转载

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

【注】该系列文章以及使用到安装包/测试数据 可以在《 倾情大奉送--Spark入门实战系列 》获取

1 运行环境说明

1.1   硬软件环境

l   主机操作系统: Windows 64 位,双核 4 线程,主频 2.2G10G 内存

l   虚拟软件: VMware® Workstation 9.0.0 build-812388

l   虚拟机操作系统: CentOS 64 位,单核

l   虚拟机运行环境:

Ø   JDK1.7.0_55 64

Ø   Hadoop2.2.0 (需要编译为 64 位)

Ø   Scala2.10.4

Ø   Spark1.1.0 (需要编译)

Ø   Hive0.13.1

1.2   机器网络环境

集群包含三个节点,节点之间可以免密码 SSH 访问,节点 IP 地址和主机名分布如下:

序号

IP 地址

机器名

类型

核数 / 内存

用户名

目录

1

192.168.0.61

hadoop1

NN/DN/RM

Master/Worker

1 /3G

hadoop

/app 程序所在路径

/app/scala-...

/app/hadoop

/app/complied

2

192.168.0.62

hadoop2

DN/NM/Worker

1 /2G

hadoop

3

192.168.0.63

hadoop3

DN/NM/Worker

1 /2G

hadoop

2 Spark 基础应用

SparkSQL 引入了一种新的 RDD —— SchemaRDDSchemaRDD 由行对象( Row )以及描述行对象中每列数据类型的 Schema 组成; SchemaRDD 很象传统数据库中的表。 SchemaRDD 可以通过 RDDParquet 文件、 JSON 文件、或者通过使用 hiveql 查询 hive 数据来建立。 SchemaRDD 除了可以和 RDD 一样操作外,还可以通过 registerTempTable 注册成临时表,然后通过 SQL 语句进行操作。

值得注意的是:

l Spark1.1 使用 registerTempTable 代替 1.0 版本的 registerAsTable

l Spark1.1hiveContext 中, hql() 将被弃用, sql() 将代替 hql() 来提交查询语句,统一了接口。

l 使用 registerTempTable 注册表是一个临时表,生命周期只在所定义的 sqlContexthiveContext 实例之中。换而言之,在一个 sqlontext (或 hiveContext )中 registerTempTable 的表不能在另一个 sqlContext (或 hiveContext )中使用。

另外, Spark1.1 提供了语法解析器选项 spark.sql.dialect ,就目前而言, Spark1.1 提供了两种语法解析器: sql 语法解析器和 hiveql 语法解析器。

l sqlContext 现在只支持 sql 语法解析器( SQL-92 语法)

l hiveContext 现在支持 sql 语法解析器和 hivesql 语法解析器,默认为 hivesql 语法解析器,用户可以通过配置切换成 sql 语法解析器,来运行 hiveql 不支持的语法,如 select 1

l 切换可以通过下列方式完成:

l sqlContexet 中使用 setconf 配置 spark.sql.dialect

l hiveContexet 中使用 setconf 配置 spark.sql.dialect

l sql 命令中使用 set spark.sql.dialect=value

SparkSQL1.1 对数据的查询分成了 2 个分支: sqlContexthiveContext 。至于两者之间的关系, hiveSQL 继承了 sqlContext ,所以拥有 sqlontext 的特性之外,还拥有自身的特性(最大的特性就是支持 hive )。

2.1   启动 Spark shell

2.1.1  环境设置

使用如下命令打开 /etc/profile 文件:

sudo vi /etc/profile

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

设置如下参数:

export SPARK_HOME=/app/hadoop/spark-1.1.0

export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

export HIVE_HOME=/app/hadoop/hive-0.13.1

export PATH=$PATH:$HIVE_HOME/bin

export CLASSPATH=$CLASSPATH:$HIVE_HOME/bin

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.1.2  启动 HDFS

$cd /app/hadoop/hadoop-2.2.0/sbin

$./start-dfs.sh

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.1.3  启动 Spark 集群

$cd /app/hadoop/spark-1.1.0/sbin

$./start-all.sh

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.1.4  启动 Spark-Shell

spark 客户端(在 hadoop1 节点 ) ,使用 spark-shell 连接集群

$cd /app/hadoop/spark-1.1.0/bin

