不同场景的模式和示例
MapReduce 处理为处理和构建不同类型的查询创建了一整套新范例和结构。然而,要最充分地利用 Hadoop,意味着要编写合适的 MapReduce 查询来处理信息。本文介绍许多不同的场景,其中包含如何开发不同类型的查询的食谱式示例。
高级文本处理
处理文本是 MapReduce 流程的一种常见用法,因为文本处理相对复杂且是处理器资源密集的处理。基本的字数统计常常用于演示 Haddoop 处理大量文本和基本汇总大体内容的能力。
要获得字数,将文本从一个输入文件中拆分(使用一个基本的 string tokenizer)为各个包含计数的单词,并使用一个 Reduce 来计算每个单词的数量。例如,从短语 the quick brown fox jumps over the lazy dog 中,Map 阶段生成清单 1 中的输出。
清单 1. Map 阶段的输出
the, 1
quick, 1
brown, 1
fox, 1
jumps, 1
over, 1
the, 1
lazy, 1
dog, 1
Reduce 阶段然后合计每个惟一的单词出现的次数,得到清单 2 中所示的输出。
清单 2. Reduce 阶段的输出
the, 2
quick, 1
brown, 1
fox, 1
jumps, 1
over, 1
lazy, 1
dog, 1
尽管此方法适用于基本的字数统计,但您常常希望识别重要的短语或单词的出现。例如,获取 Amazon 上对不同影片和视频的评论。
使用来自 Stanford University 大数据项目的信息,您可以下载影片评论数据(参见 参考资料)。该数据包含(Amazon 上报告的)原始评论的评分和有用性,如清单 3 中所示。
清单 3. 下载影片评论数据
product/productId: B003AI2VGA
review/userId: A3QYDL5CDNYN66
review/profileName: abra "a devoted reader"
review/helpfulness: 0/0
review/score: 2.0
review/time: 1229040000
review/summary: Pretty pointless fictionalization
review/text: The murders in Juarez are real. This movie is a badly acted
fantasy of revenge and holy intercession. If there is a good movie about
Juarez, I don't know what it is, but it is not this one.
请注意,尽管评论者给影片打了 2 分(1 为最差,5 为最好),但评论内容将此影片描述为一部非常差的影片。我们需要一个置信度评分,以便能够了解所给的评分与实际的评论是否彼此匹配。
许多工具可用于执行高级启发式分析,但基本的处理可使用一个简单的索引或正则表达式来实现。然后,我们可统计正面和负面正则表达式匹配数来获得一部影片的分数。
图 1. 统计正面和负面正则表达式匹配数来获得一部影片的分数
该图显示了如何从原始数据的单词分数来获得影片分数
对于 Map 部分,统计影片评论中各个单词或短语的数量,为正面和负面评价提供单个计数。Map 操作从产品评论中统计影片的分数,Reduce 操作然后按产品 ID 汇总这些分数,以提供正面或负面的评分。因此 Map 类似于清单 4。
清单 4. 为正面和负面评论提供单个计数的 Map 函数
// List of positive words/phrases
static String[] pwords = {"good","excellent","brilliant movie"};
// List of negative words/phrases
static String[] nwords = {"poor","bad","unwatchable"};
int count = 0;
for (String word : pwords) {
String REGEX = "//b" + word + "//b";
Pattern p = Pattern.compile(REGEX);
Matcher m = p.matcher(INPUT);
while(m.find()) {
count++;
}
for (String word : nwords) {
String REGEX = "//b" + word + "//b";
Pattern p = Pattern.compile(REGEX);
Matcher m = p.matcher(INPUT);
while(m.find()) {
count--;
}
}
output.collect(productId, count);
Reduce 然后可像传统的内容求和那样计算。
清单 5. 按产品 ID 对正面和负面评论求和的 Reduce 函数
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key,
Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
结果是评论的置信度分数。可以扩展单词列表来包含您想要匹配的短语。
读取和写入 JSON 数据
JSON 已成为一种实用的数据交换格式。它的实用性一定程度上源于它的简单性质和结构,以及在如此多的语言和环境中解析的轻松性。
在解析传入的 JSON 数据时,最常见的格式是每个符号输入行一条 JSON 记录。
清单 6. 每个符号输入行一条 JSON 记录
{ "productId" : "B003AI2VGA", "score": 2.0, "text" : """}
{ "productId" : "B007BI4DAT", "score": 3.4, "text" : """}
{ "productId" : "B006AI2FDH", "score": 4.1, "text" : """}
此代码可通过使用合适的类(比如 GSON)将传入的字符串转换为 JSON 对象来轻松解析。将此方法用于 GSON 时,您将需要去序列化到一个预先确定的类中。
清单 7. 去序列化到一个预先确定的类中
class amazonRank {
private String productId;
private float score;
private String text;
amazonRank() {
}
}
解析传入的文本,如下所示。
清单 8. 解析传入的文本
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
try {
amazonRank rank = gson.fromJson(value.toString(),amazonRank.class);...
要写入 JSON 数据,可执行相反的操作。创建您想要与 MapReduce 定义内的 JSON 输出匹配的输出类,然后使用 GSON 类将此转换为此结构的一种 JSON 表示。
清单 9. 写入 JSON 数据
class recipeRecord {
private String recipe;
private String recipetext;
private int recipeid;
private float calories;
private float fat;
private float weight;
recipeRecord() {
}
}
现在您可在输出期间填充对象的一个实例,将它转换为单条 JSON 记录。
清单 10. 在输出期间填充对象的一个实例
recipeNutrition recipe = new recipeRecord();
recipe.recipeid = key.toString();
recipe.calories = sum;
Gson json = new Gson();output.collect(key, new Text(json.toJson(recipe)));
如果您要在 Hadoop 处理作业中使用一个第三方库,请确保将库 JAR 文件与 MapReduce 代码包含在一起:$ jar -cvf recipenutrition.jar -C recipenutrition/* google-gson/gson.jar。
尽管在 Hadoop MapReduce 处理器之外,但另一种替代方案是使用 Jaql,它将直接解析并处理 JSON 数据。
合并数据集
一个 MapReduce 作业中通常执行 3 种类型的合并:
组合多个具有相同结构的文件的内容。
组合多个您想要组合的具有类似结构的文件的内容。
联接来自多个来源的与一个特定 ID 或关键词相关的数据。
第一个选项最好,在典型的 MapReduce 作业外部处理,因为它可使用 Hadoop Distributed File System (HDFS) getmerge 操作或某个类似操作完成。此操作接受单个目录作为内容并输出到一个指定文件。例如,$ hadoop fs -getmerge srcfiles megafile 将 srcfiles 目录中的所有文件合并到一个文件中:megafile。
合并类似文件
要合并类似但不等同的文件,主要问题在于如何识别输入时使用的格式以及如何指定输出的格式。例如,给定文件 name, phone, count 和第二个文件 name, email, phone, count,您要负责确定哪个文件是正确的并执行 Map 来生成所需的结构。对于更复杂的记录,您可能需要在 Map 阶段对包含和不包含空值的字段执行更复杂的合并,以生成信息。
事实上,Hadoop 不是此过程的理想选择,除非您还将它作为简化、统计或化简信息的一个机会。也就是说,您识别传入记录的数量,有哪些可能的格式,并在您想要选择的字段上执行 Reduce。
联接
尽管有一些潜在的解决方案来执行联接,但它们常常依赖于以一种结构化方式处理信息,然后使用此结构确定对输出信息做什么。
举例而言,给定两条不同的信息线索(比如电子邮件地址、发送的电子邮件数量,以及接收的电子邮件地址数量),目的在于将数据合并到一种输出格式中。这是输入文件:email, sent-count 和 email, received-count。输出应为此格式:email, sent-count, received-count。
处理传入的文件并以不同方式输出内容,以便可以不同方式访问和生成文件和数据。然后依靠 Reduce 函数来执行化简。在大多数情况中,这将是一个多阶段过程:
一个阶段处理 “已发送的” 电子邮件,以 email, fake#sent 形式输出信息
注意:我们使用伪前缀来调整顺序,以便数据可按伪前缀来核对,而不按收到的前缀来核对。此做法允许数据按虚假、暗含的顺序联接。
一个阶段处理 “已发送的” 电子邮件,以 email, received 形式输出信息。
在 Map 函数读取文件时,它生成一些行。
清单 11. 生成行
dev@null.org,0#sent
dev@null.org, received
Map 识别输入记录并输出一个带一个键的统一版本。输出并生成 sent#received 结构来处理内容,确定该值应合并在一起还是汇总为一个单纯收到的值。
清单 12. 输出一个带一个键的统一版本
int sent = 0;
int received = 0;
for (Text val : values) {
String strVal = val.toString();
buf.append(strVal).append(",");
if (strVal.contains("#")) {
String[] tokens = strVal.split("#");
// If the content contains a hash, assume it's sent and received
int recvthis = Integer.parseInt(tokens[0]);
int sentthis = Integer.parseInt(tokens[1]);
received = received + Integer.parseInt(recvthis);
sent = sent _ sentthis;
} else {
// Otherwise, it's just the received value
received = received + Integer.parseInt(strVal);
}
}
context.write(key, IntWritable(sendReplyCount), new IntWritable(receiveReplyCount));
在此情况下,我们依赖于 Hadoop 本身内的化简来按该键简化输出数据(在此情况下,该键为电子邮件地址),简化为我们需要的信息。因为该信息是以电子邮件为键,所以记录可以电子邮件为键来轻松地合并。
使用键的技巧
请记住,MapReduce 过程的一些方面可为我们所用。在本质上,MapReduce 是一个两阶段过程:
Map 阶段访问数据,挑选您需要的信息,然后输出该信息,使用一个键和关联的信息。
Reduce 阶段使用通用的键将映射的数据合并、汇总或统计为一种更简单的形式,从而简化数据。
键是一个重要的概念,因为它可用于以不同方式格式化和汇总数据。例如,如果您计划化简有关国家和城市人口的数据,可以仅输出一个键来按国家化简或汇总数据。
清单 13. 仅输出一个键
France
United Kingdom
USA
要按国家和城市汇总,键是二者的复合版本。
清单 14. 键是国家和城市的复合版本
France#Paris
France#Lyon
France#Grenoble
United Kingdom#Birmingham
United Kingdom#London
这是一个基本的技巧,可在处理某些类型的数据时为我们所用(例如具有一个共同键的材料),因为我们可使用它模拟伪联接。此技巧在组合博客文章(拥有一个 blogpostid 以便于识别)和博客评论(拥有一个 blogpostid 和 blogcommentid)时也很有用。
要化简输出(例如统计博客和评论中的字数),我们首先通过 Map 处理博客文章和博客评论,但我们输出一个通用的 ID。
清单 15. 化简输出
blogpostid,the,quick,brown,fox
blogpostid#blogcommentid,jumps,over,the,lazy,dog
这会明显地使用两个键,将信息输出为两个不同的信息行。我们也可反转这一关系。我们可通过向每个单词添加评论 ID,从评论中针对 blogpostid 来识别单词。
清单 16. 反转关系
blogpostid,the,quick,brown,fox,jumps#blogcommentid,over#blogcommentid,
the#blogcommentid,lazy#blogcommentid,dog#blogcommentid
在处理期间,我们可通过查看 ID 而获知该单词是否附加到博客文章,以及它是否按该格式附加到博客文章或评论。
模拟传统的数据库操作
Hadoop 在真正意义上不是一个真正的数据库,这一定程度上是因为我们无法逐行执行更新、删除或插入。尽管这在许多情况下不是问题(您可对要处理的活动数据执行转储和加载),但有时您不希望导出并重新加载数据。
一种避免导出并重新加载数据的技巧是,创建一个变更文件,其中包含来自原始转储文件的一个差异列表。现在我们暂时忽略从 SQL 或其他数据库生成这些数据的过程。只要数据有一个惟一 ID,我们就可将它用作键,就可利用该键。下面来看一个类似于清单 17 的源文件。
清单 17. 源文件
1,London
2,Paris,
3,New York
假设有一个类似于清单 18 的变更文件。
清单 18. 变更文件
1,DELETE
2,UPDATE,Munich
4,INSERT,Tokyo
最终得出两个文件经过解析的合并结果,如清单 19 所示。
清单 19. 源文件和变更文件的合并
2,Munich
3,New York
4,Tokyo
我们如何通过 Hadoop 实现这样一种合并?
使用 Hadoop 实现此合并的一种方式是,处理当前数据并将它转换为插入数据(因为它们都是插入目标文件中的新数据),然后将 UDPATE 操作转换为新数据的 DELETE 和 INSERT 操作。事实上,使用变更文件,通过将它修改为清单 20 中的内容更容易实现此目的。
清单 20. 通过 Hadoop 实现合并
1,DELETE
2,DELETE
2,INSERT,Munich
4,INSERT,Tokyo
问题在于,我们无法对两个文件进行物理合并,但我们可相应地处理它们。如果它是一个原始的 INSERT 或 DELETE,我们会输出一个带有计数器的键。如果它是创建新插入数据的 UPDATE 操作,我们想要一个不会化简的不同的键,所以我们生成一个类似清单 21 的间隙 (interstitial) 文件。
清单 21. 生成间隙文件
1,1,London
2,1,Paris,
3,1,New York
1,-1,London
2,-1,Paris
2#NEW,Munich
4#NEW,1,Tokyo
在 Reduce 期间,我们汇总每个惟一键的计数器的内容,生成清单 22。
清单 22. 汇总每个惟一键的计数器的内容
1,0,London
2,0,Paris,
3,1,New York
2#NEW,1,Munich
4#NEW,1,Tokyo
我们然后可通过一个辅助 MapReduce 函数运行内容,使用清单 23 中所示的基本结构。
清单 23. 通过一个辅助 MapReduce 函数运行内容
map:
if (key contains #NEW):
emit(row)
if (count >0 ):
emit(row)
辅助 MapReduce 会得到预期输出,如清单 24 中所示。
清单 24. 辅助 MapReduce 函数的预期输出
3,1,New York
2,Munich
4,1,Tokyo
图 2 演示了这个首先格式化和化简、然后简化输出的两阶段过程。
图 2. 格式化、化简和简化输出的两阶段过程
原始数据在 Map 和 Reduce 阶段中得到化简和映射
这个过程需要比传统数据库中更多的工作,但它所提供解决方案需要的对不断更新的数据的交换简单得多。
结束语
本文介绍了许多使用 MapReduce 查询的不同场景。您看到了这些查询在处理各种数据上的强大功能,您现在应能够在自己的 MapReduce 解决方案中利用这些示例了。