Apache Beam是一个开源的数据处理编程库,由Google共享给Apache的项目,前不久刚刚成为Apache TLP项目。它提供了一个高级的、统一的编程模型,允许我们通过构建Pipeline的方式实现批量、流数据处理,并且构建好的Pipeline能够运行在底层不同的执行引擎上。刚刚接触该开源项目时,我的第一感觉就是:在编程API的设计上,数据集及其操作的抽象有点类似Apache Crunch(MapReduce Pipeline编程库)项目;而在支持统一数据处理模型上,能够让人想到Apache Flink项目。如果深入了解Apache Beam,你会发现未来Apache Beam很可能成为数据处理领域唯一一个能够将不同的数据应用统一起来的编程库。
Apache Beam目前最新版本为0.5.0-SNAPSHOT,最新的Release版本为0.4.0,很多特性还在开发中。在网上找到一个由Andrew Psaltis在2016年6月份演讲的《Apache Beam: The Case for Unifying Streaming API’s》,引用了其中一个Apache Beam的架构图,如下图所示:
上图中,我们可以看到,Apache Beam核心的主要有两层:
在Pipeline构建层,针对不同的编程语言,构建一组用于定义Pipeline相关抽象,提供编程API,这一层被称为Beam SDKs。最终的用户(具有不同编程语言技能的人员)可以基于这些抽象的Beam SDK来构建数据处理Pipeline。
Runner适配层,主要是用来对接底层的计算引擎,用来执行上层用户开发好的Pipeline程序。
我们先根据官网文档,了解一下Apache Beam的Roadmap。首先,下面的三个特性,或者说是Apache Beam的目标:
基于单一的编程模型,能够实现批处理(Batch processing)、流处理(Streaming Processing),通常的做法是把待处理的数据集(Dataset)统一,一般会把有界(Bound)数据集作为无界(Unbound)数据集的一种特殊情况来看待,比如Apache Flink便是按照这种方式处理,在差异化的API层之上构建一个统一的API层。
在多个不同的计算环境下,都能够执行已经定义好的数据处理Pipeline。也就是说,对数据集处理的定义(即构建的Data Pipeline),与最终所要Deploy的执行环境完全无关。这对实现数据处理的企业是非常友好的,当下数据处理新技术不断涌现,企业数据处理平台也为了能够与时俱进并提高处理效率,当然希望在底层计算平台升级的过程中无需重写上层已定义的Data Pipeline。
目前,Apache Beam项目开发整体来看还处在初期,初步决定底层执行环境支持主流的计算平台:Apache Apex、Apache Flink、Apache Spark、Google Cloud Dataflow。实际上,Apache Beam的这种统一编程模型,可以支持任意的计算引擎,通过Data Pipeline层与执行引擎层之间开发一个类似Driver的连接器即可实现。
实现任意可以共享的Beam SDK、IO connector、Transform库。
在使用Apache Beam构建数据处理程序,首先需要使用Beam SDK中的类创建一个Driver程序,在Driver程序中创建一个满足我们数据处理需求的Pipeline,Pipeline中包括输入(Inputs)、转换(Transformations)、输出(Outputs)三个核心的组件。然后,根据我们选择的Beam SDK来确定底层使用Pipeline Runner(执行引擎,或计算引擎),将我们定义好的Pipeline运行在Pipeline Runner上。
Apache Beam SDKs提供一组抽象,用来简化大规模分布式数据处理。同一个Beam抽象,能够同时适应批量处理、流处理两种数据源。下面,我们了解一下Apache Beam的一些关键抽象:
一个Pipeline是对一个数据处理任务抽象,它包含了我们在对给定数据集处理的全部逻辑,主要包括从数据源读取数据(可能从多个数据源读取)、在给定的数据集上执行Transform操作(中间可能是一个DAG图,通过多个Transform连接,而Transform的输出和输出都可能是一个数据集)、将Transform的数据结果写入到指定对的存储系统中。
一个PCollection是对分布式数据集的抽象,他可以是输入数据集、中间结果数据集、输出数据集。每一个由PCollection表征的数据集作为输入时,都会存在一个或多个Transform作用在其上(对数据集进行处理的逻辑)。
一个Transform表示数据处理过程中一个步骤(Step),对应于Pipeline中一个操作,每一个Transform会以一个或多个PCollection作为输入,经过处理后输出一个或多个PCollection。
Apache Beam提供了Source和Sink的API,用来表示读取和写入数据。Source表示从一个外部的数据源读入数据到Pipeline,而Sink表示经过Pipeline处理后将数据写入到外部存储系统
PipelineRunner是实际用来处理Pipeline逻辑的底层组件,它能够将用户构建的Pipeline翻译成底层计算引擎能够处理的Job,并执行Pipeline的处理逻辑。
Apache Beam还在开发之中,后续对应的API设计可能会有所变化,不过从当前版本来看,基于对数据处理领域对象的抽象,API的设计风格大量使用泛型来定义,具有很高的抽象级别。下面我们分别对感兴趣的的设计来详细说明。
Source表示数据输入的抽象,在API定义上分成两大类:一类是面向数据批处理的,称为BoundedSource,它能够从输入的数据集读取有限的数据记录,知道数据具有有限性的特点,从而能够对输入数据进行切分,分成一定大小的分片,进而实现数据的并行处理;另一类是面向数据流处理的,称为UnboundedSource,它所表示的数据是连续不断地进行输入,从而能够实现支持流式数据所特有的一些操作,如Checkpointing、Watermarks等。
Source对应的类设计,如下类图所示:
目前,Apache Beam支持BoundedSource的数据源主要有:HDFS、MongoDB、Elasticsearch、File等,支持UnboundedSource的数据源主要有:Kinesis、Pubsub、Socker等。未来,任何具有Bounded或Unbounded两类特性的数据源都可以在Apache Beam的抽象基础上实现对应的Source。
Sink表示任何经过Pipeline中一个或多个PTransform处理过的PCollection,最终会输出到特定的存储中。与Source对应,其实Sink主要也是具有两种类型:一种是直接写入特定存储的Bounded类型,如文件系统;另一种是写入具有Unbounded特性的存储或系统中,如Flink。在API设计上,Sink的类图如下所示:
可见,基于Sink的抽象,可以实现任意可以写入的存储系统。
下面,我们来看一下PipelineRunner的类设计以及目前开发中的PipelineRunner,如下图所示:
目前,PipelineRunner有DirectRunner、DataflowRunner、SparkRunner、ApexRunner、FlinkRunner,待这些主流的PipelineRunner稳定以后,如果有其他新的计算引擎框架出现,可以在PipelineRunner这一层进行扩展实现。
这些PipelineRunner中,DirectRunner是最简单的PipelineRunner,它非常有用,比如我们实现了一个从HDFS读取数据,但是需要在Spark集群上运行的ETL程序,使用DirectRunner可以在本地非常容易地调试ETL程序,调试到程序的数据处理逻辑没有问题了,再最终在实际的生产环境Spark集群上运行。如果特定的PipelineRunner所对应的计算引擎没有很好的支撑调试功能,使用DirectRunner是非常方便的。
PCollection是对分布式数据集的抽象,主要用作输入、输出、中间结果集。其中,在Apache Beam中对数据及其数据集的抽象有几类,我们画到一张类图上,如下图所示:
PCollection是对数据集的抽象,包括输入输出,而基于Window的数据处理有对应的Window相关的抽象,还有一类就是TupleTag,针对具有CoGroup操作的情况下用来标记对应数据中的Tuple数据,具体如何使用可以后面我们实现的Join的例子。
一个Pipeline是由一个或多个PTransform构建而成的DAG图,其中每一个PTransform都具有输入和输出,所以PTransform是Apache Beam中非常核心的组件,我按照PTransform的做了一下分类,如下类图所示:
通过上图可以看出,PTransform针对不同输入或输出的数据的特征,实现了一个算子(Operator)的集合,而Apache Beam除了期望实现一些通用的PTransform实现来供数据处理的开发人员开箱即用,同时也在API的抽象级别上做的非常Open,如果你想实现自己的PTransform来处理指定数据集,只需要自定义即可。而且,随着社区的活跃及其在实际应用场景中推广和使用,会很快构建一个庞大的PTransform实现库,任何有数据处理需求的开发人员都可以共享这些组件。
这里,单独把Combine这类合并数据集的实现拿出来,它的抽象很有趣,主要面向globally 和per-key这两类抽象,实现了一个非常丰富的PTransform算子库,对应的类图如下所示:
通过上图可以看出,作用在一个数据集上具有Combine特征的基本操作:Max、Min、Top、Mean、Sum、Count等等。
Window是用来处理某一个Micro batch的数据记录可以进行Merge这种场景的需求,通常用在Streaming处理的情况下。Apache Beam也提供了对Window的抽象,其中对于某一个Window下的数据的处理,是通过WindowFn接口来定义的,与该接口相关的处理类,如下类图所示:
首先说明一下,为了简单起见,我直接在代码中显式配置指定PipelineRunner,示例代码片段如下所示:
PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class);
如果要部署到服务器上,可以通过命令行的方式指定PipelineRunner,比如要在Spark集群上运行,类似如下所示命令行:
spark-submit --class org.shirdrn.beam.examples.MinimalWordCountBasedSparkRunner 2017-01-18 --master spark://myserver:7077 target/my-beam-apps-0.0.1-SNAPSHOT-shaded.jar --runner=SparkRunner
下面,我们从几个典型的例子来看(基于Apache Beam软件包的examples有所改动),Apache Beam如何构建Pipeline并运行在指定的PipelineRunner上:
我们根据Apache Beam的MinimalWordCount示例代码开始,看如何构建一个Pipeline,并最终执行它。 MinimalWordCount的实现,代码如下所示:
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; public class MinimalWordCount { @SuppressWarnings("serial") public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式) Pipeline pipeline = Pipeline.create(options); pipeline.apply(TextIO.Read.from("/tmp/dataset/apache_beam.txt")) // 读取本地文件,构建第一个PTransform .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { // 对文件中每一行进行处理(实际上Split) @ProcessElement public void processElement(ProcessContext c) { for (String word : c.element().split("[//s://,//.//-]+")) { if (!word.isEmpty()) { c.output(word); } } } })) .apply(Count.<String> perElement()) // 统计每一个Word的Count .apply("ConcatResultKVs", MapElements.via( // 拼接最后的格式化输出(Key为Word,Value为Count) new SimpleFunction<KV<String, Long>, String>() { @Override public String apply(KV<String, Long> input) { return input.getKey() + ": " + input.getValue(); } })) .apply(TextIO.Write.to("wordcount")); // 输出结果 pipeline.run().waitUntilFinish(); } }
Pipeline的具体含义,可以看上面代码的注释信息。下面,我们考虑以HDFS数据源作为Source,如何构建第一个PTransform,代码片段如下所示:
PCollection<KV<LongWritable, Text>> resultCollection = pipeline.apply(HDFSFileSource.readFrom( "hdfs://myserver:8020/data/ds/beam.txt", TextInputFormat.class, LongWritable.class, Text.class))
可以看到,返回的是具有键值分别为LongWritable、Text类型的KV对象集合,后续处理和上面处理逻辑类似。如果使用Maven构建Project,需要加上如下依赖(这里beam.version的值可以为最新Release版本0.4.0):
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-hdfs</artifactId> <version>${beam.version}</version> </dependency>
去重也是对数据集比较常见的操作,使用Apache Beam来实现,示例代码如下所示:
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Distinct; public class DistinctExample { public static void main(String[] args) throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式) Pipeline pipeline = Pipeline.create(options); pipeline.apply(TextIO.Read.from("/tmp/dataset/MY_ID_FILE.txt")) .apply(Distinct.<String> create()) // 创建一个处理String类型的PTransform:Distinct .apply(TextIO.Write.to("deduped.txt")); // 输出结果 pipeline.run().waitUntilFinish(); } }
对数据进行分组操作也非常普遍,我们拿一个最基础的PTransform实现GroupByKey来实现一个例子,代码如下所示:
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.runners.direct.repackaged.com.google.common.base.Joiner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; public class GroupByKeyExample { @SuppressWarnings("serial") public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式) Pipeline pipeline = Pipeline.create(options); pipeline.apply(TextIO.Read.from("/tmp/dataset/MY_INFO_FILE.txt")) .apply("ExtractFields", ParDo.of(new DoFn<String, KV<String, String>>() { @ProcessElement public void processElement(ProcessContext c) { // file format example: 35451605324179 3G CMCC String[] values = c.element().split("/t"); if(values.length == 3) { c.output(KV.of(values[1], values[0])); } } })) .apply("GroupByKey", GroupByKey.<String, String>create()) // 创建一个GroupByKey实例的PTransform .apply("ConcatResults", MapElements.via( new SimpleFunction<KV<String, Iterable<String>>, String>() { @Override public String apply(KV<String, Iterable<String>> input) { return new StringBuffer() .append(input.getKey()).append("/t") .append(Joiner.on(",").join(input.getValue())) .toString(); } })) .apply(TextIO.Write.to("grouppedResults")); pipeline.run().waitUntilFinish(); } }
使用DirectRunner运行,输出文件名称类似于grouppedResults-00000-of-00002、grouppedResults-00001-of-00002等等。
最后,我们通过实现一个Join的例子,其中,用户的基本信息包含ID和名称,对应文件格式如下所示:
35451605324179 Jack 35236905298306 Jim 35236905519469 John 35237005022314 Linda
另一个文件是用户使用手机的部分信息,文件格式如下所示:
35451605324179 3G 中国移动 35236905298306 2G 中国电信 35236905519469 4G 中国移动
我们希望通过Join操作后,能够知道用户使用的什么网络(用户名+网络),使用Apache Beam实现,具体实现代码如下所示:
package org.shirdrn.beam.examples; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; public class JoinExample { @SuppressWarnings("serial") public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.create(); options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式) Pipeline pipeline = Pipeline.create(options); // create ID info collection final PCollection<KV<String, String>> idInfoCollection = pipeline .apply(TextIO.Read.from("/tmp/dataset/MY_ID_INFO_FILE.txt")) .apply("CreateUserIdInfoPairs", MapElements.via( new SimpleFunction<String, KV<String, String>>() { @Override public KV<String, String> apply(String input) { // line format example: 35451605324179 Jack String[] values = input.split("/t"); return KV.of(values[0], values[1]); } })); // create operation collection final PCollection<KV<String, String>> opCollection = pipeline .apply(TextIO.Read.from("/tmp/dataset/MY_ID_OP_INFO_FILE.txt")) .apply("CreateIdOperationPairs", MapElements.via( new SimpleFunction<String, KV<String, String>>() { @Override public KV<String, String> apply(String input) { // line format example: 35237005342309 3G CMCC String[] values = input.split("/t"); return KV.of(values[0], values[1]); } })); final TupleTag<String> idInfoTag = new TupleTag<String>(); final TupleTag<String> opInfoTag = new TupleTag<String>(); final PCollection<KV<String, CoGbkResult>> coGrouppedCollection = KeyedPCollectionTuple .of(idInfoTag, idInfoCollection) .and(opInfoTag, opCollection) .apply(CoGroupByKey.<String>create()); final PCollection<KV<String, String>> finalResultCollection = coGrouppedCollection .apply("", ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { @ProcessElement public void processElement(ProcessContext c) { KV<String, CoGbkResult> e = c.element(); String id = e.getKey(); String name = e.getValue().getOnly(idInfoTag); for (String eventInfo : c.element().getValue().getAll(opInfoTag)) { // Generate a string that combines information from both collection values c.output(KV.of(id, "/t" + name + "/t" + eventInfo)); } } })); PCollection<String> formattedResults = finalResultCollection .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().getKey() + "/t" + c.element().getValue()); } })); formattedResults.apply(TextIO.Write.to("joinedResults")); pipeline.run().waitUntilFinish(); } }
本文基于 署名-非商业性使用-相同方式共享 4.0 许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。