Flink自定义HBasesink的过程

文末有官方的完整的HBasesink代码;
Flink自定义sink端
自定义sink主要有二种实现方式,一种是实现RichSinkFunction和SInkFuncion
如果要实现Checkpointed还需要实现CheckpointedFunction接口
推荐使用RichSinkFunction全生命周期函数
HBaseSink的实现
简单版 实现RichSinkFunction 但不具备通用性,没对一个源就需要修改代码,且没有提供构建方法,开始自己写的一个版本

import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * @author WangKaiYu * @date 2022-02-18 15:19 */ public class HbaseSink extends RichSinkFunction{ private final static Logger logger = LoggerFactory.getLogger(HbaseSink.class); private static org.apache.hadoop.conf.Configuration configuration; private static Connection connection= null; private static BufferedMutator mutator; private static String Tablename="student"; @Override public void open(Configuration parameters) throws Exception { configuration=HBaseConfiguration.create(); configuration.set("hbase.master", "192.168.10.154:16000"); configuration.set("hbase.zookeeper.quorum", "192.168.10.154,192.168.10.155,192.168.10.196"); configuration.set("hbase.zookeeper.property.clientPort", "2181"); try { //asyncConnection = ConnectionFactory.createAsyncConnection(configuration); connection = ConnectionFactory.createConnection(configuration); //connection.getBufferedMutator(TableName.valueOf(Tablename).) }catch (Exception e){ e.printStackTrace(); } BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(Tablename)); //缓存大小 params.writeBufferSize(2*1024*1024); //最大时间 params.setWriteBufferPeriodicFlushTimeoutMs(5 * 1000L); //AsyncConnection asyncConnection = HbaseSink.asyncConnection.get(); try { mutator = connection.getBufferedMutator(params); }catch (IOException e){ logger.error("当前获取bufferedMutator 失败:" + e.getMessage()); } } @Override public void close() throws Exception { if (mutator != null) { mutator.close(); } if (connection != null) { connection.close(); } } @Override public void invoke(String value, Context context) throws Exception { String familyName = "info"; String[] values = value.split(","); Put put = new Put(Bytes.toBytes(values[0])); put.addColumn(Bytes.toBytes(familyName),Bytes.toBytes("name"),Bytes.toBytes(values[1])); put.addColumn(Bytes.toBytes(familyName),Bytes.toBytes("age"),Bytes.toBytes(values[2])); mutator.mutate(put); // 指定时间内的数据强制刷写到hbase mutator.flush(); } }

源码介绍
1,主要类介绍
BufferedMutatorParams
实例化一个BufferedMutator所需要的参数。
主要参数TableName(表名),writeBufferSize(写缓存大小),maxKeyValueSize(最大key-value大小),ExecutorService(执行线程池),ExceptionListener(监听BufferedMutator的异常)。
Flink自定义HBasesink的过程
文章图片

AsyncProcess
AsyncProcess内部维护的有一个线程池,我们的操作会被封装成runnable,然后扔到线程池里执行。这个过程是异步的,直到任务数达到最大值。
HConnectionImplementation
一个集群的链接。通过它可以找到master,定位到regions的分布,保持locations的缓存,并指导如何校准localtions信息。
BufferedMutator构建的过程
1),首先是要构建一个HBaseConfiguration Configuration conf =HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "zookeeperHost"); 2),接着是构建BufferedMutatorParams final BufferedMutator.ExceptionListener listener = newBufferedMutator.ExceptionListener() { @Override public void onException(RetriesExhaustedWithDetailsException e,BufferedMutator mutator) { for (int i = 0; i < e.getNumExceptions(); i++) { LOG.info("Failed to sent put " + e.getRow(i) + "."); } } }; BufferedMutatorParams params = new BufferedMutatorParams(TABLE) .listener(listener); params.writeBufferSize(123); 3),最后构建HConnection Connection conn = ConnectionFactory.createConnection(getConf()) 4),最后构建BufferMutator BufferedMutator mutator = conn.getBufferedMutator(params)

数据发送的过程
1),构建put或者List[put] 2),调用BufferedMutator.mutate方法 3),刷写到hbase。 三种方法: 一是,显式调用BufferedMutator.flush 二是,发送结束的时候调用BufferedMutator.close 三是,它根据当前缓存大于了设置的写缓存大小

