QUAGGA 线程机制解析


文章目录

  • QUAGGA 线程机制解析
    • 1 引言
      • 1.1 说在前面的话
      • 1.2 QUAGGA 线程机制简介
    • 2 重要的结构体解析
      • 2.1 线程结构体 thread
      • 线程对CPU使用的情况 struct cpu_thread_history
      • 线程管理结构 thread_master
    • 3事件主循环顺序
      • 3.1 源码解析
      • 3.2 线程之间状态转换图
      • 3.3 线程调度时序图
    • 4 API的使用
      • 4.1 主要的外部接口
      • 4.2 内部接口
    • 5 总结

QUAGGA 线程机制解析 1 引言 1.1 说在前面的话
最近研读 QUAGGA1.2.4源码,发现QUAGG源码中线模块设计很不错,所以就分析了相关源码,QUAGGA中线程设计的框架和LIBEVENT的主框架设计有异曲同工之妙,都是REACTOR模式,本文将详细介绍quagga中线程中的各种事件,read,write,timer,signal等,本文涉及的主要知识点如下:
  • IO多路复用技术(IOCP select epoll poll等)
  • Reactor 模式
  • 常见数据结构(list queue等)
1.2 QUAGGA 线程机制简介
quagga中的线程是“假线程”,它并没有实现线程中的优先级抢占问题。在quagga的线程管理中,有一个虚拟的时间轴,必须等前一时间的线程处理完,才看下一时间的线程是否触发。由于电脑处理速度较快且处理每个线程的时间间隔较小,所以可以达到类似“并行处理”的多线程效果。
quagga中的线程是分队列调度的,每个队列以一个链表或者队列的方式实现。线程队列可以分成5个队列:event、timer、ready、read、write。队列的优先级由高到低排列。但是,read和write队列并不参与到优先级的排列中,实际操作时,如果read和write队列中的线程就绪,就加入ready队列中,等待调度。调度时,首先进行event队列中线程的调度,其次是ready和timer。
线程主要涉及的文件 thread.c thread.h,本文以quagga1.2.4 进行解析
2 重要的结构体解析 2.1 线程结构体 thread
/* Thread itself. */ struct thread { thread_type type; /* thread type */ thread_type add_type; /* thread type */ struct thread *next; /* next pointer of the thread */ struct thread *prev; /* previous pointer of the thread */ struct thread_master *master; /* pointer to the struct thread_master. */ int (*func) (struct thread *); /* event function */ void *arg; /* event argument */ union { int val; /* second argument of the event. */ int fd; /* file descriptor in case of read/write. */ struct timeval sands; /* rest of time sands value. */ } u; int index; /* used for timers to store position in queue */ struct timeval real; struct cpu_thread_history *hist; /* cache pointer to cpu_history */ const char *funcname; const char *schedfrom; int schedfrom_line; };

以下对结构体中各个变量进行解析:
  • type : 线程类型,共有7种,在thread.h中用宏定义好:
    /* Thread types. */ #define THREAD_READ0// 写绪队列 #define THREAD_WRITE1// 写队列 #define THREAD_TIMER2// 定时器队列 #define THREAD_EVENT3// 事件队列 #define THREAD_READY4// 就绪队列 #define THREAD_BACKGROUND5// 需要后台运行队列 #define THREAD_UNUSED6// 空闲队列 #define THREAD_EXECUTE7// 执行队列

  • add_type: 不懂
  • next:指向下一个线程
  • prev:指向前一个线程
  • master:指向线程管理结构由 thread_master管理所有的thread
  • func:该线程对应的事件处理函数 原型:
  • arg:传递给事件回调函数的第一个参数
  • 联合体 u中变量:
    • val: 事件回调函数使用的第二个参数,暂时不清楚具体用法
    • fd:读写事件使用的文件描述符 /
    • sands:定时器的时间间隔
  • index :定时器事件在队列中的位置,所有的timer都在一个队列中
  • real:暂时不清楚具体用法
  • hist: 统计该线程 thread对CPU的使用情况 下面单独介绍
  • funcname :#f 调用的函数名字 调试信息
  • schedfrom:FILE 调用的文件信息 调试信息
  • schedfrom_line:LINE 调用行号 调试信息
