花门楼前见秋草,岂能贫贱相看老。这篇文章主要讲述ActiveMQ Message Persistence相关的知识,希望能为你提供帮助。
1、JMS规范支持两种类型的消息传递:persistent and non-persistent。ActiveMQ支持这两种类型的传递方式;persistent 好处:
1)消息对消费者总是可用;
2)系统宕机后,消息不被丢失。
2、ActiveMQ可将消息存储在三种类型介质中:file-based(存储在文件中)、in-memory(存储在内存中)、relational databases(存储在关系数据库中)。
3、ActiveMQ中存储两种Destination,主题(Topic)与 队列(Queue)。主题对应的Domain是发布-订阅模型(Pub/Sub),队列对应的Domain是点对点模型(P2P)。
1)队列存储:按FIFO的顺序存储消息,对于众多的消费者,只有其中一个消费者可以接收该消息。也只有当消费者消费了之后,该消息才能删除;
2)Topic存储:分为两种订阅者,持久订阅者(durable subscribes)和非持久订阅者(non-durable subscribes),对于 durable subscribers而言,尽管当前它是不活跃的(没有连接到borker),也不会错失发给broker的消息(类似于QQ的离线消息)。持久订阅者维护者一个消息指针,指针总是指向该订阅者需要的下一个消息。
为什么要这样设计?
通过指针这种形式,可以很好地解决上面三种情况下出现的问题。
只有当所有的消费者都获得了该消息时,消息才能从broker中删除。
二、activemq存储:
1、AMQ Message Store:
“日志”(journal)由存储在特定长度的滚动的数据文件中,数据文件中包含消息和命令(例如事务边界和消息删除),如果已经达到当前使用的数据文件的最大长度,则会创建一个新的数据文件。
1)数据文件中的所有消息都是引用计数的,这样一旦该数据文件中的每个消息不再需要,数据文件就可以被删除或归档。
2)日志只会将消息附加到当前数据文件的末尾,因此存储速度非常快。
2、KahaDB:
自ActiveMQ5.4以来,KahaDB成为了ActiveMQ默认的持久化存储方式。相比于原来的AMQ存储方式,官方宣称KahaDB使用了更少的文件描述符,并且提供了更快的存储恢复机制。
1)配置:
在 conf/activemq.xml 中配置如下:
<
broker brokerName="broker" ... >
<
persistenceAdapter>
<
kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
<
/persistenceAdapter>
...
<
/broker>
【ActiveMQ Message Persistence】指定了kahaDB,并表明数据存储在 "activemq-data"目录下,日志文件最大长度是32MB。
另外,一些关于KahaDB的配置选项如下:
2)文件结构:
可以看出,上面directory一共有四个文件:
A、db.data:消息的索引文件,本质上是B-Tree的实现,使用B-Tree作为索引指向db-*.log里面存储的消息。
B、db.redo:主要用来进行消息恢复。
C、db-*.log:存储消息的内容。对于一个消息而言,不仅仅有消息本身的数据(message data),而且还有(Destinations、订阅关系、事务...)。log以日志形式存储消息,而且新的数据总是以APPEND的方式追加到日志文件末尾。因此,消息的存储是很快的。比如,对于持久化消息,Producer把消息发送给Broker,Broker先把消息存储到磁盘中(enableJournalDiskSyncs配置选项),然后再向Producer返回Acknowledge。Append方式在一定程度上减少了Broker向Producer返回Acknowledge的时间。
D、lock文件。
3)KahaDB的Architecture:
从上图中可以看出:图中各个部分与KahaDB配置的存储目录下的文件是一 一对应的。
A、在内存(cache)中的那部分B-Tree是Metadata Cache:
通过将索引缓存到内存中,可以加快查询的速度(quick retrival of message data)。但是需要定时将 Metadata Cache 与 Metadata Store同步。这个同步过程就称为:check point。由checkpointInterval选项 决定每隔多久时间进行一次checkpoint操作。
B、BTree Indexes则是保存在磁盘上的,称为Metadata Store,它对应于文件db.data,它就是对Data Logs以B树的形式 索引。有了它,Broker(消息服务器)可以快速地重启恢复,因为它是消息的索引,根据它就能恢复出每条消息的location。如果Metadata Store被损坏,则只能扫描整个Data Logs来重建B树了,这个过程是很复杂且缓慢的。
C、Data Logs则对应于文件 db-*.log,默认是32MB,Data Logs以日志形式存储消息,它是生产者生产的数据的真正载体。
D、Redo Log则对应于文件 db.redo,redo log的原理用到了“Double Write”。关于“Double Write”可参考(https://www.percona.com/blog/2006/08/04/innodb-double-write/)简要记录下自己的理解:因为磁盘的页大小与操作系统的页大小不一样,磁盘的页大小一般是16KB,而OS的页大小是4KB。而数据写入磁盘是以磁盘页大小为单位进行的,即一次写一个磁盘页大小,这就需要4个OS的页大小(4*4=16)。如果在写入过程中出现故障(突然断电)就会导致只写入了一部分数据(partial page write)。而采用了“Double Write”之后,将数据写入磁盘时,先写到一个Recovery Buffer中,然后再写到真正的目的文件中。在ActiveMQ的源码PageFile.java中有相应的实现。
扩展知识:Linux中的日志文件系统:
因为Linux的 ext文件系统采用索引节点来存储文件的元数据,每次数据写入磁盘之后,需要更新索引节点表。而写入磁盘与更新索引节点表并不是“原子操作”,比如,在数据写入磁盘后,系统发生故障,由于没有更新索引,所以之前写入的数据就再也找不到了。
因此,日志文件系统给Linux系统增加了一层安全性:数据写入存储设备之前,先将数据(或者只将索引节点信息写日志)写入到临时文件中,该临时文件称日志。如果在数据写入时发生故障,还可以通过日志来进行一定的恢复。
3、jdbc:
1)配置:
conf\\activemq.xml进行配置:
<
!--
<
persistenceAdapter>
<
kahaDB directory="$activemq.data/kahadb"/>
<
/persistenceAdapter>
-->
<
persistenceAdapter>
<
jdbcPersistenceAdapter dataSource="#mysqlDataSource"/>
<
/persistenceAdapter>
<
!-- MySQL DataSource -->
<
bean id="mysqlDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<
property name="driverClassName" value="https://www.songbingjia.com/android/com.mysql.jdbc.Driver"/>
<
property name="url" value="https://www.songbingjia.com/android/jdbc:mysql://127.0.0.1:3306/amq_db?relaxAutoCommit=true"/>
<
property name="username" value="https://www.songbingjia.com/android/root"/>
<
property name="password" value="https://www.songbingjia.com/android/root"/>
<
property name="poolPreparedStatements" value="https://www.songbingjia.com/android/true"/>
<
/bean>
2)手动将mysql的jar包拷贝到apache-activemq-5.8.0\\lib\\目录下。
3)我们重新启动消息服务器,就可以发现activeMQ在数据库中新建了3张表:activemq_acks
,activemq_lock
,activemq_msgs 。
推荐阅读
- ConcurrentMap.putIfAbsent(key,value) 用法
- spring4.1.8扩展实战之四(感知spring容器变化(SmartLifecycle接口))
- hadoop各种发行版本
- 分布式系统架构演进
- spring依赖注入引发的一点思考
- 客户端HttpClient4处理 Servlet Gzip后的内容
- httpClient4发送gzip的post数据,servlet接收并解压
- elasticsearch基础知识以及创建索引
- 使用maven构建springmvc-mybatis项目