大数据的批处理和实时处理模式已经存在了。但是,还没有一种模式可以允许我们实时批处理非独立数据。合作伙伴一旦收到操作指导,Expedia的营销团队需要分析相互依赖的数据集。现存的系统运行于一个本地Hadoop集群中,但是整个团队一直在努力达到内部的SLA(服务等级协议)。这些信息也是有时效的,更快地获取数据意味着给合作伙伴更好的操作指导。
从事于Expedia研究的Pariveda小组参与AWS的Solutions Architects(解决方案架构)研究来应对三个明显的挑战:如何在源数据可用后尽快传递分析结果;如何处理相互依赖但又在不同时间产生的数据集;如何管理不同时间达到的数据集之间的从属关系。
在本博客帖中,我将会描述Expedia,Pariveda和AWS团队是如何以 AWS Lambda , Amazon DynamoDB , Amazon EMR 和A mazon S3为 组成部分,找出独特方法来实时处理数据的。你将会学到在不管理任何基础设施的情况下如何实施一个相似的传递途径。
解除数据集间的相互依赖关系
我们需要的解决的一个问题是数据集间的相互依赖关系。我们的目标是为另一个系统提供统一,清洁的数据输入源以便进行更加详细的分析。为了创建这些数据输入源,每天来自多个合作伙伴和内部系统的不同种类的成百上千的数据集必须经过接收,聚合和查询。每一种数据,每一个合作伙伴的数据达到的时间都是不同的。这意味着数据处理过程需要持续,直到一个数据输入源所需的所有数据都到达。
下面概括的解决方案就是我们的成果。我们使用从属于一个S3桶的AWS Lambda来更新DynamoDB中定义的任务。任务定义包括名称,非独立文件的列表,它们的状态(已到达还是未到达)和在EMR中运行该任务所需的参数。一旦一个特定任务所需的所有文件都到达了,lambda函数就会更新任务队列,在EMR中启动一个集群。EMR将结果再推回到S3以便使用S3的应用程序在需要该结果时可以提取到它们。
配置任务
系统的核心时任务。任务对象保存了所有的信息,这些信息是确定数据依赖关系,依赖关系状态和待发生的处理结果所必需的。通过定义任务,你可以配置所有需要发生的work。从任务表中,可以轻松地看到所有的数据依赖关系。
建立S3事件与任务之间的映射
当我们从S3获取事件时,AWS Lambda中引发的事件只有关于S3中被修改的对象的上下文。从S3中直接获取的数据没有关于这个任务的任何信息。但在Node.js这条只给出了一行代码的信息中,我们确实得到了一条很珍贵的信息,那就是S3对象的新密钥。
var srcKey = event.Records[0].s3.object.key;
从这个新密钥,我们需要一种方式来获取任务信息。为了达到这个目的,我们创建了FileUnit表。这一表实际上完全改变了这个任务,将S3密钥作为打开表的范围密钥,而任务密钥作为数据负载。这使我们获取了源密钥,通过一次DynamoDB查询就可以算出我们拥有的任务。
function getTaskKey(srcKey, dateKey, callback) { // Create FileUnit query var path = decodeURIComponent(srcKey); console.log('checking for FileUnit {/nDate:' + dateKey +'/nFile:' + path + '/n}'); var fileUnitParams = { Key: {Date: {S: dateKey}, Filename: {S: path}}, TableName: 'FileUnit', AttributesToGet: ['Task'], ConsistentRead: false }; // Get Task key from FileUnit dynamodb.getItem(fileUnitParams, function (err, data) { if (err) { console.log('Error reading file unit: ' + err); // an error occurred callback(err); } else if(data === undefined || data.Item === undefined || data.Item.Task === undefined) { console.log('File Unit Key does not exist: Date: ' + dateKey + 'File: ' + path); callback('File Unit Key Does not exist'); } else { console.log('Found key: ' + data.Item); var taskKey = data.Item.Task.S; callback(null, path, taskKey); } }); }
从这里,我们可以更新任务,确定是否所有的依赖数据都已经到达,并启动亚马逊EMR。
产生的流程图
创建DynamoDB表
我们在DynamoDB中创建如下三个表:
Task表
针对Task表,我们使用 HashKey/RangeKey Primary Key 配置,以日期作为hash密钥,以TaskKey字符串作为范围密钥。这个表可以使用任何名称,只要在你所有的任务中,该名称是唯一的即可。而针对Expedia项目,从hash密钥的角度看,Date并不遵守时间系列数据的准则,所以你可以创建另一个可预测的hash密钥。但是如果你的任务是以天为基础重复的,那么Date是一个很好的hash密钥的选择,因为它便于后面的搜索。
这里只是一个样例条目:
{ "Date": "20150525", "input/inputFile1.txt": "NULL", "input/inputFile2.txt": "NULL", "input/inputFile3.txt": "NULL", "ScriptParameters": "['s3://us-west-2.elasticmapreduce/libs/hive/hive-script ', '--run-hive-script', '--hive-versions', '0.13.1', '--args','-f', 's3://YOUR_BUCKET_NAME/input/script/taskQuery.hql']", "TaskKey": "Task1", "TaskStatus": "New" }
在创建表格时,我们只需关心TaskKey和Date参数。但是输入文件(要注意这些文件在S3上的路径)和ScriptParameters对整个系统的运行是必需的。该任务在控制台创建。在实际操作中,这些配置信息应该在数据文件被加载前以设定的频率从某个文件中加载。
FileUnit表
FileUnit是对使用S3路径的Task表格的一个引用。它有三个属性:
在所有的实践中,Date并不是FileUnit表格的必须属性。实际上,如果你能避免这一属性而且不影响任务的运行,那会是更好的选择,但是这一属性却能更好地支持我们的描述。如果你的任务名并不是以天为基础重复,使用S3路径作为hash密钥,使用任务名称作为范围密钥会是更好的选择。这可以使你在根据hash密钥查询的同时,管理那些在多个任务之间相互依赖的数据集变得更简单。
Batch表
应该创建Batch表格,并以Date属性作为hash密钥,以Task属性为范围密钥。Task的值将会与Task表格中的TaskKey的值相同。为了便于查询,我们也为Batch表格添加了全局备用索引,并以日期为hash密钥,以ProcessingState为范围密钥。这帮助我们轻松地查询未处理的项目。
测试数据
为了测试,在Task表格中创建一个与上面类似的条目。确保使用属性名称所指定的输入路径,将它们的值设为NULL。下一步,获取这些输入路径,在FileUnit表格中创建条目。路径名称必须完全匹配Filename列的值(包括大小写)。FileUnit表中Task的值必须匹配任务的TaskKey值。要使用上面的Task样例,你将创建如下三个FileUnit条目:
表创建完毕,测试数据加载完毕后,我们可以实施AWS Lambda函数。
写AWS Lambda函数
代码框架
代码框架与上面展示的流程图很相似。
function processDataFiles(srcKey, date, completionCallback) { async.waterfall([ function (callback) { getTaskKey(srcKey, date, callback); }, function (path, taskKey, callback) { updateTask(path, date, taskKey, callback); }, function(taskItem, callback) { sendToQueue(taskItem, callback); }, function (callback) { getUnprocessedItems(date, callback); }, function(batchData, callback) { processBatch(batchData, callback); } ], function (err) { if (err) { console.error('Processing stopped: ' + err); completionCallback(err); } else { console.log('Success'); completionCallback(); }// Success }); }
查询DynamoDB表是简单的。查看前面“建立S3事件与任务之间的映射”部分的代码。更新任务表也是简单的,但是我们在调用函数来返回新值时引入了一个变种,因此我们可以检查其完整性。在这种方式下,我们只需为任务对DynamoDB做一次查询,而不是两次。
function updateTask(path, date, taskKey, updateTaskCallback) { var taskUpdateExpression = 'SET TaskStatus = :val1, #P = :val2' ; var expressionAttributeNames = {'#P': path}; var taskUpdateExpressionValues = { ':val1': {'S': 'FilesReceived'}, ':val2': {'S': 'Received'} }; var taskParams = { Key: {Date: {S: date}, TaskKey: {S: taskKey}}, TableName: 'Task', UpdateExpression: taskUpdateExpression, ExpressionAttributeNames: expressionAttributeNames, ExpressionAttributeValues: taskUpdateExpressionValues, ReturnValues: 'ALL_NEW' }; dynamodb.updateItem(taskParams, function(err, data){ if (err) { console.log('Error updating tasks' + err, err.stack); // an error occurred updateTaskCallback(err); } else { console.log("received: ", util.inspect(data, {depth:4})); updateTaskCallback(null, data.Attributes); } }); }//updateTask
我们将会粉饰代码,将已完成的项目添加到Batch表中因为该代码与上面的updateTask函数非常相似。但我们确实是使用putItem函数而非updateItem函数。
启动EMR
在EMR中启动任务是很简单的。我们启动一个EMR集群,然后添加一个工作流程步骤。有很多的配置代码,但是实质是简单的。你必须安装合适的应用来完成你的工作,在本案例中,你必须安装Hive,所以你在启动EMR时会看到工作流程步骤被添加到EMR中。
function startEMR(callback) { var installHiveArgs = [ 's3://' + fconst.zone_name + '.elasticmapreduce/libs/hive/hive-script', '--base-path', 's3://' + fconst.zone_name + '.elasticmapreduce/libs/hive/', '--install-hive', '--hive-versions', '0.13.1' ]; var installHiveStep = { Jar : scriptRunnerJar, Args : installHiveArgs }; var params = { Instances: { /* required */ HadoopVersion: '3.8.0', InstanceGroups: [ { InstanceCount: 1, /* required */ InstanceRole: 'MASTER', /* required */ InstanceType: 'm1.medium', /* required */ Market: 'ON_DEMAND', Name: 'Master' }, { InstanceCount: 1, /* required */ InstanceRole: 'CORE', /* required */ InstanceType: 'm1.medium', /* required */ Market: 'ON_DEMAND', Name: 'Core' } ] }, Name: "Near Realtime Big Data Test", AmiVersion: 'latest', LogUri: 's3://big-data-blog-logs/', ServiceRole: 'EMR_DefaultRole' , JobFlowRole: 'EMR_EC2_DefaultRole', Steps: [ { HadoopJarStep: installHiveStep, Name: 'Hive setup', ActionOnFailure: 'TERMINATE_CLUSTER' } ], VisibleToAllUsers: true }; console.log('Starting job: ' + params.Name); console.log(util.inspect(params, {depth:4})); emr.runJobFlow(params, callback); }
从这里,我们通过使用脚本参数调用addJobFlowSteps函数,将处理任务添加到工作流程中。有一个很小的转换步骤需要在这里进行。你可以在GitHub知识库中找到该转换代码。
部署AWS Lambda函数
为了在AWS Lambda函数中部署该应用,你需要:
1.从GitHub下载源代码。
2.使用 npm install工具来安 装 async 所依赖的部件。
3.在FunctionConstants.js文件中更新logsPath的值,使其指向某一个桶,并在你希望EMR放置日志文件的路径前加前缀。
4.将函数打包,并部署该函数,如本例或演练步骤中所示。
5.确保你的Lambda Execution IAM角色有以下权限:
a.在DynamoDB中–调用getItem,updateItem和putItem函数的权限
b.在EMR中–调用startJobFlow和addJobFlowItem函数的权限
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:*" ], "Resource": "arn:aws:logs:*:*:*" }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject" ], "Resource": "arn:aws:s3:::*" }, { "Action": [ "dynamodb:*", ], "Effect": "Allow", "Resource": "*" }, { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:RunJobFlow" ], "Resource": [ "*" ] }, { "Sid": "Stmt1432573611000", "Effect": "Allow", "Action": [ "iam:PassRole" ], "Resource": [ "arn:aws:iam::XXXXXXXXXXXX:role/EMR_DefaultRole" ] } ] }
在控制台上进行快速测试
确保已正确配置了所有参数,你可以在AWS控制台上打开AWS Lambda函数的Edit/Test页面,模拟添加到S3的新文件:
使用S3样本事件,修改s3部分和object部分的参数值,触发模拟任务中文件的事件。
{ "Records": [ { ... "s3": { "s3SchemaVersion": "1.0", "configurationId": "testConfigRule", "bucket": { "name": "YOUR_BUCKET_NAME", "ownerIdentity": { "principalId": "EXAMPLE" }, "arn": "arn:aws:s3:::YOUR_BUCKET_NAME" }, "object": { "key": "input/inputFile3.txt", "size": 1024, "eTag": "d41d8cd98f00b204e9800998ecf8427e" } } } ] }
在Execution结果窗口你应该可以看到消息Files processed successfully(消息处理成功)。
将S3桶事件发布到AWS Lambda函数
从控制台中,从Actions菜单中选择Add event source选项,将S3桶中的Object Created事件添加到AWS Lambda函数中,将关于事件源的信息设置为S3桶:
端对端测试
既然一切都已准备就绪,你可以在S3桶中创建新文件了。你应该看到与如下截屏所类似的信息:
如果EMR集群还未启动,你可以查看该功能所产生的CloudWatch日志,确定问题所在。你也可以通过控制台模拟所有的文件达到情况来确定功能收到错误信息的部位。
祝贺你,你已经建立了一个工作系统!
优化
有若干方法可以优化该系统,取决于你的使用场景。下面是一些优化案例。
一个文件对多个任务
如果多个任务以来同一个文件,你必须适当地调整FileUnit表,然后调整任务查询相关参数来处理结构的变化。你可以使用上面描述的格式(文件名作hash密钥)或者你可以保留格式但是将Task表中条目的值设为一组值而非一个值。
清扫任务
如果你的任务很小,你预期数据以某一频率到达,你可以调整批处理的任务大小到大于1 (该值是在配置文件中配置的)。如果你这样设置了,你可能想要添加一个清扫功能,使其以定时器规定的时间来清除可能还未运行的任务。以这种方式,你获取了效率,待处理的任务在运行前不用等待太长时间来满足批处理的个数限制。
原文链接: https://blogs.aws.amazon.com/bigdata/post/Tx1R28PXR3NAO1I/How-Expedia-Implemented-Near-Real-time-Analysis-of-Interdependent-Datasets
活动推荐: CSDN在线培训——新创公司用AWS搭建高扩展性架构
( 翻译/吕东梅 责编/王鑫贺 )
订阅“AWS中文技术社区”微信公众号,实时掌握AWS技术及产品消息!
AWS中文技术社区为广大开发者提供了一个Amazon Web Service技术交流平台 ,推送AWS最新资讯、技术视频、技术文档、精彩技术博文等相关精彩内容,更有AWS社区专家与您直接沟通交流!快加入AWS中文技术社区,更快更好的了解AWS云计算技术。