Java&Go高性能队列之Disruptor性能测试#yyds干货盘点#

盛年不重来,一日难再晨,及时当勉励,岁月不待人。这篇文章主要讲述Java&Go高性能队列之Disruptor性能测试#yyds干货盘点#相关的知识,希望能为你提供帮助。
之前写过Java& Go高性能队列之LinkedBlockingQueue性能测试之后,就一直准备这这篇文章,作为准备内容的过程中也写过一些Disruptor高性能消息队列的应用文章:高性能队列Disruptor在测试中应用和千万级日志回放引擎设计稿。
Disruptor以高性能出名,下面我来测试一下三种场景下性能表现。
有一些基本的设定和用词规范,大家可以翻看Java& Go高性能队列之LinkedBlockingQueue性能测试。
结论总体来说,com.lmax.disruptor.dsl.Disruptor消费性能是非常厉害的,几乎是测不到顶。但是在生产方面,性能会随着Event的增加会下降很多
还是在50万QPS级别上,满足现在压测需求,唯一需要避免的就是队列较长时性能不稳定。总结起来几点比较通用的参考:

  • Disruptor消费者能力超强,即使在超高消费者数量(1000),依然保持非常高性能
  • 保证无消息积压前提下,com.lmax.disruptor.AbstractSequencer#bufferSize大小对性能影响不大
  • 在单生产者场景下,Disruptor生产速率与java.util.concurrent.LinkedBlockingQueue一样具有性能不稳定的问题
  • Disruptor性能瓶颈在于生产者,消息对象大小对性能影响较大,多生产者对总体性能影响不大,队列积压对性能影响也不大
  • 如果降低Event体积会极大提升性能,以后尽量使用java.lang.String,这点已经在日志回放系统印证了
简介这里再多唠叨两句。
测试结果这里性能只记录每毫秒处理消息(对象)个数作为评价性能的唯一标准。这里我采用的是com.lmax.disruptor.dsl.ProducerType#MULTI消费模式,注册消费者用的是com.lmax.disruptor.dsl.Disruptor#handleEventsWithWorkerPool方法。
数据说明这里我用了三种org.apache.http.client.methods.HttpGet,创建方法均使用原生API,为了区分大小的区别,我会响应增加一些header和URL长度。
小对象:
def get = new HttpGet()

中对象:
def get = new HttpGet(url) get.addHeader("token", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION)

大对象:
def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION)

生产者
对象大小 队列长度 (百万) 线程数 速率(/ms)
1 1 890
1 5 1041
1 10 1100
0.5 1 755
0.5 5 597
0.5 10 612
0.5 10 580
1 1 360
1 5 394
1 10 419
1 20 401
0.5 1 256
0.5 5 426
1 1 201
1 5 243
1 10 242
0.5 1 194
0.5 5 215
0.5 10 195
测试过程中超大com.lmax.disruptor.AbstractSequencer#bufferSize会导致com.lmax.disruptor.dsl.Disruptor耗时非常长,自测1024 * 1024 再高就感觉很吃力了,所以没测试超过1百万的消息队列长度。由于并没有设定com.lmax.disruptor.AbstractSequencer#bufferSize的测试场景,所以本次测试总是用这个设置。
测试结果规律倒是挺明显的:
  1. 消息总量越大,QPS越大
  2. 生产者线程数对QPS影响不大
  3. 消息体尽可能小
消费者对于Disruptor框架来讲,单独的消费者用例比较难构建,我用了一个取巧的办法,会对性能测试结果有一些影响,这里可以通过后来分享测试用例的时候会详细说说。不过对于Disruptor逆天的消费能力,这点误差可以忽略。
对象大小 队列长度 (百万) 线程数 速率(/ms)
1 1 10526
1 5 6060
1 10 5376
1 20 4672
1 20 4219
1 1 12345
1 5 8130
1 10 5586
1 1 16129
1 5 5681
1 10 5649
0.5 1 8474
0.5 5 4761
0.5 10 3846
测试结论也挺明显的,基本与java.util.concurrent.LinkedBlockingQueue一致。
  1. 数据上看长度越长越好
  2. 消费者线程越少越好
  3. 消息体尽可能小
PS:关于Disruptor消费能力,我测试了一个1百万大对象消息,1000线程的消费者用例,QPS=3412/ms,这个跟我后面基于Disruptor设计的新性能测试模型有关系,表明消费者线程数即使增加到1000,Disruptor依然保持了非常高的性能。
生产者 & 消费者这里的线程数指的是生产者或者消费者的数量,总体线程数是此数值的2倍。
对象大小 次数 (百万) 线程数 队列长度 (百万) 速率(/ms)
1 1 0.1 16949
1 1 0.2 8403
1 1 0.5 5555
1 5 0.1 5181
1 10 0.1 1295
1 1 0.1 21276
1 1 0.2 16949
1 5 0.2 15625
1 10 0.2 574
2 1 0.2 34920
2 5 0.2 24752
2 10 0.2 789
1 1 0.1 44000
1 1 0.2 25000
1 5 0.2 11764
1 10 0.2 278
次轮整个测试过程都是几乎崩溃的,因为同样的用例执行起来误差太大了,最大的能有接近2倍的差距。以下结论仅供参考:
  1. 消息队列积累消息越少,速率越快
  2. 消费速率随时间推移越来越快
  3. 消息体尽可能小
