The Whale Pirates团队, 热衷Docker技术及应用的研究与探索。
开发者
· 李文权@nevermosby
· 林箐@summerQLin
前不久Mesosphere公司刚刚开源了他的基于Mesos的商业版Solution——DC/OS。 作为非常关注Docker世界里前沿科技的The Whale Pirates,当然想要用它来make fun。
我们原来计划在DCOS上搭建一套基于Docker Spark大数据计算平台,业务上实现一个推荐系统。可是人算不如天算,花了一定时间搭建的DCOS平台,突然master和agent节点被黑,重新搭建DCOS比较花时间,导致没有充分的时间在DCOS平台上完全实践我们的作品。
然后,我们就反思了,自动化搭建DCOS对使用者来说,很有必要。然后最后的48小时里,我们开始写Automation。
作品:DCOS Spark Elasticsearch Alipay G2 = Data Visualization for Big data
背景
Docker registry支持把push, pull的events发送给Webhook,这个被称作 registry notification, event数据包含用户及操作相关的信息,比如说event发生的时间,用户,action(pull/push),所操作的docker repository的路径,manifest等等。这些信息给用户行为分析提供了丰富的素材。此项目的目的就是分析历史数据,对用户即将pull的docker image进行预测,从而给用户推荐他可能会感兴趣的image。
项目repo地址
Elasticsearch与Spark的整合
Spark提供快速和大型数据处理的引擎,elasticsearch是性能优功能全的搜索引擎。以下是本项目涉及到的spark的一些功能,实现了读取数据,分析数据,储存结果的流程。
· MLlib , a scalable machine learning library,
· Elasticsearch for Apache Hadoop , an opensource, standalone, selfcontained, small library that allows Hadoop jobs to interact with Elasticsearch,
· Spark Streaming , a library to build scalable faulttolerant streaming applications.
架构设计
图片显示了spark与elasticsearch整合的流程:
(1)>(2). registry 发出实时的JSON形式的event数据,数据保存在NoSQL DB里面,同时发送给es。
(2)>(3). spark job,通过org.elasticsearch.spark以及相应的search query,获取数据,使用Spark MLlib’s ALS推荐算法进行计算。
(4)>(5). 给出每个user推荐的docker repository,并且将结果保存到es,供展示模块调用读取。步骤(5)中,当一个用户pull了一个logstash的image,我们的推荐系统可能就会推荐elasticsearch, kibana,logstash, logspout, spark等image,因为用户pull过elasticsearch的image,也会pull其他相关的一些image,这些image常常被组合起来使用,所以推荐系统会给出这些推荐建议。
Machine Learning: The Alternating Least Squares Recommender Algorithm
· 建立ALS模型
//10, 5, 0.01, 1.0 is hyperparameter values
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
· 评估评价质量:选取一部分作为train data,另外一部分用来计算AUC值,即这个推荐是好的推荐大于差的推荐的概率。
import org.apache.spark.rdd._
def areaUnderCurve(
positiveData: RDD[Rating],
bAllItemIDs: Broadcast[Array[Int]],
predictFunction: (RDD[(Int,Int)] => RDD[Rating])) = {
...
}
val allData = buildRatings(rawUserArtistData, bArtistAlias) 1
val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1))
trainData.cache()
cvData.cache()
val allItemIDs = allData.map(_.product).distinct().collect() 2
val bAllItemIDs = sc.broadcast(allItemIDs)
val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)
val auc = areaUnderCurve(cvData, bAllItemIDs, model.predict)
数据可视化(Data Visualization)
这是我们的Demo地址: Demo ,展示了以下数据特点:
· 单个用户的pull docker image的历史数据
· 根据Spark分析这个用户Pull Image的历史数据,结合pull了相同image的其他用户的pull image数据,推荐给这个用户一些他可能感兴趣的image。image上面的数字代表推荐的这个image被其他人pull的次数,代表了这个image的热门度。
如何运行
运行环境的需求
· Spark on Mesos
· Elastic on Mesos
· Prepare events data, the data we collected from our corp.
运行命令
mvn package
· 本地运行
spark‐submit ‐‐class com.cloudera.datascience.recommender.RunRecommender ‐‐master local ‐‐driver‐memory 6g
· DCOS上运行
dcos spark run ‐‐verbose ‐‐submit‐args='‐‐driver‐memory 6g ‐‐class <jarfile>.jar <elasticsearch node:port>
Not completed
· Spark结合Elasticsearch,实时分析数据流
Spark Streaming, which is capable to process about 400,000 records per node per second for simple aggregations on small records, significantly outperforms other popular streaming systems.
This is mainly because Spark Streaming groups messages in small batches into a sequence of Spark RDDs (Resilient Distributed DataSets) using a sliding window. These RDDs can then be manipulated using normal Spark operations.