Nginx|nginx master 和 worker 之间的通信

Nginx|nginx master 和 worker 之间的通信
文章图片


文章目录

    • 从 ngx_master_process_cycle 说起
    • ngx_start_worker_processes
    • ngx_spawn_process
    • ngx_worker_process_cycle
    • ngx_worker_process_init

从 ngx_master_process_cycle 说起 简单做个伪代码,看一下流程哈:
void ngx_master_process_cycle(ngx_cycle_t *cycle) { ··· // 启动各个worker进程 ngx_start_worker_processes(cycle, ccf->worker_processes, NGX_PROCESS_RESPAWN); ···for (; ; ) {··· if (···) {// 这里主要是向各个进程发送命令 ngx_signal_worker_processes(cycle, ngx_signal_value(NGX_SHUTDOWN_SIGNAL)); } } }

怎么通信先别管,再看一下是怎么启动工作进程的呗:
ngx_start_worker_processes 先了解一下 ngx_channel_t,有点重要哈:
master进程每次发送给worker进程的指令用如下的一个结构来完成封装
typedef struct {// 传递的 TCP 消息中的命令 ngx_uint_tcommand; // 进程 ID,一般是发送命令方的进程 ID ngx_pid_tpid; // 表示发送命令方在 ngx_processes 进程数组间的序号 ngx_int_tslot; // 通信的套接字句柄 ngx_fd_tfd; }ngx_channel_t; Nginx 针对 command 成员定义了如下命令: // 打开频道,使用频道这种方式通信前必须发送的命令 #define NGX_CMD_OPEN_CHANNEL 1// 关闭已经打开的频道,实际上也就是关闭套接字 #define NGX_CMD_CLOSE_CHANNEL 2// 要求接收方正常地退出进程 #define NGX_CMD_QUIT 3// 要求接收方强制地结束进程 #define NGX_CMD_TERMINATE 4// 要求接收方重新打开进程已经打开过的文件 #define NGX_CMD_REOPEN 5

static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) {··· ngx_channel_t ch; ··· ch.command = NGX_CMD_OPEN_CHANNEL; //当前是新建了一个进程 for (i = 0; i < n; i++) {//spawn,生成一个子进程 // ngx_worker_process_cycle:该子进程所进行的事件循环,这里先不管它什么循环 // worker进程在一个无限for循环中,不断的检查相应的事件模型中是否存在对应的事件, // 然后将accept事件和read、write事件分开放入两个队列中,最后在事件循环中不断的处理事件 ngx_spawn_process(cycle, ngx_worker_process_cycle, (void *) (intptr_t) i, "worker process", type); // 下面的这段代码的主要作用是将新建进程这个事件通知到其他的进程, // 其就会向ngx_processes数组的每个进程的channel[0]上写入当前广播的事件,也即这里的ch, // 因为子进程之间也需要通信 ch.pid = ngx_processes[ngx_process_slot].pid; ch.slot = ngx_process_slot; //该新建进程所存放的数组位置 ch.fd = ngx_processes[ngx_process_slot].channel[0]; // 将当前创建了子进程的事件广播给其余的进程 ngx_pass_open_channel(cycle, &ch); } }

我们来看它怎么个生成法:
ngx_spawn_process 这里啊,需要关注一下:
typedef struct {... // socketpair 创建的套接字对 ngx_socket_t channel[2]; }ngx_processes_t;

ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) {··· // 在ngx_processes数组中存储了当前创建的所有进程,而ngx_last_process理解为end, //只不过ngx_processes中记录的进程有可能有部分已经失效了。 //当前循环就是从头开始查找是否有某个进程已经失效了, //如果已经失效了,则复用该进程位置,否则直接使用ngx_last_process所指向的位置 for (s = 0; s < ngx_last_process; s++) {if (ngx_processes[s].pid == -1) {break; } } // 这里说明所创建的进程数达到了最大限度 if (s == NGX_MAX_PROCESSES) {··· return NGX_INVALID_PID; } // NGX_PROCESS_DETACHED标志表示当前fork出来的进程与原来的父进程没有任何关系,比如进行nginx升级时, // 新生成的master进程就与原先的master进程没有关系 if (respawn != NGX_PROCESS_DETACHED) { //这里为什么采用sockpair? /* 这里的socketpair()方法的主要作用是生成一对套接字流,用于主进程和子进程的通信, 这一对套接字会存储在ngx_processes[s].channel中,本质上这个字段是一个长度为2的整型数组。 在主进程和子进程 进行通信的之前,主进程会关闭其中一个,而子进程会关闭另一个, 然后相互之间往未关闭的另一个文件描述符中写入或读取数据即可实现通信。 AF_UNIX表示当前使用的是UNIX文件形式的socket地址族SOCK_STREAM指定了当前套接字建立的通信方式是管道流, 并且这个管道流是双向的,即管道双方都可以进行读写操作第三个参数protocol必须为0。 */ if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) {··· return NGX_INVALID_PID; } ··· // 将ngx_processes[s].channel[0]设置为非阻塞模式 if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {··· ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // 将ngx_processes[s].channel[1]设置为非阻塞模式 if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {··· ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } on = 1; // 将ngx_processes[s].channel[0]套接字管道设置为异步模式 if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {··· ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // 当前还处于主进程中,这里的ngx_pid指向了主进程的进程id,当前方法的作用主要是将 // ngx_processes[s].channel[0]的操作权限设置给主进程,也就是说主进程通过向 // ngx_processes[s].channel[0]写入和读取数据来与子进程进行通信 if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {··· ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // FD_CLOEXEC表示当前指定的套接字管道在子进程中可以使用,但是在execl()执行的程序中不可使用 if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {··· ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // FD_CLOEXEC表示当前指定的套接字管道在子进程中可以使用,但是在execl()执行的程序中不可使用 if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {··· ngx_close_channel(ngx_processes[s].channel, cycle->log); return NGX_INVALID_PID; } // ngx_processes[s].channel[1]是用于给子进程监听相关事件使用的,当父进程向 // ngx_processes[s].channel[0]发布事件之后, // ngx_processes[s].channel[1]中就会接收到对应的事件,从而进行相应的处理 ngx_channel = ngx_processes[s].channel[1]; } else {// 如果是NGX_PROCESS_DETACHED模式,则表示当前是另外新起的一个master进程,因而将其管道值都置为-1 ngx_processes[s].channel[0] = -1; ngx_processes[s].channel[1] = -1; } ngx_process_slot = s; // fork()产生一个新的进程 pid = fork(); switch (pid) { case -1: // fork出错 ··· return NGX_INVALID_PID; case 0: // 子进程执行的分支,这里的proc()方法是外部传进来的,也就是说,当前方法只是创建一个新的进程, // 具体的进程处理逻辑,将交由外部代码块进行定义ngx_getpid()方法获取的就是当前新创建的子进程的进程id ngx_pid = ngx_getpid(); proc(cycle, data); break; default: // 父进程会走到这里 break; } ··· // 父进程会走到这里,当前的pid是fork()之后父进程得到的新创建的子进程的pid ngx_processes[s].pid = pid; ngx_processes[s].exited = 0; ···// 设置当前进程的各个属性,并且存储到ngx_processes数组中的对应位置 ngx_processes[s].proc = proc; ngx_processes[s].data = https://www.it610.com/article/data; ngx_processes[s].name = name; ngx_processes[s].exiting = 0; ··· if (s == ngx_last_process) {ngx_last_process++; } return pid; }

现在看到,master进程使用sockpair开了两个套接字,其中,第0号位用于master进程向worker进程发送信息,并设置异步,制定1号为子进程可用,接下来我们看看子进程被创建出来执行什么工作:
ngx_worker_process_cycle
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) {··· //初始化,开启个监听啥的,先等等,后面展开 ngx_worker_process_init(cycle, worker); ··· for (; ; ) {//如果收到退出信号 ··· // 这里通过检查相应的事件模型中是否存在对应的事件,然后将其放入队列中进行处理, // 这里是worker进程处理事件的核心方法 ngx_process_events_and_timers(cycle); // 如果当前nginx已经终止,则退出当前进程 if (ngx_terminate) {ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting"); ngx_worker_process_exit(cycle); } if (ngx_quit) {··· } ··· } }

这一段是没看到我们想看的哈,看来还需要在深入一个函数去看看:
ngx_worker_process_init
/** * 这里主要是对当前进程进行初始化,为其设置优先级和打开的文件限制等参数。 * 最后会为当前进程添加一个监听channel[1]的连接,以不断读取master进程的消息,从而进行相应的处理 */ static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) {··· // 设置当前进程的优先级 if (worker >= 0 && ccf->priority != 0) {if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) {··· } } // 设置当前进程能够打开的文件句柄数 // 简而言之就是设置核心文件能够使用的最大大小 ···// 需要注意的是,对于cache manager和cache loader进程,这里的worker传入的是-1, // 表示这两个进程不需要设置亲核性 if (worker >= 0) {// 获取当前worker的CPU亲核性 cpu_affinity = ngx_get_cpu_affinity(worker); if (cpu_affinity) {// 设置worker的亲核性 ngx_setaffinity(cpu_affinity, cycle->log); //就是绑定个CPU,后面专门出一篇写这个,这个我也是要收入囊中的 } } ···// 初始化空的set指令集合 sigemptyset(&set); // ◆ SIG_BLOCK:将 set 参数指向信号集中的信号加入到信号掩码中。 // ◆ SIG_UNBLOCK:将 set 参数指向的信号集中的信号从信号掩码中删除。 // ◆ SIG_SETMASK:将 set 参数指向信号集设置为信号掩码。 // 这里就是直接初始化要阻塞的信号集,默认为空集 if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) {··· } ··· // 调用各个模块的init_process()方法进行进程模块的初始化,与本篇关系不大,后面讲模块了再说 ··· // 这里主要是关闭当前进程中各个模块的channel[0]管道句柄 for (n = 0; n < ngx_last_process; n++) { ··· //关闭父进程的channel[1] if (close(ngx_processes[n].channel[1]) == -1) { //这是个全局的processes,好吧 ··· } } // 关闭当前进程的channel[0]管道句柄 if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {··· } ··· // ngx_channel指向的是当前进程的channel[1]句柄,也即监听master进程发送消息的句柄。 // 当前方法中,首先会为当前的句柄创建一个connection对象,并且将其封装为一个事件, //然后将该事件添加到对应的事件模型队列中以监听当前句柄的事件,事件的处理逻辑则主要有这里的ngx_channel_handler()方法进行。 //这里的ngx_channel_handler的主要处理逻辑是:根据当前收到的消息设置当前进程的一些标志位,或者更新某些缓存数据, //如此,在当前进行的事件循环中,通过不断检查这些标志位,从而实现在事件进程中处理真正的逻辑。 //因而这里的ngx_channel_handler的处理效率是非常高的 if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT, ngx_channel_handler) == NGX_ERROR) {exit(2); } }

这里worker进程的初始化过程主要做了三件事:
为worker进程设置优先级和提升打开文件的权限; 设置worker进程的亲核性; 关闭当前进程与master进程通信的管道数组中的channel[0],然后监听channel[1],以处理master进程的消息;

当收到master进程发过来的命令后,就调用ngx_channel_handler处理。
【Nginx|nginx master 和 worker 之间的通信】至此,master-worker 之间的通信就讲完了。

    推荐阅读