转载

Streaming Analytics + Message Hub 入门介绍

设置

  • 从 GitHub 下载 Messaging Toolkit v4.0.0 Kafka 0.9 Alpha Release 并进行解压。

  • 在 Bluemix 上创建一个 Message Hub 服务(参阅创建 Message Hub 服务实例)。

  • 在 Bluemix 上创建一个 Streaming Analytics 服务(参阅 Bluemix Streaming Analytics 简介中的“Finding the service(查找服务)”部分

回页首

创建一个主题

在 Message Hub 中,消息通过主题来传输。生产者编写各种主题的消息,而消费者可以读取这些主题。

在 Bluemix 的 Message Hub 仪表板上提供了管理主题的工具。点击加号图标创建一个新主题,名字为“sampleTopic”,并保持。

图 1.在 Message Hub 上创建主题

Streaming Analytics + Message Hub 入门介绍

回页首

安装消息工具包

首选,将以下的 jar 文件添加到消息工具的 opt/download 目录下:

  • jackson-core-2.5.4.jar

  • messagehub.login-1.0.0.jar

在使用该工具包之前,我们需要告诉 Streams Studio 可以在哪里找到该工具包。

右键单击“Toolkit Locations(工具包位置)”并选择“Add Toolkit Location…(添加工具包位置…)”,即可在 Streams Explorer 中添加另一个工具包

图 2.添加工具包位置

Streaming Analytics + Message Hub 入门介绍

通过“Directory…(目录…)”选择工具包位置,并按下 OK(确定)。您选择的位置将出现在工具包位置列表中。

图 3.指定工具包的位置

Streaming Analytics + Message Hub 入门介绍

回页首

使用 Streams 生成和使用消息

现在我们将使用 KafkaSSLSample Streams 应用来生成和使用消息。

将 KafkaSSLSample 从所下载工具包的目录(samples/KafkaSSLSample)中导出到 Streams Studio。此时,即可成功构建应用。Streams 应用导入指令,可在“Importing SPL projects(导入 SPL 项目)”中找到。

我们还需要告知应用从哪里能够找到我们的 Message Hub 服务。

  1. 导航至 Bluemix 上 Message Hub 的控制台,点击“Service credentials”标签页,注意 kafka_sasl、user 和 password 这几项

图 4.Service credentials

Streaming Analytics + Message Hub 入门介绍
  1. 更改 KafkaSSLSample/etc/consumer.properties 和 KafkaSSLSample/etc/producer.properties 文件中的粗体行,并将 Message Hub 服务凭证中以逗号隔开的 kafka_brokers_sasl 列表替换为 broker.host.1:2181,broker.host.2:2181,broker.host.3:2181。

清单1.KafkaSSLSample/etc/consumer.properties

bootstrap.servers=broker.host.1:2181,broker.host.2:2181,broker.host.3:2181 serializer.class=kafka.serializer.StringEncoder group.id=mygroup key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer group.id=test; partition.assignment.strategy=roundrobin client.id=myAPIKey security.protocol=SSL ssl.protocol=TLSv1.2 ssl.enabled.protocols=TLSv1.2 ssl.truststore.type=JKS ssl.endpoint.identification.algorithm=HTTPS

清单2.KafkaSSLSample/etc/producer.properties

bootstrap.servers=broker.host.1:2181,broker.host.2:2181,broker.host.3:2181 acks=0 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer client.id=myAPIKey security.protocol=SSL ssl.protocol=TLSv1.2 ssl.enabled.protocols=TLSv1.2 ssl.truststore.type=JKS ssl.endpoint.identification.algorithm=HTTPS

注意: Message Hub 开始支持 SASL 加密方式,SASL 加密的端口有所改变。所以,在 Service Credentials 中可能没有 kafka_brokers_sasl 这一项内容。这时候,需要使用 kafka_brokers 来代替。

同时,还需要为 Message Hub 提供验证信息。在下面 KafkaSASLSample/etc/jaas.conf 文件的粗体信息中,分别替换 myusername 和 mypassword 为 Message Hub Service credentials 的 user 和 password 信息:

>清单 3.KafkaSASLSample/etc/jaas.conf

KafkaClient { com.ibm.messagehub.login.MessageHubLoginModule required serviceName="kafka" username="myusername" password="mypassword"; };

重要提示:在保存 producer.properties 和 consumer.properties 后,必须清理工作区域:Select Project(选择项目) -> Clean(清理)…,然后按下 OK(确认)。

KafkaSSLSample Streams 应用包含有消息发送和接收逻辑。

图 5.Kafka 消息收发逻辑图

Streaming Analytics + Message Hub 入门介绍
  • Streams 图形 (OutputStream -> KafkaSinkOp) 中的“生产者”部分,可使用 KafkaProducer 运算符将消息发送至名为“sampleTopic”的主题(每 0.2 秒发送一次)。

  • “消费者”部分 (KafkaStream -> SinkOp) 可使用 KafkaConsumer 运算符对信息进行检索并打印至控制台。

构建 KafkaSSLSample 的 Distributed Build,如此便可在您的 Bluemix Streaming Analytics 服务上运行 Distributed Build。

如果您希望在 Streams Studio 中本地运行 KafkaSSLSample,请参阅 Lanuching a Main composite(启动主复合)。

若要查看本地运行的 Streams 图形,请参阅 Viewing the instance graph for your application(查看应用的实例图)。

回页首

云中的 Streams 和 Message Hub

通过构建 KafkaSSLSample,可在您的工作区目录中创建一个名为 workspace/KafkaSSLSample/output/com.ibm.streamsx.messaging.sample.kafka.KafkaSSLSample/Distributed/com.ibm.streamsx.messaging.sample.kafka.KafkaSSLSample.sab 的文件。它包括了所有 Streaming Analytics 服务运行 Streams 应用的信息。

您可使用 Streaming Analytics 控制台上传该文件。

  1. 点击 Bluemix 中的 Streaming Analytics 服务仪表盘,并单击“Launch(启动)”启动 Streams 工作台。

  2. 单击控制台右上角“play icon(播放图标)”下拉菜单中的“Submit job(提交任务)”选项

图 6.提交任务信息按钮

Streaming Analytics + Message Hub 入门介绍
  1. 浏览您构建的 com.ibm.streamsx.messaging.sample.kafka.KafkaSSLSample.sab 文件,并单击 Submit(提交)。

图 7.提交任务信息

Streaming Analytics + Message Hub 入门介绍

如果 Streams 控制台的图形视图显示:所有的运算符均处于健康状态(绿色圆圈),而且在每个 Stream 约达到 5 元组/秒的流动量,则表示 Streams 应用运行正常。

图 8.任务运行状态

Streaming Analytics + Message Hub 入门介绍

您还可以在 Streams 日志中查看由 SinkOp 打印的消息。

  • 导航至最左边的 Streams 控制台日志查看器。

  • 站看导航树,并高亮带有 SinkOp 运算符的 PE。

  • 选择“Console Log(控制台日志)”选项卡。

  • 单击“Load console messages(负载控制台消息)”。

图9.日志信息

Streaming Analytics + Message Hub 入门介绍

回页首

结论

本文介绍了 Streams 如何使用消息工具包中的运算符发送和接收消息。这种功能如果与其他服务结合使用,将会产生更好的效果。通过 Message Hub,Streams 将能够与其他同样与 Message Hub 通讯的服务或应用进行通讯。

原文  http://www.ibm.com/developerworks/cn/cloud/blogs/streaming-analytics-message-hub/index.html?ca=drs-
正文到此结束
Loading...