MQ入门

世事洞明皆学问,人情练达即文章。这篇文章主要讲述MQ入门相关的知识,希望能为你提供帮助。
MQ1. MQ引言1.1 什么是MQ
MQ(Message Queue):翻译为消息队列,通过典型的生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断从队列获取消息,因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统之间解耦,别名为消息中间件,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
1.2 MQ有哪些
当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ、炽手可热的kafka、阿里巴巴自主研发的RocketMQ等
1.3 不同MQ特点

# 1.ActiveMQ
ActiveMQ是Apache出品,最流行的、能力强劲的开源消息总线,它是一个完全支持JMS规范的消息中间件,丰富的API多种集群架构模式让ActiveMQ在业界成为老牌的消息中间件,在中小型企业颇受欢迎。

# 2.Kafka
kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,准求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事物,对消息的重复、丢失、错误没有严格要求。

# 3.RocketMQ
RocketMQ是阿里开源的消息中间件,它是纯java开发,具有高吞吐量、高可用性、适合大规模分布式应用的特点,RockerMQ思路起源于Kafka,但并不是Kafka的一个Copy, 它对消息的可靠传输及事物特性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

# 4.RabbitMQ
RabbitMQ是使用Erlang语音开发的开源消息队列系统,基于AMQP协议来实现,AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

RabbitMQ比kafka可靠,kafka更适合IO高吞吐的处理,一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用,比如ELK日志收集
2. MQ实战2.1 第一种模型(直连)

说明:
  • P:生产者,也就是要发送消息的程序
  • C:消费者,消息的接受者,会一直等待消息的到来
  • queue: 消息队列,图中红色部分,类似一个邮箱,可以缓存消息;生产者可以向其中投递消息,消费者从其中取出消息
引入依赖
< dependency>
< groupId> com.rabbitmq< /groupId>
< artifactId> amqp-client< /artifactId>
< version> 5.14.0< /version>
< /dependency>

开发生产者
public class Producer

public static void main(String[] args) throws IOException, TimeoutException
batchProducer();


static void batchProducer() throws IOException, TimeoutException
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");

// 连接对象
Connection connection = connectionFactory.newConnection();

// 创建通道
Channel channel = connection.createChannel();
// 通道对应的队列,不存在自动创建 | String queue, boolean durable, boolean exclusive, boolean autoDelete, Map< String, Object> arguments
channel.queueDeclare("direct", false, false ,false, null);

// 发布消息 参数1:交换机名称 参数2:队列名称 | String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish("", "direct", null, ("你好,我是消息").getBytes());


channel.close();
connection.close();


【MQ入门】开发消费者
// 消费者
public class Consumer1
// 消费者
public static void main(String[] args) throws IOException, TimeoutException
amqpConsumer();


static void amqpConsumer() throws IOException, TimeoutException
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");

// 连接对象
Connection connection = connectionFactory.newConnection();

// 创建通道
Channel channel = connection.createChannel();
// 通道对应的队列,不存在自动创建 | String queue, boolean durable, boolean exclusive, boolean autoDelete, Map< String, Object> arguments
channel.queueDeclare("direct", false, false ,false, null);

// 消费消息 参数2:true自动确认消息 false手动确认消息 | String queue, boolean autoAck, Consumer callback
channel.basicConsume("direct", false, new DefaultConsumer(channel)
@Override // 参数body是从消息队列取出的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("body = " + new String(body));

//参数1:要确认队列中哪个消息(消息标志)参数2:是否同时开启多个参数确认
channel.basicAck(envelope.getDeliveryTag(), false);

);

// channel.close();
// connection.close();


2.2 第二种模型(work queue)
Work queues,也被称为(Toask queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息消费的速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

说明:
  • P:生产者:任务的发布者
  • C1:消费者,领取任务并且完成任务,假设完成速度较慢
  • C2:消费者2,领取任务并且完成任务,假设完成速度快
开发生产者(批量生产消息)
public static void main(String[] args) throws IOException, TimeoutException
batchProducer();


static void batchProducer() throws IOException, TimeoutException
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");

// 连接对象
Connection connection = connectionFactory.newConnection();

// 创建通道
Channel channel = connection.createChannel();
// 通道对应的队列,不存在自动创建
channel.queueDeclare("hello", false, false ,false, null);

// 发布消息
for (int i=1; i< =20; i++)
channel.basicPublish("", "work", null, ("你好,我是消息" + i).getBytes());


channel.close();
connection.close();

开发消费者1和消费者2
public class Consumer2
// 消费者
public static void main(String[] args) throws IOException, TimeoutException
amqpConsumer();


static void

    推荐阅读