本示例接上一个案例,其中有发送消息的案例,这里只是消费者举例,我们将从Tweets my-kafka-streams-topic中读取流,创建一个以#标签为值的新中间流,将其转换存都KTable,包含每个#标签的数量,然后将其发布到topic my-kafka-streams-out2。这是一个Ktable。
我们向kafkaprocessingplanet模块添加了一个新的REST端点。我们将阅读同一主题my-kafka-streams-topic,并将其进行转换,实现主题标签作为key和计数作为值,发布消息到my-kafka-streams-hashtagcount-output。我们假定消息中始终有一个#号标签,并且它在消息的末尾出现,只是为了使事情简单。
Kafka属性与第一个示例相同,不同之处在于我们确实提供了key的序列化/反序列化的配置,因为我们将发布一个以String作为key的主题。否则,您将遇到ClassCastException。
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
首先,我们将创建一个新标签流,该流将仅包含#主题标签作为值。这可以通过以下方式实现:
KStream<Object, String> hashTags = source.flatMapValues(new ValueMapper<String, Iterable>() { @Override public Iterable apply(String value) { return Arrays.asList(value.substring(value.indexOf("#"))); } });
为了简洁起见,我们将其转换为lambda:
KStream<Object, String> hashTags = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#"))));
我们有一个流,其中没有键key和#主题标签作为值。为了知道某个标签在流中出现多少次,我们需要每个标签的记录。因此,我们创建一个KGroupedStream带有标签作为键key和标签为值。将KeyValueMapper用于创建keyy。我们将原始键和值作为输入参数提供给apply方法,并将新键作为返回String。
KGroupedStream<String, String> hashTags = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#")))) .groupBy(new KeyValueMapper<Object, String, String>() { @Override public String apply(Object key, String value) { return value; } });
同样,我们将其转换为lambda:
KGroupedStream<String, String> hashTags = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#")))) .groupBy((key, value) -> value)
接下来,我们调用的KGroupedStream的count方法,将为KTable counts我们创建一个标签,标签为键,计数为值。
KTable<String, Long> counts = source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#")))) .groupBy((key, value) -> value) .count();
现在我们唯一需要做的就是创建一个流,并将其发布到主题my-kafka-streams-hashtagcount-output。
source.flatMapValues((ValueMapper<String, Iterable>) value -> Arrays.asList(value.substring(value.indexOf("#")))) .groupBy((key, value) -> value) .count() .toStream() .to("my-kafka-streams-hashtagcount-output", Produced.with(Serdes.String(), Serdes.Long()));
最后,我们需要添加以下内容以启动流:
final Topology topology = builder.build(); streams2 = new KafkaStreams(topology, props); streams2.start();
输出主题必须配置为启用日志压缩。请参阅 官方文档 或 简短说明 以了解日志压缩。日志压缩可确保将多个具有相同键的记录发布到某个主题时,仅保留最新版本。仅最后一个版本有用,如果未启用日志压缩,则旧版本将消耗不必要的资源。启用日志压缩后,Kafka将清除较旧的版本。有了这些信息,我们可以创建输出主题:
$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic my-kafka-streams-hashtagcount-output --config "cleanup.policy=compact"
测试所有内容了。像以前一样启动Producer和Processor Spring Boot应用程序。
过调用URL http:// localhost:8081 / sendMessages / 将消息从生产者发送到my-kafka-streams-topic;通过调用URL http:// localhost:8082 / startProcessing2 /来 启动流处理器。
使用一些额外的属性启动主题的使用者,以便将数据正确打印到控制台。我们注意到每隔30秒就会有新结果打印到控制台。
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-kafka-streams-hashtagcount-output --property print.key=true --value-deserializer=org.apache.kafka.common.serialization.LongDeserializer #italy 85 #latin 84 #cicero 100 #caesar 87
除了使用count方法之外,我们还可以使用以下方法:
count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
在这种情况下,结果将存储在可以查询的状态存储中。如何查询这些状态存储可以在 这里 找到。
帖子中使用的资源当然可以在 GitHub上找到 。