导入包:
https://github.com/gaufung/rocketmq-client-dotnet/tree/master
文章图片
【RocketMQ-c#代码】
using org.apache.rocketmq.client.consumer.listener; using org.apache.rocketmq.client.producer; using System; using System.Text; using java.util; using System.Windows.Forms; using org.apache.rocketmq.client.consumer; using org.apache.rocketmq.common.consumer; using System.Threading; using System.Collections.Generic; namespace wf_RMQ3 { public partial class Form1 : Form { public Form1() { InitializeComponent(); }private void button1_Click(object sender, EventArgs e) { try { DefaultMQProducer p = new DefaultMQProducer("PG01"); p.setNamesrvAddr("172.20.168.210:9876"); p.setInstanceName("PER01"); //p.setProducerGroup("myproducer"); p.start(); var data = https://www.it610.com/article/Encoding.UTF8.GetBytes(textBox1.Text.ToString()); org.apache.rocketmq.common.message.Message m = new org.apache.rocketmq.common.message.Message("TP01", data); p.send(m); p.shutdown(); textBox1.Text = ""; } catch (Exception ex) { throw ex; } }private void button2_Click(object sender, EventArgs e) { try { //DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); ////consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //consumer.setNamesrvAddr("172.20.168.210:9876"); //consumer.subscribe("jinwei01", "*"); //consumer.registerMessageListener(new TestListener()); //consumer.start(); //启动推送型消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CG01"); consumer.setNamesrvAddr("172.20.168.210:9876"); consumer.subscribe("TP01", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setConsumeTimestamp("20170422221800"); consumer.setInstanceName("CER01"); //consumer.setConsumerGroup("myconsumer01"); //设置消费者端口,官方没有该功能。适用端口有安全限制的服务器 //if (port > 0) consumer.setClientPort(port); //注册推送事件 consumer.registerMessageListener(new ChainwayMessageListener()); //启动消费者 consumer.start(); } catch (Exception ex) { throw ex; }}}public class ChainwayMessageListener : MessageListenerConcurrently {public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext ccc){for (int i = 0; i < list.size(); i++) { var msg = list.get(i) as org.apache.rocketmq.common.message.Message; byte[] body = msg.getBody(); var str = Encoding.UTF8.GetString(body); MessageBox.Show(str); Console.Write(str); continue; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } }
参考资料
https://www.cnblogs.com/gmq-sh/p/5972569.html