文章目录
- RabbitMQ-使用Java操作简单队列 simple queues
- 1.获取连接工具类
RabbitMQ-使用Java操作简单队列 simple queues 1.获取连接工具类
【Java|RabbitMQ-使用Java操作简单队列 simple queues】?为了方便每次获取连接,封装一个简单的工具类
package com.ithzk.rabbitmq.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hzk
* @date 2018/3/7
*/
public class RabbitMQConnectionUtils {/**
* 若connection timeout 连接不上尝试关闭防火墙或者放开端口访问
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection() throws IOException, TimeoutException {
//定义一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置服务器地址
connectionFactory.setHost("10.200.52.118");
//AMQP 15672
connectionFactory.setPort(5672);
//vhost
connectionFactory.setVirtualHost("/vhost_hzk");
//设置用户名
connectionFactory.setUsername("hzk");
//设置密码
connectionFactory.setPassword("hzk");
return connectionFactory.newConnection();
}
}
2.消息生产者/发送者
package com.ithzk.rabbitmq.simple;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hzk
* @date 2018/3/7
*/
public class Send {private static final String QUEUE_NAME="test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {//获取一个连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//创建队列申明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg = "simgple queue";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("send msg:"+msg);
channel.close();
connection.close();
}
}
3.消息消费者/接收者
?新、老两种接收消息的方式
package com.ithzk.rabbitmq.simple;
import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils;
import com.rabbitmq.client.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author hzk
* @date 2018/3/7
*/
public class Recv {private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {//获取连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中获取通道
Channel channel = connection.createChannel();
//创建队列申明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {/**
* 获取到达消息
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("recv msg:" + msg);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
}/**
* 老方式接收队列信息
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
@Test
public void oldRecv() throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = RabbitMQConnectionUtils.getConnection();
//从连接中获取通道
Channel channel = connection.createChannel();
//定义队列消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//监听队列
channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
while (true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("old Recv msg:"+msg);
}}}
?ps:如若连接超时 connection timeout 检查防火墙配置,可临时关闭防火墙或者放开对应端口访问
service iptables stop 临时关闭防火墙直到手动启动或者是重启电脑
iptables -I INPUT -p tcp --dport xxx -j ACCEPT开放端口访问
推荐阅读
- Java|Java基础——数组
- 人工智能|干货!人体姿态估计与运动预测
- java简介|Java是什么(Java能用来干什么?)
- Java|规范的打印日志
- Linux|109 个实用 shell 脚本
- 程序员|【高级Java架构师系统学习】毕业一年萌新的Java大厂面经,最新整理
- Spring注解驱动第十讲--@Autowired使用
- SqlServer|sql server的UPDLOCK、HOLDLOCK试验
- jvm|【JVM】JVM08(java内存模型解析[JMM])
- 技术|为参加2021年蓝桥杯Java软件开发大学B组细心整理常见基础知识、搜索和常用算法解析例题(持续更新...)