为了从主题中读取无限制的数据流,我们需要创建一个小型应用程序,以发送无限制的数据流。我们模拟一条Tweet流,在Tweet末尾恰好有一个标签。每秒都会在该主题上发布一条消息。Tweets始终包含相同的消息(Lorem ipsum…),主题标签是从5个主题标签的固定列表中随机选择的。
@RestController public class KafkaProducerController { private static final String loremIpsum = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor " + "incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco " + "laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit " + "esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa " + "qui officia deserunt mollit anim id est laborum."; private static final String[] hashTags = {"latin", "italy", "roman", "caesar", "cicero"}; private Random randomNumber = new Random(); private String randomMessage; @RequestMapping("/sendMessages/") public void sendMessages() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); try { while (true) { // Every second send a message try { Thread.sleep(1000); } catch (InterruptedException e) {} randomMessage = loremIpsum + " #" + hashTags[randomNumber.nextInt(hashTags.length)]; producer.send(new ProducerRecord<String, String>("my-kafka-streams-topic", null, randomMessage)); } } finally { producer.close(); } } }
变量loremIpsum是固定消息,变量hashTags是主题标签的固定列表。在该sendMessages方法中,我们首先定义一些Kafka属性。在第一个try块中,无休止的循环每秒向主题发送一条带有随机主题标签(变量randomMessage)的消息。my-kafka-streams-topic。我们只为该主题提供一个值,而不是一个键,因为该主题中的键与我们无关。
最后要做的是在Kafka中创建主题。我们通过Kafka的bin目录中的kafka-topics.sh 脚本来执行此操作 :
$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-streams-topic
可以通过调用URL http:// localhost:8081 / sendMessages / 来启动数据流。
现在我们已经做好了所有准备,现在该开始第一个流媒体示例了。我们将从中读取Tweets my-kafka-streams-topic,使用主题标签过滤Tweets #latin并将其发布到主题my-kafka-streams-out1。
我们创建一个Spring Boot应用程序(mykafkaprocessingplanet多模块Maven项目中的模块),并将以下依赖项添加到pom中:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency> </dependencies>
可以看出,除了kafka-streams依赖之外,我们还必须添加kafka-clients依赖。如果不添加最后一个依赖项,则在创建时会遇到以下错误KStream:
java.lang.NoSuchMethodError: org.apache.kafka.clients.admin.AdminClientConfig
与生产者一样,我们确保可以在调用URL之后开始处理。这URL的KafkaProcessingController:
private KafkaStreams streams1; @RequestMapping("/startProcessing1/") public void startProcessing1() { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "processor1"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); //魔术关键行 builder.stream("my-kafka-streams-topic").filter((key, value) -> ((String) value).endsWith("#latin")).to("my-kafka-streams-out1"); final Topology topology = builder.build(); streams1 = new KafkaStreams(topology, props); streams1.start(); }
与往常一样,首先我们需要定义属性以连接到主题。注意,我们没有覆盖默认键Serializer / Deserializer,因为我们没有为主题中的消息提供键。注意魔术关键行,我们从my-kafka-streams-topic中读取流,过滤以结“#latin”结尾的消息,并将过滤后的消息发送到主题my-kafka-streams-out1
最后要做的是创建输出主题:
$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-streams-out1
是时候把所有东西放在一起了。通过从mykafkaproducerplanet目录调用以下命令来启动生产者:
$ mvn spring-boot:run
执行以下命令来启动Consumer :
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-streams-out1 --from-beginning
通过调用URL http:// localhost:8081 / sendMessages / 将消息从生产者发送到my-kafka-streams-topic,通过调用URL http:// localhost:8082 / startProcessing1 / 实现流转发处理;可以通过调用URL http:// localhost:8082 / stop 来停止流处理。
帖子中使用的资源当然可以在 GitHub上找到 。点击标题见原文