l 主机操作系统: Windows 64 位,双核 4 线程,主频 2.2G , 10G 内存
l 虚拟软件: VMware® Workstation 9.0.0 build-812388
l 虚拟机操作系统: CentOS 64 位,单核
l 虚拟机运行环境:
Ø JDK : 1.7.0_55 64 位
Ø Hadoop : 2.2.0 (需要编译为 64 位)
Ø Scala : 2.10.4
Ø Spark : 1.1.0 (需要编译)
Ø Hive : 0.13.1
集群包含三个节点,节点之间可以免密码 SSH 访问,节点 IP 地址和主机名分布如下:
序号 | IP 地址 | 机器名 | 类型 | 核数 / 内存 | 用户名 | 目录 |
1 | 192.168.0.61 | hadoop1 | NN/DN/RM Master/Worker | 1 核 /3G | hadoop | /app 程序所在路径 /app/scala-... /app/hadoop /app/complied |
2 | 192.168.0.62 | hadoop2 | DN/NM/Worker | 1 核 /2G | hadoop | |
3 | 192.168.0.63 | hadoop3 | DN/NM/Worker | 1 核 /2G | hadoop |
SparkSQL 引入了一种新的 RDD —— SchemaRDD , SchemaRDD 由行对象( Row )以及描述行对象中每列数据类型的 Schema 组成; SchemaRDD 很象传统数据库中的表。 SchemaRDD 可以通过 RDD 、 Parquet 文件、 JSON 文件、或者通过使用 hiveql 查询 hive 数据来建立。 SchemaRDD 除了可以和 RDD 一样操作外,还可以通过 registerTempTable 注册成临时表,然后通过 SQL 语句进行操作。
值得注意的是:
l Spark1.1 使用 registerTempTable 代替 1.0 版本的 registerAsTable
l Spark1.1 在 hiveContext 中, hql() 将被弃用, sql() 将代替 hql() 来提交查询语句,统一了接口。
l 使用 registerTempTable 注册表是一个临时表,生命周期只在所定义的 sqlContext 或 hiveContext 实例之中。换而言之,在一个 sqlontext (或 hiveContext )中 registerTempTable 的表不能在另一个 sqlContext (或 hiveContext )中使用。
另外, Spark1.1 提供了语法解析器选项 spark.sql.dialect ,就目前而言, Spark1.1 提供了两种语法解析器: sql 语法解析器和 hiveql 语法解析器。
l sqlContext 现在只支持 sql 语法解析器( SQL-92 语法)
l hiveContext 现在支持 sql 语法解析器和 hivesql 语法解析器,默认为 hivesql 语法解析器,用户可以通过配置切换成 sql 语法解析器,来运行 hiveql 不支持的语法,如 select 1 。
l 切换可以通过下列方式完成:
l 在 sqlContexet 中使用 setconf 配置 spark.sql.dialect
l 在 hiveContexet 中使用 setconf 配置 spark.sql.dialect
l 在 sql 命令中使用 set spark.sql.dialect=value
SparkSQL1.1 对数据的查询分成了 2 个分支: sqlContext 和 hiveContext 。至于两者之间的关系, hiveSQL 继承了 sqlContext ,所以拥有 sqlontext 的特性之外,还拥有自身的特性(最大的特性就是支持 hive )。
使用如下命令打开 /etc/profile 文件:
sudo vi /etc/profile
设置如下参数:
export SPARK_HOME=/app/hadoop/spark-1.1.0
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export HIVE_HOME=/app/hadoop/hive-0.13.1
export PATH=$PATH:$HIVE_HOME/bin
export CLASSPATH=$CLASSPATH:$HIVE_HOME/bin
$cd /app/hadoop/hadoop-2.2.0/sbin
$./start-dfs.sh
$cd /app/hadoop/spark-1.1.0/sbin
$./start-all.sh
在 spark 客户端(在 hadoop1 节点 ) ,使用 spark-shell 连接集群
$cd /app/hadoop/spark-1.1.0/bin
$./spark-shell --master spark://hadoop1:7077 --executor-memory 1g
启动后查看启动情况,如下图所示:
Spark1.1.0 开始提供了两种方式将 RDD 转换成 SchemaRDD :
l 通过定义 Case Class ,使用反射推断 Schema ( case class 方式)
l 通过可编程接口,定义 Schema ,并应用到 RDD 上( applySchema 方式 )
前者使用简单、代码简洁,适用于已知 Schema 的源数据上;后者使用较为复杂,但可以在程序运行过程中实行,适用于未知 Schema 的 RDD 上。
对于 Case Class 方式,首先要定义 Case Class ,在 RDD 的 Transform 过程中使用 Case Class 可以隐式转化成 SchemaRDD ,然后再使用 registerTempTable 注册成表。注册成表后就可以在 sqlContext 对表进行操作,如 select 、 insert 、 join 等。注意, case class 可以是嵌套的,也可以使用类似 Sequences 或 Arrays 之类复杂的数据类型。
下面的例子是定义一个符合数据文件 /sparksql/people.txt 类型的 case clase ( Person ),然后将数据文件读入后隐式转换成 SchemaRDD : people ,并将 people 在 sqlContext 中注册成表 rddTable ,最后对表进行查询,找出年纪在 13-19 岁之间的人名。
第一步 上传测试数据
在 HDFS 中创建 /class6 目录,把配套资源 /data/class5/ people.txt 上传到该目录上
$hadoop fs -mkdir /class6
$hadoop fs -copyFromLocal /home/hadoop/upload/class6/people.* /class6
$hadoop fs -ls /
第二步 定义 sqlContext 并引入包
//sqlContext 演示
scala>val sqlContext=new org.apache.spark.sql.SQLContext(sc)
scala>import sqlContext.createSchemaRDD
第三步 定义 Person 类,读入数据并注册为临时表
//RDD1 演示
scala>case class Person(name:String,age:Int)
scala>val rddpeople=sc.textFile("hdfs://hadoop1:9000/class6/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))
scala>rddpeople.registerTempTable("rddTable")
第四步 在查询年纪在 13-19 岁之间的人员
scala>sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
上面步骤均为 trnsform 未触发 action 动作,在该步骤中查询数据并打印触发了 action 动作,如下图所示:
通过监控页面,查看任务运行情况:
applySchema 方式比较复杂,通常有 3 步过程:
l 从源 RDD 创建 rowRDD
l 创建与 rowRDD 匹配的 Schema
l 将 Schema 通过 applySchema 应用到 rowRDD
第一步 导入包创建 Schema
// 导入 SparkSQL 的数据类型和 Row
scala>import org.apache.spark.sql._
// 创建于数据结构匹配的 schema
scala>val schemaString = "name age"
scala>val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
第二步 创建 rowRDD 并读入数据
// 创建 rowRDD
scala>val rowRDD = sc.textFile("hdfs://hadoop1:9000/class6/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim))
// 用 applySchema 将 schema 应用到 rowRDD
scala>val rddpeople2 = sqlContext.applySchema(rowRDD, schema)
scala>rddpeople2.registerTempTable("rddTable2")
第三步 查询获取数据
scala>sqlContext.sql("SELECT name FROM rddTable2 WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
通过监控页面,查看任务运行情况:
同样得, sqlContext 可以读取 parquet 文件,由于 parquet 文件中保留了 schema 的信息,所以不需要使用 case class 来隐式转换。 sqlContext 读入 parquet 文件后直接转换成 SchemaRDD ,也可以将 SchemaRDD 保存成 parquet 文件格式。
第一步 保存成 parquest 格式文件
// 把上面步骤中的 rddpeople 保存为 parquet 格式文件到 hdfs 中
scala>rddpeople.saveAsParquetFile("hdfs://hadoop1:9000/class6/people.parquet")
第二步 读入 parquest 格式文件,注册表 parquetTable
//parquet 演示
scala>val parquetpeople = sqlContext.parquetFile("hdfs://hadoop1:9000/class6/people.parquet")
scala>parquetpeople.registerTempTable("parquetTable")
第三步 查询年龄大于等于 25 岁的人名
scala>sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)
sparkSQL1.1.0 开始提供对 json 文件格式的支持,这意味着开发者可以使用更多的数据源,如鼎鼎大名的 NOSQL 数据库 MongDB 等。 sqlContext 可以从 jsonFile 或 jsonRDD 获取 schema 信息,来构建 SchemaRDD ,注册成表后就可以使用。
l jsonFile - 加载 JSON 文件目录中的数据,文件的每一行是一个 JSON 对象
l jsonRdd - 从现有的 RDD 加载数据,其中 RDD 的每个元素包含一个 JSON 对象的字符串
第一步 上传测试数据
第二步 读取数据并注册 jsonTable 表
//json 演示
scala>val jsonpeople = sqlContext.jsonFile("hdfs://hadoop1:9000/class6/people.json")
jsonpeople.registerTempTable("jsonTable")
第三步 查询年龄大于等于 25 的人名
scala>sqlContext.sql("SELECT name FROM jsonTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)
在 sqlContext 或 hiveContext 中来源于不同数据源的表在各自生命周期中可以混用,即 sqlContext 与 hiveContext 之间表不能混合使用
//sqlContext 中来自 rdd 的表 rddTable 和来自 parquet 文件的表 parquetTable 混合使用
scala>sqlContext.sql("select a.name,a.age,b.age from rddTable a join parquetTable b on a.name=b.name").collect().foreach(println)
使用 hiveContext 之前首先要确认以下两点:
l 使用的 Spark 是支持 hive
l Hive 的配置文件 hive-site.xml 已经存在 conf 目录中
前者可以查看 lib 目录下是否存在以 datanucleus 开头的 3 个 JAR 来确定,后者注意是否在 hive-site.xml 里配置了 uris 来访问 Hive Metastore 。
在 hadoop1 节点中使用如下命令启动 Hive
$nohup hive --service metastore > metastore.log 2>&1 &
在 SPARK_HOME/conf 目录下创建 hive-site.xml 文件,修改配置后需要重新启动 Spark-Shell
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop1:9083</value>
<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
</property>
</configuration>
要使用 hiveContext ,需要先构建 hiveContext :
scala>val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
然后就可以对 Hive 数据进行操作了,下面我们将使用 Hive 中的销售数据,首先切换数据库到 hive 并查看有几个表:
// 销售数据演示
scala>hiveContext.sql("use hive")
scala>hiveContext.sql("show tables").collect().foreach(println)
// 所有订单中每年的销售单数、销售总额
// 三个表连接后以 count(distinct a.ordernumber) 计销售单数, sum(b.amount) 计销售总额
scala>hiveContext.sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear").collect().foreach(println)
结果如下:
[2004,1094,3265696]
[2005,3828,13247234]
[2006,3772,13670416]
[2007,4885,16711974]
[2008,4861,14670698]
[2009,2619,6322137]
[2010,94,210924]
通过监控页面,查看任务运行情况:
第一步 实现分析
所有订单每年最大金额订单的销售额 :
1 、先求出每份订单的销售额以其发生时间
select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber
2 、以第一步的查询作为子表,和表 tbDate 连接,求出每年最大金额订单的销售额
select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear
第二步 实现 SQL 语句
scala>hiveContext.sql("select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear").collect().foreach(println)
结果如下:
[2010,13063]
[2004,23612]
[2005,38180]
[2006,36124]
[2007,159126]
[2008,55828]
[2009,25810]
第三步 监控任务运行情况
第一步 实现分析
所有订单中每年最畅销货品:
1 、求出每年每个货品的销售金额
scala>select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid
2 、求出每年单品销售的最大金额
scala>select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear
3 、求出每年与销售额最大相符的货品就是最畅销货品
scala>select distinct e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear
第二步 实现 SQL 语句
scala>hiveContext.sql("select distinct e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear").collect().foreach(println)
结果如下:
[2004,JY424420810101,53374]
[2005,24124118880102,56569]
[2006,JY425468460101,113684]
[2007,JY425468460101,70226]
[2008,E2628204040101,97981]
[2009,YL327439080102,30029]
[2010,SQ429425090101,4494]
第三步 监控任务运行情况
第一步 创建 hiveTable 从本地文件系统加载数据
// 创建一个 hiveTable 并将数据加载,注意 people.txt 第二列有空格,所以 age 取 string 类型
scala>hiveContext.sql("CREATE TABLE hiveTable(name string,age string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '/n' ")
scala>hiveContext.sql("LOAD DATA LOCAL INPATH '/home/hadoop/upload/class6/people.txt' INTO TABLE hiveTable")
第二步 创建 parquet 表,从 HDFS 加载数据
// 创建一个源自 parquet 文件的表 parquetTable2 ,然后和 hiveTable 混合使用
scala>hiveContext.parquetFile("hdfs://hadoop1:9000/class6/people.parquet").registerTempTable("parquetTable2")
第三步 两个表混合使用
scala>hiveContext.sql("select a.name,a.age,b.age from hiveTable a join parquetTable2 b on a.name=b.name").collect().foreach(println)
sparkSQL 的 cache 可以使用两种方法来实现:
l CacheTable() 方法
l CACHE TABLE 命令
千万不要先使用 cache SchemaRDD ,然后 registerAsTable ;使用 RDD 的 cache() 将使用原生态的 cache ,而不是针对 SQL 优化后的内存列存储。
第一步 对 rddTable 表进行缓存
//cache 使用
scala>val sqlContext=new org.apache.spark.sql.SQLContext(sc)
scala>import sqlContext.createSchemaRDD
scala>case class Person(name:String,age:Int)
scala>val rddpeople=sc.textFile("hdfs://hadoop1:9000/class6/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))
scala>rddpeople.registerTempTable("rddTable")
scala>sqlContext.cacheTable("rddTable")
scala>sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
在监控界面上看到该表数据已经缓存
第二步 对 parquetTable 表进行缓存
scala>val parquetpeople = sqlContext.parquetFile("hdfs://hadoop1:9000/class6/people.parquet")
scala>parquetpeople.registerTempTable("parquetTable")
scala>sqlContext.sql("CACHE TABLE parquetTable")
scala>sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
在监控界面上看到该表数据已经缓存
第三步 解除缓存
//uncache 使用
scala>sqlContext.uncacheTable("rddTable")
scala>sqlContext.sql("UNCACHE TABLE parquetTable")
SparkSQL 除了支持 HiveQL 和 SQL-92 语法外,还支持 DSL ( Domain Specific Language )。在 DSL 中,使用 Scala 符号 '+ 标示符表示基础表中的列, Spark 的 execution engine 会将这些标示符隐式转换成表达式。另外可以在 API 中找到很多 DSL 相关的方法,如 where() 、 select() 、 limit() 等等,详细资料可以查看 Catalyst 模块中的 DSL 子模块,下面为其中定义几种常用方法:
//DSL 演示
scala>import sqlContext._
scala>val teenagers_dsl = rddpeople.where('age >= 10).where('age <= 19).select('name)
scala>teenagers_dsl.map(t => "Name: " + t(0)).collect().foreach(println)
Spark 之所以万人瞩目,除了内存计算还有其 ALL-IN-ONE 的特性,实现了 One stack rule them all 。下面简单模拟了几个综合应用场景,不仅使用了 sparkSQL ,还使用了其他 Spark 组件:
l SQL On Spark :使用 sqlContext 查询年纪大于等于 10 岁的人名
l Hive On Spark :使用了 hiveContext 计算每年销售额
l 店铺分类,根据销售额对店铺分类,使用 sparkSQL 和 MLLib 聚类算法
l PageRank ,计算最有价值的网页,使用 sparkSQL 和 GraphX 的 PageRank 算法
以下实验采用 IntelliJ IDEA 调试代码,最后生成 LearnSpark.jar ,然后使用 spark-submit 提交给集群运行。
在 src->main->scala 下创建 class6 包,在该包中添加 SQLOnSpark 对象文件,具体代码如下:
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
case class Person(name: String, age: Int)
object SQLOnSpark {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SQLOnSpark")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext._
val people: RDD[Person] = sc.textFile("hdfs://hadoop1:9000/class6/people.txt")
.map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 10 and age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
sc.stop()
}
}
先对该代码进行编译,然后运行该程序,需要注意的是在 IDEA 中需要在 SparkConf 添加 setMaster("local") 设置为本地运行。运行时可以通过运行窗口进行观察:
打印运行结果
第一步 配置打包信息
在项目结构界面中选择 "Artifacts" ,在右边操作界面选择绿色 "+" 号,选择添加 JAR 包的 "From modules with dependencies" 方式,出现如下界面,在该界面中选择主函数入口为 SQLOnSpark :
第二步 填写该 JAR 包名称和调整输出内容
打包路径为 /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar
【注意】的是默认情况下 "Output Layout" 会附带 Scala 相关的类包,由于运行环境已经有 Scala 相关类包,所以在这里去除这些包只保留项目的输出内容
第三步 输出打包文件
点击菜单 Build->Build Artifacts ,弹出选择动作,选择 Build 或者 Rebuild 动作
第四步 复制打包文件到 Spark 根目录下
cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar
cp LearnSpark.jar /app/hadoop/spark-1.1.0/
ll /app/hadoop/spark-1.1.0/
通过如下命令调用打包中的 SQLOnSpark 方法,运行结果如下:
cd /app/hadoop/spark-1.1.0
bin/spark-submit --master spark://hadoop1:7077 --class class6.SQLOnSpark --executor-memory 1g LearnSpark.jar
在 class6 包中添加 HiveOnSpark 对象文件,具体代码如下:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
object HiveOnSpark {
case class Record(key: Int, value: String)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HiveOnSpark")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
import hiveContext._
sql("use hive")
sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear")
.collect().foreach(println)
sc.stop()
}
}
按照 3.1.3SQL On Spark 方法进行打包
【注】 需要启动 Hive 服务,参见 2.3.1
通过如下命令调用打包中的 SQLOnSpark 方法,运行结果如下:
cd /app/hadoop/spark-1.1.0
bin/spark-submit --master spark://hadoop1:7077 --class class6.HiveOnSpark --executor-memory 1g LearnSpark.jar
通过监控页面看到名为 HiveOnSpark 的作业运行情况:
分类在实际应用中非常普遍,比如对客户进行分类、对店铺进行分类等等,对不同类别采取不同的策略,可以有效的降低企业的营运成本、增加收入。机器学习中的聚类就是一种根据不同的特征数据,结合用户指定的类别数量,将数据分成几个类的方法。下面举个简单的例子,按照销售数量和销售金额这两个特征数据,进行聚类,分出 3 个等级的店铺。
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
object SQLMLlib {
def main(args: Array[String]) {
// 屏蔽不必要的日志显示在终端上
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// 设置运行环境
val sparkConf = new SparkConf().setAppName("SQLMLlib")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
// 使用 sparksql 查出每个店的销售数量和金额
hiveContext.sql("use hive")
hiveContext.sql("SET spark.sql.shuffle.partitions=20")
val sqldata = hiveContext.sql("select a.locationid, sum(b.qty) totalqty,sum(b.amount) totalamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.locationid")
// 将查询数据转换成向量
val parsedData = sqldata.map {
case Row(_, totalqty, totalamount) =>
val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble)
Vectors.dense(features)
}
// 对数据集聚类, 3 个类, 20 次迭代,形成数据模型
// 注意这里会使用设置的 partition 数 20
val numClusters = 3
val numIterations = 20
val model = KMeans.train(parsedData, numClusters, numIterations)
// 用模型对读入的数据进行分类,并输出
// 由于 partition 没设置,输出为 200 个小文件,可以使用 bin/hdfs dfs -getmerge 合并下载到本地
val result2 = sqldata.map {
case Row(locationid, totalqty, totalamount) =>
val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble)
val linevectore = Vectors.dense(features)
val prediction = model.predict(linevectore)
locationid + " " + totalqty + " " + totalamount + " " + prediction
}.saveAsTextFile(args(0))
sc.stop()
}
}
按照 3.1.3SQL On Spark 方法进行打包
通过如下命令调用打包中的 SQLOnSpark 方法:
cd /app/hadoop/spark-1.1.0
bin/spark-submit --master spark://hadoop1:7077 --class class6.SQLMLlib --executor-memory 1g LearnSpark.jar /class6/output1
运行过程,可以发现聚类过程都是使用 20 个 partition :
查看运行结果,分为 20 个文件存放在 HDFS 中
使用 getmerge 将结果转到本地文件,并查看结果:
cd /home/hadoop/upload
hdfs dfs -getmerge /class6/output1 result.txt
最后使用 R 做示意图,用 3 种不同的颜色表示不同的类别。
PageRank, 即网页排名,又称网页级别、 Google 左侧排名或佩奇排名,是 Google 创始人拉里·佩奇和谢尔盖·布林于 1997 年构建早期的搜索系统原型时提出的链接分析算法。目前很多重要的链接分析算法都是在 PageRank 算法基础上衍生出来的。 PageRank 是 Google 用于用来标识网页的等级 / 重要性的一种方法,是 Google 用来衡量一个网站的好坏的唯一标准。在揉合了诸如 Title 标识和 Keywords 标识等所有其它因素之后, Google 通过 PageRank 来调整结果,使那些更具“等级 / 重要性”的网页在搜索结果中令网站排名获得提升,从而提高搜索结果的相关性和质量。
Spark GraphX 引入了 google 公司的图处理引擎 pregel ,可以方便的实现 PageRank 的计算。
下面实例采用的数据是 wiki 数据中含有 Berkeley 标题的网页之间连接关系,数据为两个文件: graphx-wiki-vertices.txt 和 graphx-wiki-edges.txt ,可以分别用于图计算的顶点和边。把这两个文件上传到本地文件系统 /home/hadoop/upload/class6 目录中(注:这两个文件可以从该系列附属资源 /data/class6 中获取)
第一步 上传数据
第二步 启动 SparkSQL
参见第 6 课《 SparkSQL (一) --SparkSQL 简介》 3.2.3 启动 SparkSQL
$cd /app/hadoop/spark-1.1.0
$bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g
第三步 定义表并加载数据
创建 vertices 和 edges 两个表并加载数据:
spark-sql>show databases;
spark-sql>use hive;
spark-sql>CREATE TABLE vertices(ID BigInt,Title String) ROW FORMAT DELIMITED FIELDS TERMINATED BY '/t' LINES TERMINATED BY '/n'; LOAD DATA LOCAL INPATH '/home/hadoop/upload/class6/graphx-wiki-vertices.txt' INTO TABLE vertices;
spark-sql>CREATE TABLE edges(SRCID BigInt,DISTID BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY '/t' LINES TERMINATED BY '/n'; LOAD DATA LOCAL INPATH '/home/hadoop/upload/class6/graphx-wiki-edges.txt' INTO TABLE edges;
查看创建结果
spark-sql>show tables;
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.graphx._
import org.apache.spark.sql.catalyst.expressions.Row
object SQLGraphX {
def main(args: Array[String]) {
// 屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// 设置运行环境
val sparkConf = new SparkConf().setAppName("PageRank")
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
// 使用 sparksql 查出每个店的销售数量和金额
hiveContext.sql("use hive")
val verticesdata = hiveContext.sql("select id, title from vertices")
val edgesdata = hiveContext.sql("select srcid,distid from edges")
// 装载顶点和边
val vertices = verticesdata.map { case Row(id, title) => (id.toString.toLong, title.toString)}
val edges = edgesdata.map { case Row(srcid, distid) => Edge(srcid.toString.toLong, distid.toString.toLong, 0)}
// 构建图
val graph = Graph(vertices, edges, "").persist()
//pageRank 算法里面的时候使用了 cache() ,故前面 persist 的时候只能使用 MEMORY_ONLY
println("**********************************************************")
println("PageRank 计算,获取最有价值的数据 ")
println("**********************************************************")
val prGraph = graph.pageRank(0.001).cache()
val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {
(v, title, rank) => (rank.getOrElse(0.0), title)
}
titleAndPrGraph.vertices.top(10) {
Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)
}.foreach(t => println(t._2._2 + ": " + t._2._1))
sc.stop()
}
}
按照 3.1.3SQL On Spark 方法进行打包
通过如下命令调用打包中的 SQLOnSpark 方法:
cd /app/hadoop/spark-1.1.0
bin/spark-submit --master spark://hadoop1:7077 --class class6.SQLGraphX --executor-memory 1g LearnSpark.jar
运行结果:
在现实数据处理过程中,这种涉及多个系统处理的场景很多。通常各个系统之间的数据通过磁盘落地再交给下一个处理系统进行处理。对于 Spark 来说,通过多个组件的配合,可以以流水线的方式来处理数据。从上面的代码可以看出,程序除了最后有磁盘落地外,都是在内存中计算的。避免了多个系统中交互数据的落地过程,提高了效率。这才是 spark 生态系统真正强大之处: One stack rule them all 。另外 sparkSQL+sparkStreaming 可以架构当前非常热门的 Lambda 架构体系,为 CEP 提供解决方案。也正是如此强大,才吸引了广大开源爱好者的目光,促进了 Spark 生态的高速发展。