libco|libco 协程的理解



学习libco,做了一些代码解读,记录在此


1 线程上下文
线程上下文实际是一个协程栈,维护协程之间的执行序列
同一个线程中的所有协程,共用一个线程上下文
线程上下文在 第一次被使用到的时候进行创建

【libco|libco 协程的理解】

// 协程公共的 线程上下文 struct stCoRoutineEnv_t { stCoRoutine_t *pCallStack[ 128 ]; // 协程栈,保存同时存在的所有协程,栈顶是当前的协程 int iCallStackSize; // 栈顶,指向当前协程的下一个位置 stCoEpoll_t *pEpoll; // 线程公共的 异步调度器 //for copy stack log lastco and nextco stCoRoutine_t* pending_co; // stCoRoutine_t* occupy_co; // }; static stCoRoutineEnv_t* g_arrCoEnvPerThread[ 204800 ] = { 0 }; void co_init_curr_thread_env() { pid_t pid = GetPid(); g_arrCoEnvPerThread[ pid ] = (stCoRoutineEnv_t*)calloc( 1,sizeof(stCoRoutineEnv_t) ); stCoRoutineEnv_t *env = g_arrCoEnvPerThread[ pid ]; env->iCallStackSize = 0; struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL ); // 主协程,在线程上线文创建的时候创建 self->cIsMain = 1; env->pending_co = NULL; env->occupy_co = NULL; coctx_init( &self->ctx ); env->pCallStack[ env->iCallStackSize++ ] = self; // 主协程放在 协程栈的栈底 stCoEpoll_t *ev = AllocEpoll(); SetEpoll( env,ev ); }



2 协程创建
协程 = 回调 + 栈内存

协程创建仅仅是数据结构的初始化




// 协程 struct stCoRoutine_t { stCoRoutineEnv_t *env; // 协程归属的线程上下文 pfn_co_routine_t pfn; // 协程 所封装的回调函数 void *arg; // 回调函数的参数 coctx_t ctx; // 协程上下文,保存当前协程让出cpu时 寄存器的状态 char cStart; // 标志:协程上下文 是否被初始化,若否,则使用coctx_make初始化ctx char cEnd; // 标志: char cIsMain; // 标志:是否是主协程 char cEnableSysHook; // 标志:是否使用系统函数hook char cIsShareStack; // 标志:是否使用了共享栈 void *pvEnv; // 协程当前的 系统环境变量(hook开启时使用) //char sRunStack[ 1024 * 128 ]; stStackMem_t* stack_mem; // 协程当前使用的 栈内存空间 ,当使用共享栈的时候,指向共享的栈内存 //save satck buffer while confilct on same stack_buffer; char* stack_sp; // 协程切换时, 保存当前的rsp地址,即当前栈顶地址 unsigned int save_size; // 协程切换时, 保存的栈内存大小 char* save_buffer; // 协程切换时,将stack_mem拷贝到此,保存当时的栈数据 stCoSpec_t aSpec[1024]; // }; // 协程创建 int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg ) { if( !co_get_curr_thread_env() )// 线程上下文 在 第一个协程创建的时候 进行创建 { co_init_curr_thread_env(); } stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg ); *ppco = co; return 0; }struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr, pfn_co_routine_t pfn,void *arg ) { stCoRoutineAttr_t at; if( attr ) { memcpy( &at,attr,sizeof(at) ); } if( at.stack_size <= 0 ) { at.stack_size = 128 * 1024; // 默认栈内存大小128k, 主协程使用默认栈大小 } else if( at.stack_size > 1024 * 1024 * 8 )// 最大栈内存大小8M { at.stack_size = 1024 * 1024 * 8; } if( at.stack_size & 0xFFF )// 对栈内存大小进行16位对齐 { at.stack_size &= ~0xFFF; at.stack_size += 0x1000; } stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) ); memset( lp,0,(long)(sizeof(stCoRoutine_t))); lp->env = env; // 协程的 线程上下文 lp->pfn = pfn; // 协程的 回调函数 lp->arg = arg; // 协程的 回调函数 参数 stStackMem_t* stack_mem = NULL; if( at.share_stack ) { stack_mem = co_get_stackmem( at.share_stack); // 使用共享栈的时候,从共享栈中获取一块内存作为 当前协程的栈内存,栈内存其实使用的是堆内存 at.stack_size = at.share_stack->stack_size; // 此时栈内存大小 由 共享栈决定 } else// 主协程不使用共享栈 { stack_mem = co_alloc_stackmem(at.stack_size); // 不使用共享栈的时候,从堆中创建一块内存作为 当前协程的栈内存 } lp->stack_mem = stack_mem; lp->ctx.ss_sp = stack_mem->stack_buffer; // 协程上下文 保存 栈内存起始地址 lp->ctx.ss_size = at.stack_size; // 协程上下文保存 栈内存大小 lp->cStart = 0; lp->cEnd = 0; lp->cIsMain = 0; lp->cEnableSysHook = 0; // 默认不启用 系统函数hook lp->cIsShareStack = at.share_stack != NULL; // 设置是否使用共享栈的标记 lp->save_size = 0; lp->save_buffer = NULL; return lp; }



