阅读 122

Flink源码4-Slot分配和Task执行

接上期:——》JobMaster#startJobExecution()
resetAndStartScheduler();
——》JobMaster#resetAndStartScheduler
schedulerAssignedFuture.thenRun(this::startScheduling);
——》JobMaster#startScheduling()
schedulerNG.startScheduling();

SchedulerBase# startScheduling();

startAllOperatorCoordinators();
* 注释: 开始调度
*/
startSchedulingInternal();

DefaultScheduler#startSchedulingInternal
schedulingStrategy.startScheduling();

LazyFromSourcesSchedulingStrategy#startScheduling();

* 注释: 申请 Slot 并且开始 部署 ExecutionVertices
*/
allocateSlotsAndDeployExecutionVertices(schedulingTopology.getVertices());
——》LazyFromSourcesSchedulingStrategy#allocateSlotsAndDeployExecutionVertices

* 注释: 申请 slot 和 部署
* schedulerOperations = DefaultScheduler
*/
schedulerOperations.allocateSlotsAndDeploy(Collections.

DefaultScheduler#allocateSlotsAndDeploy()

*******来到正式入口:DefaultScheduler#allocateSlotsAndDeploy()***********
流程:
1、JobMaster 发送请求申请 slot
2、ResourceManager 接收到请求,执行 slot请求处理
3、TaskManager 处理 ResourceManager 发送过来的 Slot 请求
4、JobMaster 接收到 TaskManager 发送过来的 Slot 申请处理结果

1、JobMaster 发送请求申请 slot 0:31 ~

—— 5 》DefaultScheduler#allocateSlotsAndDeploy()

* 1 注释: 申请Slot
final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);
.............
* 2 注释: 部署运行
*/
waitForAllSlotsAndDeploy(deploymentHandles);

——先走 1 》DefaultScheduler#allocateSlots() 0:21
* 注释: 申请Slot
*/
final List<SlotExecutionVertexAssignment>
slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);
——》DefaultScheduler#allocateSlots()

