转载

解读2015之Spark篇:新生态系统的形成

编者按:2015年,整个IT技术领域发生了许多深刻而又复杂的变化,InfoQ策划了“解读2015”年终技术盘点系列文章,希望能够给读者清晰地梳理出技术领域在这一年的发展变化,回顾过去,继续前行。本文是大数据解读2015之Spark篇,明略数据的梁堰波为大家解读Spark在2015年的快速发展,后续InfoQ会有更多关于大数据生态技术的总结。

2015年的Spark社区的进展实在是太快了,我发现1月份出版的一本参考书到现在已经有很多内容是过时的了。社区大踏步前行的同时,用户和应用案例也越来越多,应用行业越来越广泛。到年底了我们来梳理下Spark这快速发展的一年。

先从全局有个认识,我尝试用三句话来概括下Spark最主要的变化,然后在接下来的篇幅选取一些重点内容展开。

  • Spark生态系统渐趋完善。支持的外部数据源越来越多,支持的算子越来越丰富,自身的机器学习算法越来越完善。同时在API支持上也有很大进步,新增加的R语言API使得Spark能被更多的行业所接受。
  • Spark的应用范围和规模在不断扩大。在互联网和电子商务行业的应用不断增多、规模不断扩大的基础上,越来越多的金融、电信、制造业等传统行业也开始使用Spark解决他们遇到的大数据问题。
  • Spark自身的性能和稳定性在不断提升。Tungsten项目让Spark跑的越来越快,越来越多的代码贡献者和使用经验让Spark越来越稳定。相信明年发布的Spark 2.0将是一个里程碑式的版本。

转向以DataFrame为核心

在传统意义上Spark的核心是RDD和RDD之上的各种transformation和action,也就是各种算子,RDD可以认为是分布式的Java对象的集合。2013年推出了DataFrame,可以看做分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点就是有执行计划的优化器,这样用户只需要指定自己的操作逻辑,DataFrame的优化器会帮助用户选择一条效率最优的执行路径。同时Tungsten优化(下一章重点讲)使得DataFrame的存储和计算效率比RDD高很多。Spark的机器学习项目MLlib的ML pipeline就是完全基于DataFrame的,而且未来Streaming也会以DataFrame为核心。

解读2015之Spark篇:新生态系统的形成

(图片引自Databricks)

Tungsten让Spark越来越快

那么为什么DataFrame比RDD在存储和计算上的效率更高呢?这主要得益于Tungsten项目。Tungsten做的优化概括起来说就是由Spark自己来管理内存而不是使用JVM,这样可以避免JVM GC带来的性能损失;内存中的Java对象被存储成Spark自己的二进制格式,更加紧凑,节省内存空间,而且能更好的估计数据量大小和内存使用情况;计算直接发生在二进制格式上,省去了序列化和反序列化时间。

像传统的Hadoop/Hive系统,磁盘IO是一个很大的瓶颈。而对于像Spark这样的计算框架,主要的瓶颈在于CPU和内存。下面看看Tungsten主要做了哪些优化:

1,基于JVM的语言带来的问题:GC问题和Java对象的内存开销。例如一个字符串”abcd”理论上只有4个bytes,但是用Java String类型来存储却需要48个bytes。Spark的改进就是自己管理内存,不用JVM来管理了,使用的工具是sun.misc.Unsafe。DataFrame的每一行就是一个UnsafeRow,这块内存存的啥东西只有Spark自己能读懂。有了这种特有的二进制存储格式后,DataFrame的算子直接操控二进制数据,同时又省去了很多序列化和反序列化的开销。

2,Cache-aware的计算。现在Spark已经是内存计算引擎了,但是能不能更进一步呢,能不能更好的利用CPU的L1/L2/L3缓存的优势呢,因为CPU缓存的访问效率更高。这个优化点也不是意淫出来的,是在profile了很多Spark应用之后得到的结论,发现很多CPU的时间浪费在等待从内存中取数据的过程。所以在Tungsten中就设计和实现了一系列的cache-friendly的算法和数据结构来加速这个过程,例如aggregations, joins和shuffle操作中进行快速排序和hash操作。

以sort为例,Spark已经实现了cache-aware的sort算法,比原来的性能提升至少有3倍。在传统的排序中是通过指针来索引数据的,但是缺点就是CPU cache命中率不够高,因为我们需要随机访问record做比较。实际上quicksort算法是能够非常好的利用cache的,主要是我们的record不是连续存储的。Spark的优化就是存储一个key prefix和指针在一起,那么就可以通过比较key prefix来直接实现排序,这样CPU cache的命中率就会高很多。例如如果我们需要排序的列是一个string类型,那么我们可以拿这个string的UTF-8编码的前8个字节来做key prefix,并进行排序。

解读2015之Spark篇:新生态系统的形成

