十分钟手撕Flink双流JOIN面试

一箫一剑平生意,负尽狂名十五年。这篇文章主要讲述十分钟手撕Flink双流JOIN面试相关的知识,希望能为你提供帮助。
今天和大家聊聊Flink双流Join问题。这是一个高频面试点,也是工作中常遇到的一种真实场景。
如何保证Flink双流Join??准确性???和??及时性???、除了??窗口join??还存在哪些实现方式、究竟如何回答才能完全打动面试官呢。。你将在本文中找到答案。

1 引子1.1 数据库SQL中的JOIN
我们先来看看数据库SQL中的JOIN操作。如下所示的订单查询SQL,通过将订单表的??id???和订单详情表??order_id??关联,获取所有订单下的商品信息。

select 
      a.id  as  订单id,
      a.order_date  as  下单时间,
      a.order_amount  as  订单金额,
      b.order_detail_id  as  订单详情id,
      b.goods_name  as  商品名称,
      b.goods_price  as  商品价格,
      b.order_id  as  订单id
from 
      dwd_order_info_pfd  a
right  join 
      dwd_order_detail_pfd  b
on  a.id  =  b.order_id

这是一段很简单的SQL代码,就不详细展开叙述了。此处主要引出SQL中的JOIN类型,这里用到的是 ??right join?? , 即右连接。

  • ??left join??:保留左表全部数据和右表关联数据,右表非关联数据置NULL
  • ??right join??: 保留右表全部数据和左表关联数据,左表非关联数据置NULL
  • ??inner join??: 保留左表关联数据和右边关联数据
  • ??cross join??: 保留左表和右表数据笛卡尔积


基于关联键值逐行关联匹配,过滤表数据并生成最终结果,提供给下游数据分析使用。
就此打住,关于数据库SQL中的JOIN原理不再多赘述,感兴趣的话大家可自行研究,下面我们将目光转移到大数据领域看看吧。



1.2 离线场景下的JOIN
假设存在这样一个场景:
已知mysql数据库中订单表和订单明细表,且满足一对多的关系,统计T-1天所有订单的商品分布详情。
聪明的大家肯定已经给出了答案,没错~就是上面的SQL:
select  a.*,  b.*
from 
      dwd_order_info_pfd  a
right  join 
      dwd_order_detail_pfd  b
on  a.id  =  b.order_id

现在修改下条件:已知订单表和订单明细表均为??亿级别??数据,求相同场景下的分析结果。
咋办?此时关系型数据库貌似不大合适了~开始放大招:使用??大数据计算引擎??来解决。
考虑到T-1统计场景对时效性要求很低,可以使用Hive SQL来处理,底层跑Mapreduce任务。如果想提高运行速度,换成Flink或Spark计算引擎,使用内存计算。

至于查询SQL和上面一样,并将其封装成一个定时调度任务, 等系统调度运行。如果结果不正确的话,由于数据源和数据??静态???不变,大不了重跑,看起来感觉??皆大欢喜??~
可是好景不长,产品冤家此时又给了你一个无法拒绝的需求:??我要实时统计!!??
2 实时场景下的JOIN还是上面的场景,此时数据源换成了??实时???订单流和??实时???订单明细流,比如??Kafka??的两个topic,要求实时统计每分钟内所有订单下的商品分布详情。

现在情况貌似变得复杂了起来,简单分析下:
  1. 数据源。实时数据流,和静态流不同,数据是实时流入的且动态变化,需要计算程序支持实时处理机制。
  2. 关联性。前面提到??静态???数据执行多次join操作,左表和右表能关联的数据是很恒定的;而??实时数据流??(左右表)如果进入时机不一致,原本可以关联的数据会关联不上或者发生错误。
  3. 延迟性。实时统计,提供分钟甚至秒级别响应结果。
由于流数据join的特殊性,在满足??实时处理机制???、??低延迟???、??强关联性??的前提下,看来需要制定完善的数据方案,才能实现真正的流数据JOIN。


2.1 方案思路
我们知道订单数据和订单明细数据是一对多的关系,即一条订单数据对应着多条商品明细数据,毕竟买一件商品也是那么多邮费,不如打包团购。。而一条明细数据仅对应一条订单数据。
这样,双流join策略可以考虑如下思路:

  • 当数据流为订单数据时。无条件保留,无论当前是否关联到明细数据,均留作后续join使用。
  • 当数据流为明细数据时。在关联到其订单数据后,就可以say goodbye了,否则暂时保留等待下一次与订单数据的邂逅。
  • 完成所有处于同一时段内的订单数据和订单明细数据join, 清空存储状态


实际生产场景中,需要考虑更多的复杂情况,包括JOIN过程的数据丢失等异常情况的处理,此处仅示意。
好了,看起来我们已经有了一个马马虎虎的实时流JOIN方案雏形。
貌似可以准备动手大干一场了~ 别着急,有人已经帮我们偷偷的实现了:??Apache Flink??


3 Flink的双流JOIN
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
——来自Flink官网定义

这里我们只需要知道Flink是一个实时计算引擎就行了,主要关注其如何实现双流JOIN。
3.1 内部运行机制

  • ??内存计算???:Flink任务优先在内存中计算,内存不够时保存到访问高效的磁盘,提供??秒级??延迟响应。
  • ??状态强一致性??:Flink使用一致性快照保存状态,并定期检查本地状态、持久存储来保证状态一致性。
  • ??分布式执行??: Flink应用程序可以划分为无数个并行任务在集群中执行,几乎无限量使用CPU、主内存、磁盘和网络IO。
  • ??内置高级编程模型???:Flink编程模型抽象为SQL、Table、DataStream|DataSet API、Process四层,并封装成丰富功能的算子,其中就包含??JOIN类型??的算子。

