欠伸展肢体,吟咏心自愉。这篇文章主要讲述Java&Go高性能队列之LinkedBlockingQueue性能测试#yyds干货盘点#相关的知识,希望能为你提供帮助。
在写完高性能队列Disruptor在测试中应用和千万级日志回放引擎设计稿之后,我就一直在准备java &
Go 语言几种高性能消息队列的性能测试,其中选取了几种基准测试场景以及在性能测试中的应用场景。
测试场景设计的思路参考的两个方面:
- 消息体大小,我用的不同大小GET请求区分
- 生产者和消费者线程数,Go语言中称协程goroutine
结论总体来说,
java.util.concurrent.LinkedBlockingQueue
性能还是在50万QPS级别上,满足现在压测需求,唯一需要避免的就是队列较长时性能不稳定。总结起来三点比较通用的参考:- 消息体尽可能小
- 线程数增益有限
- 尽量避免消息积压
java.util.concurrent.LinkedBlockingQueue
,分解名字可以得到这是个由链表实现的阻塞单向的对象。官方给的定义是:在我查到的几种JDK自带的队列实现类中,
java.util.concurrent.LinkedBlockingQueue
性能是最高的,还有一个候选的类java.util.concurrent.ArrayBlockingQueue
,资料说java.util.concurrent.LinkedBlockingQueue
性能大概是java.util.concurrent.ArrayBlockingQueue
性能的2 ~ 3倍,差距过于明显,这个有机会再来测试。测试结果这里性能只记录每毫秒处理消息(对象)个数作为评价性能的唯一标准。
数据说明这里我用了三种
org.apache.http.client.methods.HttpGet
,创建方法均使用原生API,为了区分大小的区别,我会响应增加一些header和URL长度。小对象:
def get = new HttpGet()
中对象:
def get = new HttpGet(url)
get.addHeader("token", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)
大对象:
def get = new HttpGet(url + token)
get.addHeader("token", token)
get.addHeader("token1", token)
get.addHeader("token5", token)
get.addHeader("token4", token)
get.addHeader("token3", token)
get.addHeader("token2", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)
生产者
对象大小 | 队列长度 (百万) | 线程数 | 速率(/ms) |
---|---|---|---|
小 | 1 | 1 | 838 |
小 | 1 | 5 | 837 |
小 | 1 | 10 | 823 |
小 | 5 | 1 | 483 |
小 | 10 | 1 | 450 |
中 | 1 | 1 | 301 |
中 | 1 | 5 | 322 |
中 | 1 | 10 | 320 |
中 | 1 | 20 | 271 |
中 | 5 | 1 | 失败 |
中 | 10 | 1 | 失败 |
中 | 0.5 | 1 | 351 |
中 | 0.5 | 5 | 375 |
大 | 1 | 1 | 214 |
大 | 1 | 5 | 240 |
大 | 1 | 10 | 241 |
大 | 0.5 | 1 | 209 |
大 | 0.5 | 5 | 250 |
大 | 0.5 | 10 | 246 |
大 | 0.2 | 1 | 217 |
大 | 0.2 | 5 | 309 |
大 | 0.2 | 10 | 321 |
大 | 0.2 | 20 | 243 |
针对
org.apache.http.client.methods.HttpRequestBase
消息体结论如下:- 长度保持在十万量级
- 生产者线程数5-10线程
- 消息体尽可能小
对象大小 | 队列长度 (百万) | 线程数 | 速率(/ms) |
---|---|---|---|
小 | 1 | 1 | 1893 |
小 | 1 | 5 | 1706 |
小 | 1 | 10 | 1594 |
小 | 1 | 20 | 1672 |
小 | 2 | 1 | 2544 |
小 | 2 | 5 | 2024 |
小 | 5 | 1 | 3419 |
中 | 1 | 1 | 1897 |
中 | 1 | 5 | 1485 |
中 | 1 | 10 | 1345 |
中 | 1 | 20 | 1430 |
中 | 2 | 1 | 2971 |
中 | 2 | 5 | 1576 |
大 | 1 | 1 | 1980 |
大 | 1 | 5 | 1623 |
大 | 1 | 10 | 1689 |
大 | 0.5 | 1 | 1136 |
大 | 0.5 | 5 | 1096 |
大 | 0.5 | 10 | 1072 |
org.apache.http.client.methods.HttpRequestBase
消息体结论如下:- 数据上看长度越长越好
- 消费者线程越少越好
- 消息体尽可能小
生产者 & 消费者这里的线程数指的是生产者或者消费者的数量,总体线程数是此数值的2倍。
对象大小 | 次数 (百万) | 线程数 | 队列长度 (百万) | 速率(/ms) |
---|---|---|---|---|
小 | 1 | 1 | 0.1 | 1326 |
小 | 1 | 1 | 0.2 | 1050 |
小 | 1 | 1 | 0.5 | 1054 |
小 | 1 | 5 | 0.1 | 1091 |
小 | 1 | 10 | 0.1 | 1128 |
小 | 2 | 1 | 0.1 | 1798 |
小 | 2 | 1 | 0.2 | 1122 |
小 | 2 | 5 | 0.2 | 946 |
小 | 5 | 5 | 0.1 | 1079 |
小 | 5 | 10 | 0.1 | 1179 |
中 | 1 | 1 | 0.1 | 632 |
中 | 1 | 1 | 0.2 | 664 |
中 | 1 | 5 | 0.2 | 718 |
中 | 1 | 10 | 0.2 | 683 |
中 | 2 | 1 | 0.2 | 675 |
中 | 2 | 5 | 0.2 | 735 |
中 | 2 | 10 | 0.2 | 788 |
中 | 2 | 15 | 0.2 | 828 |
大 | 1 | 1 | 0.1 | 505 |
大 | 1 | 1 | 0.2 | 558 |
大 | 1 | 5 | 0.2 | 609 |
大 | 1 | 10 | 0.2 | 496 |
大 | 2 | 1 | 0.2 | 523 |
大 | 2 | 5 | 0.2 | 759 |
大 | 2 | 10 | 0.2 | 668 |
org.apache.http.client.methods.HttpRequestBase
消息体结论如下:- 消息队列积累消息越少,速率越快
- 消费速率随时间推移越来越快,不明显
- 消息体尽可能小
fun
和复习了闭包的语法之后,感觉就像开了光一样,有点迷上了各类多线程的语法实现。所以这个用例对于Java同学来讲可能有点看着熟悉,仔细阅读起来有点费劲,我会尽量写一些注释。大家可以把终点放在测试结果上,这可以对以后大家使用java.util.concurrent.LinkedBlockingQueue
类有个基本的参考。测试用例会根据上述的测试场景进行微调,例如线程数、消息体对象的大小等等,这个我会着重进行三种用例场景的测试。当然在工作中使用场景肯定比我提到的三种复杂多,各位有兴趣可以自己亲自上手测试,这里我就不班门弄斧了。
生产者场景
package com.funtest.groovytestimport com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.utils.CountUtil
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBaseimport java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.atomic.AtomicIntegerclass QueueT extends SourceCode static AtomicInteger index = new AtomicInteger(0)static int total = 100_0000static int size = 10static int threadNum = 1static int piece = total / sizestatic def url = "http://localhost:12345/funtester"static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"public static void main(String[] args) LinkedBlockingQueue<
HttpRequestBase>
linkedQ = new LinkedBlockingQueue<
>
()def start = Time.getTimeStamp()
def latch = new CountDownLatch(threadNum)
def ts = []
def barrier = new CyclicBarrier(threadNum + 1)
def funtester = //创建异步闭包的方法
fun
barrier.await()
while (true)
if (index.getAndIncrement() % piece == 0)
def l = Time.getTimeStamp() - start
ts <
<
l
output("$formatLong(index.get())添加总消耗$formatLong(l)")
start = Time.getTimeStamp()if (index.get() >
total) breakdef get = new HttpGet(url)
get.addHeader("token",token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)
linkedQ.put(get)latch.countDown()threadNum.times funtester()
def st = Time.getTimeStamp()
barrier.await()
latch.await()
def et = Time.getTimeStamp()
outRGB("每毫秒速率$total / (et - st)")
outRGB(CountUtil.index(ts).toString())
消费者场景
package com.funtest.groovytestimport com.funtester.config.HttpClientConstant
import com.funtester.frame.SourceCode
import com.funtester.utils.CountUtil
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBaseimport java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicIntegerclass QueueTconsume extends SourceCode static AtomicInteger index = new AtomicInteger(1)static int total = 100_0000static int size = 10static int threadNum = 5static int piece = total / sizestatic def url = "http://localhost:12345/funtester"static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"public static void main(String[] args) LinkedBlockingQueue<
HttpRequestBase>
linkedQ = new LinkedBlockingQueue<
>
()
def pwait = new CountDownLatch(10)
def produces =
fun
while (true)
if (linkedQ.size() >
total) break
def get = new HttpGet(url)
get.addHeader("token", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)
linkedQ.add(get)pwait.countDown()10.times produces()
pwait.await()
outRGB("数据构造完成!$linkedQ.size()")def start = Time.getTimeStamp()
def barrier = new CyclicBarrier(threadNum + 1 )
def latch = new CountDownLatch(threadNum)
def ts = []
def funtester =
fun
barrier.await()
while (true)
if (index.getAndIncrement() % piece == 0)
def l = Time.getTimeStamp() - start
ts <
<
l
output("$formatLong(index.get())消费总消耗$formatLong(l)")
start = Time.getTimeStamp()def poll = linkedQ.poll(100, TimeUnit.MILLISECONDS)
if (poll == null) breaklatch.countDown()threadNum.times funtester()
def st = Time.getTimeStamp()
barrier.await()
latch.await()
def et = Time.getTimeStamp()
outRGB("每毫秒速率$total / (et - st)")
outRGB(CountUtil.index(ts).toString())
生产者 & 消费者 场景这里我引入了另外一个变量:初始队列长度length,用例运行之前将队列按照这个长度进行单线程填充。
package com.funtest.groovytestimport com.funtester.frame.SourceCode
import com.funtester.utils.Time
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBaseimport java.util.concurrent.CountDownLatch
import java.util.concurrent.CyclicBarrier
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicIntegerclass QueueBoth extends SourceCode static AtomicInteger index = new AtomicInteger(1)static int total = 500_0000static int length = 50_0000static int threadNum = 5static def url = "http://localhost:12345/funtester"static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"public static void main(String[] args)
LinkedBlockingQueue<
HttpRequestBase>
linkedQ = new LinkedBlockingQueue<
>
()def latch = new CountDownLatch(threadNum * 2)
def barrier = new CyclicBarrier(threadNum * 2 + 1)
def ts = []
def funtester = f ->
fun
barrier.await()
while (true)
if (index.getAndIncrement() >
total) break
f()latch.countDown()def produces =
def get = new HttpGet(url)
get.addHeader("token", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)
linkedQ.put(get)length.times produces()threadNum.times
funtester produces
funtester linkedQ.poll(100, TimeUnit.MILLISECONDS)def st = Time.getTimeStamp()
barrier.await()
latch.await()
def et = Time.getTimeStamp()
outRGB("每毫秒速率$total / (et - st) / 2")
补充 性能非常不稳定其中有两个问题需要补充说明,
java.util.concurrent.LinkedBlockingQueue
性能在测试过程中非常不稳定,我每次打印日志以1/10为节点打印时间戳,下面分享一些在队列长度100万时,生产者模式中的日志:INFO->
23.731 F-2107,942添加总消耗523
INFO->
23.897 F-10 200,061添加总消耗165
INFO->
24.137 F-9300,024添加总消耗239
INFO->
24.320 F-2400,037添加总消耗182
INFO->
25.200 F-5500,065添加总消耗879
INFO->
25.411 F-2600,094添加总消耗211
INFO->
25.604 F-8700,090添加总消耗193
INFO->
26.868 F-1800,047添加总消耗1,264
INFO->
26.927 F-4900,053添加总消耗57
INFO->
28.454 F-31,000,009添加总消耗1,527
INFO->
28.457 main 每毫秒速率190.0779319521
INFO->
28.476 main 平均值:524.0 ,最大值1527.0 ,最小值:57.0 ,中位数:239.0 p99:1527.0 p95:1527.0INFO->
43.930 F-10 112,384添加总消耗385
INFO->
44.072 F-9200,159添加总消耗140
INFO->
44.296 F-1300,058添加总消耗223
INFO->
44.445 F-7400,075添加总消耗149
INFO->
45.311 F-10 500,086添加总消耗866
INFO->
45.498 F-8600,080添加总消耗187
INFO->
45.700 F-1700,088添加总消耗202
INFO->
45.760 F-9800,057添加总消耗59
INFO->
47.245 F-6900,095添加总消耗1,485
INFO->
47.303 F-61,000,009添加总消耗58
INFO->
47.305 main 每毫秒速率262.7430373095
INFO->
47.320 main 平均值:375.4 ,最大值1485.0 ,最小值:58.0 ,中位数:202.0 p99:1485.0 p95:1485.0INFO->
00.916 F-1100,000添加总消耗568
INFO->
01.269 F-1200,000添加总消耗353
INFO->
01.461 F-1300,000添加总消耗192
INFO->
01.635 F-1400,000添加总消耗174
INFO->
02.536 F-1500,000添加总消耗899
INFO->
02.777 F-1600,000添加总消耗240
INFO->
03.015 F-1700,000添加总消耗237
INFO->
03.107 F-1800,000添加总消耗91
INFO->
04.519 F-1900,000添加总消耗1,412
INFO->
05.940 F-11,000,000添加总消耗96
INFO->
05.943 main 每毫秒速率184.5358922310
INFO->
05.959 main 平均值:426.2 ,最大值1412.0 ,最小值:91.0 ,中位数:240.0 p99:1412.0 p95:1412.0
可以看出最大值最小值能相差十几倍,甚至二十几倍,这种情况随着消息队列总长度增长而增长,大多数发生在80万 ~ 100万阶段,如果将长度降低到50万,这种情况就会得到明显改善。所以还有一个附加观点:消息队列长度应当尽可能少一些。
基准测试下面是我使用FunTester性能测试框架对三种消息对象的生产代码进行的测试结果。
测试对象 | 线程数 | 个数(百万) | 速率(/ms) |
---|---|---|---|
小 | 1 | 1 | 5681 |
小 | 5 | 1 | 8010 |
小 | 5 | 5 | 15105 |
中 | 1 | 1 | 1287 |
中 | 5 | 1 | 2329 |
中 | 5 | 5 | 4176 |
大 | 1 | 1 | 807 |
大 | 5 | 1 | 2084 |
大 | 5 | 5 | 3185 |
package com.funtest.groovytestimport com.funtester.base.constaint.FixedThread
import com.funtester.config.HttpClientConstant
import com.funtester.frame.execute.Concurrent
import com.funtester.httpclient.FunLibrary
import org.apache.http.client.methods.HttpGetclass TTT extends FunLibrary static int total = 100_0000static int thread = 1static int times = total / threadstatic def url = "http://localhost:12345/funtester"static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"public static void main(String[] args)
RUNUP_TIME = 0
def tasks = []
thread.times tasks <
<
new FunTester(times)
new Concurrent(tasks,"测试生产者代码性能").start()private static class FunTester extends FixedThread FunTester(int limit)
super(null, limit, true)@Override
protected void doing() throws Exception
//def get = new HttpGet()//def get = new HttpGet(url)
//get.addHeader("token", token)
//get.addHeader(HttpClientConstant.USER_AGENT)
//get.addHeader(HttpClientConstant.CONNECTION)def get = new HttpGet(url + token)
get.addHeader("token", token)
get.addHeader("token1", token)
get.addHeader("token5", token)
get.addHeader("token4", token)
get.addHeader("token3", token)
get.addHeader("token2", token)
get.addHeader(HttpClientConstant.USER_AGENT)
get.addHeader(HttpClientConstant.CONNECTION)@Override
FixedThread clone()
return new FunTester(limit)
Have Fun ~ Tester !
- FunTester2021年总结
- 2022年度计划手册模板
- 分享一份Fiddler学习包
- Selenium自动化的JUnit参数化实践
- 性能测试框架中QPS取样器实现
- 如何测试非固定型概率算法P=p(1+0.1*N)
- 移动测试工程师职业
- Groovy热更新Java实践
- Java线程安全ReentrantLock
- 接口测试代码覆盖率(jacoco)方案分享
- Selenium Python使用技巧(三)
- 控制台彩色输出
推荐阅读
- 教程手把手教你如何搭建Hadoop单机伪集群
- OpenHarmony 源码解析之账号子系统
- 云系统中的异常检测方法
- Spring专场「MVC容器」不看源码就带你认识核心流程以及运作原理
- 1月活动|51CTO博客#IT话题共建#,挑战7日连更!
- MyBatis 快速入门
- Go 每日一库之 gorilla/securecookie
- Spring MVC day01 请求参数问题及常用注解
- 题解——冒泡+二分查找