3 协程执行
协程执行实际是协程的切换,包括协程上下文(寄存器状态)的切换,回调执行之后会再次切换回来
如果协程中创建了新的协程,则会有嵌套的协程切换




// 协程执行 void co_resume( stCoRoutine_t *co ) { stCoRoutineEnv_t *env = co->env; stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ]; // 获取当前协程, 即调用该函数的协程 if( !co->cStart ) { coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 ); // 初始化 新协程的 寄存器状态,新协程上下文 的 返回地址是对 回调函数的封装 co->cStart = 1; } env->pCallStack[ env->iCallStackSize++ ] = co; // 将新协程入栈 co_swap( lpCurrRoutine, co ); // 协程切换:当前协程挂起,保存状态, 切换到新协程 }// 协程切换 void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co) { stCoRoutineEnv_t* env = co_get_curr_thread_env(); //get curr stack sp char c; // 在当前协程中申请一个字节的栈内存,此时压栈后寄存器 rsp指向该内存的地址 curr->stack_sp= &c; // 该内存的地址 即 rsp寄存器保存的值 保存到 stack_sp if (!pending_co->cIsShareStack)// 非共享栈的情况下,不需要保存 占用共享栈 的协程 { env->pending_co = NULL; env->occupy_co = NULL; } else// 共享栈的情况下, { env->pending_co = pending_co; // 保存当前占用共享栈的协程地址(新协程) 到 线程上下文 pending_co //get last occupy co on the same stack mem stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co; // 从 新协程的共享栈中 获取 前一个占用该共享栈的 协程地址 //set pending co to occupy thest stack mem; pending_co->stack_mem->occupy_co = pending_co; // 更新占用共享栈的协程地址 为 新协程env->occupy_co = occupy_co; // 保存 前一个占用新协程共享栈的 协程地址 到 线程上下文 occupy_co if (occupy_co && occupy_co != pending_co)// 如果 之前有协程占用共享栈 && 前一个协程 不是 新协程 { save_stack_buffer(occupy_co); // 复制 前一个协程的栈内存 到 occupy_co->save_buffer中去 } } //swap context// 以上的指令执行也都是在 旧协程(curr)的栈空间中 coctx_swap(&(curr->ctx),&(pending_co->ctx) ); // 切换协程的栈上下文(也就是寄存器状态),并 在新协程的栈空间中 执行协程(pending_co), // 直到协程指令结束或者被主动挂起 // 回调执行之后,协程会被再次调用本函数切换到 协程栈的下一个协程,即会切换回旧协程,即会产生嵌套 // 如果回调中产生了新的协程,则会产生多次嵌套// 之后接着回到 旧协程 的栈空间中执行 //stack buffer may be overwrite, so get again; stCoRoutineEnv_t* curr_env = co_get_curr_thread_env(); stCoRoutine_t* update_occupy_co =curr_env->occupy_co; // stCoRoutine_t* update_pending_co = curr_env->pending_co; // if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co) { //resume stack buffer if (update_pending_co->save_buffer && update_pending_co->save_size > 0) { memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size); } } }// 协程回调的封装 static int CoRoutineFunc( stCoRoutine_t *co,void * ) {// 此时位于 协程(co)的 栈内存中 if( co->pfn ) { co->pfn( co->arg ); // 执行回调,直到结束或者主动挂起 } co->cEnd = 1; // 回调执行结束 stCoRoutineEnv_t *env = co->env; co_yield_env( env ); // 当前协程执行完之后,将协程切换为下一个协程(协程执行序列维护在线程上下文中) return 0; }void co_yield_env( stCoRoutineEnv_t *env ) { stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ]; stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ]; env->iCallStackSize--; co_swap( curr, last); // 当前协程执行完之后,将协程切换为协程栈中的下一个协程 }





    推荐阅读