概述

Bigflow Python是一个致力于简化分布式计算任务编写和维护的Python module,它提供了对分布式数据和计算的高层抽象,你可以使用这些抽象来编写分布式计算程序。Bigflow Python能够将这些抽象映射到不同的分布式计算框架之上。

Bigflow Python中,最重要的抽象被称为 P类型 ,P类型是分布式数据的抽象描述,非常类似于Spark中的RDD。实际上,它们都借鉴于于Google的论文<<Flume Java>>。P类型可以通过读取(read)接口得到,例如读取文件系统中的文件/目录或是数据库中的表格,或是通过Python的已有变量构造得到。P类型可以通过 变换 来描述计算,例如,映射、分组、连接等。最终,变换后的P类型可以通过写出(write)接口持久化到文件系统/数据库中,或是将其转换为内存中的Python内置变量。此外,用户还可以将一个P类型进行缓存以便于多轮迭代计算。

第二个抽象概念为 SideInputs ,它指被广播到计算中去的P类型或是Python变量。通常而言,一个计算将被并行地在计算集群中执行,这时每个计算切片都能够得到SideInputs来满足计算的需求(例如查字典)。

后端引擎

目前,Bigflow Python支持两种后端执行引擎:

  • Local -- 一个轻量的本地执行引擎,便于学习以及基于小数据集的算法验证。
  • Spark -- 开源分布式计算引擎。

后端执行引擎可以在创建Pipeline的时候指定,例如:

local_pipeline = base.Pipeline.create("local")
spark_pipeline = base.Pipeline.create("spark")

配置

Hadoop配置

Bigflow Python需要读取core-site.xml配置文件来访问HDFS。因此在使用时,用户需要首先设置好HADOOP_HOME环境变量。

bigflow-env.sh

bigflow-env.sh位于BIGFLOW_PYTHON_HOME/bigflow/bin下,其中包含一些Bigflow Python在运行时的可选参数。

  • Bigflow Python默认将log打印到屏幕,用户可以通过改变下面的参数来指定Log的输出文件:

    BIGFLOW_LOG_FILE="a local directory"
    
  • 对于一个P类型PValue,默认的str(PValue)仅会得到一个表示其具体类别的字符串。也即`print PValue`不会真正地触发计算。这时,如果希望看到一个P类型的内容,可以调用 PValue.get()

    >>> p1 = pipeline.read(input.TextFile("xxx"))
    >>> print p1
    >>> [...]
    >>> print p1.get()
    >>> ["line1", "line2", ...]
    

    用户可以设置下面的变量来改变`str(PValue)`的行为

    BIGFLOW_PTYPE_GET_ON_STR=true
    

    这时,每次`print PValue`都会隐式地执行`PValue.get()`:

    >>> print p1
    >>> ["line1", "line2", ...]
    

    这样做的好处在于可以使Bigflow Python写出的代码更像一个单机程序。

P类型(PType)

P类型是Bigflow设计中的核心抽象,其表示集群中的分布式数据集,P类型的另一个特点是数据不可变,对P类型的任何计算都产生一个新的P类型。

在Bigflow Python中,共有3中不同的P类型,其中的每一种都被设计于类似Python语言的内置类型:

  1. PCollection -- 并行化的Python list
  2. PObject -- 并行化的Python单个变量
  3. PTable -- 并行化的 Python dict

PCollection

PCollection表示并行化的Python list 。其可以通过一个Python list实例转换得到,或是读取文件得到:

>>> p1 = pipeline.read(input.TextFile("xxx"))
>>> print p1
[...]
>>> p2 = pipeline.parallelize([1, 2, 3])
>>> print p2
[...]

字符串 "[...]" 表示p1/p2均为PCollection。绝大多数的Bigflow变换均作用于PCollection上,而且结果多为PCollection。

PCollection非常类似于Apache Spark中的弹性分布式数据集(RDD)。

PObject

PObject表示单个变量。其往往是聚合类变换的结果,例如max/sum/count等。

>>> print p1.count()
o
>>> print p2.max()
o

字符"o" 表示一个PObject。PObject往往作为一个 SideInput 参与到变换中。具体的例子请参照 SideInput 部分。

PTable

PTable非常类似于并行化的Python dict, 其包含key到value的映射,但其value必须是另一个P类型。PTable往往是一个分组变换的结果:

