Java|RabbitMQ-使用Java操作简单队列 simple queues


文章目录

  • RabbitMQ-使用Java操作简单队列 simple queues
        • 1.获取连接工具类

RabbitMQ-使用Java操作简单队列 simple queues 1.获取连接工具类
【Java|RabbitMQ-使用Java操作简单队列 simple queues】?为了方便每次获取连接,封装一个简单的工具类
package com.ithzk.rabbitmq.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author hzk * @date 2018/3/7 */ public class RabbitMQConnectionUtils {/** * 若connection timeout 连接不上尝试关闭防火墙或者放开端口访问 * @return * @throws IOException * @throws TimeoutException */ public static Connection getConnection() throws IOException, TimeoutException { //定义一个连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置服务器地址 connectionFactory.setHost("10.200.52.118"); //AMQP 15672 connectionFactory.setPort(5672); //vhost connectionFactory.setVirtualHost("/vhost_hzk"); //设置用户名 connectionFactory.setUsername("hzk"); //设置密码 connectionFactory.setPassword("hzk"); return connectionFactory.newConnection(); } }

2.消息生产者/发送者
package com.ithzk.rabbitmq.simple; import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author hzk * @date 2018/3/7 */ public class Send {private static final String QUEUE_NAME="test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException {//获取一个连接 Connection connection = RabbitMQConnectionUtils.getConnection(); //从连接中获取一个通道 Channel channel = connection.createChannel(); //创建队列申明 channel.queueDeclare(QUEUE_NAME,false,false,false,null); String msg = "simgple queue"; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("send msg:"+msg); channel.close(); connection.close(); } }

3.消息消费者/接收者
?新、老两种接收消息的方式
package com.ithzk.rabbitmq.simple; import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils; import com.rabbitmq.client.*; import org.junit.Test; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author hzk * @date 2018/3/7 */ public class Recv {private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException {//获取连接 Connection connection = RabbitMQConnectionUtils.getConnection(); //从连接中获取通道 Channel channel = connection.createChannel(); //创建队列申明 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) {/** * 获取到达消息 * @param consumerTag * @param envelope * @param properties * @param body * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("recv msg:" + msg); } }; //监听队列 channel.basicConsume(QUEUE_NAME,true,consumer); }/** * 老方式接收队列信息 * @throws IOException * @throws TimeoutException * @throws InterruptedException */ @Test public void oldRecv() throws IOException, TimeoutException, InterruptedException { //获取连接 Connection connection = RabbitMQConnectionUtils.getConnection(); //从连接中获取通道 Channel channel = connection.createChannel(); //定义队列消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //监听队列 channel.basicConsume(QUEUE_NAME,true,queueingConsumer); while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("old Recv msg:"+msg); }}}

?ps:如若连接超时 connection timeout 检查防火墙配置,可临时关闭防火墙或者放开对应端口访问
service iptables stop 临时关闭防火墙直到手动启动或者是重启电脑 iptables -I INPUT -p tcp --dport xxx -j ACCEPT开放端口访问

    推荐阅读