转载

spring cloud 2.x版本 Spring Cloud Stream消息驱动组件基础教程(kafaka篇)

spring cloud 2.x版本 Spring Cloud Stream消息驱动组件基础教程(kafaka篇)

本文采用Spring cloud本文为2.1.8RELEASE,version=Greenwich.SR3

本文基于前两篇文章 eureka-server、eureka-client、eureka-ribbon 和 spring-gateway 的实现。

参考

  • eureka-server
  • eureka-client
  • eureka-ribbon
  • spring-cloud-stream

概述

Spring Cloud Stream 是一个构建消息驱动微服务的框架。它通过使用 Spring Integration 来连接消息代理中间件以及实现消息事件驱动的微服务应用。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,并且引入了发布-订阅、消费组以及消息分区这三个核心概念。简单的说,Spring Cloud Stream 本质上就是整合了 Spring Boot 和 Spring Integration,实现了一套轻量级的消息驱动的微服务框架。

1. 创建 Spring Cloud Stream 应用: spring-cloud-stream

1.1 增加 pom 依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

1.2 配置 application.yml 文件

spring:
  application:
    name: spring-cloud-stream
  cloud:
    stream:
      kafaka:
        binder:
          brokers: 192.168.10.196:9092 #kafaka服务地址
          zk-noeds: 192.168.10.196:2181 #zk服务地址
          auto-create-topics: true
      bindings:
        output:  #stream默认提供的output
          destination: stream-kafaka #消息发送到的目的地
          content-type: text/plain #消息发送格式,接收端不用指定格式,但是发送端要。

server:
  port: 1000

1.3 创建消息发送 Service: KafakaSendService

package com.mm.spring.cloud.springcloudstream.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(Source.class)
public class KafakaSendService {

    @Autowired
    private Source source;

    public void sendMsg(String msg) {
        source.output().send(MessageBuilder.withPayload(msg).build());
    }
}

1.4 创建调用消息 Controller: KafakaProducerController

这个 KafakaProducerController 是方便演示用

package com.mm.spring.cloud.springcloudstream.controller;

import com.mm.spring.cloud.springcloudstream.service.KafakaSendService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafakaProducerController {

    @Autowired
    private KafakaSendService kafakaSendService;

    @RequestMapping("/send/{msg}")
    public void send(@PathVariable String msg) {
        kafakaSendService.sendMsg(msg);
    }
}

1.5 修改 eureka-ribbon 应用

1.5.1 增加 pom 依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

1.5.2 application.yml 增加配置

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 192.168.10.196:9092
          auto-create-topics: true
      bindings:
        input:
          destination: stream-kafaka

1.5.3 新增消息处理类: RecieceService

package spring.cloud.demo.eurekaribbon.service;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)
public class RecieceService {

    @StreamListener(Sink.INPUT)
    public void recieve(Object payload) {
        System.out.println(payload);
    }
}

1.6 启动服务

按照顺序启动 eureka-servereureka-clienteureka-ribbonspring-cloud-stream 应用的服务,然后访问 http://localhost:1000/send/maomao , 然后查看 eureka-ribbon 的控制台可以看到,如下图显示:

spring cloud 2.x版本 Spring Cloud Stream消息驱动组件基础教程(kafaka篇)

证明 eureka-ribbon 已经接到了消息。

1.7 小结

至此,一个简单的 Spring Cloud Stream 的 kafaka 应用就搭建完成了,这个是方式是使用了 Stream 的默认 Source 和 Sink 方式。

总结

Spring Cloud Stream 涉及的内容还是挺多的,这里先简单实现,至此《spring cloud 2.x 版本小白系列教程》就终结了,写这个小白系列教程的目的就是为了大家能对 Spring cloud 有一个初步的认识和了解,后续我这边会更新 Spring cloud 进阶教程,大家敬请期待,感谢小伙伴们的支持,谢谢!!!

代码地址

gitHub 地址

<center><font color=red>《Srping Cloud 2.X小白教程》目录</font></center>

  • spring cloud 2.x 版本 Eureka Server 服务注册中心教程
  • spring cloud 2.x 版本 Eureka Client 服务提供者教程
  • spring cloud 2.x 版本 Ribbon 服务发现教程(内含集成 Hystrix 熔断机制) ")
  • spring cloud 2.x 版本 Feign 服务发现教程(内含集成 Hystrix 熔断机制) ")
  • spring cloud 2.x 版本 Zuul 路由网关教程
  • spring cloud 2.x 版本 config 分布式配置中心教程
  • spring cloud 2.x 版本 Hystrix Dashboard 断路器教程
  • spring cloud 2.x 版本 Gateway 路由网关教程
  • spring cloud 2.x 版本 Gateway 自定义过滤器教程
  • spring cloud 2.x 版本 Gateway 熔断、限流教程
  • spring cloud 2.x 版本 Gateway 动态路由教程
  • spring cloud 2.x 版本 Sleuth+Zipkin 分布式链路追踪
  • spring cloud 2.x 版本 Sleuth+Zipkin 分布式链路追踪补充内容(rabbitmq 日志收集)
  • spring cloud 2.x 版本 Spring Cloud Stream 消息驱动组件基础教程(kafaka 篇)
  • 写作不易,转载请注明出处,喜欢的小伙伴可以关注公众号查看更多喜欢的文章。
  • 联系方式:4272231@163.com
    spring cloud 2.x版本 Spring Cloud Stream消息驱动组件基础教程(kafaka篇)
原文  https://segmentfault.com/a/1190000021103723
正文到此结束
Loading...