消息队列新实现(Workflow|消息队列新实现:Workflow msgqueue代码详解)
PART - 0 一个小小的创新
开源项目Workflow中有许多小创新的基础模块,今天来介绍最常用的传统数据结构:多生产者-多消费者-消息队列。
着重声明一下,这里说的消息队列不是kafka这样的消息队列服务,而是单机内的传统数据结构,主要用于机器资源的协调调度,比如线程/协程调度、网络异步资源收发等,即可以协调执行资源,又可以给数据资源当临时buffer用。
前段时间介绍过300行代码的线程池、以及在线程池之上用作计算调度的200行的Executor,而今天介绍的msgqueue,更简单、更常用,代码不到200行,极度治愈懒癌晚期的你~msgqueue模块同样非常独立,可以直接拿出来使(把)用(玩)。
一句话概括:内部使用两个队列,分别拆开了生产者和消费者之间的争抢,提升了吞吐的同时,依然维持了比较优秀的长尾、且兼顾保序与代码极简的优良传统;而其中极度节约的链表实现方式,在减少内存分配上也有可学习之处。
代码位置:https://github.com/sogou/workflow/blob/master/src/kernel/msgqueue.c
PART - 1 消息队列常见实现
认识比较早的小伙伴可能知道,其实在项目开源以前,我就为消息队列写过一系列的文章和对比了,以下按个人看过的一些给出常见的实现,也欢迎大家评论里补充看过觉得很棒的代码:
- 简单粗暴版:一把锁两个条件变量的基本实现
由于太简单了,大家手写即可~ - 双队列版:Workflow的msgqueue
https://github.com/sogou/work... - 链表版:LinkedBlockingQueue
https://developer.android.com...
(这个BlockingQueue系列还蛮多实现的) - grpc版:mpmc queue
https://github.com/grpc/grpc/... - 无锁版:内核的单生产者单消费者kfifo
https://github.com/torvalds/l... - 不保序版:go的调度workstealing
https://github.com/golang/go/...
PART - 2 msgqueue的算法 Workflow的msgqueue很简单,两张图足以说清楚内部结构和流程:
文章图片
几个特点:
- 内部有两个list,生产者把消息放到生产队列,消费者从消费队列取消息;
- 使用了两把锁分别管理两个队列;
- 使用了两个条件变量分别管理生产者和消费者的等待唤醒;
- 队列可以有block / nonblock两种状态;
- 如果为block,则生产者队列最大长度为maxlen,如果为nonblock,不限制最大长度;
文章图片
算法很简单步骤:
- 当get_list(也就是消费者队列)不为空,消费者可以拿到一个消息;
- 否则消费者会等待,直到put_list(也就是生产者队列)不为空,然后交换两个队列;
很久以前我在个人的queue项目中有很简单的压测数据:GitHub - holmes1412/queue: some different implements of queue and test,不太完善,仅供参考,毕竟我智商摆在这._.
也推荐通过以下小项目,看看msgqueue如何重构线程池来达到性能的飞跃::https://github.com/Barenboim/msgqueue_thrdpool
PART - 3 代码详解 还是按照以前介绍过的万能七步,我们可以跟着这100多行代码把队列学习一遍~
第一步:通过头文件看接口 打开msgqueue.h:
msgqueue_t *msgqueue_create(size_t maxlen, int linkoff);
void msgqueue_put(void *msg, msgqueue_t *queue);
void *msgqueue_get(msgqueue_t *queue);
void msgqueue_set_nonblock(msgqueue_t *queue);
void msgqueue_set_block(msgqueue_t *queue);
void msgqueue_destroy(msgqueue_t *queue);
msgqueue_create()
函数创建一个消息队列。- 参数maxlen代表生产队列的最大长度,默认的模式下,达到最大长度时生产者将会阻塞。
- 而第二个参数linkoff是这个模块的一个亮点。它让用户指定一个消息的偏移量,每条消息的这个位置用户需要预留一个指针大小的空间,用于内部拉链。这一个简单的设计,避免进出消息队列时,多一次内存分配与释放。
文章图片
了解过
msgqueue_create()
接口,msgqueue_get(
)和msgqueue_put()
就无须过多介绍了。注意msg的linkoff位置,需要预留一个指针。第二步:.h接口上的数据结构 上述接口上的msgqueue_t,是消息队列的真身,看起来实现在msgqueue.c里。
typedef struct __msgqueue msgqueue_t;
第三步:.c文件的内部数据结构 接下来是激动人心的时刻,为了方便起见,这里直接用了
void **
去做链表,这样做的一大优势是:充分利用用户分配的msg内存,消息队列内部可以省去分配释放空间的开销(我可真是个小机灵鬼(?′ ▽ `?)?
当然怎么实现不重要,无需纠结~
struct __msgqueue
{
size_t msg_max;
size_t msg_cnt;
int linkoff;
int nonblock;
void *head1;
void *head2;
void **get_head;
void **put_head;
void **put_tail;
pthread_mutex_t get_mutex;
pthread_mutex_t put_mutex;
pthread_cond_t get_cond;
pthread_cond_t put_cond;
};
这里就可以对得上先前讲述的:
- 两个内部队列:get_head, put_head;
- 两把锁:get_mutex, put_mutex;
- 两个条件变量:get_cond, put_cond;
- 另外的msg_max和msg_cnt很好理解,分别是内部生产者队列允许的最大长度,以及生产者队列当前实际长度。
- nonblock显然是标志这个队列是否为阻塞模式,为了简化,我们下面的代码都只讨论阻塞模式。
- 我们看到有put_tail,但是没有get_tail,因为消费者get时只管从head里拿就可以了,只有生产者put才需要通过head和tail,来保证消息的全局有序。
- linkoff已经介绍过,这是内部链表算偏移量的关键点。
msgqueue_create()
,基本足以看清楚内部数据管理方式了:msgqueue_t *msgqueue_create(size_t maxlen, int linkoff)
{
// 各种初始化,最后设置queue的成员变量如下:
queue->msg_max = maxlen;
queue->linkoff = linkoff;
queue->head1 = NULL;
queue->head2 = NULL;
// 借助两个head分别作为两个内部队列的位置
queue->get_head = &queue->head1;
queue->put_head = &queue->head2;
// 一开始队列为空,所以生产者队尾也等于队头
queue->put_tail = &queue->head2;
queue->msg_cnt = 0;
queue->nonblock = 0;
...
}
msgqueue_create(
)的接口会传入linkoff,之后这个消息队列里都是用这个来作为每一条消息的实际长度,从而计算出下一个位置的偏移量应该是多少。然后看看生产者接口
msgqueue_put()
:void msgqueue_put(void *msg, msgqueue_t *queue)
{
// 1. 通过create的时候传进来的linkoffset,算出消息尾部的偏移量
void **link = (void **)((char *)msg + queue->linkoff);
// 2. 设置为空,用于表示生产者队列末尾的后面没有其他数据
*link = NULL;
// 3. 加生产者锁
pthread_mutex_lock(&queue->put_mutex);
// 4. 如果当前已经有msg_max个消息的话
//就要等待消费者通过put_cond来叫醒我
while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock)
pthread_cond_wait(&queue->put_cond, &queue->put_mutex);
// 5. put_tail指向这条消息尾部,维护生产者队列的消息个数
*queue->put_tail = link;
queue->put_tail = link;
queue->msg_cnt++;
pthread_mutex_unlock(&queue->put_mutex);
// 6. 如果有消费者在等,通过get_cond叫醒他~
pthread_cond_signal(&queue->get_cond);
}
对应的,消费者接口
msgqueue_get()
:void *msgqueue_get(msgqueue_t *queue)
{
void *msg;
// 1. 加消费者锁
pthread_mutex_lock(&queue->get_mutex);
// 2. 如果目前get_head不为空,表示有数据;
//如果空,那么通过__msgqueue_swap()切换队列,也可以拿到数据
if (*queue->get_head || __msgqueue_swap(queue) > 0)
{
// 3. 对应put中的计算方式,根据尾巴的偏移量把消息起始偏移量算出来
msg = (char *)*queue->get_head - queue->linkoff;
// 4. 往后挪,这时候的*get_head就是下一条数据的偏移量尾部了
*queue->get_head = *(void **)*queue->get_head;
}
else
{
// 5. 没有数据,同时设置errno~~~
msg = NULL;
errno = ENOENT;
}pthread_mutex_unlock(&queue->get_mutex);
return msg;
}
第五步:其他核心函数的实现 毫无疑问,还有一个核心函数是
__msgqueue_swap( )
,这是切换队列的算法的关键:static size_t __msgqueue_swap(msgqueue_t *queue)
{
// 1. 用临时变量记录下当前的get队列偏移量
void **get_head = queue->get_head;
size_t cnt;
// 2. 把刚才的生产者队列换给消费者队列
queue->get_head = queue->put_head;
// 3. 只有这个地方才会同时持有消费者锁和生产者锁
pthread_mutex_lock(&queue->put_mutex);
// 4. 如果当前对列本身就是空的
//这里就会帮等待下一个来临的生产者通get_cond叫醒我
while (queue->msg_cnt == 0 && !queue->nonblock)
pthread_cond_wait(&queue->get_cond, &queue->put_mutex);
cnt = queue->msg_cnt;
// 5. 如果当前对列是满的,说明可能有生产者在等待
//通过put_cond叫醒生产者(可能有多个,所以用broadcast)
if (cnt > queue->msg_max - 1)
pthread_cond_broadcast(&queue->put_cond);
// 6. 把第一行的临时变量换给生产者队列,清空生产者队列
queue->put_head = get_head;
queue->put_tail = get_head;
queue->msg_cnt = 0;
pthread_mutex_unlock(&queue->put_mutex);
// 7. 返回刚才多少个,这个会影响get里的逻辑
return cnt;
}
第六步:把函数关联起来 这个模块接口比较简单,已经在put和get中分别按照生产者和消费者的流程互相关联上了。这里为了加深理解,我们换个角度,从两个锁和两个条件变量的维度进行整理:
- put_mutex: 生产者之间抢put锁,只有当消费队列为空,且有消费者要进行swap的时候,才会同时抢put锁;
- get_mutex: 消费者之间抢get锁,如果消费队列为空,那么抢到get锁的那个消费者会进入swap去换队列;
- put_cond: 等不到存放空间的生产者会等待在put条件变量上,由换完队列的那个消费者叫醒0到多个生产者;
- get_cond: 如果发现消费队列为空,如上所述会有一个消费者进入swap,此时会先看看生产队列有没有东西,如果连生产队列都没有余粮,这个消费者就会等待在get条件变量上,由下一个来的生产者唤醒这一个消费者;
void msgqueue_set_nonblock(msgqueue_t *queue)
{
queue->nonblock = 1;
pthread_mutex_lock(&queue->put_mutex);
// 叫醒一个消费者
pthread_cond_signal(&queue->get_cond);
// 叫醒所有生产者
pthread_cond_broadcast(&queue->put_cond);
pthread_mutex_unlock(&queue->put_mutex);
}
这里会发现,使用signal和broadcast的逻辑和原先put_cont、get_cond的使用是非常统一的。
【消息队列新实现(Workflow|消息队列新实现:Workflow msgqueue代码详解)】我们刚才看过两个地方有判断nonblock,都按照block方式去理解了。那么如果设置了nonblock的话流程是怎样的呢?这个不细说了,留给大家当作课后习题→_→(不是作者懒
PART - 4 总结 其实写了这么多篇,可能大家可以发现,Workflow内部有很多新思路、新算法、新数据结构,虽然并不都像Executor那样的大创新,但许多细微的地方在做法上都可以有小创新,从而实现性能的大幅提升。
这也是我参与这个项目时最大的感慨:传统的、原始的代码,依然值得我们琢磨、优化,努力做到精益求精。
另外的一点工程经验,就是这么多篇文章都在体现的通用性:工程实践上有很多trade off的事情,比如在消息队列里就是吞吐和延迟,如果增加手段提升吞吐,则往往意味着每一个请求的延迟都会增加哪怕一点点的额外开销。
客观来说,msgqueue本身实现还是非常克制、极其精简了,而实际上消息队列有非常非常多的实现。为什么workflow没有用上更复杂更高效的数据结构呢?原因都在通用性上:
- 吞吐与延迟:用更复杂的算法往往有整体收益,但是否所有场景下整体收益能够比复杂代码引入的开销大,还是要实测才能知道,一个通用的框架还是比较适合使用通用的模型;
- 分析场景瓶颈所在:对于Workflow实际网络收发来说,只有消息非常小、QPS非常高、无需任何序列化/反序列化/业务计算逻辑的场景下,瓶颈才会落到msgqueue上;
推荐阅读
- 857. 雇佣 K 名工人的最低成本 : 枚举 + 优先队列(堆)运用题
- 消息队列的流派
- 笔记|15天完成民猫电商毕设——消息模块收尾(12th day)
- 开源|免费开源iPhone推送消息工具+服务端-Bark
- 焦点|再现“抢跑”!大千生态涨停后牵出控制权变更,内幕消息提前泄露?
- redis|springboot使用redisTemplate+websocket实现集群消息的发布订阅
- 面经|面经-美团
- 消息队列|RabbitMQ——发布确认高级
- 最详解消息队列以及RabbbitMQ之HelloWorld
- SpringCloud|Spring Cloud Stream函数式编程整合消息中间件