关于这个优化可以参见SPARK-9457 和org.apache.spark.shuffle.sort下面的类,最重要的是ShuffleExternalSorter和ShuffleInMemorySorter两个类。

3,运行时代码生成

运行时代码生成能免去昂贵的虚函数调用,同时也省去了对Java基本类型装箱之类的操作了。Spark SQL将运行时代码生成用于表达式的求值,效果显著。

除了这些优化,我认为还有两个很重要的变化:

1,Unified Memory Management

在以前Spark的内存显式的被分为三部分:execution,storage和其他。execution内存用于shuffle, join, sort和aggregation等操作,而storage内存主要用于cache数据。在1.6版本之前是通过spark.shuffle.memoryFraction和spark.storage.memoryFraction两个参数来配置用于execution和storage的内存份额。从1.6开始这两部分内存合在一起统一管理了,也就是说如果现在没有execution的需要,那么所有的内存都可以给storage用,反过来也是一样的。同时execution可以evict storage的部分内存,但是反过来不行。在新的内存管理框架上使用两个参数来控制spark.memory.fraction和spark.memory.storageFraction。

2,Adaptive query execution

这个特性说大了就是所有数据库最核心的一个功能query execution optimization,可以做的东西非常多。我们自己写Spark程序中经常会碰到一个job跑到最后每个分区的数据量很小的情况,这是因为以前的Spark不会估计下游RDD的每个分区的数据量大小,并根据数据量大小来调整分区个数。以前遇到这种问题就需要手工repartition,用户自己要心里有数到哪个阶段的RDD的partition数据变多了还是变少了,需要跟着调整分区的数目,非常不灵活。从1.6版本开始有了部分支持,主要是能够估计在join和aggregate操作中Shuffle之后的分区的数目,动态调整下游task的数目,从而提高执行效率。

DataFrame和SQL API

Spark从API的角度看,可以分为两大类:

  • 类似于Python的Pandas和R语言的DataFrame API,用户可以使用Scala/Java/Python/R四种语言调用这个API处理数据;
  • SQL语言API。又分为两种:一个是普通的Spark SQL,一种是Hive SQL。

虽然API不同,但是背后解析出来的算子是一样的,DataFrame的各种算子其实就是各种SQL的语法。Spark在SQL语法的支持越来越丰富的同时内置的SQL函数得到了很大的增强,目前已经有超过100个这样的常用函数(string, math, date, time, type conversion, condition),可以说最常见的SQL内置函数都有了。

作为一个类SQL的分析工具,聚合函数是非常核心的。Spark 1.5和1.6在聚合函数上都有很大改进:实现了一个新的聚合函数接口,支持了一些build-in的聚合函数(例如max/min/count/sum/avg/first/corr/stddev/variance/skewness/kurtosis以及一些窗口函数等),同时基于新接口实现了相应的UDAF接口。新的聚合函数接口是AggregateFunction,有两种具体的实现:ImperativeAggregate和DeclarativeAggregate。ImperativeAggregate类型的聚合操作就是通过用户定义三个动作 initialize/update/merge的逻辑来实现聚合的;而DeclarativeAggregate则是通过指定initialValues/updateExpressions/mergeExpressions这三个表达式然后通过代码生成的方式来做聚合的操作。这两种方式各有利弊,一般来说代码生成效率更高,但是像variance/stddev/skewness/kurtosis这样的多个表达式需要依赖同一个中间表达式的场景下,代码生成的执行路径由于不能共享中间的结果,从而导致其不如ImperativeAggregate效率更高,所以在Spark内部的实现中这几个聚合函数也是通过ImperativeAggregate来实现的。

SQL API上另一个变化是可以直接在文件上进行SQL操作,不需要把这个文件注册成一个table。例如支持”select a, b from json.`path/to/json/files`”这样的语法,这个应该是从Apache Drill借鉴过来的。

另外一个里程碑式的特性就是Dataset API(SPARK-9999)。Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。这个强类型的值是以编码的二进制形式被存储的,这种存储格式可以不用反序列化就直接可以被上面的算子(例如sort,Shuffle等)操作。所以在创建Dataset的时候需要指定用于这个编码工作的Encoder。

这样一些需要强类型的地方就可以使用Dataset API,不失DataFrame的那些优点,同时又可以帮我们做类型检查。所以从某种角度上说这个Dataset API在将来是要替换掉RDD的。

外部数据源和Hive支持

这个feature可以说是建立起Spark生态系统的基础,使得Spark与大数据生态圈的其他组件联系起来了。可以这么理解,你无论数据是在HDFS上,还是在Cassandra里面,抑或关系型数据库里面,我Spark都可以拿过来做分析和处理,或者机器学习,我这边处理完了你让我写到哪去我就可以写出去。这个特性使得Spark成为了大数据处理的核心一环。目前Spark支持的外部数据源有很多种,主流的像Parquet,JSON,JDBC,ORC,AVRO,HBase,Cassandra,AWS S3,AWS Redshift等。

