题图为美国尼米兹核动力航空母舰
大数据时代,我们常常面对海量数据而头疼。作为学统计出身的人,我们想折腾大数据但又不想学习Hadoop或者Java,我们更倾向于把精力放在建模和算法设计上,SparkR和Docker的完美结合,让R的计算直接从一架战斗机的当兵作战华丽转变为一个航空母舰战斗群!不仅仅简化了分布式计算的操作,还简化了安装部署的环节,我们只几乎不需要做什么改动就可以直接运用R中的data frame进行分布式的计算。
参考前文 打造大数据产品:Shiny的Spark之旅 ,我们可以知道,SparkR是一个为R提供了轻量级的Spark前端的R包。 SparkR提供了一个分布式的data frame数据结构,解决了 R中的data frame只能在单机中使用的瓶颈,它和R中的data frame 一样支持许多操作,比如select,filter,aggregate等等。(类似dplyr包中的功能)这很好的解决了R的大数据级瓶颈问题。 SparkR也支持分布式的 机器学习 算法,比如使用MLib机器学习库。
参考前文 打造数据产品的快速原型:Shiny的Docker之旅 ,我们也可以知道,Docker是一种类似于虚拟机的技术,主要解决标准化快速部署的问题,在Docker中安装的软件和主机中的软件可以完全隔离,并通过Daocloud或者hub.docker.com等云服务快速建立Docker仓库,快速复用Docker镜像。Docker已经不仅仅是DevOps人员手中的神器了,每一个开发者都应该学会如何使用Docker。
SparkR的精髓在于分布式计算,而Docker的精髓在于标准容器的拓展性,SparkR和Docker的组合充分结合了二者各自的优点,将分布式应用底层化繁为简,为高层计算直接暴露接口,给科学计算节省了大量时间。
本文将通过Docker讲解如何快速部署SparkR-RStudio容器,并通过一些简单的机器学习例子展示如何使用这个航母级别的组合拳。
由于国内的镜像质量不够高,国外的镜像下载速度比较慢,出于试验的考虑,建议大家可以尝试使用 Daocloud的镜像加速服务 。
首先,我们需要在Daocloud注册一个账号,然后选择镜像加速,根据 指示 选择主机并安装Docker和Daocloud加速器。
感谢 vinicius85 在GitHub上的开源贡献 ,为我们已经做好了 Spark1.6+R+RStduio的镜像,我们利用daocloud加速拉取镜像。
daopullvinicius85/spark-rstudio
以daemon形式运行容器,暴露Rstudio-server默认的8787端口, 并持久化docker内的/srv目录下的所有文件作为通讯。
dockerrun -d -v /home/docker:/srv -p 8787:8787 --namesparkrstudiovinicius85/sparkr-rstudio
参考前文 R语言工程化实践:RStudio Server环境快速配置教程
docker exec -d sparkrstudio bash
命令表示以daemon形式执行容器中的shell脚本
我们设置一下RStudio-Server的账号密码
dockerexec -d sparkrstudiobashadduserharryzhu # 设置新用户名 dockerexec -d sparkrstudiobashpasswdharryzhu # 设置该用户的密码
ifconfig
命令可以查看到Docker当前的IP地址,透过这个IP,我们可以访问到RStudio-Server。
比如:
查看资源占用情况
dockerstatssparkrstudio
CONTAINER CPU % MEMUSAGE / LIMIT MEM % NET I/O BLOCK I/O sparkrstudio 4.50% 481.3 MB / 5.039 GB 9.55% 133.6 kB / 117.4 kB 3.252 MB / 135.2 kB
出于演示的考虑,这里引用并稍微改进了 tcosta 完成的一个逻辑回归的例子 :
使用SparkR之前,我们需要确定,我们的容器内存要在2G以上,如果用AWS的乞丐版套装,马上就会报内存不足的错误。
Errorin sparkR.init(master = "local") : JVMis not readyafter 10 seconds
如果内存不足,可以退出docker并且在虚拟机中重新提高docker的内存和cpu的配置。
# 配置环境变量 Sys.setenv(SPARK_HOME="/opt/spark-1.6.0-bin-hadoop2.6") .libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) Sys.setenv(JAVA_HOME="/usr/lib/jvm/java-8-oracle/") # 加载 SparkR包 library(SparkR) # 初始化RRD sc <- sparkR.init(master = "local") sqlContext <- sparkRSQL.init(sc) # 创建DataFrame mtcarsDF <- createDataFrame(sqlContext, mtcars) head(mtcarsDF)
mpgcyldisp hpdrat wt qsecvsamgearcarb 1 21.0 6 160 110 3.90 2.620 16.46 0 1 4 4 2 21.0 6 160 110 3.90 2.875 17.02 0 1 4 4 3 22.8 4 108 93 3.85 2.320 18.61 1 1 4 1 4 21.4 6 258 110 3.08 3.215 19.44 1 0 3 1 5 18.7 8 360 175 3.15 3.440 17.02 0 0 3 2 6 18.1 6 225 105 2.76 3.460 20.22 1 0 3 1
model <- glm(vs ~ mpg + disp + hp + wt , data = mtcarsDF, family = "binomial")# 逻辑回归 # model <- glm(vs ~ mpg + disp + hp + wt , data = mtcarsDF, family = "gaussian")# 线性回归 predictions <- predict(model, newData = mtcarsDF ) modelPrediction <- select(predictions, "vs", "prediction") head(modelPrediction)
vsprediction 1 0 0.58006945 2 0 0.64060709 3 1 0.72468718 4 1 0.47803842 5 0 0.06070972 6 1 0.54994276
# error变量: 观测值和预测值的差值 modelPrediction$error <- abs(modelPrediction$vs - modelPrediction$prediction) # modelPrediction 现在对 SQLContext 是可见的 registerTempTable(modelPrediction, "modelPrediction") num_errors <- sql(sqlContext, "SELECT count(error) FROM modelPrediction WHERE error = 1") total_errors <- sql(sqlContext, "SELECT count(error) FROM modelPrediction") # 模型错误率 training_acc <- collect(num_errors) / collect(total_errors) training_acc
<spanclass="hljs-title"><spanclass="pln">_c0</span></span> <spanclass="hljs-number"><spanclass="lit">1</span></span> <spanclass="hljs-number"><spanclass="lit">0</span></span>
作为分享主义者(sharism),本人所有互联网发布的图文均遵从CC版权,转载请保留作者信息并注明作者 Harry Zhu 的 FinanceR专栏: https://segmentfault.com/blog/harryprince ,如果涉及源代码请注明GitHub地址: https://github.com/harryprince 。微信号: harryzhustudio 商业使用请联系作者。