本文主要研究一下nacos-sdk-go的BeatReactor
nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go
type BeatReactor struct { beatMap cache.ConcurrentMap serviceProxy NamingProxy clientBeatInterval int64 beatThreadCount int beatThreadSemaphore *nsema.Semaphore beatRecordMap cache.ConcurrentMap }
nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go
func NewBeatReactor(serviceProxy NamingProxy, clientBeatInterval int64) BeatReactor { br := BeatReactor{} if clientBeatInterval <= 0 { clientBeatInterval = 5 * 1000 } br.beatMap = cache.NewConcurrentMap() br.serviceProxy = serviceProxy br.clientBeatInterval = clientBeatInterval br.beatThreadCount = Default_Beat_Thread_Num br.beatRecordMap = cache.NewConcurrentMap() br.beatThreadSemaphore = nsema.NewSemaphore(br.beatThreadCount) return br }
nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go
func (br *BeatReactor) AddBeatInfo(serviceName string, beatInfo model.BeatInfo) { log.Printf("[INFO] adding beat: <%s> to beat map./n", utils.ToJsonString(beatInfo)) k := buildKey(serviceName, beatInfo.Ip, beatInfo.Port) br.beatMap.Set(k, &beatInfo) go br.sendInstanceBeat(k, &beatInfo) }
nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go
func (br *BeatReactor) RemoveBeatInfo(serviceName string, ip string, port uint64) { log.Printf("[INFO] remove beat: %s@%s:%d from beat map./n", serviceName, ip, port) k := buildKey(serviceName, ip, port) data, exist := br.beatMap.Get(k) if exist { beatInfo := data.(*model.BeatInfo) beatInfo.Stopped = true } br.beatMap.Remove(k) }
nacos-sdk-go-v0.3.2/clients/naming_client/beat_reactor.go
func (br *BeatReactor) sendInstanceBeat(k string, beatInfo *model.BeatInfo) { for { br.beatThreadSemaphore.Acquire() //如果当前实例注销,则进行停止心跳 if beatInfo.Stopped { log.Printf("[INFO] intance[%s] stop heartBeating/n", k) br.beatThreadSemaphore.Release() return } //进行心跳通信 beatInterval, err := br.serviceProxy.SendBeat(*beatInfo) if err != nil { log.Printf("[ERROR]:beat to server return error:%s /n", err.Error()) br.beatThreadSemaphore.Release() t := time.NewTimer(beatInfo.Period) <-t.C continue } if beatInterval > 0 { beatInfo.Period = time.Duration(time.Millisecond.Nanoseconds() * beatInterval) } br.beatRecordMap.Set(k, utils.CurrentMillis()) br.beatThreadSemaphore.Release() t := time.NewTimer(beatInfo.Period) <-t.C } }
BeatReactor定义了beatMap、serviceProxy、clientBeatInterval、beatThreadCount、beatThreadSemaphore、beatRecordMap属性;它提供了NewBeatReactor、AddBeatInfo、RemoveBeatInfo方法