——4 》DefaultExecutionSlotAllocator#allocateSlotsFor

  • 注释: NormalSlotProviderStrategy */
    slotProviderStrategy.allocateSlot

    SlotProviderStrategy#NormalSlotProviderStrategy#allocateSlot()

    SchedulerImpl#allocateSlot()
    ——》SchedulerImpl#allocateSlotInternal
    ——》SchedulerImpl#internalAllocateSlot
    —— 》SchedulerImpl# allocateSingleSlot

    //第1 步: 从池中获取
    Optional<SlotAndLocality> slotAndLocality = tryAllocateFromAvailable
    if(slotAndLocality.isPresent()) {
    return //如果slot池里面有直接分配
    CompletableFuture.completedFuture(completeAllocationByAssigningPayload
    else{//否则new 一个
    return requestNewAllocatedSlot(slotRequestId, slotProfile
    }
    ——》SchedulerImpl# requestNewAllocatedSlot
    return slotPool.requestNewAllocatedSlot(slotRequestId,

    SlotPoolImpl#requestNewAllocatedSlot()
    ——》 SlotPoolImpl# requestNewAllocatedSlotInternal()
    ——》 SlotPoolImpl# requestSlotFromResourceManager()
    //真正发送请求给resourceManager
    CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway
    .requestSlot

    ResourceManager#requestSlot()
    slotManager.registerSlotRequest(slotRequest);

    SlotManagerImpl#registerSlotRequest()
    ——0 》SlotManagerImpl#internalRequestSlot()

OptionalConsumer.of(findMatchingSlot(resourceProfile))
.ifPresent(
101:) taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
.ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));

  ——1  有空闲 slot》SlotManagerImpl#findMatchingSlot
  ——2  没有slot 》    fulfillPendingSlotRequestWithPendingTaskManagerSlot        

pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
——》 SlotManagerImpl# allocateResource();
if(!resourceActions.allocateResource(defaultWorkerResourceSpec))

ResourceManager#ResourceActionsImpl#allocateResource()
// 这里2个实现 ,第一个 StandaloneResourceManager 直接返回false
// 第二个 startNewWorker ,申请新的 YarnContainer
return startNewWorker(workerResourceSpec);

返回上面101:) taskManagerSlot -> allocateSlot
—— 》 SlotManagerImpl#allocateSlot() 1:18:00

* 注释: 申请 slot
CompletableFuture<Acknowledge> requestFuture = gateway
.requestSlot(slotId, pendingSlotRequest.getJobId(),

102:) TaskExecutor#requestSlot()

allocateSlot(slotId, jobId, allocationId, resourceProfile);
——》 TaskExecutor#allocateSlot
if(taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId

TaskSlotTableImpl#allocateSlot()
slots.add(allocationId);
▲▲▲▲回到上面102:)行方法 ▲▲▲▲▲▲▲▲

       offerSlotsToJobManager(jobId);

——》 TaskExecutor#offerSlotsToJobManager
—— 》 TaskExecutor# internalOfferSlotsToJobManager


image.png

image.png

acceptedSlotsFuture .whenCompleteAsync(handleAcceptedSlotOffers
——》 TaskExecutor#handleAcceptedSlotOffers()
▲▲▲▲回到上面 ——4 》▲▲ 1:48
——4 》DefaultExecutionSlotAllocator#allocateSlotsFor()

image.png

▲▲▲▲一直回到最上面 ——5 》 ▲▲
—— 5 》DefaultScheduler#allocateSlotsAndDeploy()
* 再走 2 注释: 部署运行
* 1、申请到slot了
* 2、构建好了hander
* 3、 执行部署
*/
waitForAllSlotsAndDeploy(deploymentHandles); 1:52
▲▲▲▲至此 slot 已经 获取 ,下来将进入 task部署和 提交 》▲▲ 1:48

4.3. Task 部署和提交 2:02 ~

入口:
—— 5 》DefaultScheduler#waitForAllSlotsAndDeploy()


image.png

—— 》DefaultScheduler#deployAll()

  • 注释: 通过 deployOrHandleError 来进行部署
    * 当然,部署 Task 的时候,也有可能会报错!
    */ FutureUtils.assertNoException(slotAssigned.handle(deployOrHandleError(deploymentHandle)));

—— 》DefaultScheduler#deployOrHandleError()

  • 注释: 部署 Task(到时候根据 ExecutionVertexID 来确定 Task)
    */
    deployTaskSafe(executionVertexId);
    —— 》DefaultScheduler#deployTaskSafe()

    image.png

        /*************************************************
           注释: 根据 ExecutionVertexId 获取 ExecutionVertex
         */
        final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
    
        /*************************************************
           注释: 一个 Task 执行一个 ExecutionVertex
         *  executionVertexOperations = DefaultExecutionVertexOperations
         */
        executionVertexOperations.deploy(executionVertex);
                                                                          ↓
               DefaultExecutionVertexOperations#deploy();
      —— 》ExecutionVertex#deploy
             *  注释: 调用 Execution 的 deploy() 方法部署
     */
    currentExecution.deploy();
          —— 》Execution#deploy();
                                            ▼  
    
image.png

CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask
(deployment, rpcTimeout), executor).
thenCompose(Function.identity())

—— 》RpcTaskManagerGateway#submitTask()

              *  注释: TaskExecutor
     *  提交到对应的 slot 所在节点的 TaskExecutor 中来执行该 ExecutionVertex,其实已经变成: Task
     *  关于 Client 提交 Job 到最后变成分布式 Task 物理执行图的所有细节到此为止,结束了。
     *  从这以后,就是去到了 TaskManager 中的 某个节点 TaskExecutor 
            *  中来执行 Task 
     */
    return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
                                                            ↓   

—— 》TaskExecutor#submitTask()

* 注释: 提交 Task 让 TaskManager 启动
* 第一个参数: TaskDeploymentDescriptor 包含启动当前这个 Task 所需要的一切信息
* 注释: 构建 Task
* 内部会初始化一个执行线程。一个Task 是线程级别的执行粒度
*/
Task task = new Task(jobInformation, taskInformation, tdd.getExecutionAttemptId(),

      ——》Task 构造函数()
                                        ▼  
              *  注释: 初始化 ResultPartitionerWriter
     */
    // produced intermediate result partitions
    final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment
           ..............
           *  注释: 初始化 InputGate
     */ shuffleEnvironment.createInputGates(taskShuffleContext, this, inputGateDeploymentDescriptors)
        .toArray(new IndexedInputGate[0]);
            .............
            *  注释: 执行 Task 的线程 但是不启动 
     *  转到 Task 的 run() 方法
     */
    // finally, create the executing thread, but do not start it
    executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
             .............

image.png

* 注释: 注册 Task
*/
taskAdded = taskSlotTable.addTask(task);
................
* 注释: 如果注册成功,则通过一个线程来运行 Task
*/
task.startTaskThread();
——》Task # startTaskThread

* 注释: 这个线程,在创建 Task 对象的时候,就已经会初始化好了
* 经过转换,最终,就是调用当前类的 run() 方法
*/
executingThread.start();
—— 》Task #run()
——aa ★★》Task #dorun()

image.png

a:)      setupPartitionsAndGates(consumableNotifyingPartitionWriters, inputGates);
     ——a 》Task # setupPartitionsAndGates()

image.png

——a:1 》 partition.setup();

ResultPartition# setup();
partitionManager.registerResultPartition(this);
image.png

                                    ——a: 2》 启动
                                        // we are requesting partitions
                   for(InputGate gate : inputGates) {
                             gate.setup();
                                                  ↓ 
                               ——》 SingleInputGate#.setup();
                                             ▼       


BufferPool bufferPool = bufferPoolFactory.get();
setBufferPool(bufferPool);
▲回到最上面 ——aa 》Task #dorun ▲
——aa 》Task #dorun ()
................
b:) * 注释: 构建一个环境对象
*/

Environment env = new RuntimeEnvironment(jobId, vertexId,
c:) invokable = loadAndInstantiateInvokable(userCodeClassLoader,

d:) invokable.invoke();

StreamTask#invoke();

... beforeInvoke();
...★runMailboxLoop();
...afterInvoke();

作者:fat32jin

原文链接:https://www.jianshu.com/p/cab84b749a44

文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