转载

检测实时数据流中的复杂事件

您是否希望对实时数据源的信息执行复杂事件检测,并在找到事件后快速采取行动?如果使用 IBM® Bluemix® 和 Streaming Analytics 服务,这可比您想象的要更容易。为了展示这有多容易,我构建了一个入门应用程序,它:

  • 在一个 Bluemix Node.js 应用程序中使用 Streaming Analytics 服务
  • 将一个数据流摄取到 Streaming Analytics 中
  • 识别数据流中的各种模式并检测事件
  • 将分析结果发送到该 Bluemix 应用程序

本教程介绍如何获取、运行和扩展这个名为 EventDetection 的入门应用程序。这是解决方案组件的概略图:

检测实时数据流中的复杂事件

我们通过 SDK for Node.js 运行时来实现 EventDetection 应用程序。该应用程序提供一个简单的 Web UI 来显示分析状态和结果。该 Node.js 应用程序与 Streaming Analytics 服务的一个实例绑定。并且它通过 Streaming Analytics REST API 控制该服务。

分析工作由一个IBM Streams 应用程序来执行,这个应用程序可对一个天气数据流进行事件检测。Node.js 应用程序提交了一个 Streams 应用程序包,用于将 Streams 应用程序部署到与该应用程序所绑定的 Streaming Analytics 实例。

现在您可使用过去数小时的 NOAA 数据。该应用程序首先分析这些过去的数据,然后拾取 NOAA 发布的实时数据。出于此原因,该应用程序会立即检测到大量事件,然后在新数据发布时零星地检测到事件。大多数 NOAA 气象站每小时发布新数据,但有些数据更新可能更频繁。

所分析的数据是美国国家海洋及大气管理局 (NOAA) 通过互联网提供的。将 Streams 应用程序部署到 Streaming Analytics 实例中后,它会持续地摄取和分析天气数据,直到该应用程序停止。

Streams 应用程序检测到事件时,会将这些事件发送到 Node.js 应用程序并显示在该应用程序的 Web UI 上。

您可修改入门应用程序的源代码,通过任何有趣的方式自定义或扩展它。

构建您的应用程序所需的准备工作

新的 developerWorks Premium 会员计划提供了一张访问强大的开发工具和资源的全通票,包括 500 篇通过 Safari Books Online 提供的顶级技术文章(超过 50 篇是专门针对 Web 开发人员的),最重要开发人员活动的大幅折扣,最新的 O'Reilly 大会的视频回放,等等。立即注册。

  • 一个 Bluemix 帐户 和一个 IBM Bluemix DevOps Services 帐户 ,二者都链接到您的 IBM ID。
  • Cloud Foundry cf 命令行接口
  • 基本了解Node.js

运行应用程序

获取代码

观看: 观看 IBM Bluemix 实战演示

第 1 步. 创建一个 Bluemix 应用程序并将其绑定到 Streaming Analytics 服务

  1. 登录 Bluemix。在仪表板中,单击 CREATE APP 按钮。系统询问要创建的应用程序类型时,选择 WEB
  2. 从目录内的可用运行时中选择 SDK for Node.js 并单击 CONTINUE 。输入一个喜欢的名称并单击 FINISH 。等待应用程序完成暂存。
  3. 返回到仪表板并单击应用程序名称。在应用程序概述页面上,单击 ADD A SERVICE OR API 按钮。
  4. 单击目录中 Data and Analytics 类别内的 Streaming Analytics 服务,然后单击 CREATE ,将它绑定到您的应用程序。出现提示时重新暂存该应用程序。

阅读: Bluemix Streaming Analytics 简介

阅读: Streaming Analytics 入门

第 2 步. 获取您自己的源代码副本

EventDetection 应用程序的代码存储在 Bluemix DevOps Services 上的 streamscloud | EventDetection 项目中。要获得您自己的代码副本,可单击本页上的 获取代码 按钮(位于构建您的应用程序所需的准备工作节的末尾),然后克隆该 Git 存储库或将该代码分解到您自己的 Bluemix DevOps Services 项目中。如果您不熟悉这些方法,可下载源代码的 ZIP 文件:

  1. 单击下载图标 检测实时数据流中的复杂事件
  2. 将该 ZIP 文件保存到本地并解压。
  3. 将其中包含已解压文件的目录重命名,使其与您在第 1 步中创建的应用程序名称相匹配。