$./spark-shell --master spark://hadoop1:7077 --executor-memory 1g

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

启动后查看启动情况,如下图所示:

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.2   sqlContext 演示

Spark1.1.0 开始提供了两种方式将 RDD 转换成 SchemaRDD

l 通过定义 Case Class ,使用反射推断 Schemacase class 方式)

l 通过可编程接口,定义 Schema ,并应用到 RDD 上( applySchema 方式 )

前者使用简单、代码简洁,适用于已知 Schema 的源数据上;后者使用较为复杂,但可以在程序运行过程中实行,适用于未知 SchemaRDD 上。

2.2.1  使用 Case Class 定义 RDD 演示

对于 Case Class 方式,首先要定义 Case Class ,在 RDDTransform 过程中使用 Case Class 可以隐式转化成 SchemaRDD ,然后再使用 registerTempTable 注册成表。注册成表后就可以在 sqlContext 对表进行操作,如 selectinsertjoin 等。注意, case class 可以是嵌套的,也可以使用类似 SequencesArrays 之类复杂的数据类型。

下面的例子是定义一个符合数据文件 /sparksql/people.txt 类型的 case clasePerson ),然后将数据文件读入后隐式转换成 SchemaRDDpeople ,并将 peoplesqlContext 中注册成表 rddTable ,最后对表进行查询,找出年纪在 13-19 岁之间的人名。

第一步    上传测试数据

HDFS 中创建 /class6 目录,把配套资源 /data/class5/ people.txt 上传到该目录上

$hadoop fs -mkdir /class6

$hadoop fs -copyFromLocal /home/hadoop/upload/class6/people.* /class6

$hadoop fs -ls /

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第二步    定义 sqlContext 并引入包

//sqlContext 演示

scala>val sqlContext=new org.apache.spark.sql.SQLContext(sc)

scala>import sqlContext.createSchemaRDD

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第三步    定义 Person 类,读入数据并注册为临时表

//RDD1 演示

scala>case class Person(name:String,age:Int)

scala>val rddpeople=sc.textFile("hdfs://hadoop1:9000/class6/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))

scala>rddpeople.registerTempTable("rddTable")

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第四步    在查询年纪在 13-19 岁之间的人员

scala>sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)

上面步骤均为 trnsform 未触发 action 动作,在该步骤中查询数据并打印触发了 action 动作,如下图所示:

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

通过监控页面,查看任务运行情况:

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.2.2  使用 applySchema 定义 RDD 演示

applySchema 方式比较复杂,通常有 3 步过程:

l 从源 RDD 创建 rowRDD

l 创建与 rowRDD 匹配的 Schema

l Schema 通过 applySchema 应用到 rowRDD

第一步    导入包创建 Schema

// 导入 SparkSQL 的数据类型和 Row

scala>import org.apache.spark.sql._

// 创建于数据结构匹配的 schema

scala>val schemaString = "name age"

scala>val schema =

