前言 【rabbitmq工具类带连接池】本文是一个简单的rabbitmq工具类, 带连接池复用了connection. 其中配置参数从apollo设置的, 这里可以写死, 或者从配置文件中读取.
代码
@Configuration
@EnableApolloConfig
public class MqUtil {private final List connections = new ArrayList<>();
private final int maxConnection = 20;
@Value("${mq.calltopay.rabbit.host}")
private String host;
@Value("${mq.calltopay.rabbit.port}")
private int port;
@Value("${mq.calltopay.rabbit.username}")
private String username;
@Value("${mq.calltopay.rabbit.password}")
private String password;
@Autowired
@Qualifier("mqConnectionFactory")
public ConnectionFactory factory;
@Bean(name = "mqConnectionFactory")
public ConnectionFactory getRabbitMqConnection() {
return getFactory();
}public ConnectionFactory getFactory() {
initFactory();
return factory;
}private void initFactory() {
try {
if (factory == null) {
factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
}
} catch (Exception e) {
e.printStackTrace();
}
}public void sendMsg(String message, String queue) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
channel.close();
setConnection(connection);
}public void sendMsg(String message, String exchange, String route) throws Exception {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.basicPublish(exchange, route, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
channel.close();
setConnection(connection);
}public GetResponse basicGet(String queue, boolean autoAck) throws Exception {
GetResponse getResponse = null;
Connection connection = getConnection();
Channel channel = connection.createChannel();
getResponse = channel.basicGet(queue, autoAck);
channel.close();
setConnection(connection);
return getResponse;
}public Connection getConnection() throws Exception {
return getAndSetConnection(true, null);
}public void setConnection(Connection connection) throws Exception {
getAndSetConnection(false, connection);
}private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception {
if (isGet) {
if (connections.isEmpty()) {
return factory.newConnection();
}
Connection newConnection = connections.get(0);
connections.remove(0);
if (newConnection.isOpen()) {
return newConnection;
} else {
return factory.newConnection();
}
} else {
if (connections.size() < maxConnection) {
connections.add(connection);
}
return null;
}
}
}
推荐阅读
- Kafka|Kafka VS RocketMQ VS RabbitMQ
- RabbitMQ|RabbitMQ 四种类型发送接收数据方式
- mac下安装rabbitmq遇到的坑
- rocketmq-连接rocketmq遇到的异常
- RabbitMQ读写消息的Java Demo
- RabbitMQ 常见面试题
- mac 安装 RabbitMQ
- Go|RabbitMQ系列笔记封装篇
- Java|RabbitMQ-使用Java操作简单队列 simple queues