第 3 步. 部署应用程序并查看它

现在将按原样部署该入门应用程序,不更改任何代码,然后浏览 UI。在本教程后面,您有机会自定义该应用程序并重新部署它。

  1. 在操作系统命令行中,进入其中包含已解压应用程序的目录。
  2. 连接到 Cloud Foundry 的 Bluemix 实例:

    cf api https://api.ng.bluemix.net

    cf login

  3. 部署您的应用程序:

    cf push <em>yourapp</em>

  4. 在 Bluemix 中,单击应用程序的路径打开它。您会看到一个名为 Welcome to the Event Detection Sample Application! 的基础网页: 检测实时数据流中的复杂事件

    点击查看大图

    关闭 [x]

    检测实时数据流中的复杂事件

    Application Flow部分列出了该应用程序正在执行的步骤及其状态。

    Event Types部分定义了应用程序正在检测的事件类型。

    Application Results部分显示已检测出的事件。它还会显示当前报告的最高和最低温度。

第 4 步. 探索正在运行的 Streams 应用程序

  1. 返回到 Bluemix,调出 Streaming Analytics 服务的服务仪表板。您可通过多种方式访问该仪表板,包括单击应用程序中的 Streaming Analytics 图标: 检测实时数据流中的复杂事件 Streaming Analytics 仪表板(如这里所示)提供了控制实例的各种任务以及相关信息的链接: 检测实时数据流中的复杂事件

    点击查看大图

    关闭 [x]

    检测实时数据流中的复杂事件

    您也可以使用该仪表板启动 Streaming Analytics 控制台,控制台中显示了 Streaming Analytics 实例的信息。
  2. 单击仪表板上的 LAUNCH 按钮,显示 Streaming Analytics 控制台。在这个屏幕截图中,控制台显示了一个正在运行的作业 — 正在执行复杂事件检测的 Streams 应用程序: 检测实时数据流中的复杂事件

    点击查看大图

    关闭 [x]

    检测实时数据流中的复杂事件

  3. 在下面的屏幕截图中,每个连接显示当前元组速率为 0,因为天气数据的来源是 “爆发性” 的:不会频繁地发布新数据,所以在某些时间段没有数据可处理。要在该图中看到非零的元组速率,可重新启动 Bluemix 应用程序并立即切换到该图。在 Streams 应用程序处理初始的大量数据时,您至少会在几分钟内看到非零的元组速率。

    最大化控制台右上角的 Streams Graph 窗格,以显示 Streams 应用程序的流程图。使用浏览器的缩放控件来查看整个流程图。该图显示了 Streams 应用程序的实时运行状态。可将鼠标悬停在该图上的操作符或它们之间的连接上,以获得更详细的信息。在下面的屏幕截图中,将鼠标悬停在该图中的一个连接上,会显示在该图中前两个操作符之间流动的元组数量等信息: 检测实时数据流中的复杂事件

    点击查看大图

    关闭 [x]

    检测实时数据流中的复杂事件

第 5 步. 检查 Node.js 代码

EventDetection 应用程序是一个完整但简单的应用程序,不需要任何自定义工作即可运行。要理解该应用程序,可查看它的代码:

  1. 打开 app.js 文件,查看应用程序逻辑。app.js 中的代码围绕 6 个主要步骤进行组织:
    1. 提取在使用 Streaming Analytics REST API 时所需的环境信息。
    2. 检查 Streams 实例是否正在运行,如有必要,通过 Streaming Analytics REST API 启动该实例。
    3. 如果该实例已在运行,可检查 Streams 事件检测作业是否已在运行。如果一个作业正在运行,则取消它。
    4. 使用 Streaming Analytics REST API 将一个 Streams 应用程序包部署到 Streaming Analytics 服务。该应用程序包中包含的 Streams 应用程序可分析天气数据并执行事件检测。
    5. 处理 Streams 应用程序检测到的事件并将它们显示在这个网页上。
    6. 处理 3,000 个事件后取消与 Streams 应用程序对应的作业。

