创建生产者步骤
- 创建工厂 connectionFactory
- 获取连接
- 通过connection创建一个channel
- 发布消息
4.1. 指定 exchange(交换机)
4.2. 指定 routing key(路由规则,路由到哪一个queue)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author wx
* @date 2021-01-10
*/
public class producer {public static void main(String[] args) throws IOException, TimeoutException {//1.创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2.获取连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个channel
Channel chennel = connection.createChannel();
//4.发布消息
for(int i = 0;
i < 5;
i++){String msg = "hello rabbitmq";
//1.exchange2.routing key
//默认交换器隐式绑定到每个队列,其路由键等于队列名. 如果没有队列名,则会被删除
chennel.basicPublish("", "test001", null, msg.getBytes());
}
chennel.close();
connection.close();
}
}
创建消费者步骤
- 创建工厂 connectionFactory
- 获取连接
- 通过connection创建一个channel
- 声明一个队列(queue)
- 创建消费者
- 设置channel
- 获取消息
import com.rabbitmq.client.*;
import javax.security.auth.callback.Callback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author wx
* @date 2021-01-10
*/
public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//1,创建工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//2,获取连接
Connection connection = connectionFactory.newConnection();
//3,通过connection创建一个channel
Channel channel = connection.createChannel();
//4,声明一个队列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
//5,创建消费者//
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//6,设置channel
channel.basicConsume(queueName, true, queueingConsumer);
//7,获取消息
while (true){QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
}
}
}
推荐阅读
- 面经|面经-美团
- 消息队列|RabbitMQ——发布确认高级
- SpringCloud|Spring Cloud Stream函数式编程整合消息中间件
- 消息中间件|windows下安装RabbitMQ
- RabbitMQ|消息中间件RabbitMQ(五)——实现RPC调用
- 分布式|MQ介绍,RabbitMQ在SpringAMQP中的使用
- 微服务|【黑马-SpringCloud技术栈】【09】消息队列—RabbitMQ_SpringAMQP
- 学习记录|windows docker-desktop + k8s 部署rabbitmq
- java基础|mq消息积压怎么对应