阿里RocketMq试用记录+简单的Spring集成

CSDN学院招募微信小程序讲师啦 程序猿全指南,让【移动开发】更简单! 【观点】移动原生App开发 PK HTML 5开发 云端应用征文大赛,秀绝招,赢无人机! 阿里RocketMq试用记录+简单的Spring集成标签: springRocketMq2015-11-04 09:34 5029人阅读评论(0)收藏举报 本文章已收录于: 分类: 中间件 版权声明:本文为博主原创文章,未经博主允许不得转载。
目录(?)[+]

  1. RocketMq试用简单的Spring集成
    1. RocketMq
    2. 核心原理
      1. 1 数据结构
      2. 2 刷盘策略
      3. 3 内存机制
      4. 4 工作模式
    3. 环境安装
      1. 1 JAVA环境安装
      2. 2 RocketMq安装
    4. 测试网络拓扑
    5. 启停操作
    6. 运维指令
    7. 基本测试
    8. 宕机实验
    9. 遗留问题
    10. RocketMqSpring源码下载
    11. 参考文献
RocketMq试用+简单的Spring集成 经过2天的试用初步了解了一下RocketMq的基本用法,搜索了一下度娘,没有找到spring的例子,所以简单搞了一点代码感受一下。
1.RocketMq
RocketMQ的前身是Metaq,当Metaq3.0发布时,产品名称改为RocketMQ,有以下特点: 1) 能够保证严格的消息顺序 2) 提供丰富的消息拉取模式 3) 高效的订阅者水平扩展能力 4)实时的消息订阅机制 5)亿级消息堆积能力

2.核心原理 2.1. 数据结构
阿里RocketMq试用记录+简单的Spring集成
文章图片

阿里RocketMq试用记录+简单的Spring集成
文章图片

(1)所有数据单独储存到commit Log ,完全顺序写,随机读
(2)对最终用户展现的队列实际只储存消息在Commit Log 的位置信息,并且串行方式刷盘
(3)按照MessageId查询消息
阿里RocketMq试用记录+简单的Spring集成
文章图片

(4)根据查询的key的hashcode%slotNum得到具体的槽位置
阿里RocketMq试用记录+简单的Spring集成
文章图片

(5)根据slotValue(slot对应位置的值)查找到索引项列表的最后一项
(6)遍历索引项列表返回查询时间范围内的结果集
2.2. 刷盘策略
rocketmq中的所有消息都是持久化的,先写入系统pagecache,然后刷盘,可以保证内存与磁盘都有一份数据,访问时,可以直接从内存读取
使用简单的符号标识不同的标题,将某些文字标记为粗体或者斜体,创建一个链接等,详细语法参考帮助?。
本编辑器支持 Markdown Extra ,扩展了很多好用的功能。具体请参考[Github][2].
2.3. 内存机制
阿里RocketMq试用记录+简单的Spring集成
文章图片

2.4. 工作模式
阿里RocketMq试用记录+简单的Spring集成
文章图片

3. 环境安装 3.1. JAVA环境安装
安装
rpm -ivh jdk-7u80-linux-x64.rpm

  • 1
  • 1
环境变量
JAVA_HOME=/usr/java/jdk1.7.0_80 CLASSPATH=.:$JAVA_HOME/lib.tools.jar PATH=$JAVA_HOME/bin:$PATH export JAVA_HOME CLASSPATH PATH export ROCKETMQ_HOME=/usr/local/service/alibaba-rocketmq

  • 1
  • 2
  • 3
  • 4
  • 5
  • 1
  • 2
  • 3
  • 4
  • 5
3.2. RocketMq安装
https://github.com/alibaba/RocketMQ/releases下载3.2.6,解压

4. 测试网络拓扑 阿里RocketMq试用记录+简单的Spring集成
文章图片

因为手里没有其他服务器,105那台缺少一个slave,在同步双写模式下,发送消息会返回 SLAVE_NOT_AVAILABLE,不过消息已经发送成功,只是slave没有写成功。
5. 启停操作
这里只给出一个基本的示例,各个模式的启停在本文最后的参考文献中会有详细的说明。这里不再赘述。

  • 启动nameserver
nohup ./mqnamesrv &

  • 1
  • 1
  • 停止nameServer
./mqshutdown namesrv

  • 1
  • 1
  • 启动broker(单master)(多master,多master+slave)对应的(异步复制,同步双写)
nohup sh mqbroker -n 192.168.146.109:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &

  • 1
  • 1
  • 停止broker
./mqshutdown broker

  • 1
  • 1
6. 运维指令
  • 查看集群情况
./mqadmin clusterList -n 127.0.0.1:9876

  • 1
  • 1
  • 查看broker状态