然后我有找到了一份flink官方的HBasesink源码但是官网确没有他的介绍, 相较上面更完整并且有了checkpoint 和异步写入异常的设置 默认线程池为一
import java.io.IOException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.flink.annotation.Internal; import org.apache.flink.connector.hbase.sink.HBaseMutationConverter; import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context; import org.apache.flink.util.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.BufferedMutatorParams; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.BufferedMutator.ExceptionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Internal public class HBaseSinkFunction extends RichSinkFunction implements CheckpointedFunction, ExceptionListener { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(HBaseSinkFunction.class); private final String hTableName; private final byte[] serializedConfig; private final long bufferFlushMaxSizeInBytes; private final long bufferFlushMaxMutations; private final long bufferFlushIntervalMillis; private final HBaseMutationConverter mutationConverter; private transient Connection connection; private transient BufferedMutator mutator; private transient ScheduledExecutorService executor; private transient ScheduledFuture scheduledFuture; private transient AtomicLong numPendingRequests; private transient volatile boolean closed = false; private final AtomicReference failureThrowable = new AtomicReference(); public HBaseSinkFunction(String hTableName, Configuration conf, HBaseMutationConverter mutationConverter, long bufferFlushMaxSizeInBytes, long bufferFlushMaxMutations, long bufferFlushIntervalMillis) { this.hTableName = hTableName; this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); this.mutationConverter = mutationConverter; this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes; this.bufferFlushMaxMutations = bufferFlushMaxMutations; this.bufferFlushIntervalMillis = bufferFlushIntervalMillis; } @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { LOG.info("start open ..."); Configuration config = this.prepareRuntimeConfiguration(); try { this.mutationConverter.open(); this.numPendingRequests = new AtomicLong(0L); if (null == this.connection) { this.connection = ConnectionFactory.createConnection(config); } BufferedMutatorParams params = (new BufferedMutatorParams(TableName.valueOf(this.hTableName))).listener(this); if (this.bufferFlushMaxSizeInBytes > 0L) { params.writeBufferSize(this.bufferFlushMaxSizeInBytes); } this.mutator = this.connection.getBufferedMutator(params); if (this.bufferFlushIntervalMillis > 0L && this.bufferFlushMaxMutations != 1L) { this.executor = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("hbase-upsert-sink-flusher")); this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> { if (!this.closed) { try { this.flush(); } catch (Exception var2) { this.failureThrowable.compareAndSet((Throwable) null, var2); } } }, this.bufferFlushIntervalMillis, this.bufferFlushIntervalMillis, TimeUnit.MILLISECONDS); } } catch (TableNotFoundException var4) { LOG.error("The table " + this.hTableName + " not found ", var4); throw new RuntimeException("HBase table '" + this.hTableName + "' not found.", var4); } catch (IOException var5) { LOG.error("Exception while creating connection to HBase.", var5); throw new RuntimeException("Cannot create connection to HBase.", var5); } LOG.info("end open."); } private Configuration prepareRuntimeConfiguration() throws IOException { Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(this.serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration()); if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get("hbase.zookeeper.quorum"))) { LOG.error("Can not connect to HBase without {} configuration", "hbase.zookeeper.quorum"); throw new IOException("Check HBase configuration failed, lost: 'hbase.zookeeper.quorum'!"); } else { return runtimeConfig; } } private void checkErrorAndRethrow() { Throwable cause = (Throwable)this.failureThrowable.get(); if (cause != null) { throw new RuntimeException("An error occurred in HBaseSink.", cause); } } @Override public void invoke(T value, Context context) throws Exception { this.checkErrorAndRethrow(); this.mutator.mutate(this.mutationConverter.convertToMutation(value)); if (this.bufferFlushMaxMutations > 0L && this.numPendingRequests.incrementAndGet() >= this.bufferFlushMaxMutations) { this.flush(); } } private void flush() throws IOException { this.mutator.flush(); this.numPendingRequests.set(0L); this.checkErrorAndRethrow(); } @Override public void close() throws Exception { this.closed = true; if (this.mutator != null) { try { this.mutator.close(); } catch (IOException var3) { LOG.warn("Exception occurs while closing HBase BufferedMutator.", var3); } this.mutator = null; } if (this.connection != null) { try { this.connection.close(); } catch (IOException var2) { LOG.warn("Exception occurs while closing HBase Connection.", var2); } this.connection = null; } if (this.scheduledFuture != null) { this.scheduledFuture.cancel(false); if (this.executor != null) { this.executor.shutdownNow(); } } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { while(this.numPendingRequests.get() != 0L) { this.flush(); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { } @Override public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException { this.failureThrowable.compareAndSet((Throwable) null, exception); } }

### 主代码
【Flink自定义HBasesink的过程】主要参数 表名,hbase配置信息,HBaseMutationConverter数据转换器,缓冲区最大大小字节,缓冲区最大写入请求数,缓冲区刷新间隔
package com.bool.ods; import com.bool.util.HBaseSinkFunction; import com.bool.util.HbaseSink; import com.bool.util.MyKafkaUtil; import org.apache.flink.connector.hbase.sink.HBaseMutationConverter; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.BufferedMutator; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import java.util.Date; /** * @author WangKaiYu * @date 2022-02-15 17:19 */ public class HbaseSinkTest { public static void main(String[] args) throws Exception { String Tablename="student"; Configuration configuration; configuration= HBaseConfiguration.create(); configuration.set("hbase.master", "192.168.10.154:16000"); configuration.set("hbase.zookeeper.quorum", "192.168.10.154,192.168.10.155,192.168.10.196"); configuration.set("hbase.zookeeper.property.clientPort", "2181"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(5); //DataStreamSource streamSource = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("hbasetest","hbasetest")); int count= 0; DataStreamSource streamSource = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("test","flink-hbase-tset")); BufferedMutator mutator; //表名,hbase配置信息,HBaseMutationConverter数据转换器,缓冲区最大大小字节,缓冲区最大写入请求数,缓冲区刷新间隔 streamSource.addSink(new HBaseSinkFunction(Tablename, configuration, new HBaseMutationConverter() { @Override public void open() { BufferedMutator mutator; } @Override public Mutation convertToMutation(String value) { String familyName = "info"; String[] values = value.split(","); Put put = new Put(Bytes.toBytes(values[0]+System.currentTimeMillis())); System.out.println(System.currentTimeMillis()); for (int i = 1; i < values.length; i++) { put.addColumn(Bytes.toBytes(familyName),Bytes.toBytes("FN"+i),Bytes.toBytes(values[i])); } return put; } },1024*1024*5,1000,3)); System.currentTimeMillis(); env.execute(); } }

    推荐阅读