Flink双流join的3种方式及IntervalJoin源码分析

上下观古今,起伏千万途。这篇文章主要讲述Flink双流join的3种方式及IntervalJoin源码分析相关的知识,希望能为你提供帮助。

概述
在数据库中的静态表上做 OLAP 分析时,两表 join 是非常常见的操作。同理,在流式处理作业中,有时也需要在两条流上做 join 以获得更丰富的信息。Flink DataStream API 为用户提供了3个算子来实现双流 join,分别是:

  • join()
  • coGroup()
  • intervalJoin()
join()
join() 算子提供的语义为"Window join",即按照指定字段和(滚动/滑动/会话)窗口进行 inner join,支持处理时间和事件时间两种时间特征。
paymentInfo_ds.join(orderInfo__ds)
.where(_.order_id)
.equalTo(_.order_id)
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.apply(new JoinFunction[PaymentInfo,OrderInfo,PaymentWide]
override def join(first: PaymentInfo, second: OrderInfo): PaymentWide =
//处理逻辑
new PaymentWide(first, second)

)

coGroup()
只有 inner join 肯定还不够,如何实现 left/right outer join 呢?答案就是利用 coGroup() 算子。
它的调用方式类似于 join() 算子,也需要开窗,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流和/或右流的数据并输出。
paymentInfo_wm_ds.coGroup(orderInfo_ds)
.where(_.order_id)
.equalTo(_.order_id)
.window(TumblingEventTimeWindows.of(Time.seconds(20)))
.apply(new CoGroupFunction[PaymentInfo,OrderInfo,PaymentWide]()
override def coGroup(first: lang.Iterable[PaymentInfo], second: lang.Iterable[OrderInfo], out: Collector[PaymentWide]): Unit =
val f = first.iterator()
while (f.hasNext)
//处理左流数据

//处理右流数据


)

intervalJoin()
join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。
所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联,即:??right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]??
interval join 也是 inner join,虽然不需要开窗,但是需要用户指定偏移区间的上下界,并且只支持事件时间。注意在运行之前,需要分别在两个流上应用 assignTimestampsAndWatermarks() 方法获取事件时间戳和水印。
paymentInfo_ds.keyBy(_.order_id)
.intervalJoin(orderWide_wm_ds.keyBy(_.order_id))
.between(Time.minutes(-15), Time.minutes(0))
.process(new ProcessJoinFunction[PaymentInfo, OrderInfo, PaymentWide]()
override def processElement(in1: PaymentInfo,
in2: OrderInfo,
context: ProcessJoinFunction[PaymentInfo, OrderInfo, PaymentWide]#Context,
collector: Collector[PaymentWide]): Unit =
collector.collect(new PaymentWide(in1, in2))

)


由上可见,interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 ??upperBoundExclusive()/lowerBoundExclusive()?? 方法。
interval join 的实现原理及源码分析
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
/**
* Completes the join operation with the given user function that is executed for each joined pair
* of elements. This methods allows for passing explicit type information for the output type.
*
* @param processJoinFunction The user-defined process join function.
* @param outputTypeThe type information for the output type.
* @param < OUT> The output type.
* @return The transformed @link DataStream.
*/
@PublicEvolving
public < OUT> SingleOutputStreamOperator< OUT> process(
ProcessJoinFunction< IN1, IN2, OUT> processJoinFunction,
TypeInformation< OUT> outputType)
Preconditions.checkNotNull(processJoinFunction);
Preconditions.checkNotNull(outputType);

final ProcessJoinFunction< IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);

final IntervalJoinOperator< KEY, IN1, IN2, OUT> operator =
new IntervalJoinOperator< > (
lowerBound,
upperBound,
lowerBoundInclusive,
upperBoundInclusive,
left.getType().createSerializer(left.getExecutionConfig()),
right.getType().createSerializer(right.getExecutionConfig()),
cleanedUdf
);

return left
.connect(right)
.keyBy(keySelector1, keySelector2)
.transform("Interval Join", outputType, operator);

可见是先对两条流执行 connect() 和 keyBy() 操作,然后利用 IntervalJoinOperator 算子进行转换。在 IntervalJoinOperator 中,会利用两个 MapState 分别缓存左流和右流的数据。
org.apache.flink.streaming.api.operators.co.IntervalJoinOperator#initializeState 
private transient MapState< Long, List< BufferEntry< T1> > > leftBuffer;
private transient MapState< Long, List< BufferEntry< T2> > > rightBuffer;

@Override
public void initializeState(StateInitializationContext context) throws Exception
super.initializeState(context);

this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor< > (
LEFT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer< > (new BufferEntrySerializer< > (leftTypeSerializer))
));

this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor< > (
RIGHT_BUFFER,
LongSerializer.INSTANCE,
new ListSerializer< > (new BufferEntrySerializer< > (rightTypeSerializer))
));


  • Long 表示事件时间戳
  • List表示该时刻到来的数据记录