在这些外部数据源中,Parquet是其中最核心的,Spark的Parquet支持也有了很大的改进:修复了越来越多的bug,Parquet的版本升级到1.7;更快的metadata discovery和schema merging;能够读取其他工具或者库生成的非标准的parquet文件;以及更快更鲁棒的动态分区插入;对于flat schema的Parquet格式的数据的读性能提升了大约1倍(SPARK-11787)。

另外在Hive支持方面,越来越多的Hive特有的SQL语法被加入到Spark中,例如DISTRIBUTE BY... SORT等。支持连接Hive 1.2版本的metastore,同时支持metastore partition pruning(通过spark.sql.hive.metastorePartitionPruning=true开启,默认为false)。因为很多公司的Hive集群都升级到了1.2以上,那么这个改进对于需要访问Hive元数据的Spark集群来说非常重要。

机器学习算法

Spark在机器学习方面的发展很快,目前已经支持了主流的统计和机器学习算法。虽然和单机的机器学习库相比MLlib还有一定的差距;但是纵观所有基于分布式架构的开源机器学习库,MLlib是我认为的计算效率最高的。下面列出了目前MLlib支持的主要的机器学习算法:

Discrete

Continous

Supervised

Classification

LogisticRegression(with Elastic-Net)

SVM

DecisionTree

RandomForest

GBT

NaiveBayes

MultilayerPerceptron

OneVsRest

Regression

LinearRegression(with Elastic-Net)

DecisionTree

RandomForest

GBT

AFTSurvivalRegression

IsotonicRegression

Unsupervised

Clustering

KMeans

GaussianMixture

LDA

PowerIterationClustering

BisectingKMeans

Dimensionality Reduction, matrix factorization

PCA

SVD

ALS

WLS

下面简单说下其中一些亮点:

1,MLlib已经有了对Generialized Linear Model(GLM)的初步支持:GLM是统计学里一系列应用非常广泛的模型,指定不同的family可以得到不同的模型。例如“Gaussian” family相当于LinearRegression模型, “Bionomial” family相当于LogisticRegression模型,“Poisson” family相当于SurvivalRegression模型。目前MLlib已经提供了这三种模型的机器学习解法和LinearRegression的normal equation解法。

下面详细说说这两种解法:一种是利用WeightedLeastSquares(WLS)优化方法;另一种是利用L-BFGS优化方法。前者是通过解normal equation的思路求解,只需要对所有的数据过一遍(不需要迭代)即可得到最后的模型(好像很神奇),同时可以算出像coefficients standard errors, p-value, t-value等统计指标帮助用户理解模型,这种解法是目前LinearRegression的默认解法。不过有个限制就是样本feature的维度不能超过4096,但对样本数目没有限制。后者就是传统机器学习的解法,是通过迭代寻找最优解的方法。

而且目前MLlib也在进一步研究IterativelyReweightedLeastSquares(IRLS)算法,然后结合WLS和IRLS就可以使Spark支持类似R GLM的大多数功能。这样对于前面所有的三种模型都将提供两种解法,用户可以根据实际情况选择合适的解法。这个全部做完预计要在下一个版本Spark 2.0了。

2,ALS算法可以说是目前MLlib被应用最多的算法,ALS的数学原理其实比较容易理解,但是如何在分布式系统中高效实现是一个比较复杂的问题。MLlib里使用分块计算的思路,合理的设计数据分区和 RDD 缓存来减少数据交换,有效的降低了通信复杂度。这个算法的实现思路充分说明了一个问题:同样的算法,在分布式系统上实现时,不同的选择会带来性能上巨大的差异。

3,目前的主要的分类模型都支持使用predictRaw, predictProbability, predict分别可以得到原始预测、概率预测和最后的分类预测。同时这些分类模型也支持通过设置thresholds指定各个类的阈值。不过目前这些预测函数还只支持批量预测,也就是对一个DataFrame进行预测,不支持对单个instance进行预测。不过单个instance的预测的支持也已经在roadmap中了。

4,RandomForestClassificationModel和RandomForestRegressionModel模型都支持输出feature importance

5,GMM EM算法实现了当feature维度或者cluster数目比较大的时候的分布式矩阵求逆计算。实验表明当feature维度>30,cluster数目>10的时候,这个优化性能提升明显。

6,在深度学习方面,MLlib已经实现了神经网络算法MultilayerPerceptronClassifier(MLPC)。 这是一个基于 前馈神经网络 的分类器,它是一种在输入层与输出层之间含有一层或多层隐含结点的具有正向传播机制的神经网络模型,中间的节点使用sigmoid (logistic)函数,输出层的节点使用softmax函数。输出层的节点的数目表示分类器有几类。MLPC学习过程中使用 BP算法 ,优化问题抽象成logistic loss function并使用L-BFGS进行优化。