StructType(

schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第二步    创建 rowRDD 并读入数据

// 创建 rowRDD

scala>val rowRDD = sc.textFile("hdfs://hadoop1:9000/class6/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim))

// applySchemaschema 应用到 rowRDD

scala>val rddpeople2 = sqlContext.applySchema(rowRDD, schema)

scala>rddpeople2.registerTempTable("rddTable2")

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第三步    查询获取数据

scala>sqlContext.sql("SELECT name FROM rddTable2 WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

通过监控页面,查看任务运行情况:

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.2.3  parquet 演示

同样得, sqlContext 可以读取 parquet 文件,由于 parquet 文件中保留了 schema 的信息,所以不需要使用 case class 来隐式转换。 sqlContext 读入 parquet 文件后直接转换成 SchemaRDD ,也可以将 SchemaRDD 保存成 parquet 文件格式。

第一步    保存成 parquest 格式文件

// 把上面步骤中的 rddpeople 保存为 parquet 格式文件到 hdfs

scala>rddpeople.saveAsParquetFile("hdfs://hadoop1:9000/class6/people.parquet")

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第二步    读入 parquest 格式文件,注册表 parquetTable

//parquet 演示

scala>val parquetpeople = sqlContext.parquetFile("hdfs://hadoop1:9000/class6/people.parquet")

scala>parquetpeople.registerTempTable("parquetTable")

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第三步    查询年龄大于等于 25 岁的人名

scala>sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.2.4  json 演示

sparkSQL1.1.0 开始提供对 json 文件格式的支持,这意味着开发者可以使用更多的数据源,如鼎鼎大名的 NOSQL 数据库 MongDB 等。 sqlContext 可以从 jsonFilejsonRDD 获取 schema 信息,来构建 SchemaRDD ,注册成表后就可以使用。

l jsonFile - 加载 JSON 文件目录中的数据,文件的每一行是一个 JSON 对象

l jsonRdd - 从现有的 RDD 加载数据,其中 RDD 的每个元素包含一个 JSON 对象的字符串

第一步    上传测试数据

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用  

第二步    读取数据并注册 jsonTable

//json 演示

scala>val jsonpeople = sqlContext.jsonFile("hdfs://hadoop1:9000/class6/people.json")

jsonpeople.registerTempTable("jsonTable")

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第三步    查询年龄大于等于 25 的人名

scala>sqlContext.sql("SELECT name FROM jsonTable WHERE age >= 25").map(t => "Name: " + t(0)).collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.2.5  sqlContext 中混合使用演示

sqlContexthiveContext 中来源于不同数据源的表在各自生命周期中可以混用,即 sqlContexthiveContext 之间表不能混合使用

//sqlContext 中来自 rdd 的表 rddTable 和来自 parquet 文件的表 parquetTable 混合使用

scala>sqlContext.sql("select a.name,a.age,b.age from rddTable a join parquetTable b on a.name=b.name").collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.3   hiveContext 演示

使用 hiveContext 之前首先要确认以下两点:

l 使用的 Spark 是支持 hive

l Hive 的配置文件 hive-site.xml 已经存在 conf 目录中

前者可以查看 lib 目录下是否存在以 datanucleus 开头的 3JAR 来确定,后者注意是否在 hive-site.xml 里配置了 uris 来访问 Hive Metastore

2.3.1  启动 hive

hadoop1 节点中使用如下命令启动 Hive

$nohup hive --service metastore > metastore.log 2>&1 &

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.3.2  SPARK_HOME/conf 目录下创建 hive-site.xml

  SPARK_HOME/conf 目录下创建 hive-site.xml 文件,修改配置后需要重新启动 Spark-Shell

【注】如果在第 6 课《 SparkSQL (二) --SparkSQL 简介》配置,

<configuration>

<property>

  <name>hive.metastore.uris</name>

<value>thrift://hadoop1:9083</value>

<description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>

</property>

</configuration>

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.3.3  查看数据库表

要使用 hiveContext ,需要先构建 hiveContext

scala>val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

然后就可以对 Hive 数据进行操作了,下面我们将使用 Hive 中的销售数据,首先切换数据库到 hive 并查看有几个表:

// 销售数据演示

scala>hiveContext.sql("use hive")

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

scala>hiveContext.sql("show tables").collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.3.4  计算所有订单中每年的销售单数、销售总额

// 所有订单中每年的销售单数、销售总额

// 三个表连接后以 count(distinct a.ordernumber) 计销售单数, sum(b.amount) 计销售总额

scala>hiveContext.sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear").collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

结果如下:

[2004,1094,3265696]

[2005,3828,13247234]

[2006,3772,13670416]

[2007,4885,16711974]

[2008,4861,14670698]

[2009,2619,6322137]

[2010,94,210924]

通过监控页面,查看任务运行情况:

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.3.5  计算所有订单每年最大金额订单的销售额

第一步    实现分析

所有订单每年最大金额订单的销售额 :

1 、先求出每份订单的销售额以其发生时间

select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber

2 、以第一步的查询作为子表,和表 tbDate 连接,求出每年最大金额订单的销售额

select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d   on c.dateid=d.dateid group by c.theyear sort by c.theyear

第二步    实现 SQL 语句

scala>hiveContext.sql("select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d   on c.dateid=d.dateid group by c.theyear sort by c.theyear").collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

结果如下:

[2010,13063]

[2004,23612]

[2005,38180]

[2006,36124]

[2007,159126]

[2008,55828]

[2009,25810]

第三步    监控任务运行情况

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.3.6  计算所有订单中每年最畅销货品

第一步    实现分析

所有订单中每年最畅销货品:

1 、求出每年每个货品的销售金额

scala>select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid

2 、求出每年单品销售的最大金额

scala>select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear

3 、求出每年与销售额最大相符的货品就是最畅销货品

scala>select distinct e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate   c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate   c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear

第二步    实现 SQL 语句

scala>hiveContext.sql("select distinct e.theyear,e.itemid,f.maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate   c on a.dateid=c.dateid group by c.theyear,b.itemid) e join (select d.theyear,max(d.sumofamount) as maxofamount from (select c.theyear,b.itemid,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate   c on a.dateid=c.dateid group by c.theyear,b.itemid) d group by d.theyear) f on (e.theyear=f.theyear and e.sumofamount=f.maxofamount) order by e.theyear").collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

结果如下:

[2004,JY424420810101,53374]

[2005,24124118880102,56569]

[2006,JY425468460101,113684]

[2007,JY425468460101,70226]

[2008,E2628204040101,97981]

[2009,YL327439080102,30029]

[2010,SQ429425090101,4494]

第三步    监控任务运行情况

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.3.7  hiveContext 中混合使用演示

第一步    创建 hiveTable 从本地文件系统加载数据

// 创建一个 hiveTable 并将数据加载,注意 people.txt 第二列有空格,所以 agestring 类型

scala>hiveContext.sql("CREATE TABLE hiveTable(name string,age string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '/n' ")

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

scala>hiveContext.sql("LOAD DATA LOCAL INPATH '/home/hadoop/upload/class6/people.txt' INTO TABLE hiveTable")

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第二步    创建 parquet 表,从 HDFS 加载数据

// 创建一个源自 parquet 文件的表 parquetTable2 ,然后和 hiveTable 混合使用

scala>hiveContext.parquetFile("hdfs://hadoop1:9000/class6/people.parquet").registerTempTable("parquetTable2")

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第三步    两个表混合使用

scala>hiveContext.sql("select a.name,a.age,b.age from hiveTable a join parquetTable2 b on a.name=b.name").collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.4   Cache 使用

sparkSQLcache 可以使用两种方法来实现:

l CacheTable() 方法

l CACHE TABLE 命令

千万不要先使用 cache SchemaRDD ,然后 registerAsTable ;使用 RDDcache() 将使用原生态的 cache ,而不是针对 SQL 优化后的内存列存储。

第一步    rddTable 表进行缓存

//cache 使用

scala>val sqlContext=new org.apache.spark.sql.SQLContext(sc)

scala>import sqlContext.createSchemaRDD

scala>case class Person(name:String,age:Int)

scala>val rddpeople=sc.textFile("hdfs://hadoop1:9000/class6/people.txt").map(_.split(",")).map(p=>Person(p(0),p(1).trim.toInt))

scala>rddpeople.registerTempTable("rddTable")

scala>sqlContext.cacheTable("rddTable")

scala>sqlContext.sql("SELECT name FROM rddTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

在监控界面上看到该表数据已经缓存

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第二步    parquetTable 表进行缓存

scala>val parquetpeople = sqlContext.parquetFile("hdfs://hadoop1:9000/class6/people.parquet")

scala>parquetpeople.registerTempTable("parquetTable")

scala>sqlContext.sql("CACHE TABLE parquetTable")

scala>sqlContext.sql("SELECT name FROM parquetTable WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

在监控界面上看到该表数据已经缓存

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第三步    解除缓存

//uncache 使用

scala>sqlContext.uncacheTable("rddTable")

scala>sqlContext.sql("UNCACHE TABLE parquetTable")

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

2.5   DSL 演示

SparkSQL 除了支持 HiveQLSQL-92 语法外,还支持 DSLDomain Specific Language )。在 DSL 中,使用 Scala 符号 '+ 标示符表示基础表中的列, Sparkexecution engine 会将这些标示符隐式转换成表达式。另外可以在 API 中找到很多 DSL 相关的方法,如 where()select()limit() 等等,详细资料可以查看 Catalyst 模块中的 DSL 子模块,下面为其中定义几种常用方法:

//DSL 演示

scala>import sqlContext._

scala>val teenagers_dsl = rddpeople.where('age >= 10).where('age <= 19).select('name)

scala>teenagers_dsl.map(t => "Name: " + t(0)).collect().foreach(println)

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3 Spark 综合应用

Spark 之所以万人瞩目,除了内存计算还有其 ALL-IN-ONE 的特性,实现了 One stack rule them all 。下面简单模拟了几个综合应用场景,不仅使用了 sparkSQL ,还使用了其他 Spark 组件:

l SQL On Spark :使用 sqlContext 查询年纪大于等于 10 岁的人名

l Hive On Spark :使用了 hiveContext 计算每年销售额

l 店铺分类,根据销售额对店铺分类,使用 sparkSQLMLLib 聚类算法

l PageRank ,计算最有价值的网页,使用 sparkSQLGraphXPageRank 算法

以下实验采用 IntelliJ IDEA 调试代码,最后生成 LearnSpark.jar ,然后使用 spark-submit 提交给集群运行。

3.1   SQL On Spark

3.1.1  实现代码

src->main->scala 下创建 class6 包,在该包中添加 SQLOnSpark 对象文件,具体代码如下:

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.SQLContext

case class Person(name: String, age: Int)

object SQLOnSpark {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("SQLOnSpark")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

import sqlContext._

val people: RDD[Person] = sc.textFile("hdfs://hadoop1:9000/class6/people.txt")

.map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))

people.registerTempTable("people")

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 10 and age <= 19")

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

sc.stop()

}

}

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3.1.2  IDEA 本地运行

先对该代码进行编译,然后运行该程序,需要注意的是在 IDEA 中需要在 SparkConf 添加 setMaster("local") 设置为本地运行。运行时可以通过运行窗口进行观察:

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

打印运行结果

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3.1.3  生成打包文件

【注】 可以参见第 3 课《 Spark 编程模型(下) --IDEA 搭建及实战》进行打包

第一步    配置打包信息

在项目结构界面中选择 "Artifacts" ,在右边操作界面选择绿色 "+" 号,选择添加 JAR 包的 "From modules with dependencies" 方式,出现如下界面,在该界面中选择主函数入口为 SQLOnSpark

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第二步    填写该 JAR 包名称和调整输出内容

打包路径为 /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar

【注意】的是默认情况下 "Output Layout" 会附带 Scala 相关的类包,由于运行环境已经有 Scala 相关类包,所以在这里去除这些包只保留项目的输出内容

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第三步    输出打包文件

点击菜单 Build->Build Artifacts ,弹出选择动作,选择 Build 或者 Rebuild 动作

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第四步    复制打包文件到 Spark 根目录下

cd /home/hadoop/IdeaProjects/out/artifacts/LearnSpark_jar

cp LearnSpark.jar /app/hadoop/spark-1.1.0/

ll /app/hadoop/spark-1.1.0/

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3.1.4  运行查看结果

通过如下命令调用打包中的 SQLOnSpark 方法,运行结果如下:

cd /app/hadoop/spark-1.1.0

bin/spark-submit --master spark://hadoop1:7077 --class class6.SQLOnSpark --executor-memory 1g LearnSpark.jar

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3.2   Hive On Spark

3.2.1  实现代码

class6 包中添加 HiveOnSpark 对象文件,具体代码如下:

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.hive.HiveContext

object HiveOnSpark {

case class Record(key: Int, value: String)

def main(args: Array[String]) {

val sparkConf = new SparkConf().setAppName("HiveOnSpark")

val sc = new SparkContext(sparkConf)

val hiveContext = new HiveContext(sc)

import hiveContext._

sql("use hive")

sql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear")

.collect().foreach(println)

sc.stop()

}

}

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3.2.2  生成打包文件

按照 3.1.3SQL On Spark 方法进行打包

3.2.3  运行查看结果

【注】 需要启动 Hive 服务,参见 2.3.1

通过如下命令调用打包中的 SQLOnSpark 方法,运行结果如下:

cd /app/hadoop/spark-1.1.0

bin/spark-submit --master spark://hadoop1:7077 --class class6.HiveOnSpark --executor-memory 1g LearnSpark.jar

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

通过监控页面看到名为 HiveOnSpark 的作业运行情况:

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3.3   店铺分类

分类在实际应用中非常普遍,比如对客户进行分类、对店铺进行分类等等,对不同类别采取不同的策略,可以有效的降低企业的营运成本、增加收入。机器学习中的聚类就是一种根据不同的特征数据,结合用户指定的类别数量,将数据分成几个类的方法。下面举个简单的例子,按照销售数量和销售金额这两个特征数据,进行聚类,分出 3 个等级的店铺。

3.3.1  实现代码

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.catalyst.expressions.Row

import org.apache.spark.{SparkConf, SparkContext}

import org.apache.spark.sql.hive.HiveContext

import org.apache.spark.mllib.clustering.KMeans

import org.apache.spark.mllib.linalg.Vectors

object SQLMLlib {

def main(args: Array[String]) {

    // 屏蔽不必要的日志显示在终端上

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    // 设置运行环境

val sparkConf = new SparkConf().setAppName("SQLMLlib")

val sc = new SparkContext(sparkConf)

val hiveContext = new HiveContext(sc)

    // 使用 sparksql 查出每个店的销售数量和金额

hiveContext.sql("use hive")

hiveContext.sql("SET spark.sql.shuffle.partitions=20")

val sqldata = hiveContext.sql("select a.locationid, sum(b.qty) totalqty,sum(b.amount) totalamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.locationid")

    // 将查询数据转换成向量

val parsedData = sqldata.map {

case Row(_, totalqty, totalamount) =>

val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble)

Vectors.dense(features)

}

    // 对数据集聚类, 3 个类, 20 次迭代,形成数据模型

    // 注意这里会使用设置的 partition20

val numClusters = 3

val numIterations = 20

val model = KMeans.train(parsedData, numClusters, numIterations)

    // 用模型对读入的数据进行分类,并输出

    // 由于 partition 没设置,输出为 200 个小文件,可以使用 bin/hdfs dfs -getmerge 合并下载到本地

val result2 = sqldata.map {

case Row(locationid, totalqty, totalamount) =>

val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble)

val linevectore = Vectors.dense(features)

val prediction = model.predict(linevectore)

locationid + " " + totalqty + " " + totalamount + " " + prediction

}.saveAsTextFile(args(0))

sc.stop()

}

}

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3.3.2  生成打包文件

