在这篇文章中,我们将介绍如何使用Spring Cloud Stream和Kafka构建实时流式微服务应用程序。本示例项目演示了如何使用事件驱动的体系结构,Spring Boot,Spring Cloud Stream,Apache Kafka和Lombok构建 实时流 应用程序。
在本教程中,我们开发一个简单的基于Spring Boot的Greetings微服务,功能包括:
在 这里 找到源代码
Spring Cloud Stream是一个基于Spring Boot用于构建消息驱动的微服务的框架。
Kafka是一个最初由LinkedIn开发的、流行的高性能和水平可扩展的消息传递平台。
从 这里 下载Kafka 并解开它:
> tar -xzf kafka_2.11-1.0.0.tgz
> cd kafka_2.11-1.0.0
启动Zookeeper和Kafka
在Windows上:
> bin/windows/zookeeper-server-start.bat config/zookeeper.properties
> bin/windows/kafka-server-start.bat config/server.properties
在Linux或Mac上:
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
如果Kafka未运行且计算机从休眠状态唤醒后无法启动,请删除该<TMP_DIR>/kafka-logs文件夹,然后再次启动Kafka。
Lombok是一个Java框架,可以在代码中自动生成getter,setter, toString(), builders, loggers, 等方法代码。
转到 https://start.spring.io 以创建Maven项目:
注意pom.xml文件中的maven依赖项:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <!-- 还要在IDE中安装Lombok插件 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- 热重新加载 - 在应用程序运行时更改代码后,在IntelliJ中按Ctrl + F9 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency>
public interface GreetingsStreams { String INPUT = "greetings-in"; String OUTPUT = "greetings-out"; @Input(INPUT) SubscribableChannel inboundGreetings(); @Output(OUTPUT) MessageChannel outboundGreetings(); }
为了使我们的应用程序能够与Kafka通信,我们需要定义一个出站流来将消息写入Kafka主题,并使用入站流来读取来自Kafka主题的消息。
Spring Cloud提供了一种方便的方法:可以简单地创建为每个流定义单独的方法接口。
inboundGreetings()方法定义要从Kafka读取的入站流,并且outboundGreetings()方法定义要写入Kafka的出站流。
在运行时,Spring将创建一个基于Java代理的GreetingsStreams接口实现,可以在代码中的任何位置注入Spring Bean来访问我们的两个流。
我们的下一步是将Spring Cloud Stream绑定到GreetingsStreams接口中的流。这可以通过使用以下代码创建一个StreamsConfig类来完成:
@EnableBinding(GreetingsStreams.class) public class StreamsConfig { }
使用传递接口的@EnableBinding注释来完成绑定到流GreetingsService(见下文)。
默认情况下,配置属性存储在src/main/resources/application.properties文件中。
但是,我更喜欢使用YAML格式,因为它不那么详细,并且允许在同一文件中保留公共和特定于环境的属性。
现在,让我们重命名application.properties到application.yaml和粘贴下面的配置片断到文件:
spring: cloud: stream: kafka: binder: brokers: localhost:9092 bindings: greetings-in: destination: greetings contentType: application/json greetings-out: destination: greetings contentType: application/json
上面的配置属性配置了要连接的Kafka服务器的地址,以及我们在代码中用于入站和出站流的Kafka主题。他们都必须使用相同的Kafka主题greetings!contentType属性告诉Spring Cloud Stream在流中发送/接收我们的消息对象String。
Greetings使用下面的代码创建一个简单的类,代表我们读取的消息对象并写入greetingsKafka主题:
ort lombok.Builder; import lombok.Getter; import lombok.Setter; import lombok.ToString; @Getter @Setter @ToString @Builder public class Greetings { private long timestamp; private String message; public Greetings(long timestamp, String message) { this.timestamp = timestamp; this.message = message; } public Greetings() { } }
请注意,由于使用了Lombok注释,该类没有任何getter和setter方法。该@ToString会生成一个toString()使用类的字段和方法,@Builder的注释将允许我们创建Greetings使用流利的建设者builder (见下文)的对象。
让我们使用下面的代码创建GreetingsService类,该代码将Greetings对象写入greetingsKafka主题:
@Service @Slf4j public class GreetingsService { private final GreetingsStreams greetingsStreams; public GreetingsService(GreetingsStreams greetingsStreams) { this.greetingsStreams = greetingsStreams; } public void sendGreeting(final Greetings greetings) { log.info("Sending greetings {}", greetings); MessageChannel messageChannel = greetingsStreams.outboundGreetings(); messageChannel.send(MessageBuilder .withPayload(greetings) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build()); } }
@Service注释将配置该类作为一个Spring bean,并通过构造依赖将GreetingsStreams注入GreetingsService。
@Slf4j注释表示我们可以使用SLF4J日志。
在sendGreeting()方法中,我们使用注入的GreetingsStream对象来发送由Greetings对象表示的消息。
现在我们将创建一个REST API端点,它将触发使用GreetingsServiceSpring Bean 向Kafka发送消息:
@RestController public class GreetingsController { private final GreetingsService greetingsService; public GreetingsController(GreetingsService greetingsService) { this.greetingsService = greetingsService; } @GetMapping("/greetings") @ResponseStatus(HttpStatus.ACCEPTED) public void greetings(@RequestParam("message") String message) { Greetings greetings = Greetings.builder() .message(message) .timestamp(System.currentTimeMillis()) .build(); greetingsService.sendGreeting(greetings); } }
greetings()方法定义了一个HTTP GET /greetings端点,该端点接受请求参数message并将其传递给GreetingsService的sendGreeting()方法
让我们创建一个com.kaviddiss.streamkafka.service.GreetingsListener类来监听Kafka主题greetings上的消息并将它们记录在控制台上:
@Component @Slf4j public class GreetingsListener { @StreamListener(GreetingsStreams.INPUT) public void handleGreetings(@Payload Greetings greetings) { log.info("Received greetings: {}", greetings); } }
GreetingsListener有一个方法handleGreetings(),Spring Cloud Stream将监听Kafka主题Greetings上的每个新消息对象greetings。这要归功于为handleGreetings()方法配置的注释@StreamListener。
拼图的最后一部分是由Spring Initializer自动生成的StreamKafkaApplication类:
@SpringBootApplication public class CloudbusstreamkafkaApplication { public static void main(String[] args) { SpringApplication.run(CloudbusstreamkafkaApplication.class, args); } }
无需在此处进行任何更改。您可以从IDE运行此类作为Java应用程序,也可以使用Spring Boot Maven插件从命令行运行该应用程序:
> mvn spring - boot:run
应用程序运行后,在浏览器中转到 http://localhost:8080/greetings?message=hello 并检查控制台:
(timestamp=1531643278270, message=hello)
如果需要输入用户名和密码,用户名是user,密码在控制台using generated security password可以找到,或者使用元注解排除SecurityAutoConfiguration.class
@SpringBootApplication(exclude = {SecurityAutoConfiguration.class })
在 这里 找到源代码
spring cloud专题