今天应上级要求,撰写JAVA代码测试RabbitMQ。
以下为环境要求:
RabbitMQ环境不在此电脑,并且无法通讯,需要写完之后打成jar包,运行在另一台主机。开发工具Myeclipse10,JDK1.6,RabbitMQ3.4,所需jar包在www.rabbitmq.com上下载的rabbitmq-java-client-bin-3.6.1。
第一步,建立工程rabbitMQTest,
文章图片
,结构如图所示。所需Jar包放在了lib目录下,
文章图片
第二步,新建class名为,Send.java,来为RabbitMQ测试发送数据。代码如下
package com.mq;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
public class Send {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws java.io.IOException, TimeoutException, InterruptedException {
System.out.println("----------main method is begin----------");
//创建工厂
ConnectionFactory factory = new ConnectionFactory();
System.out.println("----------factory successful!----------");
//设置IP,端口,账户和密码
factory.setHost("192.168.228.87");
factory.setPort(5672);
factory.setUsername("mq");
factory.setPassword("mq");
//创建连接对象
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("----------connected successful!----------");
// 指定队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = getMessage(argv);
//共发送10万条数据,每隔1S发送1次
int index=0;
while (index<100000){
Thread.sleep(1*1000);
channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
index++;
}
channel.close();
connection.close();
}private static String getMessage(String[] strings) {
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0)
return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1;
i < length;
i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
第三步,新建Class名为Recv.java, 来为RabbitMQ测试接收 数据。代码如下
package com.mq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void recv() throws java.io.IOException, java.lang.InterruptedException, TimeoutException {
System.out.print("----------main method is begin----------");
ConnectionFactory factory = new ConnectionFactory();
System.out.println("----------factory successful!----------");
factory.setHost("192.168.228.87");
factory.setPort(5672);
factory.setUsername("mq");
factory.setPassword("mq");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("----------connected successful!----------");
// 指定队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 指定该线程同时只接收一条消息
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
// 打开消息应答机制
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done");
// 返回接收到消息的确认信息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.')
Thread.sleep(1000);
}
}
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
recv();
}
}
最后,打成Jar包,分别为Send.jar和Recv.jar,在下图2的第1步中Send.jar要选Send的main方法,Recv.jar要选Recv的main方法,这里只对Sen.jar贴图:
【Java实现RabbitMQ以及Jar包的实现】
文章图片
图1
文章图片
图2
打好的包,即可在另一台装有rabbitMQ环境和JRE环境中运行,具体使用的命令是:
文章图片
推荐阅读
- 程序员|程序员“真实”日常(每天敲代码不到 1 小时)
- Java|angular.js 翻页组件
- Java|java--NoSuchMethodError解决办法
- Flutter|Flutter 基于NestedScrollView+RefreshIndicator完美解决滑动冲突
- andorid|Android属性动画Property Animation系列一之ValueAnimator
- Android|无限自动轮播+自定义小圆点
- 技术|报表开发之批量导入导出Excel
- android|Linux驱动子系统之I2C(一)
- android|Android LCD(二)(LCD常用接口原理篇)
- android|Android LCD(一)(LCD基本原理篇)