阅读 299

flink任务提交与执行5-Task的执行

在上文生成ExecutionGraph之后,并通过调用executionGraph.getSchedulingTopology()方法将拓扑结构传给SchedulerNG.schedulingTopology之后,SchedulerNG方法开始基于schedulingTopology来进行任务的执行。最后,使用Execution.deploy()方法来进行任务的部署

Execution.deploy()方法执行完后,TaskManager会接收到JobManager提交的TaskDesploymentDescriptor信息,完成Task的任务的构建并启动运行。当全部Task都启动了,数据开始正常的接入运行。

1. Task的创建

  • TaskExecutor.submitTask方法中调用生成task的方法,生成Task完毕后会将Task添加至taskSlotTable,同时,将ACk的信息返回给JobMaster中的schedulerNG,表示该任务已经被成功调度和执行。同时,会使用task.starttaskThread()命令来启动该任务

Task task = new Task(xxxx);复制代码

2. Task的结构

  • 由于Task实现了Runnable接口,Task正常启动后,主要方法都在Run方法中执行。

Run方法中,构建了RuntimeEnvironment,主要用于构建AbstractInvokable,通过调用invokable.invoke()方法来进行方法执行。AbstractInvokable是用于给TaskManager执行的基本抽象类。

	Environment env = new RuntimeEnvironment(
				jobId,
				vertexId,
				executionId,
				executionConfig,
				taskInfo,
				jobConfiguration,
				taskConfiguration,
				userCodeClassLoader,
				memoryManager,
				ioManager,
				broadcastVariableManager,
				taskStateManager,
				aggregateManager,
				accumulatorRegistry,
				kvStateRegistry,
				inputSplitProvider,
				distributedCacheEntries,
				consumableNotifyingPartitionWriters,
				inputGates,
				taskEventDispatcher,
				checkpointResponder,
				taskManagerConfig,
				metrics,
				this);
				
		// Make sure the user code classloader is accessible thread-locally.
			// We are setting the correct context class loader before instantiating the invokable
			// so that it is available to the invokable during its entire lifetime.
			executingThread.setContextClassLoader(userCodeClassLoader);

		// now load and instantiate the task's invokable code
		AbstractInvokable	invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);复制代码

2.1 AbstractInvokable介绍

  • 由图可知,batch任务和streaming任务的Task都是继承AbstractInvokable

abstractinvokable继承.PNG图一

2.2 StreamTask

2.2.1 Streamtask的继承类

由图二知,StreamTask主要继承了AbstractInvokable,主要常见的有OneInputStreamtask, twoInputStreamTaskSourceStreamTask

  • OneInputStreamtask主要用于执行OneInputStreamOperator

  • TwoinputStreamTask主要是用于执行TwoinputStreamOperator以及用于在TwoInputStreamOperator中选择一个input来进行读取。

  • SourceStreamTask主要是用于执行StreamSource

streamtask.PNG

图二

2.2.1 Streamtask的成员变量

  • headOperator: 消费输入流的头个Operator

  • OperatorChain:被当前Task执行的operatorchainoperatorchain会将多个operator放在同一个task中执行

  • statebackend:当前使用的statebackend,主要是用于创建checkpoint Streams 以及创建keyed state

  • checkpointStorage: 主要是用于checkpoint和metadata stream的持久的存储

  • timerService: 该成员变量用于定义当前的处理时间 以及注册任务的定时器用于下次处理

  • accumulatorMap: 用户定义的当前任务的累加器

  • asyncOperationsThreadPool: 做checkpoint操作的异步线程池,避免当前任务被堵塞

  • mailboxProcessor:封装了mailbox执行逻辑的对象

  • inputProcessor:主要是用于处理输入数据集

StreamTask成员变量.PNG图三

3. StreamTask的执行

在使用AbstractInvokable.invoke方法开始执行Streamtask后,主要由beforeinvokerunMailboxLoopafterInvoke等3个方法

public final void invoke() throws Exception {
		try {
			beforeInvoke();

			// final check to exit early before starting to run
			if (canceled) {
				throw new CancelTaskException();
			}

			// let the task do its work
			isRunning = true;
			runMailboxLoop();

			// if this left the run() method cleanly despite the fact that this was canceled,
			// make sure the "clean shutdown" is not attempted
			if (canceled) {
				throw new CancelTaskException();
			}

			afterInvoke();
		}
		finally {
			cleanUpInvoke();
		}
	}复制代码
  • beforeInvoke

