阅读 108

协程coobjc源码分析:co调度

这篇文章主要通过源码分析,介绍coobjc中的co调度。这个问题搞清楚之后,co_lauch做了什么,看起来就很简单了。我们先了解coroutine和scheduler这两个关键的数据结构。

coroutine

在协程的数据结构中和调度相关的字段。

  • entry: 需要执行的任务,最终指向的是co_launch(block)中的block。

  • userdata: 一个OC的类对象COCoroutine,这个对象持有coroutine这个数据结构,和它一一对应。

  • context: 是协程执行的当前上下文。

  • pre_context: 保存的是这个协程被挂起或者执行完成后需要回复的上下文,coobjc通过切换上下文来实现函数的跳转。

  • scheduler:  co被scheduler的co_queue持有,scheduler是co调度的核心。

    struct coroutine {         coroutine_func entry;                   // Process entry.         void *userdata;                         // Userdata.         void *context;                          // Coroutine,         void *pre_context;                      // Coroutine's source process's Call stack data.                  struct coroutine_scheduler *scheduler;  // The pointer to the scheduler.         ...     };     typedef struct coroutine coroutine_t; 复制代码

coobjc通过yield,resume,add来操作co。这三个方法在co调度的时候也会被频繁用到。

coroutine_yield

void coroutine_yield(coroutine_t *co) {     if (co == NULL) {         // if null         co = coroutine_self();     }     BOOL skip = false;     coroutine_getcontext(co->context);     if (skip) {         return;     } #pragma unused(skip)     skip = true;     co->status = COROUTINE_SUSPEND;     coroutine_setcontext(co->pre_context); } 复制代码

这个函数的作用是挂起协程。下面提到的main指的是scheduler中main_coroutine的入口函数 coroutine_scheduler_main,im指的是coroutine_resume_im(coroutine_t *co) 这个函数会执行co的entry,main中有个for循环遍历co_queue取出head通过im函数执行head的entry。关于scheduler下面会有详细介绍。

coroutine_setcontext(co->pre_context);这一行可以让程序跳转到coroutine_getcontext(co->pre_context)这里。通常情况下的调用栈main()->im()->coroutine_getcontext(co->pre_context)。当yeild执行的时候,程序跳转到im函数中coroutine_getcontext(co->pre_context)的这个位置,im函数会直接return,跳转到main函数的for循环里,继续取出co_queue的head执行head的entry。当for循环再次执行到这个被挂起的co的时候,在im执行co entry 方法中,调用coroutine_setcontext(co->context),程序跳转到  coroutine_yield()方法中的  coroutine_getcontext(co->context);这一行,此时skip是yes。coroutine_yield()函数return。回到调用coroutine_yield()的地方。yield通过保存上下文,使得被挂起的co下次能够在之前的上下文环境下继续执行。

coroutine_resume

void coroutine_resume(coroutine_t *co) {     if (!co->is_scheduler) {         coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();         co->scheduler = scheduler;                  scheduler_queue_push(scheduler, co);                  if (scheduler->running_coroutine) {             // resume a sub coroutine.             scheduler_queue_push(scheduler, scheduler->running_coroutine);             coroutine_yield(scheduler->running_coroutine);         } else {             // scheduler is idle             coroutine_resume_im(co->scheduler->main_coroutine);         }     } } 复制代码

coroutine_resume 这个方法是把co push 到scheduler的协程队列里面。如果当前有协程在运行的话,那个当前运行的协程就会被挂起,push到协程队列里面,如果co_queue中只有新添加进来的co和被挂起的co,此时新添加进来的co处于queue的head会被main函数的for循环取出执行entry。如果当前没有协程在运行,就会执行scheduler中main_corroutine的entry函数,这个函数是一个for循环从队列中读取co,执行co的entry。添加到co_quue队列中的co最终会被执行。后面的判断如果没有runing_coroutine,这个时候main_coroutine被挂起,for循环不执行,需要主动触发一次main函数的调用coroutine_resume_im(co->scheduler->main_coroutine);

coroutine_add

void coroutine_add(coroutine_t *co) {     if (!co->is_scheduler) {         coroutine_scheduler_t *scheduler = coroutine_scheduler_self_create_if_not_exists();         co->scheduler = scheduler;         if (scheduler->main_coroutine->status == COROUTINE_DEAD) {             coroutine_close_ifdead(scheduler->main_coroutine);             coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);             main_co->is_scheduler = true;             main_co->scheduler = scheduler;             scheduler->main_coroutine = main_co;         }         scheduler_queue_push(scheduler, co);                  if (!scheduler->running_coroutine) {             coroutine_resume_im(co->scheduler->main_coroutine);         }     } } 复制代码

这个方法把当前co添加到scheduler协程队列里面。如果main_coroutine的状态是dead,会创建一个main_coroutine,coroutine_t *main_co = coroutine_create(coroutine_scheduler_main);这里可以看到main_coroutine的entry指向的是coroutine_scheduler_main这个函数下面还会讲到,作用就是前面一直在说的for循环。没有当前没有running_coroutine会主动触发main函数。

scheduler

scheduler是协程调度的核心。

scheduler的数据结构

    struct coroutine_scheduler {         coroutine_t         *main_coroutine;         coroutine_t         *running_coroutine;         coroutine_list_t     coroutine_queue;     };     typedef struct coroutine_scheduler coroutine_scheduler_t;              struct coroutine_list {         coroutine_t *head;         coroutine_t *tail;     };     typedef struct coroutine_list coroutine_list_t; 复制代码
  • main_coroutine:它的entry指向 coroutine_scheduler_main函数,类似于线程中的runloop提供一个for循环,不断读取协程队列中的head,执行head的入口函数,队列为空的时候main_coroutine会被挂起。

  • running_coroutine : 用来记录当前正在运行中的协程,获取或者挂起当前协程都会用到这个字段。

  • coroutine_queue: 是一个双向链表,用来保存添加到当前scheduler的协程,当协程的入口函数执行完成后,scheduler会把它从链表中清除。

scheduler的创建过程。

//scheduler 的创建 coroutine_scheduler_t *coroutine_scheduler_self_create_if_not_exists(void) {          if (!coroutine_scheduler_key) {         pthread_key_create(&coroutine_scheduler_key, coroutine_scheduler_free);     }          void *schedule = pthread_getspecific(coroutine_scheduler_key);     if (!schedule) {         schedule = coroutine_scheduler_new();         pthread_setspecific(coroutine_scheduler_key, schedule);     }     return schedule; } 复制代码

pthread_setspecificpthread_getspecific是线程存储的存取函数,表面上看起来这是一个全局变量,所有线程都可以使用它,而它的值在每个线程中都是是单独存储的。线程存储的key值是pthread_key_t类型,通过pthread_key_create创建,pthread_key_create需要两个参数第一个是字符串类型的key值,第二个参数是一个清理函数,线程释放这个key值对应的存储空间的的时候,这个清理函数会被调用。线程存储的创建方式保证了每一个线程中只有一个scheduler,并且提供了获取这个scheduler的入口。

coroutine_scheduler_main

// The main entry of the coroutine's scheduler // The scheduler is just a special coroutine, so we can use yield. void coroutine_scheduler_main(coroutine_t *scheduler_co) {          coroutine_scheduler_t *scheduler = scheduler_co->scheduler;     for (;;) {                  // Pop a coroutine from the scheduler's queue.         coroutine_t *co = scheduler_queue_pop(scheduler);         if (co == NULL) {             // Yield the scheduler, give back cpu to origin thread.             coroutine_yield(scheduler_co);                          // When some coroutine add to the scheduler's queue,             // the scheduler will resume again,             // then will resume here, continue the loop.             continue;         }         // Set scheduler's current running coroutine.         scheduler->running_coroutine = co;         // Resume the coroutine         coroutine_resume_im(co);                  // Set scheduler's current running coroutine to nil.         scheduler->running_coroutine = nil;                  // if coroutine finished, free coroutine.         if (co->status == COROUTINE_DEAD) {             coroutine_close_ifdead(co);         }     } } 复制代码

coroutine_scheduler_main函数是scheduler的runloop。一个for循环,从自己的协程队列里面读取协程。当scheduler的协程队列不为空的时候,会从队列中取出head执行入口函数。当队列里面的协程全部取出后,当前scheduler的协程队列coroutine_queue为空。main_coroutine会被coroutine_yield这个函数挂起。coroutine_yield会保存当前上下文,也就是说当main_coroutine下次被resume的时候,会从这里继续执行下去,继续for循环。

coroutine_resume_im

void coroutine_resume_im(coroutine_t *co) {     switch (co->status) {         case COROUTINE_READY:         {             co->stack_memory = coroutine_memory_malloc(co->stack_size);             co->stack_top = co->stack_memory + co->stack_size - 3 * sizeof(void *);             // get the pre context             co->pre_context = malloc(sizeof(coroutine_ucontext_t));             BOOL skip = false;             coroutine_getcontext(co->pre_context);             if (skip) {                 // when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.                 return;             } #pragma unused(skip)             skip = true;                          free(co->context);             co->context = calloc(1, sizeof(coroutine_ucontext_t));             coroutine_makecontext(co->context, (IMP)coroutine_main, co, (void *)co->stack_top);             // setcontext             coroutine_begin(co->context);                          break;         }         case COROUTINE_SUSPEND:         {             BOOL skip = false;             coroutine_getcontext(co->pre_context);             if (skip) {                 // when proccess reenter(resume a coroutine), skip the remain codes, just return to pre func.                 return;             } #pragma unused(skip)             skip = true;             // setcontext             coroutine_setcontext(co->context);                          break;         }         default:             assert(false);             break;     } } 复制代码

对于im这个函数,这篇文章主要介绍的是coobjc的调度,不做详细的说明。我们只需要执行这个函数在COROUTINE_READY会执行im的entry。在COROUTINE_SUSPEND状态下会恢复之前的context也就是yield中断的地方。

我们来回顾一下co调度的整个流程。在一个线程中会创建唯一的数据构 scheduler。scheduler中包含main_co,running_co,co_queue。main_co的entry是一个for循环,在co_queue队列里面取出head co,设置head为running_co,执行head的entry。当调用coroutine_resume(co)的时候。如果running_co存在,那么running_co就会被yield挂起,main_co会从co_queue取出一个新的co执行它的entry,当for循环再次遍历到这个 被挂起的co的时候,程序会跳转到yield函数里面继续执行。如果如果runing_co不存在存在的话co会被添加到co_queue,同时resume会执行main_coroutine的entry,for循环开始。

co_lauch

co_launch 这个函数的功能类似于,dispatch_async。区别是co_launch,把需要执行的任务放到一个协程队列里面,dispatch_async是把执行任务放到一个线程队列里面执行。在调度层面通过coroutine_resume把co添加到scheduler的co_queue,在这个执行任务里面,你可以通过yield,resume来交出线程或者抢占线程。

NS_INLINE COCoroutine * _Nonnull  co_launch(void(^ _Nonnull block)(void)) {     COCoroutine *co = [COCoroutine coroutineWithBlock:block onQueue:nil];     return [co resume]; } - (COCoroutine *)resume {     COCoroutine *currentCo = [COCoroutine currentCoroutine];     BOOL isSubroutine = [currentCo.dispatch isEqualToDipatch:self.dispatch] ? YES : NO;     [self.dispatch dispatch_async_block:^{         if (self.isResume) {             return;         }         if (isSubroutine) {             self.parent = currentCo;             [currentCo addChild:self];         }         self.isResume = YES;         coroutine_resume(self.co);     }];     return self; }


作者:yuec
链接:https://juejin.cn/post/6844903881239969806


文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