rabbitmq工具类带连接池

前言 【rabbitmq工具类带连接池】本文是一个简单的rabbitmq工具类, 带连接池复用了connection. 其中配置参数从apollo设置的, 这里可以写死, 或者从配置文件中读取.
代码

@Configuration @EnableApolloConfig public class MqUtil {private final List connections = new ArrayList<>(); private final int maxConnection = 20; @Value("${mq.calltopay.rabbit.host}") private String host; @Value("${mq.calltopay.rabbit.port}") private int port; @Value("${mq.calltopay.rabbit.username}") private String username; @Value("${mq.calltopay.rabbit.password}") private String password; @Autowired @Qualifier("mqConnectionFactory") public ConnectionFactory factory; @Bean(name = "mqConnectionFactory") public ConnectionFactory getRabbitMqConnection() { return getFactory(); }public ConnectionFactory getFactory() { initFactory(); return factory; }private void initFactory() { try { if (factory == null) { factory = new ConnectionFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); } } catch (Exception e) { e.printStackTrace(); } }public void sendMsg(String message, String queue) throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); channel.basicPublish("", queue, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8")); channel.close(); setConnection(connection); }public void sendMsg(String message, String exchange, String route) throws Exception { Connection connection = getConnection(); Channel channel = connection.createChannel(); channel.basicPublish(exchange, route, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8")); channel.close(); setConnection(connection); }public GetResponse basicGet(String queue, boolean autoAck) throws Exception { GetResponse getResponse = null; Connection connection = getConnection(); Channel channel = connection.createChannel(); getResponse = channel.basicGet(queue, autoAck); channel.close(); setConnection(connection); return getResponse; }public Connection getConnection() throws Exception { return getAndSetConnection(true, null); }public void setConnection(Connection connection) throws Exception { getAndSetConnection(false, connection); }private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception { if (isGet) { if (connections.isEmpty()) { return factory.newConnection(); } Connection newConnection = connections.get(0); connections.remove(0); if (newConnection.isOpen()) { return newConnection; } else { return factory.newConnection(); } } else { if (connections.size() < maxConnection) { connections.add(connection); } return null; } } }

    推荐阅读