flink|flink 学习笔记 — 自定义 Sink 函数

flink Sink简介

  • flink 中有两个重要的概念,Source 和 Sink ,Source 决定了我们的数据从哪里产生,而 Sink 决定了数据将要去到什么地方。
  • flink 自带有丰富的 Sink,比如:kafka、csv 文件、ES、Socket 等等。
  • 当我们想要使用当前并未实现的 Sink 函数时,可以进行自定义。
自定义 Sink 函数
  • 这里主要自定义写入 kudu 的 kuduSink。
  • 自定义sink需要我们实现 SinkFunction,或者继承 RichSinkFunction,下面我们阅读源码来对其进行比较:
SinkFunction 函数,这是一个接口。
package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.functions.Function; import java.io.Serializable; /** * Interface for implementing user defined sink functionality. * * @param Input type parameter. */ @Public public interface SinkFunction extends Function, Serializable {/** * @deprecated Use {@link #invoke(Object, Context)}. */ @Deprecated default void invoke(IN value) throws Exception {}/** * Writes the given value to the sink. This function is called for every record. * * You have to override this method when implementing a {@code SinkFunction}, this is a * {@code default} method for backward compatibility with the old-style method only. * * @param value The input record. * @param context Additional context about the input record. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation *to fail and may trigger recovery. */ default void invoke(IN value, Context context) throws Exception { invoke(value); }/** * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about * an input record. * * The context is only valid for the duration of a * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use * afterwards! * * @param The type of elements accepted by the sink. */ @Public // Interface might be extended in the future with additional methods. interface Context {/** Returns the current processing time. */ long currentProcessingTime(); /** Returns the current event-time watermark. */ long currentWatermark(); /** * Returns the timestamp of the current input record or {@code null} if the element does not * have an assigned timestamp. */ Long timestamp(); } }

RichSinkFunction 函数,这个是一个抽象类。
package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.functions.AbstractRichFunction; /** * A {@link org.apache.flink.api.common.functions.RichFunction} version of {@link SinkFunction}. */ @Public public abstract class RichSinkFunction extends AbstractRichFunction implements SinkFunction {private static final long serialVersionUID = 1L; }

  • 由源码可以看到,RichSinkFunction 抽象类继承了 SinkFunction 接口,在使用过程中会更加灵活。通常情况下,在自定义 Sink 函数时,是继承 RichSinkFunction 来实现。
  • KuduSink 函数, 继承了 RichSinkFunction,重写了 open、close 和 invoke 方法,在 open 中进行 kudu 相关配置的初始化,在 invoke 中进行数据写入的相关操作,最后在 close 中关掉所有的开关。
package test; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.*; import org.apache.log4j.Logger; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.util.Map; public class SinkKudu extends RichSinkFunction {private final static Logger logger = Logger.getLogger(SinkKudu.class); private KuduClient client; private KuduTable table; private String kuduMaster; private String tableName; private Schema schema; private KuduSession kuduSession; private ByteArrayOutputStream out; private ObjectOutputStream os; public SinkKudu(String kuduMaster, String tableName) { this.kuduMaster = kuduMaster; this.tableName = tableName; }@Override public void open(Configuration parameters) throws Exception { out = new ByteArrayOutputStream(); os = new ObjectOutputStream(out); client = new KuduClient.KuduClientBuilder(kuduMaster).build(); table = client.openTable(tableName); schema = table.getSchema(); kuduSession = client.newSession(); kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); }@Override public void invoke(Map map) { if (map == null) { return; } try { int columnCount = schema.getColumnCount(); Insert insert = table.newInsert(); PartialRow row = insert.getRow(); for (int i = 0; i < columnCount; i++) { Object value = https://www.it610.com/article/map.get(schema.getColumnByIndex(i).getName()); insertData(row, schema.getColumnByIndex(i).getType(), schema.getColumnByIndex(i).getName(), value); }OperationResponse response = kuduSession.apply(insert); if (response != null) { logger.error(response.getRowError().toString()); } } catch (Exception e) { logger.error(e); } }@Override public void close() throws Exception { try { kuduSession.close(); client.close(); os.close(); out.close(); } catch (Exception e) { logger.error(e); } }// 插入数据 private void insertData(PartialRow row, Type type, String columnName, Object value) throws IOException {try { switch (type) { case STRING: row.addString(columnName, value.toString()); return; case INT32: row.addInt(columnName, Integer.valueOf(value.toString())); return; case INT64: row.addLong(columnName, Long.valueOf(value.toString())); return; case DOUBLE: row.addDouble(columnName, Double.valueOf(value.toString())); return; case BOOL: row.addBoolean(columnName, (Boolean) value); return; case INT8: row.addByte(columnName, (byte) value); return; case INT16: row.addShort(columnName, (short) value); return; case BINARY: os.writeObject(value); row.addBinary(columnName, out.toByteArray()); return; case FLOAT: row.addFloat(columnName, Float.valueOf(String.valueOf(value))); return; default: throw new UnsupportedOperationException("Unknown type " + type); } } catch (Exception e) { logger.error("数据插入异常", e); } } }

【flink|flink 学习笔记 — 自定义 Sink 函数】测试样例(这里使用了一个 UserInfo 的 pojo 类,包括 userid、name、age 三个属性,文内省略了)
package test; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; import java.util.Map; public class SinkTest {public static void main(String []args) throws Exception {// 初始化 flink 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 生成数据源 DataStreamSource dataSource = env.fromElements(new UserInfo("001", "Jack", 18), new UserInfo("002", "Rose", 20), new UserInfo("003", "Cris", 22), new UserInfo("004", "Lily", 19), new UserInfo("005", "Lucy", 21), new UserInfo("006", "Json", 24)); // 转换数据 map SingleOutputStreamOperator mapSource = dataSource.map(new MapFunction() {@Override public Map map(UserInfo value) throws Exception { Map map = new HashMap<>(); map.put("userid", value.userid); map.put("name", value.name); map.put("age", value.age); return map; } }); // sink 到 kudu String kuduMaster = "host"; String tableInfo = "tablename"; mapSource.addSink(new SinkKudu(kuduMaster, tableInfo)); env.execute("sink-test"); } }

小结
  • 这里自定义 SinkKudu 函数,通过一个简单样例进行测试。当然,这里的 source 可以换成读取 kafka 数据进行流式数据的处理。flink 读取 kafka,然后写入 kudu,是生产中实时 ETL 经常采用的方案。

    推荐阅读