学习libco,做了一些代码解读,记录在此
1 线程上下文
线程上下文实际是一个协程栈,维护协程之间的执行序列
同一个线程中的所有协程,共用一个线程上下文
线程上下文在 第一次被使用到的时候进行创建
【libco|libco 协程的理解】
// 协程公共的 线程上下文
struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ];
// 协程栈,保存同时存在的所有协程,栈顶是当前的协程
int iCallStackSize;
// 栈顶,指向当前协程的下一个位置
stCoEpoll_t *pEpoll;
// 线程公共的 异步调度器 //for copy stack log lastco and nextco
stCoRoutine_t* pending_co;
//
stCoRoutine_t* occupy_co;
//
};
static stCoRoutineEnv_t* g_arrCoEnvPerThread[ 204800 ] = { 0 };
void co_init_curr_thread_env()
{
pid_t pid = GetPid();
g_arrCoEnvPerThread[ pid ] = (stCoRoutineEnv_t*)calloc( 1,sizeof(stCoRoutineEnv_t) );
stCoRoutineEnv_t *env = g_arrCoEnvPerThread[ pid ];
env->iCallStackSize = 0;
struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
// 主协程,在线程上线文创建的时候创建
self->cIsMain = 1;
env->pending_co = NULL;
env->occupy_co = NULL;
coctx_init( &self->ctx );
env->pCallStack[ env->iCallStackSize++ ] = self;
// 主协程放在 协程栈的栈底 stCoEpoll_t *ev = AllocEpoll();
SetEpoll( env,ev );
}
2 协程创建
协程 = 回调 + 栈内存
协程创建仅仅是数据结构的初始化
// 协程
struct stCoRoutine_t
{
stCoRoutineEnv_t *env;
// 协程归属的线程上下文
pfn_co_routine_t pfn;
// 协程 所封装的回调函数
void *arg;
// 回调函数的参数
coctx_t ctx;
// 协程上下文,保存当前协程让出cpu时 寄存器的状态 char cStart;
// 标志:协程上下文 是否被初始化,若否,则使用coctx_make初始化ctx
char cEnd;
// 标志:
char cIsMain;
// 标志:是否是主协程
char cEnableSysHook;
// 标志:是否使用系统函数hook
char cIsShareStack;
// 标志:是否使用了共享栈 void *pvEnv;
// 协程当前的 系统环境变量(hook开启时使用) //char sRunStack[ 1024 * 128 ];
stStackMem_t* stack_mem;
// 协程当前使用的 栈内存空间 ,当使用共享栈的时候,指向共享的栈内存 //save satck buffer while confilct on same stack_buffer;
char* stack_sp;
// 协程切换时, 保存当前的rsp地址,即当前栈顶地址
unsigned int save_size;
// 协程切换时, 保存的栈内存大小
char* save_buffer;
// 协程切换时,将stack_mem拷贝到此,保存当时的栈数据 stCoSpec_t aSpec[1024];
// };
// 协程创建
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() )// 线程上下文 在 第一个协程创建的时候 进行创建
{
co_init_curr_thread_env();
}
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
return 0;
}struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
pfn_co_routine_t pfn,void *arg )
{ stCoRoutineAttr_t at;
if( attr )
{
memcpy( &at,attr,sizeof(at) );
}
if( at.stack_size <= 0 )
{
at.stack_size = 128 * 1024;
// 默认栈内存大小128k, 主协程使用默认栈大小
}
else if( at.stack_size > 1024 * 1024 * 8 )// 最大栈内存大小8M
{
at.stack_size = 1024 * 1024 * 8;
} if( at.stack_size & 0xFFF )// 对栈内存大小进行16位对齐
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
} stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
memset( lp,0,(long)(sizeof(stCoRoutine_t)));
lp->env = env;
// 协程的 线程上下文
lp->pfn = pfn;
// 协程的 回调函数
lp->arg = arg;
// 协程的 回调函数 参数 stStackMem_t* stack_mem = NULL;
if( at.share_stack )
{
stack_mem = co_get_stackmem( at.share_stack);
// 使用共享栈的时候,从共享栈中获取一块内存作为 当前协程的栈内存,栈内存其实使用的是堆内存
at.stack_size = at.share_stack->stack_size;
// 此时栈内存大小 由 共享栈决定
}
else// 主协程不使用共享栈
{
stack_mem = co_alloc_stackmem(at.stack_size);
// 不使用共享栈的时候,从堆中创建一块内存作为 当前协程的栈内存
}
lp->stack_mem = stack_mem;
lp->ctx.ss_sp = stack_mem->stack_buffer;
// 协程上下文 保存 栈内存起始地址
lp->ctx.ss_size = at.stack_size;
// 协程上下文保存 栈内存大小 lp->cStart = 0;
lp->cEnd = 0;
lp->cIsMain = 0;
lp->cEnableSysHook = 0;
// 默认不启用 系统函数hook
lp->cIsShareStack = at.share_stack != NULL;
// 设置是否使用共享栈的标记 lp->save_size = 0;
lp->save_buffer = NULL;
return lp;
}
3 协程执行
协程执行实际是协程的切换,包括协程上下文(寄存器状态)的切换,回调执行之后会再次切换回来
如果协程中创建了新的协程,则会有嵌套的协程切换
// 协程执行
void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
// 获取当前协程, 即调用该函数的协程
if( !co->cStart )
{
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
// 初始化 新协程的 寄存器状态,新协程上下文 的 返回地址是对 回调函数的封装
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co;
// 将新协程入栈
co_swap( lpCurrRoutine, co );
// 协程切换:当前协程挂起,保存状态, 切换到新协程
}// 协程切换
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
stCoRoutineEnv_t* env = co_get_curr_thread_env();
//get curr stack sp
char c;
// 在当前协程中申请一个字节的栈内存,此时压栈后寄存器 rsp指向该内存的地址
curr->stack_sp= &c;
// 该内存的地址 即 rsp寄存器保存的值 保存到 stack_sp if (!pending_co->cIsShareStack)// 非共享栈的情况下,不需要保存 占用共享栈 的协程
{
env->pending_co = NULL;
env->occupy_co = NULL;
}
else// 共享栈的情况下,
{
env->pending_co = pending_co;
// 保存当前占用共享栈的协程地址(新协程) 到 线程上下文 pending_co
//get last occupy co on the same stack mem
stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
// 从 新协程的共享栈中 获取 前一个占用该共享栈的 协程地址
//set pending co to occupy thest stack mem;
pending_co->stack_mem->occupy_co = pending_co;
// 更新占用共享栈的协程地址 为 新协程env->occupy_co = occupy_co;
// 保存 前一个占用新协程共享栈的 协程地址 到 线程上下文 occupy_co
if (occupy_co && occupy_co != pending_co)// 如果 之前有协程占用共享栈 && 前一个协程 不是 新协程
{
save_stack_buffer(occupy_co);
// 复制 前一个协程的栈内存 到 occupy_co->save_buffer中去
}
} //swap context// 以上的指令执行也都是在 旧协程(curr)的栈空间中
coctx_swap(&(curr->ctx),&(pending_co->ctx) );
// 切换协程的栈上下文(也就是寄存器状态),并 在新协程的栈空间中 执行协程(pending_co),
// 直到协程指令结束或者被主动挂起
// 回调执行之后,协程会被再次调用本函数切换到 协程栈的下一个协程,即会切换回旧协程,即会产生嵌套
// 如果回调中产生了新的协程,则会产生多次嵌套// 之后接着回到 旧协程 的栈空间中执行
//stack buffer may be overwrite, so get again;
stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
stCoRoutine_t* update_occupy_co =curr_env->occupy_co;
//
stCoRoutine_t* update_pending_co = curr_env->pending_co;
//
if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
{
//resume stack buffer
if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
{
memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
}
}
}// 协程回调的封装
static int CoRoutineFunc( stCoRoutine_t *co,void * )
{// 此时位于 协程(co)的 栈内存中
if( co->pfn )
{
co->pfn( co->arg );
// 执行回调,直到结束或者主动挂起
}
co->cEnd = 1;
// 回调执行结束 stCoRoutineEnv_t *env = co->env;
co_yield_env( env );
// 当前协程执行完之后,将协程切换为下一个协程(协程执行序列维护在线程上下文中) return 0;
}void co_yield_env( stCoRoutineEnv_t *env )
{
stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
env->iCallStackSize--;
co_swap( curr, last);
// 当前协程执行完之后,将协程切换为协程栈中的下一个协程
}