Fllink实时计算运用(七)Flink 自定义序列化Protobuf接入实现方案
1. 自定义序列化接入方案(Protobuf)
在实际应用场景中, 会存在各种复杂传输对象,同时要求较高的传输处理性能, 这就需要采用自定义的序列化方式做相应实现, 这里以Protobuf为例做讲解。
功能: kafka对同一Topic的生产与消费,采用Protobuf做序列化与反序列化传输, 验证能否正常解析数据。
- 通过protobuf脚本生成JAVA文件
syntax = "proto3"; option java_package = "com.itcast.flink.connectors.kafka.proto"; option java_outer_classname = "AccessLogProto"; // 消息结构定义 message AccessLog {string ip = 1; string time = 2; string type = 3; string api = 4; string num = 5; }
@echo off
for %%i in (proto/*.proto) do (
d:/TestCode/protoc.exe --proto_path=./proto--java_out=../java./proto/%%i
echo generate %%i to java file successfully!
)
注意, 路径要配置正确。
- 自定义序列化实现
添加POM依赖:
org.apache.flink flink-connector-kafka_2.111.11.2 com.google.protobuf protobuf-java3.8.0 org.springframework spring-beans5.1.8.RELEASE
@Data
public class AccessLog implements Serializable {private String ip;
private String time;
private String type;
private String api;
private Integer num;
}
CustomSerialSchema:
/**
* 自定义序列化实现(Protobuf)
*/
public class CustomSerialSchema implements DeserializationSchema, SerializationSchema {private static final long serialVersionUID = 1L;
private transient Charset charset;
public CustomSerialSchema() {
this(StandardCharsets.UTF_8);
}public CustomSerialSchema(Charset charset) {
this.charset = checkNotNull(charset);
}public Charset getCharset() {
return charset;
}/**
* 反序列化实现
* @param message
* @return
*/
@Override
public AccessLog deserialize(byte[] message) {
AccessLog accessLog = null;
try {
AccessLogProto.AccessLog accessLogProto = AccessLogProto.AccessLog.parseFrom(message);
accessLog = new AccessLog();
BeanUtils.copyProperties(accessLogProto, accessLog);
return accessLog;
} catch (Exception e) {
e.printStackTrace();
}
return accessLog;
}@Override
public boolean isEndOfStream(AccessLog nextElement) {
return false;
}/**
* 序列化处理
* @param element
* @return
*/
@Override
public byte[] serialize(AccessLog element) {
AccessLogProto.AccessLog.Builder builder = AccessLogProto.AccessLog.newBuilder();
BeanUtils.copyProperties(element, builder);
return builder.build().toByteArray();
}/**
* 定义消息类型
* @return
*/
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(AccessLog.class);
}
}
- 通过flink对kafka消息生产者的实现
public class KafkaSinkApplication {public static void main(String[] args) throws Exception {// 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 读取Socket数据源 DataStreamSource socketStr = env.socketTextStream("localhost", 9911, "\n"); // 3. 转换处理流数据 SingleOutputStreamOperator outputStream = socketStr.map(new MapFunction() { @Override public AccessLog map(String value) throws Exception { System.out.println(value); // 根据分隔符解析数据 String[] arrValue = https://www.it610.com/article/value.split("\t"); // 将数据组装为对象 AccessLog log = new AccessLog(); log.setNum(1); for(int i=0; i
[root@flink1 kafka_2.12-1.1.1]# bin/kafka-console-consumer.sh --bootstrap-server10.10.20.132:9092--topic flink-serial
1601649380422GET"
getAccount
1601649381422POSTaddOrder
1601649382422POST"
- 【Fllink实时计算运用(七)Flink 自定义序列化Protobuf接入实现方案】通过flink对kafka消息订阅者的实现
public class KafkaSourceApplication {public static void main(String[] args) throws Exception {// 1. 创建运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 设置kafka服务连接信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); properties.setProperty("group.id", "fink_group"); // 3. 创建Kafka消费端 FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer( "flink-serial",// 目标 topic new CustomSerialSchema(),// 自定义序列化 properties); // 4. 读取Kafka数据源 DataStreamSource socketStr = env.addSource(kafkaProducer); socketStr.print().setParallelism(1); // 5. 执行任务 env.execute("job"); }}
本文由mirson创作分享,如需进一步交流,请加QQ群:19310171或访问www.softart.cn
推荐阅读
- 使用协程爬取网页,计算网页数据大小
- ACSL|ACSL 美国计算机科学联赛 2016-2017 R4 摩天大楼-Skyscraper 题解
- 首屏时间,你说你优化了,那你倒是计算出给给我看啊!
- ATAN2根据xy坐标计算角度
- 历史上的今天|【历史上的今天】2 月 16 日(世界上第一个 BBS 诞生;中国计算机教育开端;IBM 机器人赢得智能竞赛)
- 计算机网络基础TCP\HTTP\HTTPS
- Elasticsearch|Elasticsearch 简介
- 计算机网络|计算机网络——DHCP协议详解
- 实时|实时 OLAP 系统 Druid
- SRS(简单实时视频服务)|SRS(简单实时视频服务) 笔记(1)- 体验