线程对CPU使用的情况 struct cpu_thread_history
struct cpu_thread_history { int (*func)(struct thread *); unsigned int total_calls; struct time_stats { unsigned long total, max; } real; #ifdef HAVE_RUSAGE struct time_stats cpu; #endif thread_type types; const char *funcname; };

Router> show thread cpu CPU (user+system): Real (wall-clock): Runtime(ms)Invoked Avg uSec Max uSecs Avg uSec Max uSecsTypeThread 2.25217132442147465 RWvty_accept 2.6292797384109422 RWB work_queue_run 1.381178132196355 RWThello_timer 6.26261102442116465 RWTEXB TOTAL

线程管理结构 thread_master
/* Master of the theads. */ struct thread_master { struct thread **read; struct thread **write; struct pqueue *timer; struct thread_list event; struct thread_list ready; struct thread_list unuse; struct pqueue *background; int fd_limit; thread_fd_set readfd; thread_fd_set writefd; thread_fd_set exceptfd; unsigned long alloc; };

以下对上述结构的解释:
  • read 读事件列表 数据结构:指针数组
  • write 写事件列表 数据结构:指针数组
  • timer 定时器队列 数据结构:队列
  • event 事件列表 数据结构:双向链表
  • ready 就绪列表 数据结构:双向链表
  • unuse // 没有用到的线程列表 数据结构:双向链表
  • background 需要单独执行一个线程的列表,通常这个事件比较耗时 数据结构:队列
  • fd_limit 可以读写的最大文件描述符的个数
  • readfd 读事件文件描述符 用于IO复用
  • writefd 写事件文件描述符 用于IO复用
  • exceptfd 异常事件文件描述符 用于IO复用
  • alloc 线程的数量
各种事件的处理顺序:
singal > ready > event > timer > read > write > background
3事件主循环顺序 3.1 源码解析
/* Co-operative thread main loop */ void thread_main (struct thread_master *master) { struct thread *t; // 此过程不断的获取需要执行事件,然后去执行 while ((t = thread_fetch (master))) // 执行就绪的事件 thread_call (t); }/* Fetch next ready thread. */ static struct thread * thread_fetch (struct thread_master *m) { struct thread *thread; thread_fd_set readfd; thread_fd_set writefd; thread_fd_set exceptfd; struct timeval timer_val = { .tv_sec = 0, .tv_usec = 0 }; struct timeval timer_val_bg; struct timeval *timer_wait = &timer_val; struct timeval *timer_wait_bg; while (1) { int num = 0; /* Signals pre-empt everything */ // 优先处理信号事件 quagga_sigevent_process (); /* Drain the ready queue of already scheduled jobs, before scheduling * more. */ // 查看就绪的队列,如果有就绪的事件则取出来,立刻返回 if ((thread = thread_trim_head (&m->ready)) != NULL) { printf("func %s file %s line %d\n",__FUNCTION__,__FILE__,__LINE__); return thread; }/* To be fair to all kinds of threads, and avoid starvation, we * need to be careful to consider all thread types for scheduling * in each quanta. I.e. we should not return early from here on. *//* Normal event are the next highest priority.*/ //查看事件的队列,如果有将事件推动到就绪的队列之中 thread_process (&m->event); printf("func %s file %s line %d\n",__FUNCTION__,__FILE__,__LINE__); // 所有需要检测是文件描述符 /* Structure copy.*/ readfd = fd_copy_fd_set(m->readfd); writefd = fd_copy_fd_set(m->writefd); exceptfd = fd_copy_fd_set(m->exceptfd); //如果没有就绪的事件,则进行定时器、IO的检测 /* Calculate select wait timer if nothing else to do */ if (m->ready.count == 0) { //计算 select 需要的超时时间,例如有两个定时器,一个5秒,还剩 3秒就到时间了 一个10秒,还有2秒就到执行时间了, // 计算出最短的时间,也就是2秒,用2秒作为 select的超时时间,巧妙的将定时器和IO复用整合在一起 quagga_get_relative (NULL); timer_wait = thread_timer_wait (m->timer, &timer_val); timer_wait_bg = thread_timer_wait (m->background, &timer_val_bg); printf("func %s file %s line %d\n",__FUNCTION__,__FILE__,__LINE__); if (timer_wait_bg && (!timer_wait || (timeval_cmp (*timer_wait, *timer_wait_bg) > 0))) timer_wait = timer_wait_bg; } //进行检测IO的检测 quagga 使用的是 select num = fd_select (FD_SETSIZE, &readfd, &writefd, &exceptfd, timer_wait); printf("func %s file %s line %d\n",__FUNCTION__,__FILE__,__LINE__); /* Signals should get quick treatment */ if (num < 0) { if (errno == EINTR) continue; /* signal received - process it */ zlog_warn ("select() error: %s", safe_strerror (errno)); return NULL; }/* Check foreground timers.Historically, they have had higher priority than I/O threads, so let's push them onto the ready list in front of the I/O threads. */ quagga_get_relative (NULL); // 处理定时器事件,如果定时器到了将其放入到就绪队列中去 thread_timer_process (m->timer, &relative_time); printf("func %s file %s line %d\n",__FUNCTION__,__FILE__,__LINE__); /* Got IO, process it */ // 处理select检测到文件描述符,如果有则将读写事件加入到就绪队列中去 if (num > 0) {thread_process_fds (m, &readfd, &writefd, num); printf("func %s file %s line %d\n",__FUNCTION__,__FILE__,__LINE__); }#if 0 /* If any threads were made ready above (I/O or foreground timer), perhaps we should avoid adding background timers to the ready list at this time.If this is code is uncommented, then background timer threads will not run unless there is nothing else to do. */ if ((thread = thread_trim_head (&m->ready)) != NULL) return thread; #endif/* Background timer/events, lowest priority */ // 处理 background事件,和定时器类似 thread_timer_process (m->background, &relative_time); if ((thread = thread_trim_head (&m->ready)) != NULL) return thread; } }

3.2 线程之间状态转换图
QUAGGA 线程机制解析
文章图片

3.3 线程调度时序图
QUAGGA 线程机制解析
文章图片

4 API的使用 4.1 主要的外部接口
/* 创建一个新的thread_master ,分配内存,初始化各个链表以及队列,一般情况下一个进程创建一个 */ extern struct thread_master *thread_master_create (void); /* 释放掉 创建时分配的内存 */ extern void thread_master_free (struct thread_master *); // 向读队列中添加一个读事件 extern struct thread *funcname_thread_add_read (struct thread_master *, int (*)(struct thread *), void *, int, debugargdef); //向写队列中添加一个写事件 extern struct thread *funcname_thread_add_write (struct thread_master *, int (*)(struct thread *), void *, int, debugargdef); // 添加一个定时器 秒为单位 extern struct thread *funcname_thread_add_timer (struct thread_master *, int (*)(struct thread *), void *, long, debugargdef); // 添加一个定时器 微妙为单位 extern struct thread *funcname_thread_add_timer_msec (struct thread_master *, int (*)(struct thread *), void *, long, debugargdef); // 添加一个定时器 定时器单位 struct timeval * extern struct thread *funcname_thread_add_timer_tv (struct thread_master *, int (*)(struct thread *), void *, struct timeval *, debugargdef); // 添加一个事件优先级大约 读写 定时器 extern struct thread *funcname_thread_add_event (struct thread_master *, int (*)(struct thread *), void *, int, debugargdef); // 添加一个 background事件 这种事件往往比较耗时,需要后台运行 extern struct thread *funcname_thread_add_background (struct thread_master *, int (*func)(struct thread *), void *arg, long milliseconds_to_delay, debugargdef); // 添加一个立即执行的事件,该函数立刻执行该事件 extern struct thread *funcname_thread_execute (struct thread_master *, int (*)(struct thread *), void *, int, debugargdef); #undef debugargdef // 取消某个线程 extern void thread_cancel (struct thread *); // 取消某个事件 extern unsigned int thread_cancel_event (struct thread_master *, void *); // 线程主线程 负责事件的调度执行,是个死循环 extern void thread_main (struct thread_master *); // /* Return remain time in second. */ extern unsigned long thread_timer_remain_second (struct thread *); // 与上面类似 只不过返回不是秒而是 struct timeval extern struct timeval thread_timer_remain(struct thread*); // extern int thread_should_yield (struct thread *);

4.2 内部接口
这里不进行详细的介绍,函数都比较简短小巧,也比较容易理解,主要分为以下几类:
  • 时间相关的函数接口
  • 线程相关数据的增删改查 (struct thread * struct pqueue * struct thread_list )
  • CPU信息记录相关的接口
5 总结 【QUAGGA 线程机制解析】本博文对quagga线程机制进行相关的介绍,由于quagga是一个非常优秀的开源工具,线程模块设计得也比较精妙,所以分析了一下,增加自己的设计能力,后续可以将此模块进行移植出来,作为一个 Reactor进行使用。

    推荐阅读