去除无用代码,找出在何处执行这些步骤。本节的剩余部分将更详细地分析两个步骤。

阅读: Streaming Analytics 服务实例 REST API 参考资料

  1. 查看步骤 1(d) — 将 Streams 应用程序包部署到 Streaming Analytics 服务的代码:
    ...       // -----New form-----       var form = new FormData();        // -----File part-----       form.append('file', fs.createReadStream('EventDetection.sab'), {         contentType: 'application/octet-stream'       });        // -----JSON Part-----        jsonObject = JSON.stringify({         "jobName" :  "EventDetectionSample",         "submissionParameters" :         {             "route" : app_uri,         },       });       console.info('JSON object part: ' + jsonObject);        var buffer = new Buffer(jsonObject);       form.append('my_buffer', buffer, {         contentType: 'application/json',         // The line below is not an actual file.  The name with the .json         // extension is needed for the data in the buffer to be recognized         // as json.         "filename": "jparams.json"       });        // -----POST Params-----       var uri_string = sa_props.jobs_path + '?bundle_id=EventDetection.sab';        // -----SUBMIT POST-----       var jsonPostRes = {};        form.submit({         protocol: 'https:',         host: sa_props.rest_host,         path: uri_string,         headers: {'Authorization' : authbuf}       }, function(err, res) {          ...
    上述代码对 Streaming Analytics REST API 执行一次 HTTP POST ,使用 form-data 包提交一个多部分表单:
    • 该表单的一部分是一个名为 EventDetection.sab 的文件。.sab 文件是一个 Streams 应用程序包,是在 Streams 开发环境中编译一个 Streams 应用程序的结果。EventDetection.sab 包含了执行事件检测的 Streams 应用程序。
    • 另一部分是一个 JSON 对象,它包含应用程序包部署方面的其他信息,比如为 Streaming Analytics 实例中运行的作业所提供的名称,以及此应用程序的 submission-time 参数。Streams 应用程序仅有一个 submission-time 参数。您将该路径传递给 Node.js 应用程序,以便 Streams 应用程序可在检测到事件时将其发回到 Node.js 应用程序。
  2. 现在查看步骤 1(e) 的代码,该代码处理 Streams 应用程序检测到的事件。此代码在 app.js 文件的开头附近,而不是在步骤序列中,因为只要从 Streams 应用程序中发送了一个事件,就会异步调用它。Streams 应用程序向 Node.js 应用程序执行一次 HTTP POST,将事件发送到应用程序。下面的代码处理事件的 JSON 载荷并执行适当的操作:
    // POST handler for the events being sent back from the Streams application app.post('/', function(req, res){     status_step[4] = "Processing Events";      if (!cancelling) {       console.info("In POST function");       var jsonString = req.body.jsonString;       console.info("POST message is: " + jsonString);       var payload = JSON.parse(jsonString);            if (payload.eventType == 'MaxMin Temp') {         // Max or min temperature change         maxmin = payload;       }       else {         // Regular event         eventCount++;         console.info("Event total = " + eventCount);          // Add event to the array used by the web user interface         events.push(new Event(eventCount, payload));               // Cancel the Streams job if we've reached the event target         if (eventCount == eventTarget) {           cancelling = true;           console.info("EVENT TARGET REACHED...");           console.info("STREAMS JOB WILL BE CANCELLED.");           finalCancel(jobNumber.toString());         }       }     }     res.send(); });

第 6 步. 检查 Streams 应用程序代码

