flink任务提交与执行5-Task的执行
在上文生成
ExecutionGraph
之后,并通过调用executionGraph.getSchedulingTopology()
方法将拓扑结构传给SchedulerNG.schedulingTopology
之后,SchedulerNG
方法开始基于schedulingTopology
来进行任务的执行。最后,使用Execution.deploy()
方法来进行任务的部署
在Execution.deploy()
方法执行完后,TaskManager
会接收到JobManager
提交的TaskDesploymentDescriptor
信息,完成Task
的任务的构建并启动运行。当全部Task
都启动了,数据开始正常的接入运行。
1. Task的创建
TaskExecutor.submitTask
方法中调用生成task
的方法,生成Task
完毕后会将Task
添加至taskSlotTable
,同时,将ACk的信息返回给JobMaster
中的schedulerNG
,表示该任务已经被成功调度和执行。同时,会使用task.starttaskThread()
命令来启动该任务
Task task = new Task(xxxx);复制代码
2. Task的结构
由于
Task
实现了Runnable
接口,Task
正常启动后,主要方法都在Run
方法中执行。
在Run
方法中,构建了RuntimeEnvironment
,主要用于构建AbstractInvokable
,通过调用invokable.invoke()
方法来进行方法执行。AbstractInvokable
是用于给TaskManager
执行的基本抽象类。
Environment env = new RuntimeEnvironment( jobId, vertexId, executionId, executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, taskStateManager, aggregateManager, accumulatorRegistry, kvStateRegistry, inputSplitProvider, distributedCacheEntries, consumableNotifyingPartitionWriters, inputGates, taskEventDispatcher, checkpointResponder, taskManagerConfig, metrics, this); // Make sure the user code classloader is accessible thread-locally. // We are setting the correct context class loader before instantiating the invokable // so that it is available to the invokable during its entire lifetime. executingThread.setContextClassLoader(userCodeClassLoader); // now load and instantiate the task's invokable code AbstractInvokable invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);复制代码
2.1 AbstractInvokable介绍
由图可知,batch任务和streaming任务的
Task
都是继承AbstractInvokable
。
图一
2.2 StreamTask
2.2.1 Streamtask的继承类
由图二知,StreamTask
主要继承了AbstractInvokable
,主要常见的有OneInputStreamtask
, twoInputStreamTask
,SourceStreamTask
。
OneInputStreamtask
主要用于执行OneInputStreamOperator
。TwoinputStreamTask
主要是用于执行TwoinputStreamOperator
以及用于在TwoInputStreamOperator
中选择一个input来进行读取。SourceStreamTask
主要是用于执行StreamSource
图二
2.2.1 Streamtask的成员变量
headOperator
: 消费输入流的头个Operator
OperatorChain
:被当前Task
执行的operatorchain
,operatorchain
会将多个operator
放在同一个task
中执行statebackend
:当前使用的statebackend
,主要是用于创建checkpoint Streams 以及创建keyed statecheckpointStorage
: 主要是用于checkpoint
和metadata stream的持久的存储timerService
: 该成员变量用于定义当前的处理时间 以及注册任务的定时器用于下次处理accumulatorMap
: 用户定义的当前任务的累加器asyncOperationsThreadPool
: 做checkpoint
操作的异步线程池,避免当前任务被堵塞mailboxProcessor
:封装了mailbox
执行逻辑的对象inputProcessor
:主要是用于处理输入数据集
图三
3. StreamTask的执行
在使用
AbstractInvokable.invoke
方法开始执行Streamtask
后,主要由beforeinvoke
,runMailboxLoop
,afterInvoke
等3个方法
public final void invoke() throws Exception { try { beforeInvoke(); // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } // let the task do its work isRunning = true; runMailboxLoop(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } afterInvoke(); } finally { cleanUpInvoke(); } }复制代码
beforeInvoke
主要进行一些准备和初始化的工作,比如statebackend
,operatorchain
,asyncOperationsThreadPool
等项目的初始化
runMailboxLoop
主要执行streamoperator
的计算逻辑
afterInvoke
清理和关闭operator
cleanUpInvoke
清理初始化的一些变量信息
3.1 runMailboxLoop相关方法介绍
runMailboxLoop
该方法主要执行了mailboxProcessor.runMailboxLoop()
方法,在runMailboxLoop
中不断循环processMail(localMailbox)
方法,而在创建mailboxProcessor
的过程中,processInput
作为参数传给mailboxProcessor
的mailboxDefaultAction
成员变量中。因此,runDefaultAction
会直接调用Streamtask.processInput
方法持续接入数据并且处理。同时根据processInput
返回的状态数据来判断是否需要结束当前的task
。主要由MORE_AVAILABLE
,NOTHING_AVUAILBLE
,END_OF_INPUT
状态构成。MORE_AVAILABLE
表示还有数据,可以继续执行。NOTHING_AVUAILBLE
表示暂时没有数据,需要被挂起,END_OF_INPUT
表示数据已到达最后的状态,之后不再有数据输入。
public void runMailboxLoop() throws Exception { final TaskMailbox localMailbox = mailbox; Preconditions.checkState( localMailbox.isMailboxThread(), "Method must be executed by declared mailbox thread!"); assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!"; final MailboxController defaultActionContext = new MailboxController(this); while (processMail(localMailbox)) { mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed } }复制代码
processInput方法介绍
ProcessInput
方法主要调用了 inputProcessor.processInput
。inputProcessor
是一个接口,在他的实现类StreamOneInputProcessor
中,processInput
方法如下,调用了streamTaskInput.emitNext
方法,streamTaskInput
也是一个接口。
public InputStatus processInput() throws Exception { InputStatus status = input.emitNext(output); if (status == InputStatus.END_OF_INPUT) { synchronized (lock) { operatorChain.endHeadOperatorInput(1); } } return status; }复制代码
emitNext方法介绍
这里循环调用了checkpointedInputGate.pollNext
,从网络接入的数据是从InputGate
的InputChannel
接入的。然后如果是完整的记录,则调用processElement
方法进行处理。对应processElement
方法会根据不同的方法进行处理
public InputStatus emitNext(DataOutput<T> output) throws Exception { while (true) { // get the stream element from the deserializer if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { processElement(deserializationDelegate.getInstance(), output); return InputStatus.MORE_AVAILABLE; } } Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext(); if (bufferOrEvent.isPresent()) { processBufferOrEvent(bufferOrEvent.get()); } else { if (checkpointedInputGate.isFinished()) { checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available"); if (!checkpointedInputGate.isEmpty()) { throw new IllegalStateException("Trailing data in checkpoint barrier handler."); } return InputStatus.END_OF_INPUT; } return InputStatus.NOTHING_AVAILABLE; } } }复制代码
processElement方法介绍
StreamElement
主要为两种类型,Event
和StreamRecord
。Event
包含Watermark
,StreamStatus
,LatencyMaker
等实现。业务相关数据主要是在StreamRecord
中。会根据不同类型的数据进行不同的处理,而在output.emitRecord
方法中,实际上通过StreamTaskNetworkInput
中的Dataout
实现类将StreamRecord
发送到OperatorChain
的HeadOperator
进行处理
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception { if (recordOrMark.isRecord()){ output.emitRecord(recordOrMark.asRecord()); } else if (recordOrMark.isWatermark()) { statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel); } else if (recordOrMark.isLatencyMarker()) { output.emitLatencyMarker(recordOrMark.asLatencyMarker()); } else if (recordOrMark.isStreamStatus()) { statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel); } else { throw new UnsupportedOperationException("Unknown type of StreamElement"); } }
作者:大虾饺
链接:https://juejin.cn/post/7048568989332537351