深入浅出Apache|深入浅出Apache Pulsar(2)(Pulsar消息机制)

消息机制 Pulsar 采用发布-订阅(pub-sub)的设计模式 。 该设计模式中,producer 发布消息到 topic, Consumer 订阅 topic、处理发布的消息,并在处理完成后发送确认。
一旦创建订阅,即使 consumer 断开连接,Pulsar 仍然可以保存所有消息。 在 consumer 确认消息已处理成功后,才会删除消息。
主题(Topic) 逻辑上一个Topic是日志结构,每个消息都在这个日志结构中有一个偏移量。Apache Pulsar使用游标来跟踪偏移量(Cursor Tracking)。
Pulsar 支持两种基本的 topic 类型:持久 topic 与非持久 topic。

{persistent|non-persistent}://tenant/namespace/topic

  • Non-Partitioned topics
$ $PULSAR_HOME/bin/pulsar-admin topics \ list public/default $ $PULSAR_HOME/bin/pulsar-admin topics \ create persistent://public/default/input-seed-avro-topic $ $PULSAR_HOME/bin/pulsar-admin topics \ lookup persistent://public/default/input-seed-avro-topic $ $PULSAR_HOME/bin/pulsar-admin topics \ delete persistent://public/default/input-seed-avro-topic $ $PULSAR_HOME/bin/pulsar-admin topics \ stats persistent://public/default/input-seed-avro-topic $ curl http://server-101:8080/admin/v2/persistent/public/default/exclamation-input/stats | python -m json.tool

Partitioned topics
$ $PULSAR_HOME/bin/pulsar-admin topics \ create-partitioned-topic persistent://public/default/output-seed-avro-topic \ --partitions 2 $ $PULSAR_HOME/bin/pulsar-admin topics \ list-partitioned-topics public/default $ $PULSAR_HOME/bin/pulsar-admin topics \ get-partitioned-topic-metadata persistent://public/default/output-seed-avro-topic $ $PULSAR_HOME/bin/pulsar-admin topics \ delete-partitioned-topic persistent://public/default/output-seed-avro-topic

消息(Message) Messages are the basic "unit" of Pulsar.
public interface Message { Map getProperties(); boolean hasProperty(String var1); String getProperty(String var1); byte[] getData(); T getValue(); MessageId getMessageId(); long getPublishTime(); long getEventTime(); long getSequenceId(); String getProducerName(); boolean hasKey(); String getKey(); boolean hasBase64EncodedKey(); byte[] getKeyBytes(); boolean hasOrderingKey(); byte[] getOrderingKey(); String getTopicName(); Optional getEncryptionCtx(); int getRedeliveryCount(); byte[] getSchemaVersion(); boolean isReplicated(); String getReplicatedFrom(); }

