flink流处理访问mysql

业务场景如下:
概述:采集工厂设备的数据。
flink连接emqtt,采集工业物联网的数据,进行流处理,工厂设备数据内没有machID, 需要从mysq的设备信息基础表根据gateMac获取对应的machID。
访问mysql实现思路(一):
flink项目在初始化的时候从mysql获取所有设备的基础信息,所遇问题:如果新增一个设备,flink项目需要重启一次,从mysql来获取全部的machID,这样效果很差,被否定。
实现思路(二):
将flink读取mysql写成一个单流, 每5分钟重新获取一次,定时刷新, 将结果写入map中。

private static Map deviceMap = new Hashtable(); /** *从mysql获取machID, 五分钟刷新一次本博客讲解的地方 */ DataStream deviceStream = env.addSource(new JdbcReader()); deviceStream.broadcast().map(new MapFunction() { @Override public Object map(Map value) { deviceMap = value; return null; } });


SourceMain.java (flink处理数据的主项目)
package com.flink; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.flink.config.flinkConstants; import com.flink.model.DeviceAlarm; import com.flink.model.DeviceData; import com.flink.utils.emqtt.EmqttSource; import com.flink.utils.mysql.JdbcReader; import com.flink.utils.mysql.JdbcWriter; import com.flink.utils.opentsdb.OpnetsdbWriter; import com.flink.utils.redis.RedisWriter; import com.google.gson.Gson; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.util.*; public class EmqttFlinkMain { private static Map deviceMap = new Hashtable(); public static void main(String[] args) throws Exception { flinkConstants fc = flinkConstants.getInstance(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /** *从mysql获取machID, 五分钟刷新一次本博客讲解的地方 */ DataStream deviceStream = env.addSource(new JdbcReader()); deviceStream.broadcast().map(new MapFunction() { @Override public Object map(Map value) { deviceMap = value; return null; } }); // ========================================================================//emqtt DataStream inputStream = env.addSource(new EmqttSource()); /** *数据类型 */ DataStream dataStream = inputStream .rebalance() .flatMap(new FlatMapFunction() { @Override public void flatMap(Tuple2 value, Collector out) { String message = value.f0; String topic = value.f1; List d = DataHandle(message, topic); for (DeviceData line : d) { out.collect(line); } } }); //写入opentsdb dataStream.addSink(new OpnetsdbWriter()).name("opentsdb"); //写入redis SingleOutputStreamOperator keyedStream = dataStream .map(new MapFunction() { @Override public Tuple2 map(DeviceData value) { String key = value.getCompID() + "/" + value.getMachID() + "/" + value.getOperationValue(); return Tuple2.of(key, value.getOperationData()); } }) .keyBy(0) .timeWindow(Time.seconds(3)) .process(new ProcessWindowFunction() { @Override public void process(Tuple tuple, Context context, Iterable elements, Collector out) throws Exception { Iterator iter = elements.iterator(); while (iter.hasNext()) { Tuple2 temp = iter.next(); if (!iter.hasNext()) { out.collect(temp); } } } }); keyedStream.addSink(new RedisWriter()).name("redis"); /** *告警类型 */ //写入mysql DataStream> alarmStream = inputStream.filter(new FilterFunction() { @Override public boolean filter(Tuple2 value) throws Exception { JSONObject AlarmObject = JSON.parseObject(value.f0); String dataType = (String) AlarmObject.get("type"); return dataType.equals("Alarm") || dataType.equals("alarm"); } }).map(new MapFunction>() { @Override public List map(Tuple2 s) throws Exception { return alarmDnalysis(s.f0, s.f1); } }); //调用JdbcWriter alarmStream.addSink(new JdbcWriter()).name("mysql").setParallelism(3); //调用JdbcWriterAsyncFunction //// create async function, which will *wait* for a while to simulate the process of async i/o //AsyncFunction, String> function = new JdbcWriterAsyncFunction(); // //// add async operator to streaming job //AsyncDataStream.orderedWait( //alarmStream, //function, //10000L, //TimeUnit.MILLISECONDS, //20).name("async write mysql").setParallelism(3); env.execute("EmqttFlinkMain"); }private static List DataHandle(String message, String topic) { List d = new ArrayList<>(); try { JSONObject DataObject = JSON.parseObject(message); String dataType = (String) DataObject.get("type"); if (dataType.equals("Data") || dataType.equals("data")) { String[] array = topic.split("/"); JSONArray dataList = JSON.parseArray(DataObject.get("values").toString()); String machID = deviceMap.get(array[1]); if (machID != null) { for (int i = 0; i < dataList.size(); i++) { DeviceData d1 = new DeviceData(); JSONObject dataDict = dataList.getJSONObject(i); d1.setMachID(machID); d1.setCompID(array[0]); d1.setGateMac(array[1]); d1.setOperationValue(dataDict.get("name").toString()); d1.setOperationData(dataDict.get("data").toString()); d1.setGatherTime(dataDict.get("time").toString()); d.add(d1); } } else { System.out.println("无法解析数据"); } } } catch (Throwable t) { t.printStackTrace(); } return d; } }


flink自定义数据源mysql

JdbcReader.java (读取mysql)
package com.flink.utils.mysql; import com.flink.config.flinkConstants; import com.mysql.jdbc.Driver; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Hashtable; import java.util.Map; //RichSourceFunction RichParallelSourceFunction public class JdbcReader extends RichSourceFunction { private static final Logger logger = LoggerFactory.getLogger(JdbcReader.class); private Connection connection = null; private PreparedStatement ps = null; private volatile boolean isRunning = true; //该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类 @Override public void open(Configuration parameters) throws Exception { super.open(parameters); flinkConstants fc = flinkConstants.getInstance(); DriverManager.registerDriver(new Driver()); String db_url = "jdbc:mysql://" + fc.JDBC_HOST + ":" + fc.JDBC_PORT + "/" + fc.JDBC_DATABASE; connection = DriverManager.getConnection(db_url, fc.JDBC_USERNAME, fc.JDBC_PASSWORD); //获取连接 ps = connection.prepareStatement("select machID, gateMac from dac_machinestatus where operationFlag=9"); }//执行查询并获取结果 @Override public void run(SourceContext ctx) throws Exception { Map DeviceMap = new Hashtable(); try { while (isRunning) { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { String gateMac = resultSet.getString("gateMac"); String machID = resultSet.getString("machID"); if (!(gateMac.isEmpty() && machID.isEmpty())) { DeviceMap.put(gateMac, machID); } } System.out.println("DeviceMap>>>>>>" + DeviceMap); ctx.collect(DeviceMap); //发送结果 DeviceMap.clear(); Thread.sleep(5000 * 60); } } catch (Exception e) { logger.error("runException:{}", e); } }//关闭数据库连接 @Override public void cancel() { try { super.close(); if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } catch (Exception e) { logger.error("runException:{}", e); } isRunning = false; } }


有趣的是在开发中发现了一个问题:
RichSourceFunction读取mysql, 将结果存在public static Map DeviceMap = new Hashtable(); 中, 最后集群中只有一台服务器DeviceMap 获取到数据, 其他的都为空。
修改代码, RichSourceFunction读取mysql, map算子操作前加一个broadcast()算子操作, 这样将一个节点的数据广播到所有的节点。 最后集群中所有的服务器DeviceMap 都获取到数据。
flink流处理访问mysql
文章图片


pom.xml
4.0.0FlinkDataHandle FlinkDataHandle 1.0-SNAPSHOTjarFlinkDataHandle http://maven.apache.orgUTF-8 2.11.11 2.11 2.2.0 0.9.1.1 5.1.26 1.1.41 1.2.0 1.4.2 aliyun http://maven.aliyun.com/nexus/content/groups/public/ cloudera https://repository.cloudera.com/artifactory/cloudera-repos/ jboss http://repository.jboss.org/nexus/content/groups/public org.apache.flink flink-streaming-java_${scala.compat.version} ${flink.version} org.apache.flink flink-connector-twitter_${scala.compat.version} ${flink.version} org.apache.flink flink-connector-kafka-0.10_${scala.compat.version} ${flink.version} org.apache.flink flink-statebackend-rocksdb_${scala.compat.version} ${flink.version} org.apache.flink flink-table_${scala.compat.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.compat.version} ${flink.version} org.apache.flink flink-hbase_${scala.compat.version} ${flink.version} org.apache.flink flink-connector-redis_2.11 1.1.5 org.apache.hadoop hadoop-client 2.6.0 org.apache.hbase hbase-server ${hbase.version} org.apache.hbase hbase-common ${hbase.version} org.apache.commons commons-pool2 2.4.2 org.json json 20180130 com.alibaba fastjson 1.2.47 org.slf4j slf4j-api 1.7.25 org.slf4j slf4j-log4j12 1.7.25 test redis.clients jedis 2.3.0 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.0.2 org.apache.flink flink-connector-rabbitmq_2.11 1.7.2 org.fusesource.mqtt-client mqtt-client 1.12 org.apache.flink flink-table_2.11 1.7.2 mysql mysql-connector-java 5.1.35 org.apache.commons commons-dbcp2 2.1.1 src/main/java src/test/java【flink流处理访问mysql】maven-assembly-plugin jar-with-dependencies com.flink.EmqttFlinkMain make-assemblypackage single org.codehaus.mojo exec-maven-plugin 1.2.1 exec java true false compile com.stars org.apache.maven.plugins maven-compiler-plugin 1.8 1.8






    推荐阅读