当左流和右流有数据到达时,会分别调用 processElement1() 和 processElement2() 方法,它们都调用了 processElement() 方法,代码如下。
/**
* Process a @link StreamRecord from the left stream. Whenever an @link StreamRecord
* arrives at the left stream, it will get added to the left buffer. Possible join candidates
* for that element will be looked up from the right buffer and if the pair lies within the
* user defined boundaries, it gets passed to the @link ProcessJoinFunction.
*
* @param record An incoming record to be joined
* @throws Exception Can throw an Exception during state access
*/
@Override
public void processElement1(StreamRecord< T1> record) throws Exception
processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);


/**
* Process a @link StreamRecord from the right stream. Whenever a @link StreamRecord
* arrives at the right stream, it will get added to the right buffer. Possible join candidates
* for that element will be looked up from the left buffer and if the pair lies within the user
* defined boundaries, it gets passed to the @link ProcessJoinFunction.
*
* @param record An incoming record to be joined
* @throws Exception Can throw an exception during state access
*/
@Override
public void processElement2(StreamRecord< T2> record) throws Exception
processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);


@SuppressWarnings("unchecked")
private < THIS, OTHER> void processElement(
final StreamRecord< THIS> record,
final MapState< Long, List< IntervalJoinOperator.BufferEntry< THIS> > > ourBuffer,
final MapState< Long, List< IntervalJoinOperator.BufferEntry< OTHER> > > otherBuffer,
final long relativeLowerBound,
final long relativeUpperBound,
final boolean isLeft) throws Exception

final THIS ourValue = https://www.songbingjia.com/android/record.getValue();
final long ourTimestamp = record.getTimestamp();

if (ourTimestamp == Long.MIN_VALUE)
throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
"interval stream joins need to have timestamps meaningful timestamps.");


if (isLate(ourTimestamp))
return;


addToBuffer(ourBuffer, ourValue, ourTimestamp);

for (Map.Entry< Long, List< BufferEntry< OTHER> > > bucket: otherBuffer.entries())
final long timestamp= bucket.getKey();

if (timestamp < ourTimestamp + relativeLowerBound ||
timestamp > ourTimestamp + relativeUpperBound)
continue;


for (BufferEntry< OTHER> entry: bucket.getValue())
if (isLeft)
collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
else
collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);




long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
if (isLeft)
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
else
internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);



  • 取得当前流 StreamRecord 的时间戳,调用 isLate() 方法判断它是否是迟到数据(即时间戳小于当前水印值),如是则丢弃。
  • 调用 addToBuffer() 方法,将时间戳和数据一起插入当前流对应的 MapState。
  • 遍历另外一个流的 MapState,如果数据满足前述的时间区间条件,则调用 collect() 方法将该条数据投递给用户定义的 ProcessJoinFunction 进行处理。collect() 方法的代码如下,注意结果对应的时间戳是左右流时间戳里较大的那个。
private boolean isLate(long timestamp)
long currentWatermark = internalTimerService.currentWatermark();
return currentWatermark != Long.MIN_VALUE & & timestamp < currentWatermark;


private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception
final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

collector.setAbsoluteTimestamp(resultTimestamp);
context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

userFunction.processElement(left, right, context, collector);


private static < T> void addToBuffer(
final MapState< Long, List< IntervalJoinOperator.BufferEntry< T> > > buffer,
final T value,
final long timestamp) throws Exception
List< BufferEntry< T> > elemsInBucket = buffer.get(timestamp);
if (elemsInBucket == null)
elemsInBucket = new ArrayList< > ();

elemsInBucket.add(new BufferEntry< > (value, false));
buffer.put(timestamp, elemsInBucket);


调用 TimerService.registerEventTimeTimer() 注册时间戳为 timestamp + relativeUpperBound 的定时器,该定时器负责在水印超过区间的上界时执行状态的清理逻辑,防止数据堆积。注意左右流的定时器所属的 namespace 是不同的,具体逻辑则位于 onEventTime() 方法中。
@Override
public void onEventTime(InternalTimer< K, String> timer) throws Exception

long timerTimestamp = timer.getTimestamp();
String namespace = timer.getNamespace();

logger.trace("onEventTime @ ", timerTimestamp);

switch (namespace)
case CLEANUP_NAMESPACE_LEFT:
long timestamp = (upperBound < = 0L) ? timerTimestamp : timerTimestamp - upperBound;
logger.trace("Removing from left buffer @ ", timestamp);
leftBuffer.remove(timestamp);
break;

case CLEANUP_NAMESPACE_RIGHT:
long timestamp = (lowerBound < = 0L) ? timerTimestamp + lowerBound : timerTimestamp;
logger.trace("Removing from right buffer @ ", timestamp);
rightBuffer.remove(timestamp);
break;

default:
throw new RuntimeException("Invalid namespace " + namespace);



【Flink双流join的3种方式及IntervalJoin源码分析】


    推荐阅读