从 GitHub 下载 Messaging Toolkit v4.0.0 Kafka 0.9 Alpha Release 并进行解压。
在 Bluemix 上创建一个 Message Hub 服务(参阅创建 Message Hub 服务实例)。
在 Bluemix 上创建一个 Streaming Analytics 服务(参阅 Bluemix Streaming Analytics 简介中的“Finding the service(查找服务)”部分
回页首
在 Message Hub 中,消息通过主题来传输。生产者编写各种主题的消息,而消费者可以读取这些主题。
在 Bluemix 的 Message Hub 仪表板上提供了管理主题的工具。点击加号图标创建一个新主题,名字为“sampleTopic”,并保持。
图 1.在 Message Hub 上创建主题
回页首
首选,将以下的 jar 文件添加到消息工具的 opt/download 目录下:
jackson-core-2.5.4.jar
messagehub.login-1.0.0.jar
在使用该工具包之前,我们需要告诉 Streams Studio 可以在哪里找到该工具包。
右键单击“Toolkit Locations(工具包位置)”并选择“Add Toolkit Location…(添加工具包位置…)”,即可在 Streams Explorer 中添加另一个工具包
图 2.添加工具包位置
通过“Directory…(目录…)”选择工具包位置,并按下 OK(确定)。您选择的位置将出现在工具包位置列表中。
图 3.指定工具包的位置
回页首
现在我们将使用 KafkaSSLSample Streams 应用来生成和使用消息。
将 KafkaSSLSample 从所下载工具包的目录(samples/KafkaSSLSample)中导出到 Streams Studio。此时,即可成功构建应用。Streams 应用导入指令,可在“Importing SPL projects(导入 SPL 项目)”中找到。
我们还需要告知应用从哪里能够找到我们的 Message Hub 服务。
导航至 Bluemix 上 Message Hub 的控制台,点击“Service credentials”标签页,注意 kafka_sasl、user 和 password 这几项
图 4.Service credentials
更改 KafkaSSLSample/etc/consumer.properties 和 KafkaSSLSample/etc/producer.properties 文件中的粗体行,并将 Message Hub 服务凭证中以逗号隔开的 kafka_brokers_sasl 列表替换为 broker.host.1:2181,broker.host.2:2181,broker.host.3:2181。
清单1.KafkaSSLSample/etc/consumer.properties
bootstrap.servers=broker.host.1:2181,broker.host.2:2181,broker.host.3:2181 serializer.class=kafka.serializer.StringEncoder group.id=mygroup key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer group.id=test; partition.assignment.strategy=roundrobin client.id=myAPIKey security.protocol=SSL ssl.protocol=TLSv1.2 ssl.enabled.protocols=TLSv1.2 ssl.truststore.type=JKS ssl.endpoint.identification.algorithm=HTTPS
清单2.KafkaSSLSample/etc/producer.properties
bootstrap.servers=broker.host.1:2181,broker.host.2:2181,broker.host.3:2181 acks=0 key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer client.id=myAPIKey security.protocol=SSL ssl.protocol=TLSv1.2 ssl.enabled.protocols=TLSv1.2 ssl.truststore.type=JKS ssl.endpoint.identification.algorithm=HTTPS
注意: Message Hub 开始支持 SASL 加密方式,SASL 加密的端口有所改变。所以,在 Service Credentials 中可能没有 kafka_brokers_sasl 这一项内容。这时候,需要使用 kafka_brokers 来代替。
同时,还需要为 Message Hub 提供验证信息。在下面 KafkaSASLSample/etc/jaas.conf 文件的粗体信息中,分别替换 myusername 和 mypassword 为 Message Hub Service credentials 的 user 和 password 信息:
>清单 3.KafkaSASLSample/etc/jaas.conf
KafkaClient { com.ibm.messagehub.login.MessageHubLoginModule required serviceName="kafka" username="myusername" password="mypassword"; };
重要提示:在保存 producer.properties 和 consumer.properties 后,必须清理工作区域:Select Project(选择项目) -> Clean(清理)…,然后按下 OK(确认)。
KafkaSSLSample Streams 应用包含有消息发送和接收逻辑。
图 5.Kafka 消息收发逻辑图
Streams 图形 (OutputStream -> KafkaSinkOp) 中的“生产者”部分,可使用 KafkaProducer 运算符将消息发送至名为“sampleTopic”的主题(每 0.2 秒发送一次)。
“消费者”部分 (KafkaStream -> SinkOp) 可使用 KafkaConsumer 运算符对信息进行检索并打印至控制台。
构建 KafkaSSLSample 的 Distributed Build,如此便可在您的 Bluemix Streaming Analytics 服务上运行 Distributed Build。
如果您希望在 Streams Studio 中本地运行 KafkaSSLSample,请参阅 Lanuching a Main composite(启动主复合)。
若要查看本地运行的 Streams 图形,请参阅 Viewing the instance graph for your application(查看应用的实例图)。
回页首
通过构建 KafkaSSLSample,可在您的工作区目录中创建一个名为 workspace/KafkaSSLSample/output/com.ibm.streamsx.messaging.sample.kafka.KafkaSSLSample/Distributed/com.ibm.streamsx.messaging.sample.kafka.KafkaSSLSample.sab 的文件。它包括了所有 Streaming Analytics 服务运行 Streams 应用的信息。
您可使用 Streaming Analytics 控制台上传该文件。
点击 Bluemix 中的 Streaming Analytics 服务仪表盘,并单击“Launch(启动)”启动 Streams 工作台。
单击控制台右上角“play icon(播放图标)”下拉菜单中的“Submit job(提交任务)”选项
图 6.提交任务信息按钮
浏览您构建的 com.ibm.streamsx.messaging.sample.kafka.KafkaSSLSample.sab 文件,并单击 Submit(提交)。
图 7.提交任务信息
如果 Streams 控制台的图形视图显示:所有的运算符均处于健康状态(绿色圆圈),而且在每个 Stream 约达到 5 元组/秒的流动量,则表示 Streams 应用运行正常。
图 8.任务运行状态
您还可以在 Streams 日志中查看由 SinkOp 打印的消息。
导航至最左边的 Streams 控制台日志查看器。
站看导航树,并高亮带有 SinkOp 运算符的 PE。
选择“Console Log(控制台日志)”选项卡。
单击“Load console messages(负载控制台消息)”。
图9.日志信息
回页首
本文介绍了 Streams 如何使用消息工具包中的运算符发送和接收消息。这种功能如果与其他服务结合使用,将会产生更好的效果。通过 Message Hub,Streams 将能够与其他同样与 Message Hub 通讯的服务或应用进行通讯。