转载

使用Spark RandomForest找出影响比较大的App

0. 项目背景:

我厂开发了一个App,反应还不错,在app store 上面的好几个区都能拿到工具类的1-3名。 但是在运营上面,一直貌似不够精细。楼主尝试使用机器学习的方法找到对我们影响比较大的App.

所谓“影响比较大”,是指:有哪些App会①带来新用户、②留住老用户、③导致流失用户。

先说说结果: 这是一个比较失败的项目, 因为最后算法的运算结果跟瞎猜没有区别。免得各位看到最后太过失望。。。

源码可以从百度网盘下载:https://pan.baidu.com/s/1gfjzwsj

数据没法奉献出来,抱歉~

1. 数据来源

App在某些时刻可以收集用户安装了哪些App,以此作为分析的数据基础。

同时我们在后端用的是Cassandra做数据存储(这一点之前提到过了 《Cassandra秀一下肌肉》 )。数据类似时序数据库那种样子:

使用Spark RandomForest找出影响比较大的App

2. 采用的解决方案与建模:

做数据分析跟机器学习,一直用的是python stack,比如sklearn + pandas + numpy/scipy. 但是目前这一套单机已经扛不住了,毕竟有近千万级别的用户。最后采用的是比较火的Spark MLlib做机器学习。在背景部分提到,我们有三种类型的用户: 新用户、老用户、流失用户。 因此我们的模型很简单:

(1) 不同的App 作为不同的Feature,一个App就是一个feature,某个用户在某个维度的值不是0就是1

(2) 使用Spark Mllib之中的算法做分类。 即:这是一个三分类的问题

3. 重要步骤:

因为使用的是Spark 2.0.2, 因此一直想尝试使用Datasets API 做机器学习。这真的掉入了一个非常大的坑。 当时好不容易用OneHotEncoder把name列做成one-hot 的形式之后,完全不知道如何将其group 并合并起来。 最后还是彻底放弃了这种方式,采用的是比较原始的RDD的方式。

(1) 首先过滤出比较合理的App

所谓“比较合理”,是指使用的人数不会太多、也不会太少的App。

Map<String, String> options = Maps.newHashMap();
options = Maps.newHashMap();
options.put("keyspace", Constants.MAC_KEYSPACE);
options.put("table", Constants.TB_DR_APP_USER_HIST_ITEMS);
Dataset<Row> ds = ss.read().format("org.apache.spark.sql.cassandra").options(options).load();
 
ds = ds.select("name", "user_uuid");
 
// 取出人数在[100W, 900W] 之间的数据
Dataset<Row> countDS = ds.select("name")
    .groupBy("name")
    .count()
    .filter("count > 1000000 and count <= 9000000");

总用户约1KW, 那么如果一个App的使用人数小于<10% 或者 >90% , 个人认为这些App的区分度都不会太好。

这样出来,只有差不多150个App。 比App的总量小了非常非常多。 这相当于人工对数据进行降维。

因为这一步的结果在调试的时候比较常用,因此直接把结果存成文件,方便后面调用了。调用方法

Map<String, Integer> appIdxMap = getAppIndexMap();

(2) 将用户App表与用户状态表进行join

这一步用的也还是Datasets的jion api。

Dataset<Row> rows = 
    getUserApp(ss).join(getUserStatus(ss), "user_uuid");
 
//              show result:
// +--------------------+--------------------+----+
// |           user_uuid|                name|type|
// +--------------------+--------------------+----+
// |ABBD4B69-CAF1-528...|Office 2011 14.5....|   0|
// |665F7637-653B-542...|Microsoft® Silver...|  -1|
// |A7BA07A7-C71B-52F...|                OS X|   0|
// |52173699-DE47-54D...|Office 2011 14.6....|  -1|
// |8A5AA631-3A7E-51C...|Oracle VM VirtualBox|   0|
  • getUserApp 就是把用户装了哪些App找出来,
  • getUserStatus 根据user event 给用户打一个tag: loss / retention / new .

(3)  [ 核心 ] 转换成RDD并使用map-reduce的思想构建数据matrix

JavaRDD<LabeledPoint> dataList = rows.toJavaRDD()
 .mapToPair(new PairFunction<Row, String, String>() {
 // 将row映射成一个二维元组, key为uuid+type, val = app name
 @Override
 public Tuple2<String, String> call(Rowrow) throws Exception {
 String key = row.getString(0) + "#" + row.getInt(2); // uuid+type合并成key
 String val = row.getString(1); // app name
 return new Tuple2<String, String>(key, val);
 }
 })
 .groupByKey()
 .map(new Function<Tuple2<String,Iterable<String>>, Tuple2<String, double[]>>() {
 // 手动的将app name interable 变成一个 double[], 即手动进行one-hot encode
 @Override
 public Tuple2<String, double[]> call(Tuple2<String, Iterable<String>> tuple) throws Exception {
 double[] zeros = new double[appIdxMap.size()]; // 构建一个全是0的数组
 for(String appName : tuple._2()) {
 if(appIdxMap.containsKey(appName)) {
 int curIdx = appIdxMap.get(appName);
 zeros[curIdx] = 1.0;
 }
 }
 
 String type = tuple._1().split("#")[1];
 
 return new Tuple2<String, double[]>(type, zeros);
 }
 })
 .filter(tuple -> DoubleStream.of(tuple._2()).sum() > 0) // 仅仅只是demo jdk8是如何sum的
 .map(tuple -> new LabeledPoint( // 喂给RF的需要是一个JavaRDD<LabeledPoint>
 Double.parseDouble(tuple._1().split("#")[1]), 
 Vectors.dense(tuple._2())));

在一些我觉得比较关键的地方都加了注释,有问题请留言

(4) 终于来到机器学习最fancy的调用算法啦

不过这一步也是最无趣的一步,因为到了这里之后,基本上就是直接把官网代码copy过来即可。 copy地址: https://spark.apache.org/docs/2.0.2/mllib-ensembles.html#classification

唯一不太一样的就是需要设置一下具体的参数:

Integer numTrees = 1000;
String featureSubsetStrategy = "auto"; // Let the algorithm choose.
String impurity = "gini";
Integer maxDepth = 5;
Integer maxBins = 32;
Integer seed = 12345;

需要特别说明一下categoricalFeaturesInfo。 根据官方说明: 如果不设置,也可以,效果也还行。 如果设置了一般来说会更好一些。 像我们这里的情况,每一个都是0/1值,因此设置起来也比较简单:

HashMap<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
for(int i = 0; i < appIdxMap.size(); i++) {
 categoricalFeaturesInfo.put(i, 2); // 都是1/0
}

最后的结果如一开始所说的那样,效果非常不好。 应该是我们选取的方向不太对。因为把其中约200W Sample数据导出成文件,使用sklearn来处理,效果也是一样。

不过还有一些事情需要进一步去研究,比如:

  1. 在sklearn之中,打印class matrix report 非常简单,在Spark之中我还不太知道如何做
  2. 前期的处理还是非常麻烦,能否有比较方便的做法?
    因为Spark 不接受String类型的值作为feature或者是label,需要先自行修改。 这一点跟sklearn 非常有差距

本文为原创文章,转载请注明 出处

原文链接:http://www.flyml.net/2017/01/05/find-most-impect-app-via-spark-randomforest/

原文  http://www.flyml.net/2017/01/05/find-most-impect-app-via-spark-randomforest/
正文到此结束
Loading...