按照 3.1.3SQL On Spark 方法进行打包

3.3.3  运行查看结果

通过如下命令调用打包中的 SQLOnSpark 方法:

cd /app/hadoop/spark-1.1.0

bin/spark-submit --master spark://hadoop1:7077 --class class6.SQLMLlib --executor-memory 1g LearnSpark.jar /class6/output1

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

运行过程,可以发现聚类过程都是使用 20partition

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

查看运行结果,分为 20 个文件存放在 HDFS

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

使用 getmerge 将结果转到本地文件,并查看结果:

cd /home/hadoop/upload

hdfs dfs -getmerge /class6/output1 result.txt

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

最后使用 R 做示意图,用 3 种不同的颜色表示不同的类别。

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3.4   PageRank

PageRank, 即网页排名,又称网页级别、 Google 左侧排名或佩奇排名,是 Google 创始人拉里·佩奇和谢尔盖·布林于 1997 年构建早期的搜索系统原型时提出的链接分析算法。目前很多重要的链接分析算法都是在 PageRank 算法基础上衍生出来的。 PageRankGoogle 用于用来标识网页的等级 / 重要性的一种方法,是 Google 用来衡量一个网站的好坏的唯一标准。在揉合了诸如 Title 标识和 Keywords 标识等所有其它因素之后, Google 通过 PageRank 来调整结果,使那些更具“等级 / 重要性”的网页在搜索结果中令网站排名获得提升,从而提高搜索结果的相关性和质量。

