Node.js是一套JavaScript框架,其核心诉求在于利用非阻塞I/O以及异步式事件驱动处理模型实现服务器端应用程序的高性能运行。
当客户需要处理规模庞大且复杂性较高的数据时,Node.js能够提供一套以原生方式支持JSON数据结构的运行时环境。Python及Ruby等编程语言都拥有面向JSON数据的优秀支持能力,但Node.js在处理包含列表及数组的结构时显得尤为得心应手。Node.js还提供一套高性能且具备可扩展能力的备用方案,能够以原生方式将JSON数据作为对象加以处理。目前大家已经可以获得专门针对Node.js的AWS SDK( http://aws.amazon.com/sdkfornodejs ),其允许我们将Node.js应用程序与AWS服务加以紧密整合。
在今天的文章中,大家将了解到如何在Amazon Elastic MapReduce(简称Amazon EMR)当中安装Node.js、如何构建一款Node.js应用程序并将其与Hadoop Streaming并发处理架构相整合、外加如何在AWS之上部署并运行我们的Node.js MapReduce应用程序。要了解更多与Amazon EMR以及Hive项目相关之细节信息,请大家 点击此处 查看专题教程。
本文假定大家已经熟知与Hadoop、Hive以及Amazon EMR相关的专业知识。
在本文当中,我们将使用来自Twitter的数据,这部分数据中包含由推文、转发、回复以及直发信息所构成的复杂信息体系。每个文件中都包含有单一Twitter事务数据块,我们需要将其内容写入至Amazon Simple Storage Service(简称Amazon S3)并随时加以读取。我们希望针对Hive的实际需求对数据进行转换,旨在对转发率、每秒推文数量以及每用户直发信息数量等指标进行汇总。
我们的数据表现出一套由大量Twitter用户所构成的复杂交互图谱,其中包括推文转发与回复内容,如下图所示:
为了进行数据发现,我们可以利用Hive配合JsonSerde(可参看 http://aws.amazon.com/articles/2854 )对数据进行调查。不过由于这套图谱非常复杂而且存在自我指涉,因此大部分Hive JsonSerde无法将这部分数据显示为表格。能够在Node.js中进行数据处理,我们能够更轻松地利用简单语法实现数据图谱导航。
我们可以利用以下bootstrap操作将Node.js安装在Amazon EMR集群当中:
---bootstrap-actions Path=s3://github-emr-bootstrap-actions/node/install-nodejs.sh,Name=InstallNode.js
(如果大家此前从未接触过bootstrap操作,请 点击此处 查阅Amazon EMR提供的说明文档。)
现在Node.js已经被安装在Amazon EMR集群之上,而Hadoop Streaming也得到正确配置,接下来我们需要运行自己的Node.js应用程序以完成映射与归约操作。利用Hadoop Streaming处理架构,我们能够为流任务指定所需要使用的映射与归约代码。
与其它Hadoop Streaming兼容语言一样,我们必须从Amazon S3数据存储或者HDFS文件系统当中利用标准输入(即stdin)方式实现数据读取。在Node.js当中,stdin能够利用处理全局对象的方式获得可访问能力(参见 http://nodejs.org/api/process.html )。这就使我们得以访问多种控制机制,进而通过管理输入与输出数据流对数据进行读取与写入,例如process.stdin与process.stdout。
我们的MapReduce程度必须执行五项主要函数以实现从Hadoop Streaming以及输出结果中读取数据。
在默认情况下,process.stdin输入通道会处于暂停状态,且不触发任何事件。在将其启用之前,我们必须首先配置所需的字符集编码。对于非多字节字符集而言,我们可以使用UTF-8。因此,我们映射或者归约方案的主体流程应该从这里开始:
process.stdin.setEncoding('utf8');
或者,我们也可以使用UTF-16实现多字节支持。在此之后,我们必须启用stdin对事件进行恢复以及触发:
process.stdin.resume();
当我们的Node.js应用程序使用该stdin.data事件时,process.stdin通道会就此发出通知——stdin.data事件会在一定数量的数据可用于读取时被触发。我们的Node.js应用程序必须对这部分数据进行缓存处理以备后续使用,因为事件所提供的每一个数据块可能都仅仅属于标准输入内容中全部可用数据的一小部分。由于我们此前通过配置让Hadoop Streaming使用非分割式FileInputFormat,因此我们会在单一映射器中获取到完整的JSON数据,并能够将该文件作为整体加以处理。有鉴于此,我们可以通过以下代码将数据块缓存于data事件当中:
var line = ‘’; // fires on every block of data read from stdin process.stdin.on('data', function(chunk) { // chunk and emit on newline lines = chunk.split("/n") if (lines.length > 0) { // append the first chunk to the existing buffer line += lines[0] if (lines.length > 1) { // emit the current buffer emitter.emit(lineEvent,line); // go through the rest of the lines and emit them, buffering the last for (i=1;i<lines.length; i++) { if (i<lines.length) { emitter.emit(lineEvent,lines[i]); } else { line = lines[i]; } } } } });
上述操作会将全部数据块附加至行变量处,并在每一次发现新的换行符时触发“lineReady”事件。
在全部来自stdin的数据被读取完成后,该流程将触发stdin.end事件。我们已经将全部数据收集到行缓冲区当中,这样我们只需要利用以下代码刷新最后一行数据:
// fires when stdin is completed being read process.stdin.on('end', function() { emitter.emit(lineEvent,line); });
每当新的内容行准备就绪时,我们都将利用以下代码将其排序至一个JSON对象当中:
try { obj = JSON.parse(line); } catch (err) { process.stderr.write('Error Processing Line ' + line + '/n'); process.stderr.write(err); return; }
我们可以选择把复杂的JSON数据简化为普通输出结果,以供Hive JsonSerde进行加载,或者选择生成CSV或者XML数据来代替。
对于某些特定的MapReduce操作类型,我们需要确保其归约器能够获取到归属于特定类型的全部数据。为了实现这一目标,我们必须指定一个键值,并保证Hadoop在调用该归约器之前会首先对输出结果进行分类。在进行文本内容处理时,我们会利用由/t标签开头的字符串来表示这个值。
要执行存储或者移除方面的数据写入操作,我们需要利用process.stdout.write()向stdout实施写入。
Amazon EMR利用命令行语法调用的方式运行映射器与归约器,例如“./mapper.js”。因此,我们需要确保我们所构建的Node.js模块能够通过命令行实现调用。为达成这一目标,我们在映射器或者归约器文件的开头处添加一条标准“shebang”命令,这样它就能调用Node.js并运行脚本内容:
#!/usr/bin/env node
接下来,大家可以通过命令行调用的方式测试自己的映射器代码了(以下示例假定代码位于名为Mapper.js的文件当中):
./mapper.js < input-file-path
在编写了自己的映射器与归约器之后,接下来我们将其传输至Amazon S3当中,而后利用Amazon EMR针对部分输入数据运行MapReduce。
以下示例讲解了如何利用Amazon EMR命令行执行各个步骤(参见 http://aws.amazon.com/developertools/2264 ),不过大家也可以在Amazon EMR控制台(参见 console.aws.amazon.com/elasticmapreduce )或者Amazon EMR API(参见 http://docs.aws.amazon.com/ElasticMapReduce/latest/API/Welcome.html?r=8857 )中利用命令实现同样的效果。我们将展示如何以自动方式利用AWS命令行工具运行该应用程序,但大家完全可以使用AWS Web Console或者AWS Data Pipeline完成同样的工作。我们可以使用--create-cluster命令启动一套新的Amazon EMR集群,同时利用以下代码启动该集群并捃行我们的Node.js bootstrap操作:
aws emr create-cluster --ami-version 3.3.1 --enable-debugging --visible-to-all-users --name MyNodeJsMapReduceCluster --instance-groups InstanceCount=2,InstanceGroupType=CORE,InstanceType=m3.xlarge InstanceCount=1,InstanceGroupType=MASTER,InstanceType=m3.xlarge -- no-auto-terminate --enable-debugging --log-uri s3:///logs -- bootstrap-actions Path=s3://github-emr-bootstrap- actions/node/install-nodejs.sh,Name=InstallNode.js --ec2-attributes KeyName=<my key pair>
这样我们就创建了一套始终启用的集群,其中包含配备3.3.1 AMI、双核心节点以及一个主节点,全部采用m3.xlarge实例类型。以上代码同时为指定存储桶设定了调试与日志记录机制,并通过bootstrap操作完成了Node.js的启动时安装。除此之外,代码中还使用了Amazon EC2密钥对,从而将SSH安全机制引入该Hadoop集群。
接下来,我们将添加Hadoop Streaming流程,旨在处理自己的输入数据。在以下代码中,大家需要把<my cluster ID>替换为自己的实际集群ID:
aws emr add-steps --cluster-id <my cluster ID> --steps Name=NodeJSStreamProcess,Type=Streaming
我们通过创建一套文件系统参考(利用—files参数)添加自己的映射器与归约器JavaScript文件,而后将该基础文件名通过-mapper与-reducer进行引用:
Args=--files,"s3://<path to mapper>/mapper.js/,s3://<path to reducer>/reducer.js",-mapper,mapper.js,-reducer,reducer.js
而后,我们添加该输入与输出文件的位置:
-input,s3://<path-to-input-files>,-output,s3://<path-to-output-files>
这样一来,我们就获得了如下完整命令行调用代码:
aws emr add-steps --cluster-id <my cluster ID> --steps Name=NodeJSStreamProcess,Type=Streaming,Args=--files, "s3://<path to mapper>/mapper.js/,s3://<path to reducer>/reducer.js", -input,s3://<path-to-input-files>,-output,s3://< path-to-output-files>,-mapper,mapper.js,-reducer,reducer.js
大家只需要提供相应的时间量用于该流程运行,Hadoop集群不再需要其它迭代实现数据生成。请确保在运行上述示例时,大家在执行完成后及时关闭自己的集群。我们可以利用以下Amazon EMR命令完成集群关闭操作:
aws emr terminate-clusters --cluster-ids <my cluster ID>
Node.js能够在实现MapReduce应用程序快速执行效果的同时,利用简洁的原生语法对高复杂性JSON数据加以处理。通过Amazon EMR配置选项,大家可以轻松运行基于Node.js的应用程序,并随时间推移或者输入数据量的增加提升其规模。
以下MapReduce程序旨在以天为单位将推文内容输出为高复杂性JSON结构化数据。在本示例中,Twitter数据由DataSift(来自datasift.com)负责收集。我们去掉了其中的某些特殊字符,例如换行符与制表符,并将推文created_at字段输出为键。归约器随后根据日期对这部分数据加以排序,并输出推文的整体数量。
#!/usr/bin/env node var events = require('events'); var emitter = new events.EventEmitter(); var line = ''; var lineEvent = 'line'; var dataReady = 'dataReady'; //移除全部控制字符,从而保证输出结果由纯文本内容构成 String.prototype.escape = function() { return this.replace('/n', '//n').replace('/'', '///'').replace('/"', '//"') .replace('/&', '//&').replace('/r', '//r').replace('/t', '//t') .replace('/b', '//b').replace('/f', '//f'); } //为此附加一套数组 Array.prototype.appendArray = function(arr) { this.push.apply(this, arr); } //数据完成后,将其写入至必要的输出通道 emitter.on(dataReady, function(arr) { var dateComponents = arr[9].split(' '); var d = [dateComponents[1],dateComponents[2],dateComponents[3]].join(' '); var interaction = { key_date : d, content: { objectId : arr[0], hash : arr[1], id : arr[2], author_id : arr[3], author_avatar : arr[4], author_link : arr[5], author_name : arr[6], author_username : arr[7], content : arr[8], created_at : arr[9], link : arr[10], schema_version : arr[11], source : arr[12] } }; process.stdout.write(interaction.key_date + '/t' + JSON.stringify(interaction) + '/n'); }); //通过捕捉到的输入数据生成一个JSON对象 //而后生成所需的输出结果 emitter.on(lineEvent, function(l) { var obj; //通过input事件创建该JSON对象 //如果无法创建,则丢弃该项目 // // TODO在此生成一个例外以代替? if (!line || line == '') { return; } try { obj = JSON.parse(line); } catch (err) { process.stderr.write('Error Processing Line ' + line + '/n'); process.stderr.write(err); return; } //为每个交互对象生成一个输出结果组 for ( var i = 0; i < obj.interactions.length; i++) { //根据语法创建几个便捷对象 var int = obj.interactions[i]; var a = int.interaction.author; //提取我们需要保留的对象模型内容 var output = [ obj.id, obj.hash, int.interaction.id, a.id, a.avatar, a.link, a.name, a.username, int.interaction.content.escape(), int.interaction.created_at, int.interaction.link, int.interaction.schema.version, int.interaction.source ]; //当输出数组完成后触发事件 emitter.emit(dataReady, output); } }); //作用于每一次由stdin引发的数据块读取操作 process.stdin.on('data', function(chunk) { //新行中汇总并执行 lines = chunk.split("/n") if (lines.length > 0) { //将第一套数据块添加至现有缓冲区中 line += lines[0] if (lines.length > 1) { //执行当前缓冲内容 emitter.emit(lineEvent,line); //推进行内剩余内容并加以执行,将最新内容纳入缓冲区 for (i=1; i<lines.length; i++) { if (i < lines.length) { emitter.emit(lineEvent,lines[i]); } else { line = lines[i]; } } } } }); //当stdin读取操作完成后触发 process.stdin.on('end', function() { emitter.emit(lineEvent,line); }); //设置STDIN编码 process.stdin.setEncoding('utf8'); //恢复STDIN——默认暂停 process.stdin.resume();
#!/usr/bin/env node var events = require('events'); var emitter = new events.EventEmitter(); var remaining = ''; var lineReady = 'lineReady'; var dataReady = 'dataReady'; var interactionSummary = { day : '', count : 0 }; //移除全部控制字符,从而保证输出结果由纯文本内容构成 String.prototype.escape = function() { return this.replace('/n', '//n').replace('/'', '///'').replace('/"', '//"') .replace('/&', '//&').replace('/r', '//r').replace('/t', '//t') .replace('/b', '//b').replace('/f', '//f'); } //为此附加一套数组 Array.prototype.appendArray = function(arr) { this.push.apply(this, arr); } //数据完成后,将其写入至必要的输出通道 emitter.on(dataReady, function(o) { if (o) { process.stdout.write(JSON.stringify(o) + '/n'); } }); //通过捕捉到的输入数据生成一个JSON对象 //而后生成所需的输出结果 emitter.on(lineReady,function(data) { if (!data || data == '') { // null数据可能是一个关闭事件,意味着数据已经处理完毕 emitter.emit(dataReady, interactionSummary); return; } try { obj = JSON.parse(data.split('/t')[1]); } catch (err) { process.stderr.write('Error Processing Line ' + data + '/n') process.stderr.write(err); return; } if (interactionSummary.day == '') { interactionSummary.day = obj.key_date; interactionSummary.count = 1; } else { if (obj.key_date != interactionSummary.day) { //数组削减完成后触发事件 emitter.emit(dataReady, interactionSummary); interactionSummary.day = obj.key_date; interactionSummary.count = 1; } else { interactionSummary.count += 1; } } }); //作用于每一个从stdin处进行读取的数据块 process.stdin.on('data', function(chunk) { var capture = chunk.split('/n'); for (var i=0;i<capture.length; i++) { if (i==0) { emitter.emit(lineReady,remaining + capture[i]); } else if (i<capture.length-1) { emitter.emit(lineReady,capture[i]); } else { remaining = capture[i]; } } }); //当stdin读取操作完成后触发 process.stdin.on('end', function() { emitter.emit(lineReady,remaining); }); //恢复STDIN——默认为暂停 process.stdin.resume(); //设置STDIN编码 process.stdin.setEncoding('utf8');
aws emr create-cluster --ami-version 3.3.1 --enable-debugging --visible-to-all-users --name MyNodeJsMapReduceCluster --instance-groups InstanceCount=2,InstanceGroupType=CORE,InstanceType=m3.xlarge InstanceCount=1,InstanceGroupType=MASTER,InstanceType=m3.xlarge -- no-auto-terminate --enable-debugging --log-uri s3://<log bucket>/EMR/logs --bootstrap-actions Path=s3://github-emr- bootstrap-actions/node/install-nodejs.sh,Name=InstallNode.js -- service-role EMR_DefaultRole --ec2-attributes KeyName=<my key pair>,InstanceProfile=EMR_EC2_DefaultRole
aws emr add- steps --cluster-id --steps Name=NodeJSStreamProcess,Type=Streaming,Args=--files,"s3://github- aws-big-data-blog/aws-blog-nodejs-on-emr/scripts/sample-mapper.js /,s3://github-aws-big-data-blog/aws-blog-nodejs-on- emr/scripts/sample-reducer.js",-input,s3://github-aws-big-data- blog/aws-blog-nodejs-on-emr/sample/tweets,-output,s3://<my output bucket>/node_sample,-mapper,mapper.js,-reducer,reducer.js
在上述代码中,<my output bucket>应被替代为我们希望创建输出结果的目标存储桶名称。执行完成后,预配置的输出存储桶及路径内将出现多个文件,其中包含整理得出的示例数据集中单一一天内出现的推文数量:
{"day":"14 Feb 2013","count":1071}
如果大家愿意提出一点意见或者建议,请在下方的评论栏中与我们分享。
原文链接: http://blogs.aws.amazon.com/bigdata/post/TxVX5RCSD785H6/Node-js-Streaming-MapReduce-with-Amazon-EMR
感谢Raymond Zhao对本文的审校。
给InfoQ中文站投稿或者参与内容翻译工作,请邮件至editors@cn.infoq.com。也欢迎大家通过新浪微博(@InfoQ,@丁晓昀),微信(微信号: InfoQChina )关注我们,并与我们的编辑和其他读者朋友交流(欢迎加入InfoQ读者交流群 )。