Flink SQL快速构建实时计算平台

1 描述 利用FLINK SQL 构建一个数据实时分析平台,主要包含以下两个需要实现的需求点
1 FLINK SQL 从KAFKA消费数据,并将数据写入到ES 通过KIBANA查询
2 FLINK SQL 从KAFKA消费数据,计算每十分钟用户独立数据(UV)
2 系统版本 1 Java 8+
2 flink 1.10 +
3 elasticsearch 6+
4 kibana 6+
3 数据处理流程 Flink SQL快速构建实时计算平台
文章图片



4 启动Kafka并往里面写入数据 4.1 启动ZK
$ZOOKEEPER_HOME/bin/zkServer.sh start
Flink SQL快速构建实时计算平台
文章图片

4.2 $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
Flink SQL快速构建实时计算平台
文章图片

4.3 启动一个消费者
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user_behavior

4.4 用JAVA模拟数据写入数据到KAFKA
JAVA 关键代码如下

package com.bigdata.kafka.sourceparse.producer; import com.bigdata.kafka.sourceparse.KafkaConstants; import com.bigdata.kafka.sourceparse.bean.UserBehaviorObj; import com.bigdata.kafka.sourceparse.utils.JsonUtils; import kafka.utils.Json; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Random; /** * Created by 袁海龙 on 2018/11/15. */ public class KafkaProducerDemo {static SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd"); static SimpleDateFormat sdf2=new SimpleDateFormat("HH:mm:ss"); public static void main(String[] args) { Map kafkaParam=new HashMap<>(); String TOPIC=args.length==0?KafkaConstants.TOPIC:args[0]; System.out.println("TOPIC:"+TOPIC); kafkaParam.put("bootstrap.servers", KafkaConstants.KAFKA_BOOTSTRAP_SERVER); kafkaParam.put("key.serializer", KafkaConstants.KAFKA_SERIALIZER); kafkaParam.put("value.serializer", KafkaConstants.KAFKA_SERIALIZER); //自定义拦截器 //kafkaParam.put("interceptor.classes", "com.bigdata.kafka.sourceparse.producer.MyProducerInterceptor"); KafkaProducer kafkaProducer=new KafkaProducer(kafkaParam); int index=0; while (index<10000000) { String msg = genMessage(); ProducerRecord record = new ProducerRecord(TOPIC, msg); kafkaProducer.send(record); try { Thread.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } } kafkaProducer.close(); } /** * 构造消息 * @return */ public staticString genMessage(){ Date date=new Date(); StringBuffer ts=new StringBuffer(); ts.append(sdf1.format(date)) .append("T") .append(sdf2.format(date)) .append("Z"); String[] behavior={"pv","fav","chat","uv"}; UserBehaviorObj userBehaviorObj=new UserBehaviorObj(); userBehaviorObj.setUser_id(new Random().nextInt(1000000)); userBehaviorObj.setItem_id(new Random().nextInt(1000000)); userBehaviorObj.setCategory_id(new Random().nextInt(1000000)); userBehaviorObj.setBehavior(behavior[new Random().nextInt(behavior.length)]); userBehaviorObj.setTs(ts.toString()); String str=JsonUtils.toJson(userBehaviorObj); System.out.println(str); return str; }}


4.4 启动JAVA程序 消费者会一直收到数据如下
Flink SQL快速构建实时计算平台
文章图片

5 启动Flink ,ES ,KIBANA 5.1 启动Flink
$FLINK_HOME/bin/start-cluster.sh
Flink SQL快速构建实时计算平台
文章图片

在浏览器http://192.168.112.100:8081查看Flink是否已经启动
Flink SQL快速构建实时计算平台
文章图片


5.2 启动ES
在es用户下启动(es6 以后为了安全不能在root下启动)
用su es命令切换到es目录下
bin/elasticsearch
Flink SQL快速构建实时计算平台
文章图片

启动完成后在浏览器输入http://192.168.112.100:9200/查看是否启动成功


Flink SQL快速构建实时计算平台
文章图片


注意如果启动中出现如下错误

Flink SQL快速构建实时计算平台
文章图片

执行以下命令后在启动
echo 262144 > /proc/sys/vm/max_map_count
5.3 启动kibana
bin/kibana --allow-root
通过浏览器输入http://192.168.112.100:5601查看kibana启动是否成功
Flink SQL快速构建实时计算平台
文章图片


6 利用FLINK SQL 构建Flink实时计算平台 6.1 进入FLINK SQL 交互式界面
$FLINK_HOME/bin/sql-client.sh embedded
6.1 创建一个user_behavior 表接收kafka中Topic为user_behavior的数据
create table user_behavior( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), WATERMARK FOR ts as ts - INTERVAL '5' SECOND ) with( 'connector.type' = 'kafka', 'connector.version' = '0.10', 'connector.topic' = 'user_behavior', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'format.type' = 'json' )

Flink SQL快速构建实时计算平台
文章图片

【Flink SQL快速构建实时计算平台】查看user_behavior 表中是否持续有数据进来(java一直在模拟发送数据)
select * from user_behavior;
Flink SQL快速构建实时计算平台
文章图片



创建一个user_details表
create table user_details( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3) ) with( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://192.168.112.100:9200', 'connector.index' = 'user_details', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert' );

Flink SQL快速构建实时计算平台
文章图片

将user_behavior表的数据写入user_details,user_details会将数据写入到elasticsearch
insert into user_details
select user_id,item_id,category_id,behavior,ts from user_behavior;
Flink SQL快速构建实时计算平台
文章图片

执行了以上insert语句后可以去Flink的界面查看 他启动了一个任务在执行
Flink SQL快速构建实时计算平台
文章图片

同样我可以去kibana上面查看数据是否存在(关于kibana的具体怎么建立关系可以自行百度)
Flink SQL快速构建实时计算平台
文章图片

我们也可以利用Kibana 做很多有趣的可视化图


6.2 每十分钟统计一次最大的用户独立数
创建一个用户独立输的表
create table user_uv( time_str STRING, uv BIGINT ) with( 'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://192.168.112.100:9200', 'connector.index' = 'user_uv', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert' );


Flink SQL快速构建实时计算平台
文章图片

创建一个视图,便于后续计算

--每10分钟统计UV数据 create view uv_per_10min as select max(SUBSTR(DATE_FORMAT(ts,'HH:mm'),1,4) || '0') over w as time_str, count(distinct user_id) over w as uv from user_behavior window w as (order by proctime rows between unbounded preceding and CURRENT ROW) ;

Flink SQL快速构建实时计算平台
文章图片


写入数据
insert into user_uv select time_str,max(uv) uv from uv_per_10min group by time_str;

Flink SQL快速构建实时计算平台
文章图片

同样到这里以后可以去flink上看多了一个任务

Flink SQL快速构建实时计算平台
文章图片


到Kibana查询数据

Flink SQL快速构建实时计算平台
文章图片

到此利用Flink SQL 构建一个完整的实时分析就完成了
上面利用到的Flink,ES ,Kibana,Kafka等环境需要自己搭建,网上资料很多



    推荐阅读