本文主要研究一下dubbo-go的HystrixFilter
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
// HystrixFilter ... type HystrixFilter struct { COrP bool //true for consumer res map[string][]*regexp.Regexp ifNewMap sync.Map }
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
// Invoke ... func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { cmdName := fmt.Sprintf("%s&method=%s", invoker.GetUrl().Key(), invocation.MethodName()) // Do the configuration if the circuit breaker is created for the first time if _, load := hf.ifNewMap.LoadOrStore(cmdName, true); !load { configLoadMutex.Lock() filterConf := getConfig(invoker.GetUrl().Service(), invocation.MethodName(), hf.COrP) for _, ptn := range filterConf.Error { reg, err := regexp.Compile(ptn) if err != nil { logger.Warnf("[Hystrix Filter]Errors occurred parsing error omit regexp: %s, %v", ptn, err) } else { if hf.res == nil { hf.res = make(map[string][]*regexp.Regexp) } hf.res[invocation.MethodName()] = append(hf.res[invocation.MethodName()], reg) } } hystrix.ConfigureCommand(cmdName, hystrix.CommandConfig{ Timeout: filterConf.Timeout, MaxConcurrentRequests: filterConf.MaxConcurrentRequests, SleepWindow: filterConf.SleepWindow, ErrorPercentThreshold: filterConf.ErrorPercentThreshold, RequestVolumeThreshold: filterConf.RequestVolumeThreshold, }) configLoadMutex.Unlock() } configLoadMutex.RLock() _, _, err := hystrix.GetCircuit(cmdName) configLoadMutex.RUnlock() if err != nil { logger.Errorf("[Hystrix Filter]Errors occurred getting circuit for %s , will invoke without hystrix, error is: ", cmdName, err) return invoker.Invoke(ctx, invocation) } logger.Infof("[Hystrix Filter]Using hystrix filter: %s", cmdName) var result protocol.Result _ = hystrix.Do(cmdName, func() error { result = invoker.Invoke(ctx, invocation) err := result.Error() if err != nil { result.SetError(NewHystrixFilterError(err, false)) for _, reg := range hf.res[invocation.MethodName()] { if reg.MatchString(err.Error()) { logger.Debugf("[Hystrix Filter]Error in invocation but omitted in circuit breaker: %v; %s", err, cmdName) return nil } } } return err }, func(err error) error { //Return error and if it is caused by hystrix logic, so that it can be handled by previous filters. _, ok := err.(hystrix.CircuitError) logger.Debugf("[Hystrix Filter]Hystrix health check counted, error is: %v, failed by hystrix: %v; %s", err, ok, cmdName) result = &protocol.RPCResult{} result.SetResult(nil) result.SetError(NewHystrixFilterError(err, ok)) return err }) return result }
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
// OnResponse ... func (hf *HystrixFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { return result }
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
// GetHystrixFilterConsumer ... func GetHystrixFilterConsumer() filter.Filter { //When first called, load the config in consumerConfigOnce.Do(func() { if err := initHystrixConfigConsumer(); err != nil { logger.Warnf("[Hystrix Filter]Config load failed for consumer, error is: %v , will use default", err) } }) return &HystrixFilter{COrP: true} }
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
// GetHystrixFilterProvider ... func GetHystrixFilterProvider() filter.Filter { providerConfigOnce.Do(func() { if err := initHystrixConfigProvider(); err != nil { logger.Warnf("[Hystrix Filter]Config load failed for provider, error is: %v , will use default", err) } }) return &HystrixFilter{COrP: false} }
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
func initHystrixConfigConsumer() error { if config.GetConsumerConfig().FilterConf == nil { return perrors.Errorf("no config for hystrix") } filterConfig := config.GetConsumerConfig().FilterConf.(map[interface{}]interface{})[HYSTRIX] if filterConfig == nil { return perrors.Errorf("no config for hystrix") } hystrixConfByte, err := yaml.Marshal(filterConfig) if err != nil { return err } err = yaml.Unmarshal(hystrixConfByte, confConsumer) if err != nil { return err } return nil }
dubbo-go-v1.4.2/filter/filter_impl/hystrix_filter.go
func initHystrixConfigProvider() error { if config.GetProviderConfig().FilterConf == nil { return perrors.Errorf("no config for hystrix") } filterConfig := config.GetConsumerConfig().FilterConf.(map[interface{}]interface{})[HYSTRIX] if filterConfig == nil { return perrors.Errorf("no config for hystrix") } hystrixConfByte, err := yaml.Marshal(filterConfig) if err != nil { return err } err = yaml.Unmarshal(hystrixConfByte, confProvider) if err != nil { return err } return nil }
HystrixFilter的Invoke方法首先通过hf.ifNewMap.LoadOrStore判断该cmdName的circuit breaker是否已经创建,还没有创建的话,会通过getConfig获取filterConf,给指定的invocation.MethodName()创建reg,之后通过hystrix.ConfigureCommand进行配置;之后通过hystrix.GetCircuit(cmdName)获取circuit,然后执行hystrix.Do,在该func里头执行invoker.Invoke(ctx, invocation)