flink学习
flink搭建
环境:centos7 docker
1.docker pull flink
2.创建文件docker-compose.yml,内容如下:
version: "2.1"
services:
jobmanager:
image: flink:1.9.2-scala_2.12
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.9.2-scala_2.12
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
3.在文件目录下执行命令 docker-compose up -d
4.主机ip+8081查看控制台是否显示
文章图片
flink项目搭建
1.maven创建flink项目
$ mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-quickstart-java
-DarchetypeVersion=1.13.1
2.pom.xml文件添加rabbitmq依赖 依赖如下
org.apache.flink
flink-java
${flink.version}
org.apache.flink
flink-streaming-java_${scala.binary.version}
${flink.version}
org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
org.apache.flink
flink-connector-rabbitmq_${scala.binary.version}
${flink.version}
org.apache.logging.log4j
log4j-slf4j-impl
${log4j.version}
org.apache.logging.log4j
log4j-api
${log4j.version}
org.apache.logging.log4j
log4j-core
${log4j.version}
【flink学习】3.WordCountJob
public class WordCountJob {
public static void main(String[] args) throws Exception {
//1.获取flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("1.117.78.150")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.setVirtualHost("/")
.build();
//2.连接socket获取输入的数据(数据源Data Source)
//2.1. rabbitmq连接的配置,2.rabbitmq的队列名,消费的队列名
DataStream dataStreamSource = env.addSource(new RMQSource(connectionConfig,
"test.flink",true, new SimpleStringSchema()));
dataStreamSource.print();
//输出Source的信息//3.数据转换
//MapFunction:第一个参数是你接收的数据的类型
//MapFunction:第二个参数是返回数据的类型
DataStream MessageVo = dataStreamSource.map(
(MapFunction) s -> s
);
MessageVo.addSink(new MySink());
//5.这一行代码一定要实现,否则程序不执行
env.execute("liu er gou");
//SpringApplication.run(PaymentFlinkApplication.class, args);
}
}
4.MySink
public class MySink extends RichSinkFunction {private static final Logger log = LoggerFactory.getLogger(MySink.class);
@Override
public void invoke(String value, Context context) throws Exception {
log.info(value);
}@Override
public void close() throws Exception {
log.info("MySink close");
}
}
5.maven install 拿到jar包
文章图片
6.打开flink控制台 上传jar包 运行WordCountJob类 没有报错表示运行成功 同时控制台会有相应的job
文章图片
文章图片
7.rabbit队列发送消息操作 查看程序是否成功 flink控制台会显示
文章图片
推荐阅读
- 由浅入深理解AOP
- 继续努力,自主学习家庭Day135(20181015)
- python学习之|python学习之 实现QQ自动发送消息
- 一起来学习C语言的字符串转换函数
- 定制一套英文学习方案
- 漫画初学者如何学习漫画背景的透视画法(这篇教程请收藏好了!)
- 《深度倾听》第5天──「RIA学习力」便签输出第16期
- 如何更好的去学习
- 【韩语学习】(韩语随堂笔记整理)
- 焦点学习田源分享第267天《来访》