转载

Azkarra Streams简介:Apache Kafka Streams的第一个微框架

Kafka Streams是一个功能强大的库,用于在Apache Kafka之上构建复杂的流应用程序。随着时间的推移,以及经过多个项目之后,我们发现自己正在编写相同的代码来在生产环境中运行和与Kafka Streams应用程序交互。

我们坚信,基于Kafka Streams的简单微服务的开发应该花费几天而不是几周的时间,而最少的功能才能安全地投入生产。

因此,我们决定构建自己的框架,以简化Kafka Streams应用程序的开发和操作。

今天,我们很高兴宣布推出 Azkarra Streams ,这是一个新的开源微型Java框架,可让您集中精力编写Kafka Streams拓扑代码,而不是执行它们所需的样板代码。

主要特征

Azkarra Streams提供了一组功能,可以快速调试和构建可用于生产环境的Kafka Streams应用程序。其中包括:

  • Kafka Streams实例的生命周期管理(不再需要KafkaStreams#start())。
  • 轻松外部化拓扑和Kafka Streams配置(使用 Typesafe Config )。
  • 嵌入式HTTP服务器,用于查询状态存储(Undertow)。
  • HTTP端点,用于监视流应用程序指标(例如:JSON,Prometheus)。
  • 用于拓扑可视化的Web UI。
  • 使用SSL或基本身份验证的加密和身份验证。
  • 等等。

为什么选择Azkarra

在编写第一个Azkarra应用程序之前,让我们花一些时间描述标准Kafka Streams应用程序的不同部分,以便更好地了解Azkarra的好处。

为此,我们将使用著名的单词计数示例,该示例可在 Kafka Streams 官方 文档中找到 。

首先,我们必须声明并构建一个Topology。

StreamsBuilder builder = <b>new</b> StreamsBuilder();
KStream<String, String> textLines = builder
 .stream(<font>"streams-plaintext-input"</font><font>);

 textLines.flatMapValues(value ->
  Arrays.asList(value.toLowerCase().split(</font><font>"//W+"</font><font>))
 )
.groupBy((key, value) -> value)
.count(Materialized.as(</font><font>"WordCount"</font><font>))
.toStream()
.to(
  </font><font>"streams-wordcount-output"</font><font>,       
  Produced.with(Serdes.String(), Serdes.Long())
);
Topology topology = builder.build();
</font>

定义应用配置:

Properties props = <b>new</b> Properties();
props.put(APPLICATION_ID_CONFIG, <font>"streams-word-count"</font><font>);
props.put(BOOTSTRAP_SERVERS_CONFIG, </font><font>"localhost:9092"</font><font>);
props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
</font>

创建KafkaStreams 实例:

<b>final</b> KafkaStreams streams = <b>new</b> KafkaStreams(topology, props);

最后,我们必须管理Kafka Streams应用程序的运行部分。这意味着,启动KafkaStreams实例并使用关闭挂钩管理应用程序的干净关闭。

<b>final</b> CountDownLatch latch = <b>new</b> CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(
<b>new</b> Thread(<font>"streams-shutdown-hook"</font><font>) {
  @Override
  <b>public</b> <b>void</b> run() {
    streams.close();
    latch.countDown();
  }
});

  <b>try</b> {
     streams.start();
     latch.await();
  } <b>catch</b> (Throwable e) {
     System.exit(1);
  }
  System.exit(0);
</font>

这是完整的代码: GitHub Gist 。

当然,流应用程序很少那么简单。例如,您将需要添加错误处理并监视Kafka Streams实例的状态。此外,您可能希望使用交互式查询等查询内部商店。

最后但并非最不重要的一点是,有时您必须处理一些问题(例如: https :  //issues.apache.org/jira/browse/KAFKA-7380 )。

但是现实是,作为开发人员,您应该始终将开发工作重点放在拓扑的定义和优化上。原因很简单-这是为您的业务创造价值的部分。

Azkarra的第一步

我们希望解决的第一个方面是将构建Topology与执行之间的关注点分离。确实,我们认为创建和启动新KafkaStreams实例不应由开发人员直接管理。

因此,让我们使用Azkarra API重写WordCount示例。

首先,我们将使用Azkarra Streams Maven原型创建一个简单的项目结构。您可以运行以下命令:

$ mvn archetype:generate -DarchetypeGroupId=io.streamthoughts /
-DarchetypeArtifactId=azkarra-quickstart-java /
-DarchetypeVersion=0.3 /
-DgroupId=azkarra.streams /
-DartifactId=azkarra-getting-started /
-Dversion=1.0-SNAPSHOT /
-Dpackage=azkarra /
-DinteractiveMode=false

在pom.xml已经包含Azkarra流和卡夫卡流的依赖关系:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>io.streamthoughts</groupId>
        <artifactId>azkarra-streams</artifactId>
        <version>0.3</version>
    </dependency>
</dependencies>

使用您喜欢的IDE或编辑器,打开Maven项目。创建一个新文件src/main/java/azkarra,该文件具有基本的java main(String[] args)方法和如下的拓扑定义:

<b>package</b> azkarra;
@AzkarraStreamsApplication
<b>public</b> <b>class</b> SimpleStreamsApp {

    <b>public</b> <b>static</b> <b>void</b> main(String[] args) {
        AzkarraApplication.run(SimpleStreamsApp.<b>class</b>, args);
    }

    @Component
    <b>public</b> <b>static</b> <b>class</b> WordCountTopology implements TopologyProvider {

        @Override
        <b>public</b> Topology get() {
            StreamsBuilder builder = <b>new</b> StreamsBuilder();
            KStream<String, String> textLines = 
                          builder.stream(<font>"streams-plaintext-input"</font><font>);

            textLines
                .flatMapValues(value ->
                    Arrays.asList(value.toLowerCase().split(</font><font>"//W+"</font><font>))
                )
                .groupBy((key, value) -> value)
                .count(Materialized.as(</font><font>"WordCount"</font><font>))
                .toStream()
                .to(
                   </font><font>"streams-wordcount-output"</font><font>,
                   Produced.with(Serdes.String(), Serdes.Long())
                );
            <b>return</b> builder.build();
        }

        @Override
        <b>public</b> String version() {
            <b>return</b> </font><font>"1.0"</font><font>;
        }
    }
}
</font>

如您所见,我们仅实现了一个名为的接口以TopologyProvider构建Topology。Azkarra Streams负责自动配置和启动所谓的KafkaStreams实例。

接下来,我们必须配置我们的应用程序。我们将创建一个名为一个简单的文件application.conf中src/main/resources/application.conf包含以下代码:

注意:您也可以将文件保留在项目中。

azkarra {
  context {
    streams {
      bootstrap.servers = <font>"localhost:9092"</font><font>
      <b>default</b>.key.serde = </font><font>"org.apache.kafka.common.serialization.Serdes$StringSerde"</font><font>
      <b>default</b>.value.serde = </font><font>"org.apache.kafka.common.serialization.Serdes$StringSerde"</font><font>
    }
  }
}
</font>

恭喜你!您使用Azkarra编了写第一个流应用程序。

在Docker上运行您的应用

为了执行我们的应用程序,我们必须首先启动一个Kafka集群。为此,我们将使用 Confluent.Inc 维护的官方 Kafka Docker映像 。

要启动单节点Kafka群集,请运行docker-compose.yml项目中包含的文件。

$ cd azkarra-getting-started 
$ docker -compose up -d

然后,创建拓扑使用的两个主题(源,接收器)。为此,您可以运行提供的脚本:

$ chmod u + x quickstart-create-wordcount-topics.sh 
$ ./ quickstart-create-wordcount-topics.sh

最后,我们将打包并运行Maven项目:

$ mvn clean <b>package</b> && java -jar target / azkarra-quickstart-java-0.3.jar

要验证,您的流应用程序正在运行,请检查运行状况终结点:

$ curl -sX GET'http :<font><i>// localhost:8080 / health' | grep'UP'</i></font><font>
</font>

最后,让我们向Kafka主题发送一些消息streams-plaintext-input:

$ docker exec -it azkarra-cp-broker /usr/bin/kafka-console-producer --topic streams-plaintext-input --broker-list kafka:9092
Azkarra Streams
WordCount
I Heart Logs   
Kafka Streams
Making Sense of Stream Processing
消费

streams-wordcount-output 的输出:

$ docker exec -it azkarra-cp-broker /usr/bin/kafka-console-consumer --from-beginning --property print.key=<b>true</b> --property key.separator=<font>"-"</font><font> --topic streams-wordcount-output --bootstrap-server kafka:9092
</font>

嵌入式HTTP服务器

Azkarra的主要功能之一是嵌入式Web服务器,它公开了用于管理和监视本地流应用程序的端点

例如,您可以列出本地运行的流实例(即:在JVM中执行的实例)。

$ curl -sX GET http:<font><i>//localhost:8080/api/v1/streams | jq .</i></font><font>
<p>[
 </font><font>"word-count-topology-1–0"</font><font>
]
</font>

获得指定流信息:

$ curl -sX GET http:<font><i>//localhost:8080/api/v1/streams/word-count-topology-1-0/ | jq . </i></font><font>
{
 </font><font>"id"</font><font>: </font><font>"word-count-topology-1–0"</font><font>,
 </font><font>"since"</font><font>: </font><font>"2019–11–26T13:48:17.35+01:00[Europe/Paris]"</font><font>,
 </font><font>"name"</font><font>: </font><font>"WordCountTopology"</font><font>,
 </font><font>"version"</font><font>: </font><font>"1.0"</font><font>,
 </font><font>"state"</font><font>: {
 </font><font>"state"</font><font>: </font><font>"RUNNING"</font><font>,
 </font><font>"since"</font><font>: </font><font>"2019–11–26T13:48:18.772+01:00[Europe/Paris]"</font><font>
 }
}
</font>

最后输出流应用到Prometheus格式:

$ curl -sX GET ‘http:<font><i>//localhost:8080/api/v1/streams/word-count-topology-1-0/metrics?format=prometheus'</i></font><font>
# HELP streams_incoming_byte_rate The number of incoming bytes per second
# TYPE streams_incoming_byte_rate counter
streams_incoming_byte_rate{group=”admin-client-node-metrics”,id=”word-count-topology-1–0</font><font>",client_id=”word-count-topology-1–0-e04c076a-7d46–4bdc-9bc2–14d181370762-admin”,node_id=”node — 1"</font><font>,} 0.0
# HELP streams_incoming_byte_total The total number of incoming bytes
# TYPE streams_incoming_byte_total counter
streams_incoming_byte_total{group=”admin-client-node-metrics”,id=”word-count-topology-1–0</font><font>",client_id=”word-count-topology-1–0-e04c076a-7d46–4bdc-9bc2–14d181370762-admin”,node_id=”node-1"</font><font>,} 1066.0
</font>

Azkarra WebUI

Azkarra Streams的另一个很酷的功能是: http:// localhost:8080/ui 上提供了默认的嵌入式Web UI,可用于管理流应用程序。

Azkarra WebUI最初是为了促进开发而设计的,但很快发展成为一个小型管理界面。

例如,您可以停止,使用“可用操作”按钮重新启动流应用程序,浏览指标,配置等。

Azkarra WebUI还附带了用于流拓扑的简单DAG表示。

互动查询

最后,Kafka Streams具有强大的机制来查询stream应用程序实现的状态。通常,作为开发人员,我们构建HTTP端点以使用公共Kafka Streams API公开这些状态。

Azkarra Streams为此提供了一个默认终结点,可以通过Azkarra WebUI直接访问该终结点。

原文  https://www.jdon.com/53430
正文到此结束
Loading...