TF-IDF(Term Frequency–InverseDocument Frequency)是一种用于资讯检索与文本挖掘的常用加权技术。TF-IDF的主要思想是:如果某个词或短语在一篇文章中出现的频率TF高,并且在其他文章中很少出现,则认为此词或者短语具有很好的类别区分能力,适合用来分类。
TF-IDF实际是TF*IDF,其中TF(Term Frequency)表示词条在文档中的出现的频率,TF的计算公式如下所示:
其中IDF(InverseDocument Frequency)表示总文档与包含词条t的文档的比值求对数,IDF的计算公式如下所示:
其中为所有的文档总数,表示文档是否包含词条,若包含为1,不包含为0。但此处存在一个问题,即当词条在所有文档中都没有出现的话IDF计算公式的分母为0,此时就需要对IDF做平滑处理,改善后的IDF计算公式如下所示:
那么最终词条在文档中的TF-IDF值为:。
从上述的计算词条在文档中的TF-IDF值计算可以看出:当一个词条在文档中出现的频率越高,且新鲜度低(即普遍度低),则其对应的TF-IDF值越高。比如现在有一个预料库,包含了100篇()论文,其中涉及包含推荐系统()这个词条的有20篇,在第一篇论文()中总共有200个技术词汇,其中推荐系统出现了15次,则词条推荐系统的在第一篇论文()中的TF-IDF值为:
更多详细的关于TFIDF的介绍可以参考
搜索引擎:文本分类——TF/IDF算法
关于TF-IDF的其他实战:
基于TF-IDF算法的短标题关键词提取
这里需要注意的是在spark2.x中默认不支持dataframe的笛卡尔积操作,需要在创建Spark对象时开启。
创建spark对象,并设置日志等级
// spark.sql.crossJoin.enabled=true spark 2.0 x不支持笛卡尔积操作,需要开启支持 val spark = SparkSession .builder() .appName("docSimCalWithTFIDF") .config("spark.sql.crossJoin.enabled","true") .master("local[10]") .enableHiveSupport() .getOrCreate() Logger.getRootLogger.setLevel(Level.WARN)
这里以官方样例代码中的三行英文句子为例,创建数据集,并进行分词(spark中的中文分词包有很多,比如jieba,han,ansj,fudannlp等,这里不展开介绍)
val sentenceData = spark.createDataFrame(Seq( (0, "Hi I heard about Spark"), (1, "I wish Java could use case classes"), (2, "Logistic regression models are neat") )).toDF("label", "sentence") val tokenizer = new Tokenizer() .setInputCol("sentence") .setOutputCol("words") val wordsData = tokenizer.transform(sentenceData) wordsData.show(10)
展示的结果为:
+-----+--------------------+--------------------+ |label| sentence| words| +-----+--------------------+--------------------+ | 0|Hi I heard about ...|[hi, i, heard, ab...| | 1|I wish Java could...|[i, wish, java, c...| | 2|Logistic regressi...|[logistic, regres...| +-----+--------------------+--------------------+
调用官方的tfidf包计算向量:
// setNumFeatures(5)表示将Hash分桶的数量设置为5个,可以根据你的词语数量来调整,一般来说,这个值越大不同的词被计算为一个Hash值的概率就越小,数据也更准确,但需要消耗更大的内存 val hashingTF = new HashingTF() .setInputCol("words") .setOutputCol("rawFeatures") .setNumFeatures(5) val featurizedData = hashingTF .transform(wordsData) featurizedData.show(10) val idf = new IDF() .setInputCol("rawFeatures") .setOutputCol("features") val idfModel = idf.fit(featurizedData) val rescaledData = idfModel.transform(featurizedData) rescaledData.show(10) rescaledData.select("label", "features").show()
展示的结果为:
+-----+--------------------+--------------------+--------------------+ |label| sentence| words| rawFeatures| +-----+--------------------+--------------------+--------------------+ | 0|Hi I heard about ...|[hi, i, heard, ab...|(5,[0,2,4],[2.0,2...| | 1|I wish Java could...|[i, wish, java, c...|(5,[0,2,3,4],[1.0...| | 2|Logistic regressi...|[logistic, regres...|(5,[0,1,3,4],[1.0...| +-----+--------------------+--------------------+--------------------+ +-----+--------------------+--------------------+--------------------+--------------------+ |label| sentence| words| rawFeatures| features| +-----+--------------------+--------------------+--------------------+--------------------+ | 0|Hi I heard about ...|[hi, i, heard, ab...|(5,[0,2,4],[2.0,2...|(5,[0,2,4],[0.0,0...| | 1|I wish Java could...|[i, wish, java, c...|(5,[0,2,3,4],[1.0...|(5,[0,2,3,4],[0.0...| | 2|Logistic regressi...|[logistic, regres...|(5,[0,1,3,4],[1.0...|(5,[0,1,3,4],[0.0...| +-----+--------------------+--------------------+--------------------+--------------------+ +-----+--------------------+ |label| features| +-----+--------------------+ | 0|(5,[0,2,4],[0.0,0...| | 1|(5,[0,2,3,4],[0.0...| | 2|(5,[0,1,3,4],[0.0...| +-----+--------------------+
其中是向量的一种压缩表示格式,例如表示的是 向量的长度为3,其中第 1位和第2位的值为0.1 和0.3,第3位的值为0。
这里需要将其转化为向量的形式,方便后续进行计算,可以直接通过dataframe进行转化,也可以先将dataframe转化为rdd,再进行转化。datafram通过自定义UDF进行转化如下:
import spark.implicits._ // 解析数据 转化为denseVector格式 datafra val sparseVectorToDenseVector = udf { features: SV => features.toDense } val df = rescaledData .select($"label", sparseVectorToDenseVector($"features")) .withColumn("tag",lit(1)) df.show(10)
展示结果为:
+------+--------------------+---+ |label1| features1|tag| +------+--------------------+---+ | 0|[0.0,0.0,0.575364...| 1| | 1|[0.0,0.0,0.575364...| 1| | 2|[0.0,0.6931471805...| 1| +------+--------------------+---+
先转化为RDD,再进行转化如下:
val selectedRDD = rescaledData.select("label", "features").rdd .map( l=>( l.get(0).toString, l.getAs[SV](1).toDense)) selectedRDD.take(10).foreach(println)
展示结果为:
(0,[0.0,0.0,0.5753641449035617,0.0,0.0]) (1,[0.0,0.0,0.5753641449035617,0.28768207245178085,0.0]) (2,[0.0,0.6931471805599453,0.0,0.5753641449035617,0.0])
当然也可以在进行相似度计算时进行转化,实现代码如下:
// 定义相似度计算udf import spark.implicits._ val df1 = rescaledData .select($"label".alias("id1"), $"features".alias("f1")) .withColumn("tag",lit(1)) val df2 = rescaledData .select($"label".alias("id2"), $"features".alias("f2")) .withColumn("tag",lit(1)) val simTwoDoc = udf{ (f1: SV, f2: SV) => calTwoDocSim(f1,f2) } val df = df1.join(df2, Seq("tag"), "inner") .where("id1 != id2") .withColumn("simscore",simTwoDoc(col("f1"), col("f2"))) .where("simscore > 0.0") .select("id1","id2","simscore") df.printSchema() df.show(20)
其中calTwoDocSim 函数实现如下:
/** * @Author: GaoYangtuan * @Description: 自定义计算两个文本的距离 * @Thinkgamer: 《推荐系统开发实战》作者,「搜索与推荐Wiki」公号负责人,算法工程师 * @Param: [f1, f2] * @return: double **/ def calTwoDocSim(f1: SV, f2: SV): Double = { val breeze1 =new SparseVector(f1.indices,f1.values, f1.size) val breeze2 =new SparseVector(f2.indices,f2.values, f2.size) val cosineSim = breeze1.dot(breeze2) / (norm(breeze1) * norm(breeze2)) cosineSim }
打印结果如下:
root |-- id1: integer (nullable = false) |-- id2: integer (nullable = false) |-- simscore: double (nullable = false) +---+---+------------------+ |id1|id2| simscore| +---+---+------------------+ | 0| 1|0.8944271909999159| | 1| 0|0.8944271909999159| | 1| 2|0.2856369296406274| | 2| 1|0.2856369296406274| +---+---+------------------+
最后进行排序和保存,代码如下:
val sortAndSlice = udf { simids: Seq[Row] => simids.map{ case Row(id2: Int, simscore: Double) => (id2,simscore) } .sortBy(_._2) .reverse .slice(0,100) .map(e => e._1 + ":" + e._2.formatted("%.3f")) .mkString(",") } val result = df .groupBy($"id1") .agg(collect_list(struct($"id2", $"simscore")).as("simids")) .withColumn("simids", sortAndSlice(sort_array($"simids", asc = false))) result.show(10) result.coalesce(1).write.format("parquet").mode("overwrite").save("data/tfidf")
打印结果如下:
+---+---------------+ |id1| simids| +---+---------------+ | 1|0:0.894,2:0.286| | 2| 1:0.286| | 0| 1:0.894| +---+---------------+