./mqadmin brokerStatus -n 127.0.0.1:9876 -b 192.168.146.105:10911

  • 1
  • 1
  • 查看topic列表
./mqadmin topicList -n 127.0.0.1:9876

  • 1
  • 1
  • 查看topic状态
./mqadmin topicStatus -n 127.0.0.1:9876 -t PushTopic

  • 1
  • 1
  • 查看topic路由
./mqadmin topicRoute-n 127.0.0.1:9876 -t PushTopic

  • 1
  • 1
7. 基本测试 基本测试采用Java直接编码的方式生产和消费消息,例子来源于参考文献的《RocketMQ开发教程》。本文最后的代码示例,采用了spring的形式。
  • Producer
package com.jd.wxz; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args){ DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("192.168.146.109:9876"); try { producer.start(); Message msg = new Message("PushTopic", "push", "1", "Just for test.".getBytes()); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PushTopic", "push", "2", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); msg = new Message("PullTopic", "pull", "1", "Just for test.".getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); }finally{ producer.shutdown(); } } }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • Consumer
package com.sean; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("192.168.146.109:9876"); try { //订阅PushTopic下Tag为push的消息 consumer.subscribe("PushTopic", "push"); //程序第一次启动从消息队列头取数据 consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List list, ConsumeConcurrentlyContext Context) { Message msg = list.get(0); System.out.println(msg.toString()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
运行结果:
阿里RocketMq试用记录+简单的Spring集成
文章图片

阿里RocketMq试用记录+简单的Spring集成
文章图片

服务端监控:
阿里RocketMq试用记录+简单的Spring集成
文章图片

8.宕机实验 阿里RocketMq试用记录+简单的Spring集成
文章图片

9.遗留问题
1)关闭master 自动切换到slave无法实现,官方资料上没有明确指明,第三方文档里有(见文献3)。 2)在开发机服务器上运行os.sh进行优化,导致网络无法连接,运维帮忙重启才恢复。

10.RocketMq+Spring源码下载 戳我
11.参考文献 1) 《RocketMq入门(上)》
2) 《RocketMq入门(下)》
3) 《Rokectmq开发教程》
4) 《阿里Rocketmq Quict Start》
【阿里RocketMq试用记录+简单的Spring集成】5) 《RocketMQ与Kafka对比(18项差异)》
6) 《RocketMq命令整理》
7) 《RocketMq原理简介》
0
0
我的同类文章 http://blog.csdn.net 参考知识库
Linux知识库 9149关注|3461收录
Java SE知识库 20836关注|468收录
Java EE知识库 13999关注|1215收录
Java 知识库 22124关注|1436收录
软件测试知识库 3312关注|310收录
算法与数据结构知识库 12521关注|2320收录
更多资料请参考: 猜你在找 大数据平台hadoop运维之hadoop快速入门 spring3.2入门到大神(备java基础、jsp、servlet,javaee精髓) 途牛冯学奎:复杂环境以及高速业务发展下途牛度假订单的建设与运维 WEB安全攻防技术精讲视频教程(全漏洞原理+攻击手段+测试方法+预防措施) 如何使用Spring XD构建Data Microservices Spring集成Quartz的简单配置 Activiti环境配置项目搭建与Spring集成简单示例 简单明了地解释JMS 相关概念 以及JMS的实例代码下载 传统使用方式非Spring框架下的集成 SpringMVC+Spring4+Mybatis3集成开发简单Web项目+源码下载 持久化框架SpringMVC+Spring4+Mybatis3 集成开发简单Web项目+源码下载 查看评论
暂无评论

发表评论
  • 用 户 名:
  • liuguoyun_123456
  • 评论内容:
  • HTML/XML objective-c Delphi Ruby PHP C# C++ JavaScript Visual Basic Python Java CSS SQL 其它
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场 核心技术类目 全部主题 Hadoop AWS 移动游戏 Java Android iOS Swift 智能硬件 Docker OpenStack VPN Spark ERP IE10 Eclipse CRM JavaScript 数据库 Ubuntu NFC WAP jQuery BI HTML5 Spring Apache .NET API HTML SDK IIS Fedora XML LBS Unity Splashtop UML components Windows Mobile Rails QEMU KDE Cassandra CloudStack FTC coremail OPhone CouchBase 云计算 iOS6 Rackspace Web App SpringSide Maemo Compuware 大数据 aptech Perl Tornado Ruby Hibernate ThinkPHP HBase Pure Solr Angular Cloud Foundry Redis Scala Django Bootstrap 转载于:https://www.cnblogs.com/jobs-lgy/p/6307556.html

    推荐阅读