Spark GraphX 引入了 google 公司的图处理引擎 pregel ,可以方便的实现 PageRank 的计算。

3.4.1  创建表

下面实例采用的数据是 wiki 数据中含有 Berkeley 标题的网页之间连接关系,数据为两个文件: graphx-wiki-vertices.txtgraphx-wiki-edges.txt ,可以分别用于图计算的顶点和边。把这两个文件上传到本地文件系统 /home/hadoop/upload/class6 目录中(注:这两个文件可以从该系列附属资源 /data/class6 中获取)

第一步    上传数据

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第二步    启动 SparkSQL

参见第 6 课《 SparkSQL (一) --SparkSQL 简介》 3.2.3 启动 SparkSQL

$cd /app/hadoop/spark-1.1.0

$bin/spark-sql --master spark://hadoop1:7077 --executor-memory 1g

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

第三步    定义表并加载数据

创建 verticesedges 两个表并加载数据:

spark-sql>show databases;

spark-sql>use hive;

spark-sql>CREATE TABLE vertices(ID BigInt,Title String) ROW FORMAT DELIMITED FIELDS TERMINATED BY '/t' LINES TERMINATED BY '/n'; LOAD DATA LOCAL INPATH '/home/hadoop/upload/class6/graphx-wiki-vertices.txt' INTO TABLE vertices;

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