仔细看看,我们前面章节讨论的实时流JOIN方案的前提是否都满足了呢?

  1. ??实时处理机制??: Flink天生即实时计算引擎
  2. ??低延迟??: Flink内存计算秒级延迟
  3. ??强关联性??: Flink状态一致性和join类算子
不由感叹, 这个Flink果然强啊~
保持好奇心,我们去瞅瞅Flink双流join的真正奥义!!
3.2 JOIN实现机制
Flink双流JOIN主要分为两大类。一类是基于原生State的Connect算子操作,另一类是基于窗口的JOIN操作。其中基于窗口的JOIN可细分为??window join???和??interval join??两种。

  • 实现原理:底层原理依赖Flink的??State状态存储??,通过将数据存储到State中进行关联join, 最终输出结果。

恍然大悟, Flink原来是通过State状态来缓存等待join的实时流。
这里给大家抛出一个问题:
用redis存储可不可以,state存储相比redis存储的区别?
更多细节欢迎大家一起探讨,添加个人微信: ??youlong525??拉您进群,还有免费Flink PDF领取~
回到正题,这几种方式到底是如何实现双流JOIN的?我们接着往下看。
注意: 后面内容将多以??文字??? + ??代码??的形式呈现,避免枯燥,我放了一堆原创示意图~

4 基于Window Join的双流JOIN实现机制顾名思义,此类方式利用Flink的??窗口机制??实现双流join。通俗理解,将两条实时流中元素分配到同一个时间窗口中完成Join。

  • 底层原理: 两条实时流数据缓存在??Window State???中,当窗口触发计算时,执行join操作。
4.1 join算子
先看看Window join实现方式之一的join算子。这里涉及到Flink中的窗口(window)概念,因此Window Joinan按照窗口类型区分的话某种程度来说可以细分出3种:

  • Tumbling Window Join (滚动窗口)
  • Sliding Window Join (滑动窗口)
  • Session Widnow Join(会话窗口)
两条流数据按照关联主键在(滚动、滑动、会话)窗口内进行??inner join??, 底层基于State存储,并支持处理时间和事件时间两种时间特征,看下源码:



源码核心总结:windows窗口 + state存储 + 双层for循环执行join()
现在让我们把时间轴往回拉一点点,在??实时场景JOIN??那里我们收到了这样的需求:统计每分钟内所有订单下的商品明细分布。
OK, 使用join算子小试牛刀一下。我们定义60秒的滚动窗口,将订单流和订单明细流通过order_id关联,得到如下的程序:
val  env  =  ...
//  kafka  订单流
val  orderStream  =  ... 
//  kafka  订单明细流
val  orderDetailStream  =  ...
       
orderStream.join(orderDetailStream)
        .where(r  =>   r._1)    //订单id
        .equalTo(r  =>   r._2)  //订单id
        .window(TumblingProcessTimeWindows.of(
                    Time.seconds(60)))
        .apply  (r1,  r2)  =>   r1  +  "  :  "  +  r2
        .print()

整个代码其实很简单,概要总结下:


  • 定义两条输入实时流A、B
  • A流调用join(b流)算子
  • 关联关系定义: where为A流关联键,equalTo为B流关联键,都是订单id
  • 定义window窗口(60s间隔)
  • apply方法定义逻辑输出
这样只要程序稳定运行,就能够持续不断的计算每分钟内订单分布详情,貌似解决问题了奥~
还是别高兴太早,别忘了此时的join类型是??inner join??。复习一下知识: inner join指的是仅保留两条流关联上的数据。
这样双流中没关联上的数据岂不是都丢掉了?别担心,Flink还提供了另一个window join操作: ??coGroup??算子。
4.2 coGroup算子
coGroup算子也是基于window窗口机制,不过coGroup算子比Join算子更加灵活,可以按照用户指定的逻辑匹配左流或右流数据并输出。
换句话说,我们通过自己指定双流的输出来达到left join和right join的目的。
【十分钟手撕Flink双流JOIN面试】现在来看看在相同场景下coGroup算子是如何实现left join:
#这里看看java算子的写法
orderDetailStream
    .coGroup(orderStream)
    .where(r  ->   r.getOrderId())
    .equalTo(r  ->   r.getOrderId())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
    .apply(new  CoGroupFunction< OrderDetail,  Order,  Tuple2< String,  Long> > () 
        @Override
        public  void  coGroup(Iterable< OrderDetail>   orderDetailRecords,  Iterable< Order>   orderRecords,  Collector< Tuple2< String,  Long> >   collector)   
            for  (OrderDetail  orderDetaill  :  orderDetailRecords) 
                boolean  flag  =  false;
                for  (Order  orderRecord  :  orderRecords) 
                    //  右流中有对应的记录
                    collector.collect(new  Tuple2< > (orderDetailRecords.getGoods_name(),  orderDetailRecords.getGoods_price()));
                    flag  =  true;
               
                if  (!flag) 
                    //  右流中没有对应的记录
                    collector.collect(new 

    推荐阅读