本文主要研究一下nacos-sdk-go的PushReceiver
nacos-sdk-go-v0.3.2/clients/naming_client/push_receiver.go
type PushReceiver struct { port int host string hostReactor *HostReactor }
nacos-sdk-go-v0.3.2/clients/naming_client/push_receiver.go
func NewPushRecevier(hostReactor *HostReactor) *PushReceiver { pr := PushReceiver{ hostReactor: hostReactor, } go pr.startServer() return &pr }
nacos-sdk-go-v0.3.2/clients/naming_client/push_receiver.go
func (us *PushReceiver) startServer() { var conn *net.UDPConn for i := 0; i < 3; i++ { r := rand.New(rand.NewSource(time.Now().UnixNano())) port := r.Intn(1000) + 54951 us.port = port conn1, ok := us.tryListen() if ok { conn = conn1 log.Println("[INFO] udp server start, port: " + strconv.Itoa(port)) break } if !ok && i == 2 { log.Panicf("failed to start udp server after trying 3 times.") //os.Exit(1) //It is weird dangerous to invoke the os.Exit() as a Middleware. } } defer conn.Close() for { us.handleClient(conn) } }
nacos-sdk-go-v0.3.2/clients/naming_client/push_receiver.go
func (us *PushReceiver) tryListen() (*net.UDPConn, bool) { addr, err := net.ResolveUDPAddr("udp", us.host+":"+strconv.Itoa(us.port)) if err != nil { log.Printf("[ERROR]: Can't resolve address,err: %s /n", err.Error()) return nil, false } conn, err := net.ListenUDP("udp", addr) if err != nil { log.Printf("Error listening %s:%d,err:%s /n", us.host, us.port, err.Error()) return nil, false } return conn, true }
nacos-sdk-go-v0.3.2/clients/naming_client/push_receiver.go
func (us *PushReceiver) handleClient(conn *net.UDPConn) { data := make([]byte, 4024) n, remoteAddr, err := conn.ReadFromUDP(data) if err != nil { log.Printf("[ERROR]:failed to read UDP msg because of %s /n", err.Error()) return } s := utils.TryDecompressData(data[:n]) log.Println("[INFO] receive push: "+s+" from: ", remoteAddr) var pushData PushData err1 := json.Unmarshal([]byte(s), &pushData) if err1 != nil { log.Printf("[ERROR] failed to process push data.err:%s /n", err1.Error()) return } ack := make(map[string]string) if pushData.PushType == "dom" || pushData.PushType == "service" { us.hostReactor.ProcessServiceJson(pushData.Data) ack["type"] = "push-ack" ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10) ack["data"] = "" } else if pushData.PushType == "dump" { ack["type"] = "dump-ack" ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10) ack["data"] = utils.ToJsonString(us.hostReactor.serviceInfoMap) } else { ack["type"] = "unknow-ack" ack["lastRefTime"] = strconv.FormatInt(pushData.LastRefTime, 10) ack["data"] = "" } bs, _ := json.Marshal(ack) conn.WriteToUDP(bs, remoteAddr) }
PushReceiver定义了port、host、hostReactor属性;它提供了NewPushRecevier、startServer、handleClient等方法