使用的 Streams 应用程序是一个完整的 Streams 应用程序,不需要任何自定义工作即可运行。您下载(或克隆或分解)的源代码中包含了该应用程序的源代码及其预先构建的 .sab 文件。要理解 Streams 应用程序,可查看它的代码:

  1. 打开 EventDetection.spl 文件(位于项目的 spl 子目录中)。该应用程序的源代码是用 SPL 编写的,这是一种面向数据流和数据处理操作符的语言。
  2. 跳过用于定位操作符声明的代码,并将其与您在 Streaming Analytic 控制台中看到的流程图相对比。本节的剩余部分将更详细地分析两个操作符。
  3. 检查可检测复杂事件的操作符的代码。下面的代码段给出了一个名为 MatchRegex 的操作符,它可检测一个流中的一系列数据元组模式。代码注释说明了该操作符将检测的 M 形状模式的性质:
    //   // The first complex event is called "M-shape".  It triggers when the graph of the    // temperature for a weather station form's an M shape over a period of time.   //   // Detecting M shape patterns in weather data is not that useful, but recognizing an M shape in   // financial trading is valuable and is referred to as a "double-top" stock pattern.   //   // See http://hirzels.com/martin/papers/debs12-cep.pdf for more information on this complex event   // detection method, the double-top pattern and other patterns.   //   stream<WeatherSummary weatherValues, rstring event> TempMEvent = MatchRegex(WeatherSummary)   {     param       pattern     : ". rise+ drop+ rise+ drop* deep";       partitionBy : stationCode;           predicates  : {  rise = tempInF>First(tempInF)  && tempInF>=Last(tempInF),  drop = tempInF>=First(tempInF) && tempInF<Last(tempInF),  deep = tempInF<First(tempInF) && tempInF<Last(tempInF) };     output       TempMEvent : weatherValues=WeatherSummary, event="M-Shape Temp";   }

    该操作符的声明使用正则表达式语法以及一组也在操作符声明中定义的谓词来定义您尝试检测的模式。该操作符根据气象站报告的值集来查找气象站温度的 M 形状。这个 MatchRegex 操作符使用之前在 SPL 代码中定义的 WeatherSummary 流,并生成一个名为 TempMEvent 的流。该操作符按气象站的 ID 将气象站的读数数据划为不同的分组,并维护检测每个气象站事件所必要的状态。

  4. 查看可将事件发回给 Node.js 应用程序的操作符序列(包含两个操作符):
    //   // Send events to the application user interface by converting them to json and HTTPPost-ing   // to the Node.js app   //   stream <rstring jsonString> JSONOutput = com.ibm.streamsx.json::TupleToJSON(OutputEvents)   {   }      () as HttpEvents = HTTPPost(JSONOutput) {     param       headerContentType : "application/json";       url : ((rstring) getSubmissionTimeValue("route"));   }
    前面代码段中的第一个操作符将一个流中的一个元组转换为 JSON 字符串。这个操作符使用之前在 SPL 代码中定义的一个名为 OutputEvents 的流并生成一个名为 JSONOutput 的流。下个操作符(名为 HTTPPost )使用 JSONOutput 流并通过 HTTP POST 将该 JSON 字符串发送到 Node.js 应用程序的路径。

第 7 步. 自定义或扩展该应用程序

现在您已熟悉了这个入门应用程序,可修改该应用程序的源代码,通过任何有趣的方式自定义或扩展它:

  • 为了让该应用程序运行更长的时间,可修改 app.js 中的 Node.js 代码,将 event_target 变量的值从 3000 改为更大的数字。
  • 要定义 Streams 应用程序可检查哪个新的复杂事件:
    1. 更新 SPL 代码,向工作流中添加另一个 MatchRegex 操作符,以检测您想寻找的一种模式。
    2. 更新 SPL 代码中新 MatchRegex 操作符后面的操作符,从而将新的事件类型发送回 Node.js 应用程序。

要修改该应用程序:

  1. 计划您的修改工作。
  2. 更改 Node.js 和/或 SPL 源代码,以反映您想要的自定义工作。
  3. 如果已修改 SPL 代码,则必须在 Streams 开发环境中重新编译它并将您下载的 .sab 文件替换为这个更新后的版本。
  4. 将修改后的应用程序部署(推送)到 Bluemix。

结束语

可在 Bluemix 中通过 Streaming Analytics 服务轻松地对实时数据流执行复杂事件检测。您在本教程中完成的应用程序可帮助您入门。您可更改想要分析的数据流,定义想要检测的事件,以及处理这些事件来实现自己的目的。

BLUEMIX SERVICES USED IN THIS TUTORIAL:

  • Streaming Analytics 让您能够在数据从实时数据来源到达时对数据进行摄取、分析、监视和关联。
  • SDK for Node.js 帮助您轻松地开发、部署和扩展服务器端 JavaScript 应用程序。

相关主题: 大数据和分析 Node.js

正文到此结束
Loading...