其中当线程数超过10的时候,出现了非常明显的性能下滑,这个可以通过上面两组数据得出原因,Disruptor消费太快了,是生产者的数倍之多。最后测试出来的结果其实就是生产者的速率。当线程数比较少的时候,Disruptor总是有消息堆积的,所以生产者速率不会成为瓶颈,这个也跟用例设计有关系。
基准测试请翻阅上期的测试文章内容Java& Go高性能队列之LinkedBlockingQueue性能测试。
测试用例测试用例使用Groovy语言编写,自从我自定义了异步关键字fun和复习了闭包的语法之后,感觉就像开了光一样,有点迷上了各类多线程的语法实现。本期我又额外使用了自定义统计时间的关键字time以及利用闭包实现自定义等待方法,其他内容均与上期文章相同。
Disruptor有个先天的优势,就是必需得设置ringBufferSize,相当于提前分配内存了。这点是我之前没想到的,当我回去复测LinkedBlockingQueue的时候发现并没有明显的性能差异,对于测试结果影响可忽略。
【Java&Go高性能队列之Disruptor性能测试#yyds干货盘点#】PS:这里用到了一些sleep(),会导致一些误差,这个以我能力暂无法避免,经过测试对结论影响不大,对数据影响有限。
生产者
import com.funtester.config.HttpClientConstant import com.funtester.frame.SourceCode import com.funtester.frame.execute.ThreadPoolUtil import com.funtester.utils.Time import com.lmax.disruptor.EventHandler import com.lmax.disruptor.RingBuffer import com.lmax.disruptor.WorkHandler import com.lmax.disruptor.YieldingWaitStrategy import com.lmax.disruptor.dsl.Disruptor import com.lmax.disruptor.dsl.ProducerType import org.apache.http.client.methods.HttpGet import org.apache.http.client.methods.HttpRequestBaseimport java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicIntegerclass DisProduce extends SourceCode static AtomicInteger index = new AtomicInteger(1)static int total = 50_0000static int size = 10static int threadNum = 10static int piece = total / sizestatic def url = "http://localhost:12345/funtester"static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"public static void main(String[] args) Disruptor< FunEvent> disruptor = new Disruptor< FunEvent> ( FunEvent::new, 1024 * 1024, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); disruptor.start(); RingBuffer< FunEvent> ringBuffer = disruptor.getRingBuffer(); def latch = new CountDownLatch(threadNum) def ss = Time.getTimeStamp() def funtester = fun (total / threadNum).times if (index.getAndIncrement() % piece == 0) def l = Time.getTimeStamp() - ss output("$formatLong(index.get())添加总消耗$formatLong(l)") ss = Time.getTimeStamp()//def get = new HttpGet()//def get = new HttpGet(url) //get.addHeader("token", token) //get.addHeader(HttpClientConstant.USER_AGENT) //get.addHeader(HttpClientConstant.CONNECTION)def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) ringBuffer.publishEvent((event, sequence) -> event.setRequest(get))latch.countDown()//fun //while (true) //sleep(1.0) //output(disruptor.getRingBuffer().getBufferSize()) // // def start = Time.getTimeStamp() threadNum.times funtester() latch.await() def end = Time.getTimeStamp() outRGB("每毫秒速率$total / (end - start)")disruptor.shutdown(); /** * 消费者 */ private static class FunEventHandler implements EventHandler< FunEvent> , WorkHandler< FunEvent> public void onEvent(FunEvent event, long sequence, boolean endOfBatch) public void onEvent(FunEvent event) private static class FunEvent HttpRequestBase requestHttpRequestBase getRequest() return requestvoid setRequest(HttpRequestBase request) this.request = request ;

