不忘初心,砥砺前行
作者 | 陌无崖
转载请联系授权
API模式笔记告一段了,这篇文章总结了Micro中的消息订阅模式,在之前的系列文章中讲解了RabbitMQ消息队列,使用的是原生代码进行编写,在Micro中,其实为我们封装了RabbitMQ消息队列,不仅仅是RabbitMQ还有NATs作为消息中间件,称之为broker模式,在这篇文章中我们进行一个简单的发布与订阅模式的介绍。
首先建立如下目录
编写我们的main.go,首先,定义一个待发布的主题和一个broker,使用broker需要导入包。
1import "github.com/micro/go-micro/broker"
1var ( 2 // 定义一个待发布的主题 3 topic = "mu.micro.book.topic.payment.done" 4 b broker.Broker 5)
编写一个发布消息的函数如下,在代码中broker.Message是一个结构体,我们将要发送的消息需要存入该结构体中,最后使用broker.Publish发布消息。
1func pub() { 2 // 该Ticker包含一个通道字段,并会每隔时间段d就向该通道发送当时的时间。 3 // 它会调整时间间隔或者丢弃tick信息以适应反应慢的接收者。如果d<=0会panic。关闭该Ticker可以释放相关资源。 4 tick := time.NewTicker(time.Second) 5 i := 0 6 for range tick.C { 7 msg := &broker.Message{ 8 Header: map[string]string{ 9 "id": fmt.Sprintf("%d", i), 10 }, 11 Body: []byte(fmt.Sprintf("%d:%s", i, time.Now().String())), 12 } 13 log.Infof(broker.String()) 14 // 发布消息 15 if err := broker.Publish(topic, msg); err != nil { 16 log.Infof("[pub] Message publication failed: %v", err) 17 } else { 18 fmt.Println("[pub] Message published: ", string(msg.Body)) 19 } 20 i++ 21 } 22}
有发布消息的函数,肯定就有接收消息的函数如下,通过broker.Subscribe完成主题的订阅。其他的调用见名知义,不再详细介绍。
1func sub() { 2 // 订阅消息并接收 3 _, err := broker.Subscribe(topic, func(p broker.Event) error { 4 log.Info("[sub] Received Body: %s,Header: %s/n", string(p.Message().Body), p.Message().Header) 5 return nil 6 }) 7 if err != nil { 8 fmt.Println(err) 9 } 10}
编写我们的main函数,这里需要注意导入相关的包
1"github.com/micro/go-micro/config/cmd" 2"github.com/micro/go-micro/util/log"
使用Init进行初始化然后开始连接,开启两个协程进行收发消息,使用time.After()设置过期时间,当时间到时,并向当前通道发送事件,结束主线程。防止主线程提前结束。如果这一点不懂请移步 我的博客 看关于进程线程和Go协程总结这篇文章。
1func main() { 2 cmd.Init() 3 if err := broker.Init(); err != nil { 4 log.Fatalf("broker.Init() error: %v", err) 5 } 6 if err := broker.Connect(); err != nil { 7 log.Fatalf("broker.Connect() error: %v", err) 8 } 9 10 go pub() 11 go sub() 12 13 <-time.After(time.Second * 20) 14}
看到这里是不是觉得很简单,测试一番,结果如下
1PS F:/micolearn/day02/mico-broker/basic> go run main.go 22019/08/12 16:28:37 http 3[pub] Message published: 0:2019-08-12 16:28:37.388162 +0800 CST m=+1.024290401 42019/08/12 16:28:37 [sub] Received Body: %s,Header: %s 50:2019-08-12 16:28:37.388162 +0800 CST m=+1.024290401map[id:0] 62019/08/12 16:28:38 http 7[pub] Message published: 1:2019-08-12 16:28:38.3884518 +0800 CST m=+2.024580201 82019/08/12 16:28:38 [sub] Received Body: %s,Header: %s 91:2019-08-12 16:28:38.3884518 +0800 CST m=+2.024580201map[id:1] 102019/08/12 16:28:39 http 11[pub] Message published: 2:2019-08-12 16:28:39.3878092 +0800 CST m=+3.023937601 122019/08/12 16:28:39 [sub] Received Body: %s,Header: %s 132:2019-08-12 16:28:39.3878092 +0800 CST m=+3.023937601map[id:2] 142019/08/12 16:28:40 http 15[pub] Message published: 3:2019-08-12 16:28:40.3881043 +0800 CST m=+4.024232701 162019/08/12 16:28:40 [sub] Received Body: %s,Header: %s 173:2019-08-12 16:28:40.3881043 +0800 CST m=+4.024232701map[id:3] 182019/08/12 16:28:41 http 19[pub] Message published: 4:2019-08-12 16:28:41.3884312 +0800 CST m=+5.024559601 202019/08/12 16:28:41 [sub] Received Body: %s,Header: %s 214:2019-08-12 16:28:41.3884312 +0800 CST m=+5.024559601map[id:4] 222019/08/12 16:28:42 http 23[pub] Message published: 5:2019-08-12 16:28:42.3887561 +0800 CST m=+6.024884501 242019/08/12 16:28:42 [sub] Received Body: %s,Header: %s 255:2019-08-12 16:28:42.3887561 +0800 CST m=+6.024884501map[id:5] 262019/08/12 16:28:43 http 27[pub] Message published: 6:2019-08-12 16:28:43.3870869 +0800 CST m=+7.023215301 282019/08/12 16:28:43 [sub] Received Body: %s,Header: %s 296:2019-08-12 16:28:43.3870869 +0800 CST m=+7.023215301map[id:6] 302019/08/12 16:28:44 http 31[pub] Message published: 7:2019-08-12 16:28:44.3885307 +0800 CST m=+8.024659101 322019/08/12 16:28:44 [sub] Received Body: %s,Header: %s 337:2019-08-12 16:28:44.3885307 +0800 CST m=+8.024659101map[id:7] 342019/08/12 16:28:45 http 35[pub] Message published: 8:2019-08-12 16:28:45.3887328 +0800 CST m=+9.024861201 362019/08/12 16:28:45 [sub] Received Body: %s,Header: %s 378:2019-08-12 16:28:45.3887328 +0800 CST m=+9.024861201map[id:8] 382019/08/12 16:28:46 http 39[pub] Message published: 9:2019-08-12 16:28:46.3870944 +0800 CST m=+10.023222801 402019/08/12 16:28:46 [sub] Received Body: %s,Header: %s 419:2019-08-12 16:28:46.3870944 +0800 CST m=+10.023222801map[id:9] 422019/08/12 16:28:47 http 43[pub] Message published: 10:2019-08-12 16:28:47.3873887 +0800 CST m=+11.023517101 442019/08/12 16:28:47 [sub] Received Body: %s,Header: %s 4510:2019-08-12 16:28:47.3873887 +0800 CST m=+11.023517101map[id:10] 462019/08/12 16:28:48 http 47[pub] Message published: 11:2019-08-12 16:28:48.3887128 +0800 CST m=+12.024841201 482019/08/12 16:28:48 [sub] Received Body: %s,Header: %s 4911:2019-08-12 16:28:48.3887128 +0800 CST m=+12.024841201map[id:11] 502019/08/12 16:28:49 http 51[pub] Message published: 12:2019-08-12 16:28:49.3870651 +0800 CST m=+13.023193501 522019/08/12 16:28:49 [sub] Received Body: %s,Header: %s 5312:2019-08-12 16:28:49.3870651 +0800 CST m=+13.023193501map[id:12] 542019/08/12 16:28:50 http 55[pub] Message published: 13:2019-08-12 16:28:50.3873677 +0800 CST m=+14.023496101 562019/08/12 16:28:50 [sub] Received Body: %s,Header: %s 5713:2019-08-12 16:28:50.3873677 +0800 CST m=+14.023496101map[id:13] 582019/08/12 16:28:51 http 59[pub] Message published: 14:2019-08-12 16:28:51.3886879 +0800 CST m=+15.024816301 602019/08/12 16:28:51 [sub] Received Body: %s,Header: %s 6114:2019-08-12 16:28:51.3886879 +0800 CST m=+15.024816301map[id:14] 622019/08/12 16:28:52 http 63[pub] Message published: 15:2019-08-12 16:28:52.3870503 +0800 CST m=+16.023178701 642019/08/12 16:28:52 [sub] Received Body: %s,Header: %s 6515:2019-08-12 16:28:52.3870503 +0800 CST m=+16.023178701map[id:15] 662019/08/12 16:28:53 http 67[pub] Message published: 16:2019-08-12 16:28:53.3875243 +0800 CST m=+17.023652701 682019/08/12 16:28:53 [sub] Received Body: %s,Header: %s 6916:2019-08-12 16:28:53.3875243 +0800 CST m=+17.023652701map[id:16] 702019/08/12 16:28:54 http 71[pub] Message published: 17:2019-08-12 16:28:54.3885713 +0800 CST m=+18.024699701 722019/08/12 16:28:54 [sub] Received Body: %s,Header: %s 7317:2019-08-12 16:28:54.3885713 +0800 CST m=+18.024699701map[id:17] 742019/08/12 16:28:55 http 75[pub] Message published: 18:2019-08-12 16:28:55.3872265 +0800 CST m=+19.023354901 762019/08/12 16:28:55 [sub] Received Body: %s,Header: %s 7718:2019-08-12 16:28:55.3872265 +0800 CST m=+19.023354901map[id:18]
END
今日推荐阅读
RabbitMQ系列笔记广播模式和路由模式
RabbitMQ系列笔记入门篇RabbitMQ系列笔记work模式
RabbitMQ系列笔记work模式
protoc语法详解及结合grpc定义服务
Golang中Model的使用
基于Nginx和Consul构建高可用及自动发现的Docker服务架构
▼关注我,一起成长
主要分享 学习心得、笔记、随笔▼