主要进行一些准备和初始化的工作,比如statebackendoperatorchainasyncOperationsThreadPool等项目的初始化

  • runMailboxLoop

主要执行streamoperator的计算逻辑

  • afterInvoke

清理和关闭operator

  • cleanUpInvoke
    清理初始化的一些变量信息

3.1 runMailboxLoop相关方法介绍

  • runMailboxLoop
    该方法主要执行了 mailboxProcessor.runMailboxLoop()方法,在runMailboxLoop中不断循环processMail(localMailbox)方法,而在创建mailboxProcessor的过程中,processInput作为参数传给mailboxProcessormailboxDefaultAction成员变量中。因此,runDefaultAction会直接调用Streamtask.processInput方法持续接入数据并且处理。同时根据processInput返回的状态数据来判断是否需要结束当前的task。主要由MORE_AVAILABLENOTHING_AVUAILBLEEND_OF_INPUT状态构成。MORE_AVAILABLE表示还有数据,可以继续执行。NOTHING_AVUAILBLE表示暂时没有数据,需要被挂起,END_OF_INPUT表示数据已到达最后的状态,之后不再有数据输入。

public void runMailboxLoop() throws Exception {

		final TaskMailbox localMailbox = mailbox;

		Preconditions.checkState(
			localMailbox.isMailboxThread(),
			"Method must be executed by declared mailbox thread!");

		assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";

		final MailboxController defaultActionContext = new MailboxController(this);

		while (processMail(localMailbox)) {
			mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed
		}
	}复制代码
  • processInput方法介绍

ProcessInput方法主要调用了 inputProcessor.processInputinputProcessor是一个接口,在他的实现类StreamOneInputProcessor中,processInput方法如下,调用了streamTaskInput.emitNext方法,streamTaskInput也是一个接口。

public InputStatus processInput() throws Exception {
		InputStatus status = input.emitNext(output);

		if (status == InputStatus.END_OF_INPUT) {
			synchronized (lock) {
				operatorChain.endHeadOperatorInput(1);
			}
		}

		return status;
	}复制代码
  • emitNext方法介绍

这里循环调用了checkpointedInputGate.pollNext,从网络接入的数据是从InputGateInputChannel接入的。然后如果是完整的记录,则调用processElement方法进行处理。对应processElement方法会根据不同的方法进行处理

	public InputStatus emitNext(DataOutput<T> output) throws Exception {

		while (true) {
			// get the stream element from the deserializer
			if (currentRecordDeserializer != null) {
				DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
				if (result.isBufferConsumed()) {
					currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
					currentRecordDeserializer = null;
				}

				if (result.isFullRecord()) {
					processElement(deserializationDelegate.getInstance(), output);
					return InputStatus.MORE_AVAILABLE;
				}
			}

			Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
			if (bufferOrEvent.isPresent()) {
				processBufferOrEvent(bufferOrEvent.get());
			} else {
				if (checkpointedInputGate.isFinished()) {
					checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");
					if (!checkpointedInputGate.isEmpty()) {
						throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
					}
					return InputStatus.END_OF_INPUT;
				}
				return InputStatus.NOTHING_AVAILABLE;
			}
		}
	}复制代码
  • processElement方法介绍

StreamElement主要为两种类型,EventStreamRecordEvent包含WatermarkStreamStatusLatencyMaker等实现。业务相关数据主要是在StreamRecord中。会根据不同类型的数据进行不同的处理,而在output.emitRecord方法中,实际上通过StreamTaskNetworkInput中的Dataout实现类将StreamRecord发送到OperatorChainHeadOperator进行处理

private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
		if (recordOrMark.isRecord()){
			output.emitRecord(recordOrMark.asRecord());
		} else if (recordOrMark.isWatermark()) {
			statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
		} else if (recordOrMark.isLatencyMarker()) {
			output.emitLatencyMarker(recordOrMark.asLatencyMarker());
		} else if (recordOrMark.isStreamStatus()) {
			statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);
		} else {
			throw new UnsupportedOperationException("Unknown type of StreamElement");
		}
	}


作者:大虾饺
链接:https://juejin.cn/post/7048568989332537351


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