Apache Kafka当然是最常用的JMS代理,它有Apache Hadoop等分布式系统用于数据入口。与其他JMS代理相比,Apache Kafka的关键特性(从我的观点来看):
让我们看看如何将Apache Kafka与Spring Integration结合使用。我们将构建将消息生成到Kafka主题的简单演示。
卡夫卡安装和启动
您需要启动Apache ZooKeeper:
$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties
启动Kafka:
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
创建一个主题topic:
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic
列出Kafka创建的主题:
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181
基于Spring Integration构建Apache Kafka生成器
我们将构建简单的Spring Integration应用程序发布消息到Apache Kafka作为程序参数输入。
首先,让我们创建Spring Integration基础架构:
<<b>int</b>:channel id=<font>"inputToKafka"</font><font>> <<b>int</b>:queue/> </<b>int</b>:channel> <<b>int</b>-kafka:outbound-channel-adapter id=</font><font>"kafkaOutboundChannelAdapter"</font><font> kafka-producer-context-ref=</font><font>"kafkaProducerContext"</font><font> channel=</font><font>"inputToKafka"</font><font>> <<b>int</b>:poller fixed-delay=</font><font>"1000"</font><font> time-unit=</font><font>"MILLISECONDS"</font><font> receive-timeout=</font><font>"0"</font><font> task-executor=</font><font>"taskExecutor"</font><font>/> </<b>int</b>-kafka:outbound-channel-adapter> <task:executor id=</font><font>"taskExecutor"</font><font> pool-size=</font><font>"5"</font><font> keep-alive=</font><font>"120"</font><font> queue-capacity=</font><font>"500"</font><font>/> </font>
在这个xml配置,我们创建了队列通道 “inputToKafka”,我们将推送消息到这个通道。ID为“ kafkaOutboundChannelAdapter ”的Bean 是一个出站通道适配器,具有已定义的异步轮询,执行从“inputToKafka”通道读取消息并将其推送到Apache Kafka。
现在Apache Kafka生产者配置:
<bean id=<font>"kafkaStringSerializer"</font><font> <b>class</b>=</font><font>"org.apache.kafka.common.serialization.StringSerializer"</font><font> /> <<b>int</b>-kafka:producer-context id=</font><font>"kafkaProducerContext"</font><font>> <<b>int</b>-kafka:producer-configurations> <<b>int</b>-kafka:producer-configuration broker-list=</font><font>"localhost:9092"</font><font> topic=</font><font>"test_topic"</font><font> key-<b>class</b>-type=</font><font>"java.lang.String"</font><font> value-<b>class</b>-type=</font><font>"java.lang.String"</font><font> key-serializer=</font><font>"kafkaStringSerializer"</font><font> value-serializer=</font><font>"kafkaStringSerializer"</font><font> /> </<b>int</b>-kafka:producer-configurations> </<b>int</b>-kafka:producer-context> </font>
我们来看看生产者参数:
#Apache Kafka broker cluster. Cluster where we're going to be publishing messages. Let's go with <b>default</b> configuration. broker-list=<font>"localhost:9092"</font><font> </font>
#Name of topic <b>for</b> publishing messages. topic=<font>"test_topic"</font><font> #Type of the optional key associated with the message key-<b>class</b>-type=</font><font>"java.lang.String"</font><font> #Type of value sent in the message. value-<b>class</b>-type=</font><font>"java.lang.String"</font><font> #Reference to the key serializer, all keys and values has to be serialized before sending into Apache Kafka. key-serializer=</font><font>"kafkaStringSerializer"</font><font> #Same and key serializer. value-serializer=</font><font>"kafkaStringSerializer"</font><font> </font>
启动消息流
要启动消息流,也就是消息生产者,我们将简单地创建SpringBoot CommandLineRunner,将命令行参数传递到提到的队列通道中:
@Component @DependsOn(value=<font>"kafkaOutboundChannelAdapter"</font><font>) <b>public</b> <b>class</b> MessageRunner implements CommandLineRunner { @Resource(name = </font><font>"inputToKafka"</font><font>) <b>private</b> MessageChannel messageChannel; @Override <b>public</b> <b>void</b> run(String... args) throws Exception { <b>for</b> (String arg1 : args) { messageChannel.send( <b>new</b> GenericMessage<String>(arg1) ); } } } </font>
如何测试应用程序
在maven应用程序的根目录中,运行:
mvn clean install
成功编译后,运行:
java -jar target / demo-0.0.1-SNAPSHOT.jar Test1 Test2 Test3
这将创建作为参数传递的String消息,并将它们推送到Apache Kafka代理中的test_topic中。
现在,如果您运行Apache Kafka消费者:
$KAFKA_HOME/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic --from-beginning
会得到:
$ kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_topic --from-beginning Test1 Test2 Test3
太棒了,我们在Apache Kafka中创建了基于Spring Integration的简单消息生成器!