Apache® Spark™ 是用于内存中处理的一个开源集群计算框架。借助 Spark,分析应用程序的运行速度能够比目前市场上的同类可比技术快 100 倍。
“ 在本教程中,您将学习如何使用 iPython notebook 创建一个 Spark 实例,并使用 Spark API 在 Bluemix 上分析天气数据。 ”
在 2015 年 6 月,IBM 宣布了为开源 Apache Spark 项目提供支持,这使得 Spark 可充当 IBM Bluemix 云平台上的一项云服务。IBM 还根据 Spark 社区的一个机器学习许可发布了它的SystemML 软件。与此同时,IBM 还在旧金山开放了一个 Spark 技术中心,计划向一百多万名数据科学家和数据工程师传授 Spark 知识,帮助 3500 多名研发人员完成涉及 Spark 的项目工作。
在本教程中,您将学习如何使用 iPython notebook 创建一个 Spark 实例,并使用 Spark API 在 Bluemix 上分析天气数据。
这个分步示例将展示如何只通过用 Python 编写的几行代码轻松地在 IBM Bluemix 上分析数据。
您可以创建一个 iPython notebook、上传一个现有的 notebook 或者使用一个 Bluemix 示例。要创建一个新的 notebook:
2015.cs.gz
文件。在您喜爱的文本编辑器中打开得到的 .csv
文件,并将以下列标题作为首行添加到文件中: STATION,DATE,METRIC,VALUE,C5,C6,C7,C8
2015.csv
列包含一个天气站标识符、一个日期、一个将被收集的度量数据(降水量、每日最高温和最低温、观测时的温度、降雪、积雪深度,等等),列 C5 到 C8 包含其他一些信息。 .csv
文件上传到您的 notebook。将整个文件上传到 Object Storage 需要花费几分钟的时间。 接下来,使用 SparkContext 根据 2015.csv
文件创建一个 RDD。
def: set_hadoop_config(credentials prefix = "fs.swift.service." + credentials['name'] hconf = sc._jsc.hadoopConfiguration() hconf.set(prefix + ".auth.url", credentials['auth_url']+'/v2.0/tokens') hconf.set(prefix + ".auth.endpoint.prefix", "endpoints") hconf.set(prefix + ".tenant", credentials['project_id']) hconf.set(prefix + ".username", credentials['user_id']) hconf.set(prefix + ".password", credentials['password']) hconf.setInt(prefix + ".http.port", 8080) hconf.set(prefix + ".region", credentials['region']
In
的旁边。该数字表明了整个 notebook 中的代码单元格的执行顺序。在本例中,该数字是 [1]
。 .csv
文件的凭证会自动粘贴到其中。 credentials
行并运行该单元格。对于密钥的 'name'
,可以输入任何字符串作为该值。在这个示例中,使用的是 'keystone'
。 credentials = {} credentials['name'] = 'keystone' credentials['auth_url'] = 'https://xxxx.xxxxxxx.xxx' credentials['project_id'] = 'xxxxxxxxxxx' credentials['region'] = 'xxxxxxxxxxxx' credentials['user_id'] = 'xxxxxxxxxxxxxx' credentials['password'] = 'xxxxxxxxxxxx'
set_hadoop_config(credentials)
weather = sc.textFile("swift://notebooks.keystone/2015.csv")
count
的操作,强制高效完成其实例化: print "Total records in the 2015.csv dataset:", weather.count()
您可以对相同的 RDD 应用其他操作,以查看第一行数据。
print "The first row in the 2015.csv dataset:", weather.first()
结果应该如下所示
weatherParse = weather.map(lambda line : line.split(","))
Python 中的 lambda
符号用于创建未绑定到某个名称的匿名函数。此概念被用于上述代码中,以便将某个函数作为映射函数的参数进行传递。匿名函数接收来自 RDD weather
的每一行,并用逗号分隔它们。
因此,新的 RDD weatherParse
被定义为字符串列表的一个列表。并且每个列表中的字符串是该行的各个元素。
weatherParse.first()
weatherParse.first()[0]
weatherParse.first()[2]
METRIC
列等于 PRCP
。 weatherPrecp
包含一个队值 (v1, v2) 列表,其中 v1 是一个气象站标识符,v2 是该气象站的一次降水数据点(一天)。下面的表 1 展示了这种结构。 键 | 值 |
---|---|
气象站 1 | 值 1 |
气象站 2 | 值 2 |
气象站 1 | 值 3 |
气象站 2 | 值 4 |
气象站 3 | 值 5 |
... | ... |
键 | 值 |
---|---|
气象站 1 | (值 1,1) |
气象站 2 | (值 2,1) |
气象站 1 | (值 3,1) |
气象站 2 | (值 4,1) |
气象站 3 | (值 5,1) |
... | ... |
weatherPrecpCountByKey
。
# x[0] is the station # x[3] is the precipitation value weatherPrecpCountByKey = weatherPrecp.map(lambda x : (x[0], (int(x[3]), 1))) weatherPrecpCountByKey.first()
表 2 只是一个过渡。因为随后能够将表 2 的映射精简成表 3 所表示的形式。
键 | 值 |
---|---|
气象站 1 | (值 1 + 值 3,2) |
气象站 2 | (值 2 + 值 4,2) |
气象站 3 | (值 5,1) |
... | ... |
在该表中,通过用相应的计数除以值的总和,可以计算出每个气象站的平均降水量。使用来自 Spark API 的 reduceByKey
函数生成表 3。
weatherPrecpAddByKey = weatherPrecpCountByKey.reduceByKey(lambda v1,v2 : (v1[0]+v2[0], v1[1]+v2[1])) weatherPrecpAddByKey.first()
weatherPrecpAddByKey
RDD 来创建 weatherAverages
RDD,该函数将用总的读数除以总的降水量。 weatherAverages = weatherPrecpAddByKey.map(lambda k: (k[0], k[1][0] / float(k[1][1] ) ) ) weatherAverages.first()
for pair in weatherAverages.top(10): print "Station %s had average precipitations of %f" % (pair[0],pair[1])
precTop10=[] stationsTop10=[] for pair in weatherAverages.map(lambda (x,y) : (y,x)).top(10): precTop10.append(pair[0]) stationsTop10.append(pair[1]) print "Station %s had average precipitations of %f" % (pair[1],pair[0])
%matplotlib inline import numpy as np import matplotlib.pyplot as plt N = 10 index = np.arange(N) bar_width = 0.5 plt.bar(index, precTop10, bar_width, color='b') plt.xlabel('Stations') plt.ylabel('Precipitations') plt.title('10 stations with the highest average precipitation') plt.xticks(index + bar_width, stationsTop10, rotation=90) plt.show()
Apache Spark 是首次让全新功能完全可用于数据科学家、业务分析师和应用程序开发人员的下一代分布式数据处理引擎。Analytics for Apache Spark 通常与 IBM Bluemix 中的工具一起使用,让您可以快速启用 Apache Spark 的全部力量。本教程介绍了如何使用 iPython Notebook(它使用了 Spark API)分析来自真实世界的原始天气数据。您可以轻松地用这个示例作为 Bluemix 上的更多分析的基础。