深入浅出Apache|深入浅出Apache Pulsar(3)(Pulsar Schema)

Pulsar Schema Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types to more complex application-specific types.

  • 类型安全(序列化和反序列化)
  • Schema 帮助 Pulsar 保留了数据在其他系统中原有的含义
Schema类型(Schema type)
  • Primitive type
Producer producer = client.newProducer(Schema.STRING).create(); producer.newMessage().value("Hello Pulsar!").send(); Consumer consumer = client.newConsumer(Schema.STRING).subscribe(); consumer.receive();

  • Complex type
1. keyvalue key/value pair.
Schema> schema = Schema.KeyValue( Schema.INT32, Schema.STRING, KeyValueEncodingType.SEPARATED ); // Producer Producer> producer = client.newProducer(schema) .topic(TOPIC) .create(); final int key = 100; final String value = "https://www.it610.com/article/value-100"; producer.newMessage().value(new KeyValue<>(key, value)).send(); // Consumer Consumer> consumer = client.newConsumer(schema) .topic(TOPIC).subscriptionName(SubscriptionName).subscribe(); Message> msg = consumer.receive();

2.struct AVRO, JSON, and Protobuf.
Producer producer = client.newProducer(Schema.AVRO(User.class)).create(); producer.newMessage().value(User.builder().userName("pulsar-user").userId(1L).build()).send(); Consumer consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe(); User user = consumer.receive();

Schema工作方式(How does schema work) Producer
深入浅出Apache|深入浅出Apache Pulsar(3)(Pulsar Schema)
文章图片

Consumer
深入浅出Apache|深入浅出Apache Pulsar(3)(Pulsar Schema)
文章图片

Schema管理(Schema manual management) 查询Schema
$ $PULSAR_HOME/bin/pulsar-admin schemas \ get persistent://public/default/spirit-avro-topic $ $PULSAR_HOME/bin/pulsar-admin schemas \ get persistent://public/default/spirit-avro-topic \ --version=2

更新Schema
$ $PULSAR_HOME/bin/pulsar-admin schemas upload \ persistent://public/default/test-topic \ --filename $PULSAR_HOME/connectors/json-schema.json

提取Schema
$ $PULSAR_HOME/bin/pulsar-admin schemas \ extract persistent://public/default/test-topic\ --classname com.cloudwise.modal.Packet \ --jar ~/cloudwise-pulsar-1.0.0-RELEASE.jar \ --type json public void schemaInfo() { System.out.println("AvroSchema:" + AvroSchema.of(SeedEvent.class).getSchemaInfo()); System.out.println("Schema.AVRO:" + Schema.AVRO(SeedEvent.class).getSchemaInfo()); }

删除Schema
$ $PULSAR_HOME/bin/pulsar-admin schemas \ delete persistent://public/default/spirit-avro-topic

更多福利 云智慧已开源集轻量级、聚合型、智能运维为一体的综合运维管理平台OMP(Operation Management Platform) ,具备 纳管、部署、监控、巡检、自愈、备份、恢复 等功能,可为用户提供便捷的运维能力和业务管理,在提高运维人员等工作效率的同时,极大提升了业务的连续性和安全性。点击下方地址链接,欢迎大家给OMP点赞送star,了解更多相关内容~
GitHub地址:https://github.com/CloudWise-OpenSource/OMP
Gitee地址:https://gitee.com/CloudWise/OMP
微信扫描识别下方二维码,备注【OMP】加入AIOps社区运维管理平台OMP开发者交流群,与更多行业大佬一起交流学习~
深入浅出Apache|深入浅出Apache Pulsar(3)(Pulsar Schema)
文章图片

系列阅读 【深入浅出Apache|深入浅出Apache Pulsar(3)(Pulsar Schema)】深入浅出Apache Pulsar(1):Pulsar vs Kafka
深入浅出Apache Pulsar(2):Pulsar消息机制

    推荐阅读