>>> p3 = pipeline.parallelize([("A", 1), ("A", 2), ("B", 1)]
>>> p4 = p3.group_by_key()  # group_by_key() 的输入PCollection的所有元素必须是有两个元素的tuple或list。第一个元素为key,第二个元素为value。
>>> print p4
{k0: [...]}
>>> print p4.get()
{"A": [1, 2], "B": [1]}

字符串 {k0: [...]} 表示 p4 是一个PTable。 p4 的key类型为Python str,value为一个PCollection。

>>> p5 = p4.apply_values(transforms.max)  # 将变换 `transforms.max` 作用到p4的value,也即PCollection上。
{k0: o}

p5 也是一个PTable,它的key与 p4 一致,value变为了PObject类型( transforms.max 的结果)。

由于PTable仍然是一个P类型,因此它可以作为另一个PTable的value。例如:

>>> p6 = pipeline.parallelize({"A": {"a": 1, "e": 2}, "B": {"b": 3, "f": 4}})
>>> print p6
{k0: {k1: o}}  # 两层key,最内层value为PObject

>>> p7 = pipeline.parallelize({"A": {"a", [1, 2]}, "B": {"b": [3, 4]}})
>>> print p7
{k0: {k1: [...]}}  # 两层key,最内层value为PCollection

也即,PTable可以无限嵌套,

P类型可能通过下面的三种情况构造:

P类型可以通过下面的三种情况使用:

P类型的不断变换构造出一个有向无环图,最终完整地表达出用户的计算逻辑。

变换

Bigflow Python提供了一系列的变换原语方便用户表达计算。

bigflow.transforms.accumulate(pcollection, ...) 将给定的PCollection按照一个初始值和方法聚合为PObject
bigflow.transforms.aggregate(pcollection, ...) 将给定的PCollection按照初始值、初段聚合方法和汇总方法聚合为PObject
bigflow.transforms.cartesian(*pcollections, ...) 对多个输入PCollection求笛卡尔积,返回一个PCollection
bigflow.transforms.cogroup(*pcollections, ...) 对传入的所有pcollection进行协同分组。
bigflow.transforms.combine(pcollection, fn, ...) 给定一个合并函数,聚合输入PCollection中所有元素,这些元素以迭代器的形式给出
bigflow.transforms.distinct(pcollection, ...) 返回给定PCollection中所有不重复元素
bigflow.transforms.diff(pcollection1, ...) 对于给定的PCollection1和PCollection2,返回两者不相同的元素
bigflow.transforms.extract_keys(ptable, ...) 提取给定PTable中所有的key
bigflow.transforms.extract_values(ptable, ...) 提取给定PTable中所有的value
bigflow.transforms.filter(pcollection, fn, ...) 对于给定的PCollection和一个断言函数,返回只满足断言函数元素的PCollection
bigflow.transforms.first(pcollection, **options) 取出PCollection中的第一个元素
bigflow.transforms.flatten(ptable, **options) 对于给定PTable中的key和value中每一个元素,构造(key, value)对,结果保存在PCollection中
bigflow.transforms.flatten_values(ptable, ...) 等价于 extract_values(ptable)
bigflow.transforms.group_by(pcollection, ...) 利用给定的key_extractor和value_extractor对输入PCollection分组,返回一个表示分组结果的PTable
bigflow.transforms.group_by_key(pcollection, ...) 利用给定的PCollection,使用一个默认的key/value提取函数对输入的PCollection分组 ,返回一个表示分组的PTable
bigflow.transforms.is_empty(pcollection) 对于输入PCollection,返回其是否为空
bigflow.transforms.intersection(...[, ...]) 对于给定的PCollection1和PCollection2,返回所有同时存在于PCollection1和PCollection2 中的元素,即取两者交集
bigflow.transforms.join(*pcollections, **options) 对于多个输入PCollection,根据key对PCollection做内连接操作 ,连接结果为(key, (value1, value2, .
bigflow.transforms.left_join(*pcollections, ...) 对于多个输入PCollection,根据key对PCollection做左连接操作 ,连接结果为(key, (value 1, value 2, .
bigflow.transforms.right_join(*pcollections, ...) 对于多个输入PCollection,根据key对PCollection做右连接操作 ,连接结果为(key, (value 1, value 2, .
bigflow.transforms.full_join(*pcollections, ...) 对于多个输入PCollection,根据key对PCollection做全连接操作 ,连接结果为(key, (value 1, value 2, .
bigflow.transforms.max(pcollection[, key]) 得到输入PCollection中最大的元素
bigflow.transforms.max_elements(pcollection, n) 得到输入PCollection中前n大的元素
bigflow.transforms.min(pcollection[, key]) 得到输入PCollection中最小的元素
bigflow.transforms.map(pvalue, fn, ...) 对PCollection中的每个元素做一对一映射
bigflow.transforms.flat_map(pvalue, fn, ...) 对PCollection中的每个元素做一对N映射
bigflow.transforms.reduce(pcollection, fn, ...) 对于属于PCollection,使用给定的fn将所有元素规约为单个元素
bigflow.transforms.sort(pcollection[, reverse]) 对于输入PCollection,将其进行排序
bigflow.transforms.sum(pcollection, **options) 对于输入PCollection,求其所有包含元素相加的结果
bigflow.transforms.take(pcollection, n, ...) 取给定PCollection中的任意n个元素。 (如果总元素数量不足n,则返回输入pcollection)
bigflow.transforms.transform(pcollection, ...) 对给定PCollection进行任意的变换,结果为另一个PCollection
bigflow.transforms.union(*pvalues, **options) 对于多个输入PCollection/PObject,返回包含它们所有元素的PCollection
bigflow.transforms.to_list_pobject(pvalue, ...) 对于给定的PCollection,聚合为PObject,PObject的内容为list
bigflow.transforms.pipe(pvalue, command, ...) 对于给定的PCollection/PTable,返回通过command处理后的PCollection

以上的所有变换均需要作用于P类型之上,产生另一个P类型。例如,将 transforms.map 作用于PCollection上产生一个新的PCollection。如果把PCollection看作Python内置的 listtransforms.map 的用法非常类似于Python内置的 map() 方法:

python_list = [1, 2, 3]  # Python list
pcollection = pipeline.parallelize([1, 2, 3])  # 通过Python list构造出一个PCollection

map(lambda x: x + 1, python_list)  # 结果: [2, 3, 4]
transforms.map(pcollection, lambda x: x + 1).get()  # 结果: [2, 3, 4]

PCollection同样有一个成员方法 PCollection.map(function):

pcollection.map(lambda x: x + 1).get()  # 结果: [2, 3, 4]

实际上, PCollection.map(function) 内部的实现就是 transforms.map(self)

此外,之前的例子也可以这样写:

pcollection.apply(transforms.map, lambda x: x + 1)

关于 apply() 方法,请见下节。

apply/apply_values

所有的P类型均定义了一个成员方法 apply() ,其定义非常简单:

p.apply(transform, *args)

等价于:

transform(p, *args)

用户可以根据自己的风格喜好编写代码:

p.flat_map(...)
 .map(...)
 .reduce(...)

或者这样写:

p.apply(transforms.flat_map, ...)
 .apply(transforms.map, ...)
 .apply(transforms.reduce, ...)

使用 apply() 的一个优点在于,用于可以将多个变换放到一个自定义的方法中,apply这个方法:

def count_youth_number(ages):
# 假定age是一个PCollection
    return ages.filter(lambda x: x <= 18).count()

ages.apply(count_youth_number)

可以看到,apply自定义方法保持了风格的统一,同时使得代码更具有内聚性,更容易复用。

类似于 apply() ,PTable的成员方法 apply_values() 能够将一个变换作用在它的value上:

>>> pt = pipeline.parallelize({"A": [("a", 1), ("a", 2)], "B": [("b", 3), ("b", 4)]})
>>> print pt
{k0: [...]}  # pt的value是一个PCollection
>>> po = pt.apply_values(transforms.count)  # transforms.count(PCollection) --> PObject
>>> print po
{k0: o}  # po的value是一个PObject

value中的PCollection还可以用group_by_key进行再分组:

>>> pt2 = pt.apply_values(transforms.group_by_key)
>>> print pt2
{k0: {k1: [...]}}  # pt2有两层key(经过了两次分组)
>>> print pt2.get()
{"A": {"a": [1, 2]}, "B": {"b": [3, 4]}}

可以看到,最终结果是一个嵌套的PTable。

PTable可以通过 flatten() 方法转换("打平")为一个PCollection: 每个key和其对应的value中所有元素将构成一个(k, v)的Python tuple:

>>> pc = pt.flatten()
>>> print pc
[...]
>>> print pc.get()
[("B", ("b", 3)), ("B", ("b", 4)), ("A", ("a", 1)), ("A", ("a", 2))]

对于多层嵌套的PTable, flatten() 将把所有的key均打平,最终结果还是一个PCollection:

>>> print pt2.flatten().get()
[("B", ("b", 3)), ("B", ("b", 4)), ("A", ("a", 1)), ("A", ("a", 2))]

这种情况下,如果希望仅打平value中的PTable,可以使用apply_values,即 apply_values(transforms.flatten):

>>> print pt2.apply_values(transforms.flatten).get()
{"A": [("a", 1), ("a", 2)], "B": [("b", 3), ("b", 4)]}

输入/输出

Bigflow Python提供了Pipeline.read()方法从外部存储读取数据,以及Pipeline.write()方法将数据写出。输入/输出类型被抽象为 input.Sourceoutput.Target ,例如,input.TextFile是一个对应于文本文件的Source。

目前,Pipeline.read()/write()的结果只能为PCollection。所有已实现的Source/Target如下:

注意,这里的SequenceFile实际上为Hadoop中的SequenceFileAsBinaryInputFormat/SequenceFileAsBinaryOutputFormat。也就是说,Key/Value均为BytesWritable。

SideInputs

当Bigflow Python提交任务时,计算被并行执行。但某些情况下,用户会期望将某些PType传入到另一个PType的变换中参与计算(由于变换实际在运行时被分布地计算,因此更确切地说,是被 广播 到了变换)。例如,如果希望在Bigflow中进行 mapper-side join ,一个较小的PCollection可以以字典的方式传入到较大PCollection/PTable的 flat_map() 变换里面。当框架遍历较大PCollection/PTable每个元素时,可以直接对较小PCollection进行遍历来查找满足需要的连接条件。

大部分的分布式计算引擎本身提供了一些机制来满足这样的场景,例如Hadoop的 DistributeCache 或是Spark中的 Broadcast varirables

在Bigflow中,这样的机制被抽象为 SideInputs :你可以将一个P类型作为参数传入到另一个P类型的变换中。

绝大多数的变换均支持SideInputs,例如 map(), flat_map(), filter() 等:

>>> p1 = pipeline.parallelize([3, 7, 1])
>>> p2 = pipeline.parallelize(4)

>>> result = p1.filter(lambda x, threshold: x < threshold , p2)  # 注意这里的lambda表达式:`threshold` 为 `p2` 在运行时的值
>>> print result.get(result)
[3, 1]

上面的例子中,p2是一个包含单个元素 4 的PObject。它被作为SideInputs传入到 filter() 变换中,因此对应的变换表达式从原来的一个输入x,变成了有两个输入: xthresholdthreshold 为PObject在运行时的值,也就是4。

闭包

SideInputs的用法有些类似于 闭包 。当然,闭包在Bigflow Python中也同样支持:

>>> threshold = ...  # 假定我们经过某些计算,得到了一个Python int,值为4
>>> print threshold
4
>>> result = p1.filter(lambda x: x < threshold)
[3, 1]

这时候,我们没有使用SideInputs: threshold 是一个内存变量,它被filter中的lambda方法所捕获,同时也能够被正确地处理。

附带文件

通常而言,当使用Bigflow Python编写的代码被提交到分布式环境中运行时,相关的方法能够自动地被序列化/反序列化到集群中。然而用户的代码可能需要从本机import一些自己定义的库。此外,用户还可能需要将一些资源文件一起随代码进行提交,并在运行时读取资源。

对于这样的需求,可以使用 Pipeline.add_file() 方法:

>>> import user_defined_module
>>> pipeline.add_file("PATH_OF_user_defined_module", 'REL_PATH_ON_CLUSTER')
>>> c = pipeline.read(...)
>>> d = c.map(user_defined_module.some_function)
>>> d.get()

如果需要添加的文件较多,可以使用 Pipeline.add_directory() 方法,将一个目录下的所有文件一起打包:

>>> from aa.bb import cc, dd, ...
>>> pipeline.add_directory("PATH_OF_a_module", "REL_PATH_ON_CLUSTER")
>>> ...

缓存机制

Pipeline的运行有两种机制触发:

  • 调用 PType.get() 或者 Pipeline.get(PType) ,两者等价。
  • 调用 Pipeline.run()

当Pipeline通过 PType.get() 触发运行时,P类型中的数据实际上首先被缓存起来,然后读取到内存中。因此如果一个P类型被调用多次 PType.get() ,从第二次开始,Pipeline并不会真正地做计算,而是直接读取缓存数据。除了 get() 方法之外,显示地调用 PType.cache() 也可以要求Bigflow Python在运行时将P类型数据缓存,并在之后的计算中直接读取缓存数据。缓存机制在需要多轮迭代计算时会很有用,例如实现PageRank算法。:

>>> lines = pipeline.read(input.TextFile("hdfs:///lines.txt")
>>> len = lines.count()
>>> lines.cache()
>>> while len.get() > 100:
...     lines = lines.flat_map(some_transforms)
...     lines.cache()
...     len = lines.count()
...
>>> pipeline.write(lines, .output.TextFile("some output path"))

PCollection的顺序

对于分布式计算而言,元素的顺序往往难以保证。通常而言,PCollection应当被认为是无序数据集,只有 sort() 方法的 直接结果 保证元素的顺序:

p = pipeline.parallelize([...])
sorted = p.sort()  # sorted中元素降序排列

如上文所言,任何基于sorted的后续变换均不再保证顺序,例如:

mapped = sorted.map(lambda x: x)

此时不应当再假定mapped中元素仍然有序。