转载

golang微服务框架go-micro 入门笔记2.3 micro工具之消息接收和发布

本章节阐述micro消息订阅和发布相关内容

阅读本文前你可能需要进行如下知识储备

  • golang分布式微服务框架go-micro 入门笔记1:搭建go-micro环境,
  • golang微服务框架go-micro 入门笔记2.1 micro工具之micro api
  • golang微服务框架go-micro 入门笔记2.2 micro工具之micro web

broker代理

微服务之间需要通过broker来传递消息,go-micro支持http/nats/memory三种broker,其中http是默认的broker。

同时,go-micro以强大的插件形式,提供如下几种常见的broker。

$ls

gocloud/  googlepubsub/  grpc/  kafka/  mqtt/  nats/  nsq/  proxy/  rabbitmq/  redis/  snssqs/  sqs/  stan/  stomp/

http

HTTP Broker 是基于HTTP的异步broker,源代码在 github.com/micro/go-micro@v1.9.1/broker/broker.go 中,默认DefaultBroker为http

var (
    DefaultBroker Broker = newHttpBroker()
)

httpbroker实际上就是一个结构体

type httpBroker struct {

    id      string  //微服务ID
    address string //主机地址
    opts    Options //一些配置
    mux *http.ServeMux  //通过这个监听其他端发送的http请求
    c *http.Client  //通过这个发送请求到其他端
    r registry.Registry 
    sync.RWMutex
    subscribers map[string][]*httpSubscriber  //订阅
    running     bool
    exit        chan chan error
    // offline message inbox
    mtx   sync.RWMutex 
    inbox map[string][][]byte   //数据缓存

}

通过 http.Client 发送请求,通过 http.ServeMux 实现请求监听,通过 inbox 存储数据

redis

redis初始化代码如下

//main.go

//初始化URL格式redis://密码@主机:端口/

b := redis.NewBroker(
        broker.Addrs("redis://user:secret@localhost:6379/"),
        )

//初始化

b.Init()

//连接

b.Connect()

// 新建service

service := grpc.NewService(
        micro.Name("go.micro.web.config"),
        micro.Version("latest"),
        micro.Broker(b),

    )

//初始化service
service.Init()
//启动,运行,监听
service.Run()

启动应用程序需要指定broker为redis

go run main.go --broker=redis

grpc 初始化

初始化过程如下

//main.go

import (
    "github.com/micro/go-plugins/broker/grpc"
)



// 建立连接

b := grpc.NewBroker()
b.Init()
b.Connect()

// 订阅事件
sub, _ := b.Subscribe("events")
defer sub.Unsubscribe()

// 发布事件
b.Publish("events", &broker.Message{

    Headers: map[string]string{"type": "event"},

    Body: []byte(`an event`),

})

启动应用程序需要指定broker为grpc

go run main.go --broker=grpc

rabbitmq 初始化

初始化过程如下

//main.go

import (

    "github.com/micro/go-plugins/broker/grpc"

)

b := rabbitmq.NewBroker(

        broker.Addrs("amqp://用户名:密码@主机host:端口port"),

    )



    b.Init()

    b.Connect()

启动应用程序需要指定broker为rabbitmq

go run main.go plugin.go --broker=rabbitmq

mqtt

初始化过程如下

//main.go

import (

    "github.com/micro/go-micro"

    "github.com/micro/go-plugins/broker/mqtt"

)

func main() {

    service := micro.NewService(

        micro.Name("my.service"),

        micro.Broker(mqtt.NewBroker()),

    )

    //...

}

启动应用程序需要指定broker为mqtt

go run main.go plugin.go --broker=mqtt

其他

其他可以阅读代码

$GOPATH/src/github.com/micro/go-plugins/broker

消息订阅和发布

通过micro.RegisterSubscriber实现消息订阅

消息订阅主要API接口如下,第一个参数标识消息主题,第二个参数表示服务实例。

// Register Struct as Subscriber

micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), new(subscriber.Testsrv))

// Register Function as Subscriber

micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), subscriber.Handler)

重点注意第三个参数,第三个参数是处理函数,可以是函数,也可以是实现了

func Handler(ctx context.Context, msg *testsrv.Message) error 方法的结构体,micro内部会根据参数类型自动适配。结构体中可以实现多个 func Handler(ctx context.Context, msg *testsrv.Message) error 类型方法

通过broker.Subscribe实现订阅

Broker提供如下接口

type Broker interface {

    Init(...Option) error

    Options() Options

    Address() string

    Connect() error

    Disconnect() error

    Publish(topic string, m *Message, opts ...PublishOption) error

    Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)

    String() string

}
  • Subscribe 订阅事件,topic代表主题,h事件处理函数

  • Publish 发布事件

消息处理函数Handler 定义

在上述涉及到处理函数handler,具体含义如下

type Handler func(Event) error

// Event is given to a subscription handler for processing

type Event interface {

    Topic() string

    Message() *Message

    Ack() error

}



type Message struct {

    Header map[string]string

    Body   []byte

}

通过broker.Publish实现发布

举例如下

// 建立连接

b := grpc.NewBroker()

b.Init()

b.Connect()
// 订阅事件

sub, _ := b.Subscribe("events")

defer sub.Unsubscribe()

// 发布事件

b.Publish("events", &broker.Message{

    Headers: map[string]string{"type": "event"},

    Body: []byte(`an event`),

})

通过micro publish实现发布

举例如下

micro publish "go.micro.web.config" "hello"

实战和代码

效果

下载代码 broker.zip 解压到 techidea8.com/microapp/broker 下运行,效果图忑

golang微服务框架go-micro 入门笔记2.3 micro工具之消息接收和发布

效果

  • 发布消息需要注意json格式字符串
micro publish go.micro.srv.broker "{/"say/":/"这是测试消息/"}"

获得代码

关注公众号回复 micro-broker 即可获得

golang微服务框架go-micro 入门笔记2.3 micro工具之消息接收和发布

公众号betaidea

原文  https://studygolang.com/articles/23194
正文到此结束
Loading...