生产者(Producer)
public void send() throws PulsarClientException { final String serviceUrl = "pulsar://server-100:6650"; // final String serviceUrl = "pulsar://server-101:6650,server-102:6650,server-103:6650"; // http://pulsar.apache.org/docs/en/client-libraries-java/#client final PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .connectionTimeout(10000, TimeUnit.MILLISECONDS) .build(); final String topic = "persistent://public/default/topic-sensor-temp"; // http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer final Producer producer = client.newProducer() .producerName("sensor-temp") .topic(topic) .compressionType(CompressionType.LZ4) .enableChunking(true) .enableBatching(true) .batchingMaxBytes(1024) .batchingMaxMessages(10) .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .blockIfQueueFull(true) .maxPendingMessages(512) .sendTimeout(1, TimeUnit.SECONDS) .create(); MessageId mid = producer.send("sensor-temp".getBytes()); System.out.printf("\nmessage with ID %s successfully sent", mid); mid = producer.newMessage() .key("sensor-temp-key") .value("sensor-temp-key".getBytes()) .property("my-key", "my-value") .property("my-other-key", "my-other-value") .send(); System.out.printf("message-key with ID %s successfully sent", mid); producer.close(); client.close(); }

消费者(Consumer)
public void consume() throws PulsarClientException { final String serviceUrl = "pulsar://server-101:6650"; final String topic = "input-seed-avro-topic"; final PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .enableTcpNoDelay(true) .build(); final Consumer consumer = client .newConsumer() .consumerName("seed-avro-consumer") .subscriptionName("seed-avro-subscription") .subscriptionType(SubscriptionType.Exclusive) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .topic(topic) .receiverQueueSize(10) .subscribe(); final AvroSchema schema = AvroSchema.of(SeedEvent.class); while (true) { try { final Message msg = consumer.receive(); LOG.info("接收消息:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}", Thread.currentThread().getId(), msg.getTopicName(), msg.getMessageId(), msg.getSequenceId(), msg.getEventTime(), msg.getPublishTime(), msg.getProducerName(), msg.getKey(), schema.decode(msg.getValue())); try { consumer.acknowledge(msg); } catch (final PulsarClientException e) { consumer.negativeAcknowledge(msg); LOG.error("acknowledge:" + e.getLocalizedMessage(), e); } } catch (final PulsarClientException e) { LOG.error("receive:" + e.getLocalizedMessage(), e); } } }

订阅(Subscriptions)
消费者通过订阅来消费Topic中的消息。订阅是游标(跟踪偏移量)的逻辑实体,一个Topic可以添加多个订阅。订阅不包含消息的数据,只包含元数据和游标。
每个Subscription都存储一个Cursor。Cursor是日志中的当前偏移量。Subscription将其Cursor存储至BookKeeper的Ledger中。这使Cursor跟踪可以像Topic一样进行扩展。
订阅类型(subscription-type) 深入浅出Apache|深入浅出Apache Pulsar(2)(Pulsar消息机制)
文章图片

  • Exclusive 独享
    一个订阅只能有一个消息者消费消息。
深入浅出Apache|深入浅出Apache Pulsar(2)(Pulsar消息机制)
文章图片

  • Failover 灾备
    一个订阅同时只有一个消费者,可以有多个备份消费者。一旦主消费者故障则备份消费者接管。不会出现同时有两个活跃的消费者。
深入浅出Apache|深入浅出Apache Pulsar(2)(Pulsar消息机制)
文章图片

  • Shared 共享
    一个订阅中同时可以有多个消费者,多个消费者共享Topic中的消息。
深入浅出Apache|深入浅出Apache Pulsar(2)(Pulsar消息机制)
文章图片

  • Key_Shared
深入浅出Apache|深入浅出Apache Pulsar(2)(Pulsar消息机制)
文章图片

有序性保证(Ordering guarantee)
如果对顺序性有要求,可以使用 Exclusive 和 Failover 的订阅模式,这样同一个 Topic 只有一个 Consumer 在消费,可以保证顺序性。
如果使用 Shared 订阅模式,多个 Consumer 可以并发消费同一个 Topic。通过动态增加 Consumer 的数量,可以加速 Topic 的消费,减少消息在服务端的堆积。
KeyShared 模式保证在 Shared 模式下同一个 Key 的消息也会发送到同一个 Consumer,在并发的同时也保证了顺序性。
多主题订阅(Multi-topic subscriptions)
Pattern:
  • persistent://public/default/.*
  • persistent://public/default/foo.*
Reader
public void read() throws IOException { final String serviceUrl = "pulsar://server-101:6650"; final PulsarClient client = PulsarClient.builder() .serviceUrl(serviceUrl) .build(); // http://pulsar.apache.org/docs/en/client-libraries-java/#reader final Reader reader = client.newReader() .topic("my-topic") .startMessageId(MessageId.earliest()) // MessageId.latest .create(); while (true) final Message message = reader.readNext(); System.out.println(new String(message.getData())); } }

深入浅出Apache|深入浅出Apache Pulsar(2)(Pulsar消息机制)
文章图片

分片主题(Partitioned topics) 深入浅出Apache|深入浅出Apache Pulsar(2)(Pulsar消息机制)
文章图片

消息保留和过期(Message retention and expiry)
如果没有对Topic设置数据保留策略,一旦一个Topic的所有订阅的游标都已经成功消费到一个偏移量时,此偏移量前面的消息就会被自动删除。
如果Topic设置了数据保留策略,已经消费确认的消息超过保留策略阈值(Topic的消息存储大小、Topic中消息保留的时间)后会被删除。
深入浅出Apache|深入浅出Apache Pulsar(2)(Pulsar消息机制)
文章图片

conf/broker.conf
# Default message retention time # 默认0, 修改为3天=60*24*3 defaultRetentionTimeInMinutes=4320 # Default retention size # 默认为0, 修改为10G defaultRetentionSizeInMB=10240 # Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0) ttlDurationDefaultInSeconds=0

retention policy (for a namespace)
$ $PULSAR_HOME/bin/pulsar-admin namespaces \ get-retention public/default $ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/retention | python -m json.tool $ $PULSAR_HOME/bin/pulsar-admin namespaces \ set-retention public/default \ --size 1024M \ --time 5m $ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/retention \ --header "Content-Type:application/json" \ --data '{ "retentionTimeInMinutes" : 5, "retentionSizeInMB" : 1024 }'

message expiry / message-ttl
$ $PULSAR_HOME/bin/pulsar-admin namespaces \ get-message-ttl public/default $ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/messageTTL $ $PULSAR_HOME/bin/pulsar-admin namespaces \ set-message-ttl public/default \ --messageTTL 1800 $ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/messageTTL \ --header "Content-Type:application/json" \ --data '1800'

更多福利 云智慧已开源集轻量级、聚合型、智能运维为一体的综合运维管理平台OMP(Operation Management Platform) ,具备 纳管、部署、监控、巡检、自愈、备份、恢复 等功能,可为用户提供便捷的运维能力和业务管理,在提高运维人员等工作效率的同时,极大提升了业务的连续性和安全性。点击下方地址链接,欢迎大家给OMP点赞送star,了解更多相关内容~
GitHub地址: https://github.com/CloudWise-OpenSource/OMP
Gitee地址:https://gitee.com/CloudWise/OMP
微信扫描识别下方二维码,备注【OMP】加入AIOps社区运维管理平台OMP开发者交流群,与更多行业大佬一起交流学习~
深入浅出Apache|深入浅出Apache Pulsar(2)(Pulsar消息机制)
文章图片

系列阅读 【深入浅出Apache|深入浅出Apache Pulsar(2)(Pulsar消息机制)】深入浅出Apache Pulsar(1):Pulsar vs Kafka

    推荐阅读