盛年不重来,一日难再晨,及时当勉励,岁月不待人。这篇文章主要讲述Java&Go高性能队列之Disruptor性能测试#yyds干货盘点#相关的知识,希望能为你提供帮助。
之前写过Java&
Go高性能队列之LinkedBlockingQueue性能测试之后,就一直准备这这篇文章,作为准备内容的过程中也写过一些Disruptor
高性能消息队列的应用文章:高性能队列Disruptor在测试中应用和千万级日志回放引擎设计稿。
Disruptor
以高性能出名,下面我来测试一下三种场景下性能表现。
有一些基本的设定和用词规范,大家可以翻看Java&
Go高性能队列之LinkedBlockingQueue性能测试。
结论总体来说,com.lmax.disruptor.dsl.Disruptor
消费性能是非常厉害的,几乎是测不到顶。但是在生产方面,性能会随着Event的增加会下降很多
还是在50万QPS级别上,满足现在压测需求,唯一需要避免的就是队列较长时性能不稳定。总结起来几点比较通用的参考:
- 从
Disruptor
消费者能力超强,即使在超高消费者数量(1000),依然保持非常高性能 - 保证无消息积压前提下,
com.lmax.disruptor.AbstractSequencer#bufferSize
大小对性能影响不大 - 在单生产者场景下,
Disruptor
生产速率与java.util.concurrent.LinkedBlockingQueue
一样具有性能不稳定的问题 Disruptor
性能瓶颈在于生产者,消息对象大小对性能影响较大,多生产者对总体性能影响不大,队列积压对性能影响也不大- 如果降低Event体积会极大提升性能,以后尽量使用
java.lang.String
,这点已经在日志回放系统印证了
测试结果这里性能只记录每毫秒处理消息(对象)个数作为评价性能的唯一标准。这里我采用的是
com.lmax.disruptor.dsl.ProducerType#MULTI
消费模式,注册消费者用的是com.lmax.disruptor.dsl.Disruptor#handleEventsWithWorkerPool
方法。数据说明这里我用了三种
org.apache.http.client.methods.HttpGet
,创建方法均使用原生API,为了区分大小的区别,我会响应增加一些header和URL长度。小对象:
def get = new HttpGet()
中对象:
def get = new HttpGet(url)
get.addHeader("token", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)
大对象:
def get = new HttpGet(url + token)
get.addHeader("token", token)
get.addHeader("token1", token)
get.addHeader("token5", token)
get.addHeader("token4", token)
get.addHeader("token3", token)
get.addHeader("token2", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)
生产者
对象大小 | 队列长度 (百万) | 线程数 | 速率(/ms) |
---|---|---|---|
小 | 1 | 1 | 890 |
小 | 1 | 5 | 1041 |
小 | 1 | 10 | 1100 |
小 | 0.5 | 1 | 755 |
小 | 0.5 | 5 | 597 |
小 | 0.5 | 10 | 612 |
小 | 0.5 | 10 | 580 |
中 | 1 | 1 | 360 |
中 | 1 | 5 | 394 |
中 | 1 | 10 | 419 |
中 | 1 | 20 | 401 |
中 | 0.5 | 1 | 256 |
中 | 0.5 | 5 | 426 |
大 | 1 | 1 | 201 |
大 | 1 | 5 | 243 |
大 | 1 | 10 | 242 |
大 | 0.5 | 1 | 194 |
大 | 0.5 | 5 | 215 |
大 | 0.5 | 10 | 195 |
com.lmax.disruptor.AbstractSequencer#bufferSize
会导致com.lmax.disruptor.dsl.Disruptor
耗时非常长,自测1024 * 1024 再高就感觉很吃力了,所以没测试超过1百万的消息队列长度。由于并没有设定com.lmax.disruptor.AbstractSequencer#bufferSize
的测试场景,所以本次测试总是用这个设置。测试结果规律倒是挺明显的:
- 消息总量越大,QPS越大
- 生产者线程数对QPS影响不大
- 消息体尽可能小
Disruptor
框架来讲,单独的消费者用例比较难构建,我用了一个取巧的办法,会对性能测试结果有一些影响,这里可以通过后来分享测试用例的时候会详细说说。不过对于Disruptor
逆天的消费能力,这点误差可以忽略。对象大小 | 队列长度 (百万) | 线程数 | 速率(/ms) |
---|---|---|---|
小 | 1 | 1 | 10526 |
小 | 1 | 5 | 6060 |
小 | 1 | 10 | 5376 |
小 | 1 | 20 | 4672 |
小 | 1 | 20 | 4219 |
中 | 1 | 1 | 12345 |
中 | 1 | 5 | 8130 |
中 | 1 | 10 | 5586 |
大 | 1 | 1 | 16129 |
大 | 1 | 5 | 5681 |
大 | 1 | 10 | 5649 |
大 | 0.5 | 1 | 8474 |
大 | 0.5 | 5 | 4761 |
大 | 0.5 | 10 | 3846 |
java.util.concurrent.LinkedBlockingQueue
一致。- 数据上看长度越长越好
- 消费者线程越少越好
- 消息体尽可能小
Disruptor
消费能力,我测试了一个1百万大对象消息,1000线程的消费者用例,QPS=3412/ms,这个跟我后面基于Disruptor
设计的新性能测试模型有关系,表明消费者线程数即使增加到1000,Disruptor
依然保持了非常高的性能。生产者 & 消费者这里的线程数指的是生产者或者消费者的数量,总体线程数是此数值的2倍。
对象大小 | 次数 (百万) | 线程数 | 队列长度 (百万) | 速率(/ms) |
---|---|---|---|---|
小 | 1 | 1 | 0.1 | 16949 |
小 | 1 | 1 | 0.2 | 8403 |
小 | 1 | 1 | 0.5 | 5555 |
小 | 1 | 5 | 0.1 | 5181 |
小 | 1 | 10 | 0.1 | 1295 |
中 | 1 | 1 | 0.1 | 21276 |
中 | 1 | 1 | 0.2 | 16949 |
中 | 1 | 5 | 0.2 | 15625 |
中 | 1 | 10 | 0.2 | 574 |
中 | 2 | 1 | 0.2 | 34920 |
中 | 2 | 5 | 0.2 | 24752 |
中 | 2 | 10 | 0.2 | 789 |
大 | 1 | 1 | 0.1 | 44000 |
大 | 1 | 1 | 0.2 | 25000 |
大 | 1 | 5 | 0.2 | 11764 |
大 | 1 | 10 | 0.2 | 278 |
- 消息队列积累消息越少,速率越快
- 消费速率随时间推移越来越快
- 消息体尽可能小
Disruptor
消费太快了,是生产者的数倍之多。最后测试出来的结果其实就是生产者的速率。当线程数比较少的时候,Disruptor
总是有消息堆积的,所以生产者速率不会成为瓶颈,这个也跟用例设计有关系。基准测试请翻阅上期的测试文章内容Java& Go高性能队列之LinkedBlockingQueue性能测试。
测试用例测试用例使用Groovy语言编写,自从我自定义了异步关键字
fun
和复习了闭包的语法之后,感觉就像开了光一样,有点迷上了各类多线程的语法实现。本期我又额外使用了自定义统计时间的关键字time
以及利用闭包实现自定义等待方法,其他内容均与上期文章相同。Disruptor
有个先天的优势,就是必需得设置ringBufferSize
,相当于提前分配内存了。这点是我之前没想到的,当我回去复测LinkedBlockingQueue
的时候发现并没有明显的性能差异,对于测试结果影响可忽略。【Java&Go高性能队列之Disruptor性能测试#yyds干货盘点#】PS:这里用到了一些
sleep()
,会导致一些误差,这个以我能力暂无法避免,经过测试对结论影响不大,对数据影响有限。生产者
import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.frame.execute.ThreadPoolUtil
import com.funtester.utils.Time
import com.lmax.disruptor.EventHandler
import com.lmax.disruptor.RingBuffer
import com.lmax.disruptor.WorkHandler
import com.lmax.disruptor.YieldingWaitStrategy
import com.lmax.disruptor.dsl.Disruptor
import com.lmax.disruptor.dsl.ProducerType
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBaseimport java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicIntegerclass DisProduce extends SourceCode static AtomicInteger index = new AtomicInteger(1)static int total = 50_0000static int size = 10static int threadNum = 10static int piece = total / sizestatic def url = "http://localhost:12345/funtester"static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"public static void main(String[] args)
Disruptor<
FunEvent>
disruptor = new Disruptor<
FunEvent>
(
FunEvent::new,
1024 * 1024,
ThreadPoolUtil.getFactory(),
ProducerType.MULTI,
new YieldingWaitStrategy()
);
disruptor.start();
RingBuffer<
FunEvent>
ringBuffer = disruptor.getRingBuffer();
def latch = new CountDownLatch(threadNum)
def ss = Time.getTimeStamp()
def funtester =
fun
(total / threadNum).times
if (index.getAndIncrement() % piece == 0)
def l = Time.getTimeStamp() - ss
output("$formatLong(index.get())添加总消耗$formatLong(l)")
ss = Time.getTimeStamp()//def get = new HttpGet()//def get = new HttpGet(url)
//get.addHeader("token", token)
//get.addHeader(HttpClientConstant.USER_AGENT)
//get.addHeader(HttpClientConstant.CONNECTION)def get = new HttpGet(url + token)
get.addHeader("token", token)
get.addHeader("token1", token)
get.addHeader("token5", token)
get.addHeader("token4", token)
get.addHeader("token3", token)
get.addHeader("token2", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)
ringBuffer.publishEvent((event, sequence) ->
event.setRequest(get))latch.countDown()//fun
//while (true)
//sleep(1.0)
//output(disruptor.getRingBuffer().getBufferSize())
//
//
def start = Time.getTimeStamp()
threadNum.times funtester()
latch.await()
def end = Time.getTimeStamp()
outRGB("每毫秒速率$total / (end - start)")disruptor.shutdown();
/**
* 消费者
*/
private static class FunEventHandler implements EventHandler<
FunEvent>
, WorkHandler<
FunEvent>
public void onEvent(FunEvent event, long sequence, boolean endOfBatch) public void onEvent(FunEvent event) private static class FunEvent HttpRequestBase requestHttpRequestBase getRequest()
return requestvoid setRequest(HttpRequestBase request)
this.request = request
;
消费者
import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.frame.event.EventThread
import com.funtester.frame.execute.ThreadPoolUtil
import com.funtester.utils.Time
import com.lmax.disruptor.EventHandler
import com.lmax.disruptor.RingBuffer
import com.lmax.disruptor.WorkHandler
import com.lmax.disruptor.YieldingWaitStrategy
import com.lmax.disruptor.dsl.Disruptor
import com.lmax.disruptor.dsl.ProducerType
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBaseimport java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectorsclass DisConsumer extends SourceCode static AtomicInteger index = new AtomicInteger(1)static int total = 50_0000static int threadNum = 10static def url = "http://localhost:12345/funtester"static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"static def key = truepublic static void main(String[] args) Disruptor<
FunEvent>
disruptor = new Disruptor<
FunEvent>
(
FunEvent::new,
1024 * 1024,
ThreadPoolUtil.getFactory(),
ProducerType.MULTI,
new YieldingWaitStrategy()
);
def funs = range(threadNum).mapToObj(f ->
new FunEventHandler()).collect(Collectors.toList())
disruptor.handleEventsWithWorkerPool(funs as FunEventHandler[])
disruptor.start();
RingBuffer<
FunEvent>
ringBuffer = disruptor.getRingBuffer();
def ss = Time.getTimeStamp()
time
total.times
//def get = new HttpGet()//def get = new HttpGet(url)
//get.addHeader("token", token)
//get.addHeader(HttpClientConstant.USER_AGENT)
//get.addHeader(HttpClientConstant.CONNECTION)def get = new HttpGet(url + token)
get.addHeader("token", token)
get.addHeader("token1", token)
get.addHeader("token5", token)
get.addHeader("token4", token)
get.addHeader("token3", token)
get.addHeader("token2", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)ringBuffer.publishEvent((event, sequence) ->
event.setRequest(get));
output("数据$total 构建完成!")
def start = Time.getTimeStamp()
key = false
waitFor !disruptor.hasBacklog() , 0.01
def end = Time.getTimeStamp()
output(end - start)
outRGB("每毫秒速率$total / (end - start)")disruptor.shutdown();
/**
* 消费者
*/
private static class FunEventHandler implements EventHandler<
FunEvent>
, WorkHandler<
FunEvent>
public void onEvent(FunEvent event, long sequence, boolean endOfBatch)
if (key) sleep(0.05)public void onEvent(FunEvent event)
if (key) sleep(0.05)private static class FunEvent HttpRequestBase requestHttpRequestBase getRequest()
return requestvoid setRequest(HttpRequestBase request)
this.request = request
;
生产者 & 消费者
import com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.frame.execute.ThreadPoolUtil
import com.funtester.utils.Time
import com.lmax.disruptor.EventHandler
import com.lmax.disruptor.RingBuffer
import com.lmax.disruptor.WorkHandler
import com.lmax.disruptor.YieldingWaitStrategy
import com.lmax.disruptor.dsl.Disruptor
import com.lmax.disruptor.dsl.ProducerType
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBaseimport java.util.concurrent.atomic.AtomicInteger
import java.util.stream.Collectorsclass DisBoth extends SourceCode static AtomicInteger index = new AtomicInteger(1)static int total = 100_0000static int threadNum = 5static int buffer = 20_0000static def url = "http://localhost:12345/funtester"static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"static def key = truepublic static void main(String[] args) Disruptor<
FunEvent>
disruptor = new Disruptor<
FunEvent>
(
FunEvent::new,
1024 * 256,
ThreadPoolUtil.getFactory(),
ProducerType.MULTI,
new YieldingWaitStrategy()
);
def funs = range(threadNum).mapToObj(f ->
new FunEventHandler()).collect(Collectors.toList())
disruptor.handleEventsWithWorkerPool(funs as FunEventHandler[])
disruptor.start();
RingBuffer<
FunEvent>
ringBuffer = disruptor.getRingBuffer();
def produces =
fun
while (true)
if (index.getAndIncrement() >
total) break
//def get = new HttpGet()//def get = new HttpGet(url)
//get.addHeader("token", token)
//get.addHeader(HttpClientConstant.USER_AGENT)
//get.addHeader(HttpClientConstant.CONNECTION)def get = new HttpGet(url + token)
get.addHeader("token", token)
get.addHeader("token1", token)
get.addHeader("token5", token)
get.addHeader("token4", token)
get.addHeader("token3", token)
get.addHeader("token2", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)
ringBuffer.publishEvent((event, sequence) ->
event.setRequest(get));
time
buffer.times
//def get = new HttpGet()//def get = new HttpGet(url)
//get.addHeader("token", token)
//get.addHeader(HttpClientConstant.USER_AGENT)
//get.addHeader(HttpClientConstant.CONNECTION)def get = new HttpGet(url + token)
get.addHeader("token", token)
get.addHeader("token1", token)
get.addHeader("token5", token)
get.addHeader("token4", token)
get.addHeader("token3", token)
get.addHeader("token2", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)
ringBuffer.publishEvent((event, sequence) ->
event.setRequest(get));
output("数据$buffer 构建完成!")
def start = Time.getTimeStamp()
key = false
threadNum.times produces()
waitFor !disruptor.hasBacklog() , 0.01
def end = Time.getTimeStamp()
def l = end - start
output(l)
outRGB("每毫秒速率$(total + buffer) / l")disruptor.shutdown();
/**
* 消费者
*/
private static class FunEventHandler implements EventHandler<
FunEvent>
, WorkHandler<
FunEvent>
public void onEvent(FunEvent event, long sequence, boolean endOfBatch)
if (key) sleep(0.05)public void onEvent(FunEvent event)
if (key) sleep(0.05)private static class FunEvent HttpRequestBase requestHttpRequestBase getRequest()
return requestvoid setRequest(HttpRequestBase request)
this.request = request
;
Have Fun ~ Tester !
- FunTester2021年总结
- FunTester原创大赏
- Groovy语言学习笔记大赏【FunTester】
- Golang语言HTTP客户端实践
- 反射访问和修改private变量
- 如何突破职业瓶颈
- 如何选择API测试工具
- 如何从测试自动化中实现价值
- Groovy中的list
- 重放浏览器请求多链路性能测试实践
- 性能测试如何减少本机误差
- 莫要寻找可能不存在的答案
- 分段随机实践—模拟线上流量
- 千万级日志回放引擎设计稿
推荐阅读
- JavaScript 数据结构之 Set
- CPU中的MESI缓存最终一致性---CPU为什么需要缓存
- 北亚数据恢复Hp DL380服务器raid磁盘故障导致数据库所在盘无法识别的数据库数据恢复案例
- #yyds干货盘点#让SpringBoot不需要ControllerServiceDAOMapper, 这款工具绝了!
- MySQL高级(进阶)SQL语句
- Python While循环语句用法示例介绍
- Python使用.kv文件在Kivy中使用按钮
- Python使用Pandas和XlsxWriter |S–1
- Python使用Pandas和XlsxWriter |S–2