消费者
import com.funtester.config.HttpClientConstant import com.funtester.frame.SourceCode import com.funtester.frame.event.EventThread import com.funtester.frame.execute.ThreadPoolUtil import com.funtester.utils.Time import com.lmax.disruptor.EventHandler import com.lmax.disruptor.RingBuffer import com.lmax.disruptor.WorkHandler import com.lmax.disruptor.YieldingWaitStrategy import com.lmax.disruptor.dsl.Disruptor import com.lmax.disruptor.dsl.ProducerType import org.apache.http.client.methods.HttpGet import org.apache.http.client.methods.HttpRequestBaseimport java.util.concurrent.atomic.AtomicInteger import java.util.stream.Collectorsclass DisConsumer extends SourceCode static AtomicInteger index = new AtomicInteger(1)static int total = 50_0000static int threadNum = 10static def url = "http://localhost:12345/funtester"static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"static def key = truepublic static void main(String[] args) Disruptor< FunEvent> disruptor = new Disruptor< FunEvent> ( FunEvent::new, 1024 * 1024, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); def funs = range(threadNum).mapToObj(f -> new FunEventHandler()).collect(Collectors.toList()) disruptor.handleEventsWithWorkerPool(funs as FunEventHandler[]) disruptor.start(); RingBuffer< FunEvent> ringBuffer = disruptor.getRingBuffer(); def ss = Time.getTimeStamp() time total.times //def get = new HttpGet()//def get = new HttpGet(url) //get.addHeader("token", token) //get.addHeader(HttpClientConstant.USER_AGENT) //get.addHeader(HttpClientConstant.CONNECTION)def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION)ringBuffer.publishEvent((event, sequence) -> event.setRequest(get)); output("数据$total 构建完成!") def start = Time.getTimeStamp() key = false waitFor !disruptor.hasBacklog() , 0.01 def end = Time.getTimeStamp() output(end - start) outRGB("每毫秒速率$total / (end - start)")disruptor.shutdown(); /** * 消费者 */ private static class FunEventHandler implements EventHandler< FunEvent> , WorkHandler< FunEvent> public void onEvent(FunEvent event, long sequence, boolean endOfBatch) if (key) sleep(0.05)public void onEvent(FunEvent event) if (key) sleep(0.05)private static class FunEvent HttpRequestBase requestHttpRequestBase getRequest() return requestvoid setRequest(HttpRequestBase request) this.request = request ;

生产者 & 消费者
import com.funtester.config.HttpClientConstant import com.funtester.frame.SourceCode import com.funtester.frame.execute.ThreadPoolUtil import com.funtester.utils.Time import com.lmax.disruptor.EventHandler import com.lmax.disruptor.RingBuffer import com.lmax.disruptor.WorkHandler import com.lmax.disruptor.YieldingWaitStrategy import com.lmax.disruptor.dsl.Disruptor import com.lmax.disruptor.dsl.ProducerType import org.apache.http.client.methods.HttpGet import org.apache.http.client.methods.HttpRequestBaseimport java.util.concurrent.atomic.AtomicInteger import java.util.stream.Collectorsclass DisBoth extends SourceCode static AtomicInteger index = new AtomicInteger(1)static int total = 100_0000static int threadNum = 5static int buffer = 20_0000static def url = "http://localhost:12345/funtester"static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"static def key = truepublic static void main(String[] args) Disruptor< FunEvent> disruptor = new Disruptor< FunEvent> ( FunEvent::new, 1024 * 256, ThreadPoolUtil.getFactory(), ProducerType.MULTI, new YieldingWaitStrategy() ); def funs = range(threadNum).mapToObj(f -> new FunEventHandler()).collect(Collectors.toList()) disruptor.handleEventsWithWorkerPool(funs as FunEventHandler[]) disruptor.start(); RingBuffer< FunEvent> ringBuffer = disruptor.getRingBuffer(); def produces = fun while (true) if (index.getAndIncrement() > total) break //def get = new HttpGet()//def get = new HttpGet(url) //get.addHeader("token", token) //get.addHeader(HttpClientConstant.USER_AGENT) //get.addHeader(HttpClientConstant.CONNECTION)def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) ringBuffer.publishEvent((event, sequence) -> event.setRequest(get)); time buffer.times //def get = new HttpGet()//def get = new HttpGet(url) //get.addHeader("token", token) //get.addHeader(HttpClientConstant.USER_AGENT) //get.addHeader(HttpClientConstant.CONNECTION)def get = new HttpGet(url + token) get.addHeader("token", token) get.addHeader("token1", token) get.addHeader("token5", token) get.addHeader("token4", token) get.addHeader("token3", token) get.addHeader("token2", token) get.addHeader(HttpClientConstant.USER_AGENT) get.addHeader(HttpClientConstant.CONNECTION) ringBuffer.publishEvent((event, sequence) -> event.setRequest(get)); output("数据$buffer 构建完成!") def start = Time.getTimeStamp() key = false threadNum.times produces() waitFor !disruptor.hasBacklog() , 0.01 def end = Time.getTimeStamp() def l = end - start output(l) outRGB("每毫秒速率$(total + buffer) / l")disruptor.shutdown(); /** * 消费者 */ private static class FunEventHandler implements EventHandler< FunEvent> , WorkHandler< FunEvent> public void onEvent(FunEvent event, long sequence, boolean endOfBatch) if (key) sleep(0.05)public void onEvent(FunEvent event) if (key) sleep(0.05)private static class FunEvent HttpRequestBase requestHttpRequestBase getRequest() return requestvoid setRequest(HttpRequestBase request) this.request = request ;

Have Fun ~ Tester !
  • FunTester2021年总结
  • FunTester原创大赏
  • Groovy语言学习笔记大赏【FunTester】
  • Golang语言HTTP客户端实践
  • 反射访问和修改private变量
  • 如何突破职业瓶颈
  • 如何选择API测试工具
  • 如何从测试自动化中实现价值
  • Groovy中的list
  • 重放浏览器请求多链路性能测试实践
  • 性能测试如何减少本机误差
  • 莫要寻找可能不存在的答案
  • 分段随机实践—模拟线上流量
  • 千万级日志回放引擎设计稿

    推荐阅读