ZeroMQ学习 (七) 多帧消息

9 多帧消息
? ZMQ消息可以包含多个帧,这在实际应用中非常常见.
? 多帧消息的每一帧都是一个zmq_msg结构,也就是说,当你在收发含有五个帧的消息时,你需要处理五个zmq_msg结构。你可以将这些帧放入一个数据结构中,或者直接一个个地处理它们。
下面的代码演示如何发送多帧消息:

zmq_msg_send (&message, socket, ZMQ_SNDMORE); ... zmq_msg_send (&message, socket, ZMQ_SNDMORE); ... zmq_msg_send (&message, socket, 0);

然后我们看看如何接收并处理这些消息,这段代码对单帧消息和多帧消息都适用:
while (1) { // 处理一帧消息 zmq_msg_t message; zmq_msg_init (&message); zmq_msg_recv (&message, socket, 0); zmq_msg_close (&message); // 已到达最后一帧 int64_t more; zmq_getsockopt (socket, ZMQ_RCVMORE, &more, sizeof (more)); if (!more) break; }

关于多帧消息,你需要了解的还有:
  • 在发送多帧消息时,只有当最后一帧提交发送了,整个消息才会被发送;
  • 多帧消息是整体传输的,不会只收到一部分;
  • 多帧消息的每一帧都是一个zmq_msg结构;
  • 无论你是否检查套接字的ZMQ_RCVMORE选项,你都会收到所有的消息;
  • 发送时,ZMQ会将开始的消息帧缓存在内存中,直到收到最后一帧才会发送;
  • 我们无法在发送了一部分消息后取消发送,只能关闭该套接字。


还是发布者:
// //Shows how to handle Ctrl-C // #include #include #include #include #include //--------------------------------------------------------------------- //消息处理 // //程序开始运行时调用s_catch_signals()函数; //在循环中判断s_interrupted是否为1,是则跳出循环; ? static int interrupted = 0; void signal_handler (int sig) { (void)sig; interrupted = 1; } ? void catch_signals (void) { struct sigaction action; action.sa_handler = signal_handler; action.sa_flags = 0; sigemptyset (&action.sa_mask); sigaction (SIGINT, &action, NULL); } ? int main (void) { catch_signals (); void *context = zmq_ctx_new(); //与客户端通信的套接字 void *publisher = zmq_socket (context, ZMQ_PUB); zmq_bind (publisher, "tcp://*:5555"); //zmq_bind (publisher, "ipc:///tmp/myipc"); char buf[256]; int i = 0; while (1) { //每次发送两帧 zmq_send(publisher, "1000", 4, ZMQ_SNDMORE); sprintf(buf, "helloworld%d", i++); zmq_send (publisher, buf, strlen(buf),0); sleep(1); } //程序不会运行到这里,以下只是演示我们应该如何结束 zmq_close (publisher); zmq_ctx_destroy (context); return 0; }



订阅者一直收就好了
? #include #include #include #include void dump_msg(const void * data, int size) { unsigned char * ptr = (unsigned char *)data; printf ("[%03d] ", size); int i = 0; for (i = 0; i < size; i++) printf ("%02X", ptr[i]); printf ("\n"); } ? int main (void) { void *context = zmq_ctx_new(); void *subscriber = zmq_socket (context, ZMQ_SUB); /*订阅1000的内容*/ zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "1000", 4); ?zmq_connect (subscriber, "tcp://127.0.0.1:5555"); //zmq_connect (subscriber, "ipc:///tmp/myipc"); while (1) { zmq_msg_t msg; zmq_msg_init(&msg); zmq_msg_recv (&msg, subscriber, 0); dump_msg(zmq_msg_data(&msg), zmq_msg_size(&msg)); zmq_msg_close(&msg); } //程序不会运行到这里,以下只是演示我们应该如何结束 zmq_close (subscriber); zmq_ctx_destroy(context); return 0; }


【ZeroMQ学习 (七) 多帧消息】

    推荐阅读