Spark是一个用来实现快速而通用的集群计算的平台。扩展了广泛使用的MapReduce计算模型,而且高效地支持更多的计算模式,包括交互式查询和流处理。在处理大规模数据集的时候,速度是非常重要的。Spark的一个重要特点就是能够在内存中计算,因而更快。即使在磁盘上进行的复杂计算,Spark依然比MapReduce更加高效。
Spark运行模式中Hadoop YARN的集群方式最为常用的方式,目前Spark的运行模式主要有以下几种:
Spark已经发展成为包含众多子项目的大数据计算平台。伯克利将Spark的整个生态系统称为伯克利数据分析栈(BDAS)。其核心框架是Spark,同时BDAS涵盖支持结构化数据SQL查询与分析的查询引擎Spark SQL和Shark,提供机器学习功能的系统MLbase及底层的分布式机器学习库MLlib、并行图计算框架GraphX、流计算框架Spark Streaming、采样近似计算查询引擎BlinkDB、内存分布式文件系统Tachyon、资源管理框架Mesos等子项目
Spark Core是Spark的核心组件,其基于核心的RDD(Resilient Distributed DataSet 弹性分布式数据集)抽象,提供了分布式作业分发、调度及丰富的RDD操作,这些操作通过Java、Python、Scala、R语言接口暴露。RDD是分布于集群各节点上的、不可变的弹性数据集,容纳的是Java/Python/Scala/R的对象实例。Spark定义了RDD之上的两种操作:Transformation和Action,RDD上运行一个操作之后,生成另一个RDD或者执行某些动作。
Spark Streaming基于Spark的计算性能进行流式的分析,将流式数据根据时间切分为小批量的数据(RDD),并在RDD上执行Transformation、Action操作完成分析。这种方式使得现有的很多批处理代码可以直接工作在流式处理的模式下。但是这种模式以牺牲一定的延时为代价,相比于其他基于事件或者消息的流式处理框架(Storm,Samza,Flink),延时比较大。Spark Streaming内置支持来自Kafka,Flume,ZeroMQ,Kinesis,Twitter,TCP/IP Socket的数据。
Spark SQL引入了称为DataFrames的数据抽象,提供对结构化和半结构化数据操作的支持。Spark提供了一套DSL用于DataFrame的操作,DSL可以通过Scala,Java,Python来表示。同时Spark SQK提供了对标准SQL语言的支持,包括命令行接口和ODBC/JDBC支持(驱动)
MLib是基于Spark的机器学习框架,由于Spark的分布式内存框架,其实现的常用算法包括 概要统计、分类和回归、协同过滤、聚类分析、降维、特征提取与转换函数。
GraphX则是基于Spark的分布式图处理框架,对标到Hadoop体系下基于MapReduce(因此基于磁盘)的图处理框架Giraph.由于RDD是不可变的,因此GraphX不适合需要更新操作的场景。GraphX提供了两套API,一套类似于Google Pregel提供的API,另一套则更像是MapReduce的风格。
Spark的整体流程为:Client提交应用,Master找到一个Worker启动Driver,Driver向Master或者资源管理器申请资源,之后将应用转化为RDD Graph,再由DAGScheduler将RDD Graph转化为Stage的有向无环图提交给TaskScheduler,由TaskScheduler提交任务给Executor执行。在任务执行的过程中,其他组件协同工作,确保整个应用顺利执行。
Spark提供了Java编程接口,通常可以先获取JavaSparkContext,让过创建RDD对象,然后执行响应的操作即可。可以通过Maven加入如下配置
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.2</version> </dependency>
然后按照如下模式进行相关的业务代码开发
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class HelloSpark{ public static void main(String[] args){ // 1 创建一个sparkconf 对象并配置 // 使用setMaster 可以设置spark集群可以链接集群的URL,如果设置local 代表在本地运行而不是在集群运行 SparkConf conf = new SparkConf().setMaster("local").setAppName("HelloSpark"); // 2 创建javasparkContext对象 // sparkcontext 是一个入口,主要作用就是初始化spark应用程序所需的一些核心组件,例如调度器,task, // 还会注册spark,sparkMaster结点上注册。 try (JavaSparkContext jsc = new JavaSparkContext(conf)) { // 3do something here } } }
完成本地测试后可以到响应的程序,提交至spark中运行,通过可以编写响应的脚本
/opt/spark/bin/spark-submit / # 用这个命令启动 --class com.xxx.HelloSpark / # 配置类名 --num-executors 3 / # 配置在三个结点上运行 --driver-memory 100m / # drive内存 --executor-memory 100m / # 配置execute内存 --executor-cores 3 / # 内核运行单元数 /opt/spark-script/java/HelloSpark-0.0.1-SNAPSHOT.jar / # 运行的jar包
下面是一个使用spark进行词汇统计的小程序
package com.opslab.spark; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; public class _01WordCount{ public static void main(String[] args){ SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCount"); try (JavaSparkContext jsc = new JavaSparkContext(conf)) { /* * 第3步:根据具体的数据来源(HDFS、HBase、Local FS、DB、S3等)通过SparkContext来创建RDD * JavaRDD的创建基本有三种方式:根据外部的数据来源(例如HDFS)、根据Scala集合、由其它的RDD操作 * 数据会被JavaRDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴 */ JavaRDD<String> lines = jsc.textFile("D:/workspace/opslabSpark/resources/README.md"); /* * 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算 * 第4.1步:讲每一行的字符串拆分成单个的单词 */ JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s)throws Exception { return (new ArrayList<String>(Arrays.asList(s.split(" ")))).iterator(); } //如果是scala由于Sam转化所以可以写成一行代码 }); /* * 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算 * 第4.2步:在单词拆分的基础上对每个单词实例计数为1,也就是word => (word, 1) */ JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word)throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(word, 1); } }); /* * 第4步:对初始的RDD进行Transformation级别的处理,例如map、filter等高阶函数等的编程,来进行具体的数据计算 * 第4.3步:在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数 */ JavaPairRDD<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { //对相同的Key,进行Value的累计(包括Local和Reducer级别同时Reduce) @Override public Integer call(Integer v1, Integer v2)throws Exception { // TODO Auto-generated method stub return v1 + v2; } }); wordsCount.foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> pairs)throws Exception { // TODO Auto-generated method stub System.out.println(pairs._1 + " : " + pairs._2); } }); // 5. 结果输出 // 5.1 结果输出到HDFS //wordsCount.saveAsTextFile("D:/workspace/opslabJava/spark/out/sparkout/wordcount"); // 单独出来结果集 wordsCount.foreachPartition(new VoidFunction<java.util.Iterator<Tuple2<String, Integer>>>() { @Override public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator)throws Exception { Tuple2<String, Integer> t2 = tuple2Iterator.next(); System.out.println(t2._1()); System.out.println(t2._2()); } }); } } }
RDD支持两种类型的算子(operation):transformation算子 和 action算子
transformation算子可以将已有RDD转换得到一个新的RDD,而action算子则是基于数据集计算,并将结果返回给驱动器(driver)。例如,map是一个transformation算子,它将数据集中每个元素传给一个指定的函数,并将该函数返回结果构建为一个新的RDD;而 reduce是一个action算子,它可以将RDD中所有元素传给指定的聚合函数,并将最终的聚合结果返回给驱动器(还有一个reduceByKey算子,其返回的聚合结果是一个数据集)。
Spark中所有transformation算子都是懒惰的,也就是说,这些算子并不立即计算结果,而是记录下对基础数据集(如:一个数据文件)的转换操作。只有等到某个action算子需要计算一个结果返回给驱动器的时候,transformation算子所记录的操作才会被计算。这种设计使Spark可以运行得更加高效 – 例如,map算子创建了一个数据集,同时该数据集下一步会调用reduce算子,那么Spark将只会返回reduce的最终聚合结果(单独的一个数据)给驱动器,而不是将map所产生的数据集整个返回给驱动器。默认情况下,每次调用action算子的时候,每个由transformation转换得到的RDD都会被重新计算。然而,你也可以通过调用persist(或者cache)操作来持久化一个RDD,这意味着Spark将会把RDD的元素都保存在集群中,因此下一次访问这些元素的速度将大大提高。同时,Spark还支持将RDD元素持久化到内存或者磁盘上,甚至可以支持跨节点多副本。
以下是Spark支持的一些常用transformation算子。详细请参考 RDD API doc (Scala, Java, Python, R) 以及 键值对 RDD 函数 (Scala, Java) 。Java的相关API可以查阅 http://spark.apache.org/docs/latest/api/java/index.html
) 对。
注意:如果你需要按key分组聚合的话(如sum或average),推荐使用 reduceByKey或者 aggregateByKey 以获得更好的性能。
注意:默认情况下,输出计算的并行度取决于源RDD的分区个数。当然,你也可以通过设置可选参数 numTasks 来指定并行任务的个数。
package com.opslab.spark; import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import java.util.List; public class ApiMap{ public static void main(String[] args){ //演示api函数map SparkConf conf = new SparkConf().setMaster("local").setAppName("APIMap"); try (JavaSparkContext jsc = new JavaSparkContext(conf)) { List<Integer> list = Lists.newArrayList(1, 2, 3); JavaRDD<Integer> javaRDD = jsc.parallelize(list); //对数据集执行map操作,返回一个新的javaRDD JavaRDD<Integer> mapRDD = javaRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer integer)throws Exception { return integer * integer; } }); //mapRDD.collect(); mapRDD.foreach(new VoidFunction<Integer>() { @Override public void call(Integer integer)throws Exception { System.out.println(integer); } }); //与上面的方式相同,只是使用了便捷的lambda JavaRDD<Integer> mapRDD1 = javaRDD.map(s -> s * s); //mapRDD.collect(); mapRDD1.foreach(new VoidFunction<Integer>() { @Override public void call(Integer integer)throws Exception { System.out.println(integer); } }); } } }
以下是Spark支持的一些常用action算子。详细请参考 RDD API doc (Scala, Java, Python, R) 以及 键值对 RDD 函数 (Scala, Java) 。
package com.opslab.spark; import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import java.util.List; public class ApiReduce{ public static void main(String[] args){ //演示api函数map SparkConf conf = new SparkConf().setMaster("local").setAppName("APIMap"); try (JavaSparkContext jsc = new JavaSparkContext(conf)) { List<Integer> list = Lists.newArrayList(1, 2, 3); JavaRDD<Integer> javaRDD = jsc.parallelize(list); //对数据集执行map操作,返回一个新的javaRDD Integer reduce = javaRDD.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2)throws Exception { return integer + integer2; } }); System.out.println(reduce); } } }