您是否希望对实时数据源的信息执行复杂事件检测,并在找到事件后快速采取行动?如果使用 IBM® Bluemix® 和 Streaming Analytics 服务,这可比您想象的要更容易。为了展示这有多容易,我构建了一个入门应用程序,它:
本教程介绍如何获取、运行和扩展这个名为 EventDetection 的入门应用程序。这是解决方案组件的概略图:
我们通过 SDK for Node.js 运行时来实现 EventDetection 应用程序。该应用程序提供一个简单的 Web UI 来显示分析状态和结果。该 Node.js 应用程序与 Streaming Analytics 服务的一个实例绑定。并且它通过 Streaming Analytics REST API 控制该服务。
分析工作由一个IBM Streams 应用程序来执行,这个应用程序可对一个天气数据流进行事件检测。Node.js 应用程序提交了一个 Streams 应用程序包,用于将 Streams 应用程序部署到与该应用程序所绑定的 Streaming Analytics 实例。
现在您可使用过去数小时的 NOAA 数据。该应用程序首先分析这些过去的数据,然后拾取 NOAA 发布的实时数据。出于此原因,该应用程序会立即检测到大量事件,然后在新数据发布时零星地检测到事件。大多数 NOAA 气象站每小时发布新数据,但有些数据更新可能更频繁。
所分析的数据是美国国家海洋及大气管理局 (NOAA) 通过互联网提供的。将 Streams 应用程序部署到 Streaming Analytics 实例中后,它会持续地摄取和分析天气数据,直到该应用程序停止。
Streams 应用程序检测到事件时,会将这些事件发送到 Node.js 应用程序并显示在该应用程序的 Web UI 上。
“ 您可修改入门应用程序的源代码,通过任何有趣的方式自定义或扩展它。 ”
新的 developerWorks Premium 会员计划提供了一张访问强大的开发工具和资源的全通票,包括 500 篇通过 Safari Books Online 提供的顶级技术文章(超过 50 篇是专门针对 Web 开发人员的),最重要开发人员活动的大幅折扣,最新的 O'Reilly 大会的视频回放,等等。立即注册。
cf
命令行接口 运行应用程序
获取代码
观看: 观看 IBM Bluemix 实战演示
阅读: Bluemix Streaming Analytics 简介
阅读: Streaming Analytics 入门
EventDetection 应用程序的代码存储在 Bluemix DevOps Services 上的 streamscloud | EventDetection 项目中。要获得您自己的代码副本,可单击本页上的 获取代码 按钮(位于构建您的应用程序所需的准备工作节的末尾),然后克隆该 Git 存储库或将该代码分解到您自己的 Bluemix DevOps Services 项目中。如果您不熟悉这些方法,可下载源代码的 ZIP 文件:
现在将按原样部署该入门应用程序,不更改任何代码,然后浏览 UI。在本教程后面,您有机会自定义该应用程序并重新部署它。
连接到 Cloud Foundry 的 Bluemix 实例:
cf api https://api.ng.bluemix.net
cf login
部署您的应用程序:
cf push <em>yourapp</em>
点击查看大图
关闭 [x]
Application Flow部分列出了该应用程序正在执行的步骤及其状态。
Event Types部分定义了应用程序正在检测的事件类型。
Application Results部分显示已检测出的事件。它还会显示当前报告的最高和最低温度。
点击查看大图
关闭 [x]
点击查看大图
关闭 [x]
在下面的屏幕截图中,每个连接显示当前元组速率为 0,因为天气数据的来源是 “爆发性” 的:不会频繁地发布新数据,所以在某些时间段没有数据可处理。要在该图中看到非零的元组速率,可重新启动 Bluemix 应用程序并立即切换到该图。在 Streams 应用程序处理初始的大量数据时,您至少会在几分钟内看到非零的元组速率。
最大化控制台右上角的 Streams Graph 窗格,以显示 Streams 应用程序的流程图。使用浏览器的缩放控件来查看整个流程图。该图显示了 Streams 应用程序的实时运行状态。可将鼠标悬停在该图上的操作符或它们之间的连接上,以获得更详细的信息。在下面的屏幕截图中,将鼠标悬停在该图中的一个连接上,会显示在该图中前两个操作符之间流动的元组数量等信息:点击查看大图
关闭 [x]
EventDetection 应用程序是一个完整但简单的应用程序,不需要任何自定义工作即可运行。要理解该应用程序,可查看它的代码:
去除无用代码,找出在何处执行这些步骤。本节的剩余部分将更详细地分析两个步骤。
阅读: Streaming Analytics 服务实例 REST API 参考资料
... // -----New form----- var form = new FormData(); // -----File part----- form.append('file', fs.createReadStream('EventDetection.sab'), { contentType: 'application/octet-stream' }); // -----JSON Part----- jsonObject = JSON.stringify({ "jobName" : "EventDetectionSample", "submissionParameters" : { "route" : app_uri, }, }); console.info('JSON object part: ' + jsonObject); var buffer = new Buffer(jsonObject); form.append('my_buffer', buffer, { contentType: 'application/json', // The line below is not an actual file. The name with the .json // extension is needed for the data in the buffer to be recognized // as json. "filename": "jparams.json" }); // -----POST Params----- var uri_string = sa_props.jobs_path + '?bundle_id=EventDetection.sab'; // -----SUBMIT POST----- var jsonPostRes = {}; form.submit({ protocol: 'https:', host: sa_props.rest_host, path: uri_string, headers: {'Authorization' : authbuf} }, function(err, res) { ...上述代码对 Streaming Analytics REST API 执行一次 HTTP
POST
,使用 form-data 包提交一个多部分表单: // POST handler for the events being sent back from the Streams application app.post('/', function(req, res){ status_step[4] = "Processing Events"; if (!cancelling) { console.info("In POST function"); var jsonString = req.body.jsonString; console.info("POST message is: " + jsonString); var payload = JSON.parse(jsonString); if (payload.eventType == 'MaxMin Temp') { // Max or min temperature change maxmin = payload; } else { // Regular event eventCount++; console.info("Event total = " + eventCount); // Add event to the array used by the web user interface events.push(new Event(eventCount, payload)); // Cancel the Streams job if we've reached the event target if (eventCount == eventTarget) { cancelling = true; console.info("EVENT TARGET REACHED..."); console.info("STREAMS JOB WILL BE CANCELLED."); finalCancel(jobNumber.toString()); } } } res.send(); });
使用的 Streams 应用程序是一个完整的 Streams 应用程序,不需要任何自定义工作即可运行。您下载(或克隆或分解)的源代码中包含了该应用程序的源代码及其预先构建的 .sab 文件。要理解 Streams 应用程序,可查看它的代码:
MatchRegex
的操作符,它可检测一个流中的一系列数据元组模式。代码注释说明了该操作符将检测的 M 形状模式的性质: // // The first complex event is called "M-shape". It triggers when the graph of the // temperature for a weather station form's an M shape over a period of time. // // Detecting M shape patterns in weather data is not that useful, but recognizing an M shape in // financial trading is valuable and is referred to as a "double-top" stock pattern. // // See http://hirzels.com/martin/papers/debs12-cep.pdf for more information on this complex event // detection method, the double-top pattern and other patterns. // stream<WeatherSummary weatherValues, rstring event> TempMEvent = MatchRegex(WeatherSummary) { param pattern : ". rise+ drop+ rise+ drop* deep"; partitionBy : stationCode; predicates : { rise = tempInF>First(tempInF) && tempInF>=Last(tempInF), drop = tempInF>=First(tempInF) && tempInF<Last(tempInF), deep = tempInF<First(tempInF) && tempInF<Last(tempInF) }; output TempMEvent : weatherValues=WeatherSummary, event="M-Shape Temp"; }
该操作符的声明使用正则表达式语法以及一组也在操作符声明中定义的谓词来定义您尝试检测的模式。该操作符根据气象站报告的值集来查找气象站温度的 M 形状。这个 MatchRegex
操作符使用之前在 SPL 代码中定义的 WeatherSummary
流,并生成一个名为 TempMEvent
的流。该操作符按气象站的 ID 将气象站的读数数据划为不同的分组,并维护检测每个气象站事件所必要的状态。
// // Send events to the application user interface by converting them to json and HTTPPost-ing // to the Node.js app // stream <rstring jsonString> JSONOutput = com.ibm.streamsx.json::TupleToJSON(OutputEvents) { } () as HttpEvents = HTTPPost(JSONOutput) { param headerContentType : "application/json"; url : ((rstring) getSubmissionTimeValue("route")); }前面代码段中的第一个操作符将一个流中的一个元组转换为 JSON 字符串。这个操作符使用之前在 SPL 代码中定义的一个名为
OutputEvents
的流并生成一个名为 JSONOutput
的流。下个操作符(名为 HTTPPost
)使用 JSONOutput
流并通过 HTTP POST
将该 JSON 字符串发送到 Node.js 应用程序的路径。 现在您已熟悉了这个入门应用程序,可修改该应用程序的源代码,通过任何有趣的方式自定义或扩展它:
event_target
变量的值从 3000
改为更大的数字。 MatchRegex
操作符,以检测您想寻找的一种模式。 MatchRegex
操作符后面的操作符,从而将新的事件类型发送回 Node.js 应用程序。 要修改该应用程序:
可在 Bluemix 中通过 Streaming Analytics 服务轻松地对实时数据流执行复杂事件检测。您在本教程中完成的应用程序可帮助您入门。您可更改想要分析的数据流,定义想要检测的事件,以及处理这些事件来实现自己的目的。
BLUEMIX SERVICES USED IN THIS TUTORIAL:
相关主题: 大数据和分析 Node.js