flink学习

flink搭建
环境:centos7 docker
1.docker pull flink
2.创建文件docker-compose.yml,内容如下:

version: "2.1" services: jobmanager: image: flink:1.9.2-scala_2.12 expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: image: flink:1.9.2-scala_2.12 expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - "jobmanager:jobmanager" environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager

3.在文件目录下执行命令 docker-compose up -d
4.主机ip+8081查看控制台是否显示
flink学习
文章图片

flink项目搭建
1.maven创建flink项目
$ mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.13.1

2.pom.xml文件添加rabbitmq依赖 依赖如下
org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-rabbitmq_${scala.binary.version} ${flink.version} org.apache.logging.log4j log4j-slf4j-impl ${log4j.version} org.apache.logging.log4j log4j-api ${log4j.version} org.apache.logging.log4j log4j-core ${log4j.version}

【flink学习】3.WordCountJob
public class WordCountJob { public static void main(String[] args) throws Exception { //1.获取flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("1.117.78.150") .setPort(5672) .setUserName("guest") .setPassword("guest") .setVirtualHost("/") .build(); //2.连接socket获取输入的数据(数据源Data Source) //2.1. rabbitmq连接的配置,2.rabbitmq的队列名,消费的队列名 DataStream dataStreamSource = env.addSource(new RMQSource(connectionConfig, "test.flink",true, new SimpleStringSchema())); dataStreamSource.print(); //输出Source的信息//3.数据转换 //MapFunction:第一个参数是你接收的数据的类型 //MapFunction:第二个参数是返回数据的类型 DataStream MessageVo = dataStreamSource.map( (MapFunction) s -> s ); MessageVo.addSink(new MySink()); //5.这一行代码一定要实现,否则程序不执行 env.execute("liu er gou"); //SpringApplication.run(PaymentFlinkApplication.class, args); } }

4.MySink
public class MySink extends RichSinkFunction {private static final Logger log = LoggerFactory.getLogger(MySink.class); @Override public void invoke(String value, Context context) throws Exception { log.info(value); }@Override public void close() throws Exception { log.info("MySink close"); } }

5.maven install 拿到jar包
flink学习
文章图片

6.打开flink控制台 上传jar包 运行WordCountJob类 没有报错表示运行成功 同时控制台会有相应的job
flink学习
文章图片

flink学习
文章图片

7.rabbit队列发送消息操作 查看程序是否成功 flink控制台会显示
flink学习
文章图片

    推荐阅读