ApacheBeam:PCollection

那么,为什么 Beam 需要 PCollection 这样一个全新的抽象数据结构呢?
我们知道,不同的技术系统有不同的数据结构。比如,C++ 里有 vector、unordered_map,安卓有 ListView。相比它们而言,其实 Beam 的数据结构体系是很单调的,几乎所有数据都能表达为 PCollection。
PCollection,就是 Parallel Collection,意思是可并行计算的数据集。如果你之前学习了 Spark 的章节,就会发现 PCollection 和 RDD 十分相似。
在一个分布式计算系统中,我们作为架构设计者需要为用户隐藏的实现细节有很多,其中就包括了数据是怎样表达和存储的。
这个数据可能是来自于内存的数据(内部可能只是由一个 C++ array 存储);也有可能是来自外部文件(由几个文件存储);或者是来自于 MySQL 数据库(由数据库的行来表达)。
如果没有一个统一的数据抽象的话,开发者就需要不停地更改代码。比如,在小规模测试的时候用 C++ vector,等到了真实的生产环境,我再换 MySQL row。沉溺于这样的实现细节会让开发者无法集中注意力在真正重要的事情上,那就是“你想怎样处理数据”。
清楚了这些,你就能明白我们需要一层抽象来表达数据,而这层抽象就是 PCollection。
PCollection 的创建完全取决于你的需求。比如,在测试中 PCollection 往往来自于代码生成的伪造数据,或者从文件中读取。
Python

lines = (p| beam.Create(['To be, or not to be: that is the question. ']))lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://some/inputData.txt'

Java
PCollection lines = p.apply("ReadMyFile", TextIO.read().from("protocol://path/to/some/inputData.txt"));

为什么 PCollection 需要 Coders? 与普通编程相比,PCollection 的另一个不同之处是,你需要为 PCollection 的元素编写 Coder。例如,你有一个自己的类 MyClass,那么 PCollection一定需要实现 Coder
刚开始使用 Beam 时,你可能会感觉这很不方便。例如,你只是要去读取 MySQL 的一个表,也得为此实现 Coder。
Coder 的作用和 Beam 的本质紧密相关。因为你的计算流程最终会运行在一个分布式系统。所以,所有的数据都有可能在网络上的计算机之间相互传递。而 Coder 就是在告诉 Beam,怎样把你的数据类型序列化和逆序列化,以方便在网络上传输。
Coder 需要注册进全局的 CoderRegistry 中,简单来说,是为你自己的数据类型建立与 Coder 的对应关系。不然每次你都需要手动指定 Coder。
Python
apache_beam.coders.registry.register_coder(int, BigEndianIntegerCoder)

Java
PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); CoderRegistry cr = p.getCoderRegistry(); cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);

为什么 PCollection 是无序的? 讲完为什么 PCollection 需要 Coder 之后,我们再来看下,为什么 PCollection 是无序的。
PCollection 的无序特性其实也和它的分布式本质有关。一旦一个 PCollection 被分配到不同的机器上执行,那么为了保证最大的处理输出,不同机器都是独立运行的。所以,它的执行顺序就无从得知了。可能是第一个元素先被运行,也可能是第二个元素先被运行。所以,肯定不会有 PCollection[2] 这样的运算符。
为什么 PCollection 没有固定大小? 无序也就算了,为什么 PCollection 还没有固定大小呢?
前面的章节中讲到过,Beam 想要统一批处理和流处理,所以它要统一表达有界数据和无界数据。正因为如此,PCollection 并没有限制它的容量。如前面所说,它可能表达内存上的一个数组,也可能表达整个数据库的所有数据。
一个 PCollection 可以是有界的,也可以是无界的。一个有界的 PCollection 表达了一个已知大小的固定的数据集。一个无界的 PCollection 表达了一个无限大小的数据集。事实上一个 PCollection 是否有界,往往取决于它是如何产生的。
从批处理的数据源中读取,比如一个文件或者是一个数据库,就会产生有界的 PColleciton。如果从流式的或者是持续更新的数据库中读取,比如 pub/sub 或者 kafka,会产生一个无界的 PCollection。
但是,PCollection 的有界和无界特性会影响到 Beam 的数据处理方式。一个批处理作业往往处理有界数据。而无界的 PCollection 需要流式的作业来连续处理。
在实现中,Beam 也是用 window 来分割持续更新的无界数据。所以,一个流数据可以被持续地拆分成不同的小块。这样的处理方式我们会在实战部分展开。
如何理解 PCollection 的不可变性? 在普通编程语言中,大部分数据结构都是可变的。
Python
Alist = []alist.append(1)

C++
Std::vector list; list.push_back(1);

但是 PCollection 不提供任何修改它所承载数据的方式。修改一个 PCollection 的唯一方式就是去转化 (Transform) 它,下一讲会展开讲 Transformation。
但是在这一讲,我们需要理解的是,Beam 的 PCollection 都是延迟执行(deferred execution)的模式。也就是说,当你下面这样的语句的时候,什么也不会发生。
Java
PCollection p1 = ...; PCollection p2 = doSomeWork(p1);

这样的语句执行完,p2 这个 PCollection 仅仅会记录下自己是由 doSomeWork 这个操作计算而来的,和计算自己所需要的数据 p1。当你执行写完 100 行的 beam 的运算操作,最终的结果仅仅是生成了一个有向无环图(DAG),也就是执行计划(execution plan)。
为什么这么设计呢?如果你记得我们在专栏第一部分讲到的大规模数据框架设计,可能会有印象。这样的有向无环图是框架能够自动优化执行计划的核心。
ApacheBeam:PCollection
文章图片
image.png 类似图中这样的数据处理流程,在 Beam 获知了整个数据处理流程后,就会被优化成下图所示。
ApacheBeam:PCollection
文章图片
image.png 这样的优化,在 Beam 中被称为 sibling fusion。类似的操作优化我们后面会继续介绍。在这个小标题下,我想用这个优化的例子说明,PCollection 下的数据不可变是因为改变本身毫无意义。
【ApacheBeam:PCollection】例如,在刚才这个例子中,你会发现,优化后的执行计划里已经没有了数据 A0。因为,Beam 发现数据 A0 作为中间结果并不影响最终输出。另外,由于 Beam 的分布式本质,即使你想要去修改一个 PCollection 的底层表达数据,也需要在多个机器上查找,毫无实现的价值。

    推荐阅读