spark-sql>CREATE TABLE edges(SRCID BigInt,DISTID BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY '/t' LINES TERMINATED BY '/n'; LOAD DATA LOCAL INPATH '/home/hadoop/upload/class6/graphx-wiki-edges.txt' INTO TABLE edges;

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

查看创建结果

spark-sql>show tables;

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3.4.2  实现代码

import org.apache.log4j.{Level, Logger}

import org.apache.spark.sql.hive.HiveContext

import org.apache.spark.{SparkContext, SparkConf}

import org.apache.spark.graphx._

import org.apache.spark.sql.catalyst.expressions.Row

object SQLGraphX {

def main(args: Array[String]) {

    // 屏蔽日志

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    // 设置运行环境

val sparkConf = new SparkConf().setAppName("PageRank")

val sc = new SparkContext(sparkConf)

val hiveContext = new HiveContext(sc)

    // 使用 sparksql 查出每个店的销售数量和金额

hiveContext.sql("use hive")

  val verticesdata = hiveContext.sql("select id, title from vertices")

val edgesdata = hiveContext.sql("select srcid,distid from edges")

    // 装载顶点和边

val vertices = verticesdata.map { case Row(id, title) => (id.toString.toLong, title.toString)}

  val edges = edgesdata.map { case Row(srcid, distid) => Edge(srcid.toString.toLong, distid.toString.toLong, 0)}

    // 构建图

val graph = Graph(vertices, edges, "").persist()

    //pageRank 算法里面的时候使用了 cache() ,故前面 persist 的时候只能使用 MEMORY_ONLY

println("**********************************************************")

    println("PageRank 计算,获取最有价值的数据 ")

println("**********************************************************")

val prGraph = graph.pageRank(0.001).cache()

val titleAndPrGraph = graph.outerJoinVertices(prGraph.vertices) {

(v, title, rank) => (rank.getOrElse(0.0), title)

}

titleAndPrGraph.vertices.top(10) {

Ordering.by((entry: (VertexId, (Double, String))) => entry._2._1)

}.foreach(t => println(t._2._2 + ": " + t._2._1))

sc.stop()

}

}

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3.4.3  生成打包文件

按照 3.1.3SQL On Spark 方法进行打包

3.4.4  运行查看结果

通过如下命令调用打包中的 SQLOnSpark 方法:

cd /app/hadoop/spark-1.1.0

bin/spark-submit --master spark://hadoop1:7077 --class class6.SQLGraphX --executor-memory 1g LearnSpark.jar

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

运行结果:

Spark入门实战系列--6.SparkSQL(下)--Spark实战应用

3.5   小结

在现实数据处理过程中,这种涉及多个系统处理的场景很多。通常各个系统之间的数据通过磁盘落地再交给下一个处理系统进行处理。对于 Spark 来说,通过多个组件的配合,可以以流水线的方式来处理数据。从上面的代码可以看出,程序除了最后有磁盘落地外,都是在内存中计算的。避免了多个系统中交互数据的落地过程,提高了效率。这才是 spark 生态系统真正强大之处: One stack rule them all 。另外 sparkSQL+sparkStreaming 可以架构当前非常热门的 Lambda 架构体系,为 CEP 提供解决方案。也正是如此强大,才吸引了广大开源爱好者的目光,促进了 Spark 生态的高速发展。

正文到此结束
Loading...