接上个部分 (Dubbo 客户端调用链路过程分析) 讲到了客户端发送请求的过程,这个部分我们分析服务端接收请求并发送响应的过程。
在分析 服务暴露 的过程中,provider启动netty服务端的时候(NettyServer.doOpen),会在在ChannelPipeline链中加入了4个ChannelHandler。
- NettyCodecAdapter.InternalEncoder:编码器 - NettyCodecAdapter.InternalDecoder:解码器 - IdleStateHandler:心跳处理器 - NettyServerHandler:请求处理器 复制代码
接下来们将其拆分为处理请求和响应结果进行分析。
解码过程不做具体分析
NettyCodecAdapter.InternalDecoder.decode -->DubboCountCodec.decode -->ExchangeCodec.decode -->DubboCodec.decodeBody 复制代码
经过解码得到请求对象Request。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try { handler.received(channel, msg); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); } } 复制代码
装饰角色,维护了closed状态
public void received(Channel ch, Object msg) throws RemotingException { // 如果通道已经关闭,则直接返回 if (closed) { return; } handler.received(ch, msg); } 复制代码
对多消息的处理。
@Override public void received(Channel channel, Object message) throws RemotingException { //如果消息是MultiMessage类型的,做下类型转换 if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage) message; 遍历发送 for (Object obj : list) { handler.received(channel, obj); } } else { handler.received(channel, message); } } 复制代码
对心跳事件做了处理。 如果不是心跳请求,那么接下去走到AllChannelHandler的received。否则直接回复响应,不再继续往下走。
public void received(Channel channel, Object message) throws RemotingException { setReadTimestamp(channel); //是否属于心跳的请求 if (isHeartbeatRequest(message)) { Request req = (Request) message; if (req.isTwoWay()) { Response res = new Response(req.getId(), req.getVersion()); res.setEvent(Response.HEARTBEAT_EVENT); channel.send(res); } return; } //是否属于心跳的响应 if (isHeartbeatResponse(message)) { return; } handler.received(channel, message); } 复制代码
这里io事件的派发策略和客户端接收响应结果逻辑一样,这里不再赘述。
将接收到的消息分发到线程池,线程池名称为:DubboServerHandler-10.204.246.187:20880-thread-。
提交给线程的任务是:ChannelEventRunnable
public void received(Channel channel, Object message) throws RemotingException { //获取处理线程 ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //提交线程失败,可以确定是线程池满了,需要将该提示响应给客户端 if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } 复制代码
接下来的逻辑则是利用线程池去针对请求做了异步处理
处理不同状态的请求,仅仅只是一个中转,针对不同状态交给指定的方法处理。 RECEIVED、CONNECTED、DISCONNECTED、SENT、CAUGHT
这里我们关注RECEIVED的处理链路。
public class ChannelEventRunnable implements Runnable { @Override public void run() { if (state == ChannelState.RECEIVED) { handler.received(channel, message); } else { switch (state) { case CONNECTED: handler.connected(channel); break; case DISCONNECTED: try { handler.disconnected(channel); break; case SENT: handler.sent(channel, message); break; case CAUGHT: handler.caught(channel, exception); break; default: } } } } 复制代码
这里的解码主要是针对message对象是Decodeable对象的处理。前面以及解码为Request对象了,因此这里不会执行任何逻辑。
解码之后继续执行下一个handler的receive方法。
public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } if (message instanceof Request) { decode(((Request) message).getData()); } if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message); } 复制代码
处理Request请求,并异步返回Response,这里也是provider返回响应的入口
该方法分3个过程来看:
1、分请求类型进行处理:根据messgage的类型做不同的处理,例如正常的Request请求、Resopnse、telnet命令请求。
2、处理Request:继续向后执行(交给DubboProtocol的reply),得到异步调用结果。
3、返回Response:将Request的调用id封装到Response,然后利用异步回调将结果封装到 Response 对象中,同时利用channel.send(res)方法将该结果发送给客户端(关于发送响应结果的过程会在下个部分进行分析);
@Override public void received(Channel channel, Object message) throws RemotingException { //设置时间戳 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); //获取通道 final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; // 处理事件 if (request.isEvent()) { handlerEvent(channel, request); } else { // 双向通信 if (request.isTwoWay()) { handleRequest(exchangeChannel, request); } else { //如果是单向通信,仅向后调用指定服务即可,无需返回调用结果 handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } 复制代码
//处理请求 void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { //封装resopnse(把requests的id封装到resopnse了) Response res = new Response(req.getId(), req.getVersion()); //如果请求被破坏了, 响应异常 if (req.isBroken()) { ... ... res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return; } // 正常,取得请求数据,也就是 RpcInvocation 对象 Object msg = req.getData(); try { //继续向下调用 返回一个future CompletionStage<Object> future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null) { //设置调用结果状态为成功 res.setStatus(Response.OK); res.setResult(appResult); } else { //如果服务调用有异常,则设置结果状态码为服务错误 res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } // 发送该响应 channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } finally { // HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } } 复制代码
1、获取invoker; 关于获取invoker主要过程是:
2、进入invoker调用链;
3、返回执行结果CompletableFuture;
public class DubboProtocol extends AbstractProtocol { private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException { Invocation inv = (Invocation) message; //获取invoker Invoker<?> invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.completionFuture().thenApply(Function.identity()); } } } 复制代码
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { ... ... String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY)); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); } return exporter.getInvoker(); } 复制代码
这里大概梳理下调用链路:
ProtocolFilterWrapper.CallbackRegistrationInvoker.invoke ->EchoFilter.invoke ->ClassLoaderFilter.invoke ->GenericFilter.invoke ->ContextFilter.invoke ->TraceFilter.invoke ->TimeoutFilter.invoke ->MonitorFilter.invoke ->ExceptionFilter.invoke ->InvokerWrapper.invoke ->DelegateProviderMetaDataInvoker.invoke ->AbstractProxyInvoker.invoke 复制代码
关于经过的Filter这里不做说明,这里重点看下AbstractProxyInvoker.invoke方法:
public Result invoke(Invocation invocation) throws RpcException { try { //实际就是获取到代理类,然后调用对应的服务方法 Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); //把返回结果用CompletableFuture包裹 CompletableFuture<Object> future = wrapWithFuture(value, invocation); 封装AsyncRpcResult AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation); future.whenComplete((obj, t) -> { AppResponse result = new AppResponse(); if (t != null) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } asyncRpcResult.complete(result); }); return asyncRpcResult; } catch (InvocationTargetException e) { if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) { logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e); } return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } } 复制代码
主要做了三件事情:
如果服务接口返回的就是CompletableFuture对象,则直接返回( Provider端异步执行 ),否则把服务接口同步返回的结果封装到CompletableFuture返回出去。
private CompletableFuture<Object> wrapWithFuture (Object value, Invocation invocation) { if (RpcContext.getContext().isAsyncStarted()) { return ((AsyncContextImpl)(RpcContext.getContext().getAsyncContext())).getInternalFuture(); } else if (value instanceof CompletableFuture) { return (CompletableFuture<Object>) value; } return CompletableFuture.completedFuture(value); } 复制代码
到这里服务端处理客户请求过程分析完成,接下来看如何将上一步得到的Response响应给客户端。
这里我用一个时序图表示从HeaderExchangeHandler到服务接口最终执行的调用链路:
在上一部分HeaderExchangeHandler.received的接收过程中,我们知道在得到结果后会将其发送给客户端,因此我们从HeaderExchangeChannel.send方法开始分析
@Override public void send(Object message, boolean sent) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!"); } if (message instanceof Request || message instanceof Response || message instanceof String) { channel.send(message, sent); } else { Request request = new Request(); request.setVersion(Version.getProtocolVersion()); request.setTwoWay(false); request.setData(message); channel.send(request, sent); } } 复制代码
这里通过调用netty的api向channel中异步写入结果。
public void send(Object message, boolean sent) throws RemotingException { // whether the channel is closed super.send(message, sent); boolean success = true; int timeout = 0; try { ChannelFuture future = channel.writeAndFlush(message); if (sent) { // wait timeout ms timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.cause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } } 复制代码
接着则会经过NettyServerHandler的处理器以及编码器。NettyServerHandler.write方法将结果发送给客户端,后面的链路比较简单这里不做分析。
1、首先服务端通过Netty接收到请求之后经过解码后派发给业务线程池(DubboServerHandler-ip:port-thread-)。这里是IO线程到业务线程的一次异步。
2、(==如果服务接口返回的是CompletableFuture==)则会异步将CompletableFuture的结果封装到AsyncRpcResult。
3、(==如果服务接口返回的是CompletableFuture==)AsyncRpcResult再异步执行channel.send();
4、channel.send()发送结果也是一次异步。
欲知更多,欢迎访问: silence.work/