本文主要研究一下dubbo-go的DubboInvoker
dubbo-go-v1.4.2/protocol/invoker.go
// Extension - Invoker type Invoker interface { common.Node Invoke(context.Context, Invocation) Result } ///////////////////////////// // base invoker ///////////////////////////// // BaseInvoker ... type BaseInvoker struct { url common.URL available bool destroyed bool } // NewBaseInvoker ... func NewBaseInvoker(url common.URL) *BaseInvoker { return &BaseInvoker{ url: url, available: true, destroyed: false, } } // GetUrl ... func (bi *BaseInvoker) GetUrl() common.URL { return bi.url } // IsAvailable ... func (bi *BaseInvoker) IsAvailable() bool { return bi.available } // IsDestroyed ... func (bi *BaseInvoker) IsDestroyed() bool { return bi.destroyed } // Invoke ... func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Result { return &RPCResult{} } // Destroy ... func (bi *BaseInvoker) Destroy() { logger.Infof("Destroy invoker: %s", bi.GetUrl().String()) bi.destroyed = true bi.available = false }
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
var ( // ErrNoReply ... ErrNoReply = perrors.New("request need @response") ErrDestroyedInvoker = perrors.New("request Destroyed invoker") ) var ( attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY} ) // DubboInvoker ... type DubboInvoker struct { protocol.BaseInvoker client *Client quitOnce sync.Once // Used to record the number of requests. -1 represent this DubboInvoker is destroyed reqNum int64 }
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
// NewDubboInvoker ... func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { return &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), client: client, reqNum: 0, } }
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
// Invoke ... func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { var ( err error result protocol.RPCResult ) if di.reqNum < 0 { // Generally, the case will not happen, because the invoker has been removed // from the invoker list before destroy,so no new request will enter the destroyed invoker logger.Warnf("this dubboInvoker is destroyed") result.Err = ErrDestroyedInvoker return &result } atomic.AddInt64(&(di.reqNum), 1) defer atomic.AddInt64(&(di.reqNum), -1) inv := invocation.(*invocation_impl.RPCInvocation) for _, k := range attachmentKey { if v := di.GetUrl().GetParam(k, ""); len(v) > 0 { inv.SetAttachments(k, v) } } // put the ctx into attachment di.appendCtx(ctx, inv) url := di.GetUrl() // async async, err := strconv.ParseBool(inv.AttachmentsByKey(constant.ASYNC_KEY, "false")) if err != nil { logger.Errorf("ParseBool - error: %v", err) async = false } response := NewResponse(inv.Reply(), nil) if async { if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok { result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response) } else { result.Err = di.client.CallOneway(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments())) } } else { if inv.Reply() == nil { result.Err = ErrNoReply } else { result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response) } } if result.Err == nil { result.Rest = inv.Reply() result.Attrs = response.atta } logger.Debugf("result.Err: %v, result.Rest: %v", result.Err, result.Rest) return &result }
dubbo-go-v1.4.2/protocol/dubbo/dubbo_invoker.go
// Destroy ... func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { for { if di.reqNum == 0 { di.reqNum = -1 logger.Infof("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key()) di.BaseInvoker.Destroy() if di.client != nil { di.client.Close() di.client = nil } break } logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key()) time.Sleep(1 * time.Second) } }) }
Invoker定义了Invoke方法;BaseInvoker定义了url、available、destroyed属性;NewBaseInvoker方法实例化了BaseInvoker,其available为true,destroyed为false;Destroy方法设置available为false,destroyed为true;DubboInvoker的Invoke方法先通过atomic.AddInt64递增reqNum,之后遍历attachmentKey设置到invocation;之后读取constant.ASYNC_KEY属性,若async为true,则执行di.client.AsyncCall或di.client.CallOneway;若async为false则执行di.client.Call;最后返回result