Flink SQL快速构建实时计算平台
1 描述 利用FLINK SQL 构建一个数据实时分析平台,主要包含以下两个需要实现的需求点
1 FLINK SQL 从KAFKA消费数据,并将数据写入到ES 通过KIBANA查询
2 FLINK SQL 从KAFKA消费数据,计算每十分钟用户独立数据(UV)
2 系统版本 1 Java 8+
2 flink 1.10 +
3 elasticsearch 6+
4 kibana 6+
3 数据处理流程
文章图片
4 启动Kafka并往里面写入数据 4.1 启动ZK
$ZOOKEEPER_HOME/bin/zkServer.sh start
文章图片
4.2 $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
文章图片
4.3 启动一个消费者
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user_behavior
4.4 用JAVA模拟数据写入数据到KAFKA
JAVA 关键代码如下
package com.bigdata.kafka.sourceparse.producer;
import com.bigdata.kafka.sourceparse.KafkaConstants;
import com.bigdata.kafka.sourceparse.bean.UserBehaviorObj;
import com.bigdata.kafka.sourceparse.utils.JsonUtils;
import kafka.utils.Json;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
* Created by 袁海龙 on 2018/11/15.
*/
public class KafkaProducerDemo {static SimpleDateFormat sdf1=new SimpleDateFormat("yyyy-MM-dd");
static SimpleDateFormat sdf2=new SimpleDateFormat("HH:mm:ss");
public static void main(String[] args) {
Map kafkaParam=new HashMap<>();
String TOPIC=args.length==0?KafkaConstants.TOPIC:args[0];
System.out.println("TOPIC:"+TOPIC);
kafkaParam.put("bootstrap.servers", KafkaConstants.KAFKA_BOOTSTRAP_SERVER);
kafkaParam.put("key.serializer", KafkaConstants.KAFKA_SERIALIZER);
kafkaParam.put("value.serializer", KafkaConstants.KAFKA_SERIALIZER);
//自定义拦截器
//kafkaParam.put("interceptor.classes", "com.bigdata.kafka.sourceparse.producer.MyProducerInterceptor");
KafkaProducer kafkaProducer=new KafkaProducer(kafkaParam);
int index=0;
while (index<10000000) {
String msg = genMessage();
ProducerRecord record = new ProducerRecord(TOPIC, msg);
kafkaProducer.send(record);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
kafkaProducer.close();
}
/**
* 构造消息
* @return
*/
public staticString genMessage(){
Date date=new Date();
StringBuffer ts=new StringBuffer();
ts.append(sdf1.format(date))
.append("T")
.append(sdf2.format(date))
.append("Z");
String[] behavior={"pv","fav","chat","uv"};
UserBehaviorObj userBehaviorObj=new UserBehaviorObj();
userBehaviorObj.setUser_id(new Random().nextInt(1000000));
userBehaviorObj.setItem_id(new Random().nextInt(1000000));
userBehaviorObj.setCategory_id(new Random().nextInt(1000000));
userBehaviorObj.setBehavior(behavior[new Random().nextInt(behavior.length)]);
userBehaviorObj.setTs(ts.toString());
String str=JsonUtils.toJson(userBehaviorObj);
System.out.println(str);
return str;
}}
4.4 启动JAVA程序 消费者会一直收到数据如下
文章图片
5 启动Flink ,ES ,KIBANA 5.1 启动Flink
$FLINK_HOME/bin/start-cluster.sh
文章图片
在浏览器http://192.168.112.100:8081查看Flink是否已经启动
文章图片
5.2 启动ES
在es用户下启动(es6 以后为了安全不能在root下启动)
用su es命令切换到es目录下
bin/elasticsearch
文章图片
启动完成后在浏览器输入http://192.168.112.100:9200/查看是否启动成功
文章图片
注意如果启动中出现如下错误
文章图片
执行以下命令后在启动
echo 262144 > /proc/sys/vm/max_map_count
5.3 启动kibana
bin/kibana --allow-root
通过浏览器输入http://192.168.112.100:5601查看kibana启动是否成功
文章图片
6 利用FLINK SQL 构建Flink实时计算平台 6.1 进入FLINK SQL 交互式界面
$FLINK_HOME/bin/sql-client.sh embedded
6.1 创建一个user_behavior 表接收kafka中Topic为user_behavior的数据
create table user_behavior(
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),
WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) with(
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = 'user_behavior',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'json'
)
文章图片
【Flink SQL快速构建实时计算平台】查看user_behavior 表中是否持续有数据进来(java一直在模拟发送数据)
select * from user_behavior;
文章图片
创建一个user_details表
create table user_details(
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
)
with(
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://192.168.112.100:9200',
'connector.index' = 'user_details',
'connector.document-type' = 'user_behavior',
'format.type' = 'json',
'update-mode' = 'upsert'
);
文章图片
将user_behavior表的数据写入user_details,user_details会将数据写入到elasticsearch
insert into user_details
select user_id,item_id,category_id,behavior,ts from user_behavior;
文章图片
执行了以上insert语句后可以去Flink的界面查看 他启动了一个任务在执行
文章图片
同样我可以去kibana上面查看数据是否存在(关于kibana的具体怎么建立关系可以自行百度)
文章图片
我们也可以利用Kibana 做很多有趣的可视化图
6.2 每十分钟统计一次最大的用户独立数
创建一个用户独立输的表
create table user_uv(
time_str STRING,
uv BIGINT
)
with(
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://192.168.112.100:9200',
'connector.index' = 'user_uv',
'connector.document-type' = 'user_behavior',
'format.type' = 'json',
'update-mode' = 'upsert'
);
文章图片
创建一个视图,便于后续计算
--每10分钟统计UV数据
create view uv_per_10min as
select
max(SUBSTR(DATE_FORMAT(ts,'HH:mm'),1,4) || '0') over w as time_str,
count(distinct user_id) over w as uv
from user_behavior
window w as (order by proctime rows between unbounded preceding and CURRENT ROW)
;
文章图片
写入数据
insert into user_uv
select time_str,max(uv) uv from uv_per_10min
group by time_str;
文章图片
同样到这里以后可以去flink上看多了一个任务
文章图片
到Kibana查询数据
文章图片
到此利用Flink SQL 构建一个完整的实时分析就完成了
上面利用到的Flink,ES ,Kibana,Kafka等环境需要自己搭建,网上资料很多
推荐阅读
- py连接mysql
- 2019-01-18Mysql中主机名的问题
- MySql数据库备份与恢复
- mysql|InnoDB数据页结构
- 数据库|SQL行转列方式优化查询性能实践
- mysql中视图事务索引与权限管理
- MYSQL主从同步的实现
- MySQL数据库的基本操作
- neo4j|neo4j cql语句 快速查询手册
- javaweb|基于Servlet+jsp+mysql开发javaWeb学生成绩管理系统