7,除了我们经常说的机器学习和数据挖掘算法,MLlib在统计检验算法上也有很大的进步。例如 A/B test ,  Kolmogorov–Smirnov 检验等。A/B测试可以说是很多大数据应用的基础,很多结论最终都是通过A/B测试得到的。

机器学习Pipeline

MLlib最大的变化就是从一个机器学习的library开始转向构建一个机器学习工作流的系统,这些变化发生在ML包里面。MLlib模块下现在有两个包:MLlib和ML。ML把整个机器学习的过程抽象成 Pipeline ,一个Pipeline是由多个Stage组成,每个Stage是Transformer或者Estimator。

以前机器学习工程师要花费大量时间在training model之前的feature的抽取、转换等准备工作。ML提供了多个Transformer,极大提高了这些工作的效率。在1.6版本之后,已经有了30+个feature transformer,像OneHotEncoder, StringIndexer, PCA, VectorSlicer, VectorAssembler, SQLTransformer, HashingTF, IDF, Word2Vec等。这些工具使得各种feature提取、转换等准备工作。

原始数据经过上面的各种feature transformer之后就可以丢到算法里去训练模型了,这些算法叫Estimator,得到的模型是Transformer。上一章节已经提到了主流的算法支持,所以现在可以得到模型了。得到的模型怎么评价好坏呢?MLlib提供了像BinaryClassificationEvaluator等一系列的evaluation工具,这些工具里面定义了像AUC等一系列的指标用于评价模型的好坏。

这样一个机器学习的pipeline就可以跑起来了,MLlib进一步提供了像CrossValidator这样的工具帮助我们做模型和pipeline的调优,选出最佳的模型参数等。另外一个重要特性就是pipeline的持久化。现在的ML pipeline和大多数的Transformers/Estimators都支持了持久化,可以save/load。这样就可以把模型或者pipeline存储、导出到其他应用,扫除了MLlib在生产环境应用的最后一个障碍。

另外一个显著变化就是ML框架下所有的数据源都是基于DataFrame,所有的模型也尽量都基于Spark的数据类型(Vector, Matrix)表示。在ML里面的public API下基本上看不到对RDD的直接操作了,这也与前面讲的Spark的未来变化是一致的。

Python是机器学习领域最流行的语言,ML/MLlib的Python API也在不断加强,越来越多的算法和功能的Python API基本上与Scala API对等了。

SparkR

R语言在统计领域应用非常广泛,R语言本身的架构可以比较容易支持其他执行后端。SparkR就是把Spark作为R语言的执行后端。目前SparkR提供的接口包括了DataFrame的绝大多数函数以及机器学习GLM函数,未来还会支持更多的功能。SparkR既可以利用R接口的易用性以及在传统行业的既有市场空间,又可以利用Spark强大的分布式数据处理能力。SparkR在推出不久就获得了很高的用户关注度和使用率。笔者在和一些传统行业的大数据从业者交流中经常会被问到这样的问题:我以前用R写的程序,现在数据量大了,传统R跑不动了,能不能直接放到Spark上跑。相信SparkR就是在朝着解决这个问题的方向努力。

在统计和机器学习方面一个重要的feature就是R formula的支持,R用户可以用他们非常熟悉的formula来定义一个GLM模型。目前已经支持最基本的'.', '~', ':', '+'和 '-',未来还会增强这项功能。同时SparkR的GLM函数不只提供了训练一个GLM模型和预测的能力,也提供了对模型的R类似的统计指标输出。目前SparkR跑出的GLM模型和传统R跑出的模型的结果是一样的,同时也会输出coefficients standard errors, p-values, t-values等统计信息。

Spark 2.0

Spark streaming等组件在这一年也有很大的变化,笔者由于精力有限,在此不一一列出。Spark 2.0预计明年三四月份发布,将会确立以DataFrame和Dataset为核心的体系架构,RDD会慢慢退出历史舞台;同时在各方面的性能上会有很大的提升,当然我们也期待着稳定性方面的提升。从1.x到2.x还会放弃Hadoop 1.x的支持,RPC系统从Akka迁移到Netty等。快速发展的社区,越来越多的应用,性能和稳定性方面的不断提升使得Spark在未来的若干年内还是大数据处理工具的首选。

作者介绍:

梁堰波,明略数据技术合伙人,开源爱好者,Apache Spark项目核心贡献者。北京航空航天大学计算机硕士,曾就职于Yahoo!、美团网、法国电信从事机器学习和推荐系统相关的工作,在大数据、机器学习和分布式系统领域具备丰富的项目经验。

正文到此结束
Loading...