本文使用consul做服务发现,gRPC来处理RPC(Remote Procedure Call)即远程过程调用。 gRPC是google开源的一个高性能、通用的RPC框架,基于http2、protobuf设计开发,是一个跨编程语言的RPC框架,跨编程语言让client和server可以采用不同的编程语言开发。
本文使用的注记说明:
funcA()-->funcB()-->funcC()
-->funcD()-->funcE()
funcC()-->funcF()
-->funcG()
上面的函数调用关系为:funcA按序调用了funcB和funcD,funcB调用了funcC,funcD直接调用了funcE, funcC调用了funcF和funcG。
RPC接口通过protobuf定义,使用的是proto3版本。
//The request message containing the user's name message HelloRequest { string name = 1; int32 num1 = 2; int32 num2 = 3; } //The response message message HelloResponse { string message = 1; int32 result = 2; } //service definition service HelloService { rpc SayHello(HelloRequest) returns(HelloResponse); }
func (s *Server) register(sd *ServiceDesc, ss interface{}) { ... srv := &service{ server: ss, md: make(map[string]*MethodDesc), ... } //将待注册的service的MethodDesc对象保存到新创建的service的md中 for i := range sd.Methods { d := &sd.Methods[i] srv.md[d.MethodName] = d } ... //将新创建的service对象保存到server的m中 s.m[sd.ServiceName] = srv } func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) { s.RegisterService(&_HelloService_serviceDesc, srv) } var _HelloService_serviceDesc = grpc.ServiceDesc{ ServiceName: "HelloService_proto.HelloService", HandlerType: (*HelloServiceServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "SayHello", Handler: _HelloService_SayHello_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "HelloService.proto", }
调用Serve来为client提供服务。
Serve()-->Accept()接受client连接
-->新创建一个goroutine来处理建立的连接-->handleRawConn()
handleRawConn()-->newHTTP2Transport()-->NewServerTransport()-->newHTTP2Server()这个方法会与client完成http2握手,然后创建一个goroutine专门用于发送数据。
-->serveStreams()-->HandleStreams()-->operateHeaders()-->handleStream()会从接收到的stream中取出service和method名称,然后从server结构对应的map表中找出method handler。
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { //获取stream的Method名称 sm := stream.Method() if sm != "" && sm[0] == '/' { sm = sm[1:] } pos := strings.LastIndex(sm, "/") ... //获取service名称 service := sm[:pos] //获取method名称 method := sm[pos+1:] //从gRPC server的m map中根据service名称找到对应的service对象 srv, knownService := s.m[service] if knownService { //再从service的md map中找到对应的MethodDesc对象,其中包含method handler if md, ok := srv.md[method]; ok { s.processUnaryRPC(t, stream, srv, md, trInfo) return } ... } ... if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { ... } ... }
processUnaryRPC()-->NewContextWithServerTransportStream()创建一个context
-->Handler()实际就是调用SayHello
-->sendResponse()将执行结果发送给client
sendResponse()-->Write()-->put()-->executeAndPut()将数据存入controlBuffer的list中,然后通知consumer即newHTTP2Server创建的那个goroutine调用get来取数据并发送出去。
// google.golang.org/grpc/internal/transport/controlbuf.go func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) { var wakeUp bool c.mu.Lock() ... if c.consumerWaiting { wakeUp = true c.consumerWaiting = false } //将数据加入list中 c.list.enqueue(it) c.mu.Unlock() if wakeUp { select { //通知consumer取数据 case c.ch <- struct{}{}: default: } } return true, nil } // google.golang.org/grpc/internal/transport/controlbuf.go func (c *controlBuffer) get(block bool) (interface{}, error) { for { ... if !c.list.isEmpty() { //从list中取数据 h := c.list.dequeue() c.mu.Unlock() return h, nil } c.consumerWaiting = true select { //consumer等待producer生产数据 case <-c.ch: case <-c.done: c.finish() return nil, ErrConnClosing } } }
7.模拟service故障及恢复
通过faultSimulator每隔15s调用GracefulStop来停止正在运行的server来模拟service发生故障。
func (hssMonitor *hsServerMonitor) faultSimulator() { t := time.NewTicker(15 * time.Second) for { select { case <-t.C: fmt.Println("time out! Stop servers!") for _, s := range hssMonitor.hsServers { s.gServer.GracefulStop() fmt.Printf("server %s:%d graceful stopped!/n", s.info.Addr, s.info.Port) } ... } } }
通过helloServiceServerMonitor来监控server状态,若发生失败退出,则重新启动一个新的server。
// gRPCwithConsul/example/server/server.go func (hssMonitor *hsServerMonitor) startNewServer(hsPort int) { hsServer := newHelloServiceServer(hsPort) hssMonitor.hsServers = append(hssMonitor.hsServers, hsServer) go startHelloServiceServer(hsServer) } // gRPCwithConsul/example/server/server.go func (hssMonitor *hsServerMonitor) helloServiceServerMonitor() { ... for { for i := 0; i < serverNum; i++ { select { //hello service server fault happened, start new server as recovery case <-hssMonitor.hsServers[i].ch: //通过改变port,来模拟service地址变化,client调用RPC接口依旧正常工作 port[i] += 5 hssMonitor.hsServers[i].info.Port = port[i] //need new a gRPC server after stopping old one, otherwise will meet gRPC error: //Server.RegisterService after Server.Serve for "HelloService_proto.HelloService" s := grpc.NewServer() hssMonitor.hsServers[i].gServer = s //start new server go startHelloServiceServer(hssMonitor.hsServers[i]) ... } } time.Sleep(10 * time.Millisecond) } }
// gRPCwithConsul/serviceDiscovery/consulResolver.go func (r *consulResolver) start() // gRPCwithConsul/serviceDiscovery/consulResolver.go func (crb *consulResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) // gRPCwithConsul/example/HelloService_proto/HelloService.pb.go func (c *helloServiceClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) // gRPCwithConsul/example/HelloService_proto/HelloService.pb.go func RegisterHelloServiceServer(s *grpc.Server, srv HelloServiceServer) // gRPCwithConsul/serviceDiscovery/consulRegister.go func CreateConsulRegisterClient(csAddr string) error // gRPCwithConsul/serviceDiscovery/consulRegister.go func (csr *consulServiceRegister) registerServiceToConsul(info ServiceInfo) error // gRPCwithConsul/example/server/server.go func newHelloServiceServer(hsPort int) *helloServiceServer // google.golang.org/grpc/resolver_conn_wrapper.go func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) // google.golang.org/grpc/resolver_conn_wrapper.go func (ccr *ccResolverWrapper) UpdateState(s resolver.State) // google.golang.org/grpc/clientconn.go func (cc *ClientConn) updateResolverState(s resolver.State) error // grpc/grpc-go/balancer/roundrobin/roundrobin.go func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) // google.golang.org/grpc/server.go func (s *Server) Serve(lis net.Listener) error // google.golang.org/grpc/server.go func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport // google.golang.org/grpc/server.go func (s *Server) serveStreams(st transport.ServerTransport) // google.golang.org/grpc/internal/transport/http2_server.go func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) // google.golang.org/grpc/internal/transport/http2_server.go func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error
google.golang.org/grpc/resolver.go GRPCLB源码解释 const ( // Backend indicates the address is for a backend server. Backend AddressType = iota // GRPCLB indicates the address is for a grpclb load balancer. GRPCLB ) func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) google.golang.org/grpc/balancer_conn_wrappers.go { ... //通过通道发送给watcher goroutine去更新service地址 ccb.ccUpdateCh <- ccs } func (ccb *ccBalancerWrapper) watcher() google.golang.org/grpc/balancer_conn_wrappers.go { ... case s := <-ccb.ccUpdateCh: ... if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { ub.UpdateClientConnState(*s) } ... } func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) { google.golang.org/grpc/balancer/base/balancer.go ... for _, a := range s.ResolverState.Addresses { addrsSet[a] = struct{}{} if _, ok := b.subConns[a]; !ok { // a is a new address (not existing in b.subConns). sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck}) ... b.subConns[a] = sc b.scStates[sc] = connectivity.Idle sc.Connect() } } //在通过consul获取service最新地址后,检查subConns中旧的地址是否失效 for a, sc := range b.subConns { // a was removed by resolver. //如果最新地址集合中不包含地址a,则代表a已失效 if _, ok := addrsSet[a]; !ok { b.cc.RemoveSubConn(sc) delete(b.subConns, a) } } } //roundrobin实现 //grpc/grpc-go/balancer/roundrobin/roundrobin.go func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { p.mu.Lock() sc := p.subConns[p.next] //轮流选择可用连接使用 p.next = (p.next + 1) % len(p.subConns) p.mu.Unlock() return sc, nil, nil }
func (crb *consulResolverBuilder) resolveServiceFromConsul() ([]resolver.Address, error) { //调用consul API来获取指定service的地址信息 serviceEntries, _, err := crb.client.Health().Service(crb.serviceName, "", true, &consulapi.QueryOptions{}) if err != nil { fmt.Println("call consul Health API failed, ", err) return nil, err } addrs := make([]resolver.Address, 0) for _, serviceEntry := range serviceEntries { //将获取的地址信息组装成resolver.Address类型返回 address := resolver.Address{Addr: fmt.Sprintf("%s:%d", serviceEntry.Service.Address, serviceEntry.Service.Port)} addrs = append(addrs, address) } return addrs, nil } func (crb *consulResolverBuilder) csMonitor(cr *consulResolver) { t := time.NewTicker(500 * time.Millisecond) //Get service addresses from consul every 500 Millisecond and update them to gRPC for { select { case <-t.C: //resolve now case <-cr.rnCh: } addrs, err := crb.resolveServiceFromConsul() ... cr.cc.UpdateState(resolver.State{Addresses: addrs}) } } // gRPCwithConsul/example/HelloService_proto/HelloService.pb.go func (c *helloServiceClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) { out := new(HelloResponse) err := c.cc.Invoke(ctx, "/HelloService_proto.HelloService/SayHello", in, out, opts...) if err != nil { return nil, err } return out, nil } // gRPCwithConsul/serviceDiscovery/consulRegister.go func (csr *consulServiceRegister) registerServiceToConsul(info ServiceInfo) error { serviceId := getServiceId(info.ServiceName, info.Addr, info.Port) asg := &consulapi.AgentServiceRegistration{ ID: serviceId, Name: info.ServiceName, Tags: []string{info.ServiceName}, Port: info.Port, Address: info.Addr, } //register service to consul server err := ccMonitor.client.Agent().ServiceRegister(asg) ... //向consul server注册 health check asCheck := consulapi.AgentServiceCheck{TTL: fmt.Sprintf("%ds", info.CheckInterval), Status: consulapi.HealthPassing} err = ccMonitor.client.Agent().CheckRegister( &consulapi.AgentCheckRegistration{ ID: serviceId, Name: info.ServiceName, ServiceID: serviceId, AgentServiceCheck: asCheck}) ... //start a goroutine to update health status to consul server go func(<-chan struct{}) { t := time.NewTicker(info.UpdateInterval) for { select { case <-t.C: ... } //向consul server报告service HealthPassing err = ccMonitor.client.Agent().UpdateTTL(serviceId, "", asCheck.Status) ... } }(ch) return nil } // gRPCwithConsul/example/server/server.go func newHelloServiceServer(hsPort int) *helloServiceServer { //创建一个gRPC server s := grpc.NewServer() info := &serviceDiscovery.ServiceInfo{ Addr: ip, Port: hsPort, ServiceName: "HelloService", UpdateInterval: 5 * time.Second, CheckInterval: 20} ch := make(chan struct{}, 1) //创建一个helloServiceServer hsServer := &helloServiceServer{ info: info, gServer: s, ch: ch} return hsServer }
https://github.com/GrassInWind2019/gRPCwithConsul
如果想修改RPC接口也就是修改HelloService.proto文件
修改完后需要利用protoc工具重新生成HelloService.pb.go
可使用如下命令
protoc.exe --plugin=protoc-gen-go=$GOPATH/bin/protoc-gen-go.exe --go_out=plugins=grpc:. --proto_path . HelloService.proto
一定要带上--go_out=plugins=grpc,否则生成的go文件会缺少gRPC相关的code比如编译会报错,找不到HelloService.RegisterHelloServiceServer。