本文主要研究一下dubbo的ExecutionDispatcher
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionDispatcher.java
public class ExecutionDispatcher implements Dispatcher { public static final String NAME = "execution"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new ExecutionChannelHandler(handler, url); } }
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
public class ExecutionChannelHandler extends WrappedChannelHandler { public ExecutionChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getExecutorService(); if (message instanceof Request) { try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly, // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent // this scenario from happening, but a better solution should be considered later. if (t instanceof RejectedExecutionException) { Request request = (Request) message; if (request.isTwoWay()) { String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") thread pool is exhausted, detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event.", t); } } else { handler.received(channel, message); } } }
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceServerTest.java
public class PerformanceServerTest { private static final Logger logger = LoggerFactory.getLogger(PerformanceServerTest.class); private static ExchangeServer server = null; private static void restartServer(int times, int alive, int sleep) throws Exception { if (server != null && !server.isClosed()) { server.close(); Thread.sleep(100); } for (int i = 0; i < times; i++) { logger.info("restart times:" + i); server = statServer(); if (alive > 0) Thread.sleep(alive); server.close(); if (sleep > 0) Thread.sleep(sleep); } server = statServer(); } private static ExchangeServer statServer() throws Exception { final int port = PerformanceUtils.getIntProperty("port", 9911); final String transporter = PerformanceUtils.getProperty(Constants.TRANSPORTER_KEY, Constants.DEFAULT_TRANSPORTER); final String serialization = PerformanceUtils.getProperty(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION); final String threadpool = PerformanceUtils.getProperty(THREADPOOL_KEY, DEFAULT_THREADPOOL); final int threads = PerformanceUtils.getIntProperty(THREADS_KEY, DEFAULT_THREADS); final int iothreads = PerformanceUtils.getIntProperty(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS); final int buffer = PerformanceUtils.getIntProperty(BUFFER_KEY, DEFAULT_BUFFER_SIZE); final String channelHandler = PerformanceUtils.getProperty(Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME); // Start server ExchangeServer server = Exchangers.bind("exchange://0.0.0.0:" + port + "?transporter=" + transporter + "&serialization=" + serialization + "&threadpool=" + threadpool + "&threads=" + threads + "&iothreads=" + iothreads + "&buffer=" + buffer + "&channel.handler=" + channelHandler, new ExchangeHandlerAdapter() { public String telnet(Channel channel, String message) throws RemotingException { return "echo: " + message + "/r/ntelnet> "; } public CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException { if ("environment".equals(request)) { return CompletableFuture.completedFuture(PerformanceUtils.getEnvironment()); } if ("scene".equals(request)) { List<String> scene = new ArrayList<String>(); scene.add("Transporter: " + transporter); scene.add("Service Threads: " + threads); return CompletableFuture.completedFuture(scene); } return CompletableFuture.completedFuture(request); } }); return server; } private static ExchangeServer statTelnetServer(int port) throws Exception { // Start server ExchangeServer telnetserver = Exchangers.bind("exchange://0.0.0.0:" + port, new ExchangeHandlerAdapter() { public String telnet(Channel channel, String message) throws RemotingException { if (message.equals("help")) { return "support cmd: /r/n/tstart /r/n/tstop /r/n/tshutdown /r/n/trestart times [alive] [sleep] /r/ntelnet>"; } else if (message.equals("stop")) { logger.info("server closed:" + server); server.close(); return "stop server/r/ntelnet>"; } else if (message.startsWith("start")) { try { restartServer(0, 0, 0); } catch (Exception e) { e.printStackTrace(); } return "start server/r/ntelnet>"; } else if (message.startsWith("shutdown")) { System.exit(0); return "start server/r/ntelnet>"; } else if (message.startsWith("channels")) { return "server.getExchangeChannels():" + server.getExchangeChannels().size() + "/r/ntelnet>"; } else if (message.startsWith("restart ")) { //r times [sleep] r 10 or r 10 100 String[] args = message.split(" "); int times = Integer.parseInt(args[1]); int alive = args.length > 2 ? Integer.parseInt(args[2]) : 0; int sleep = args.length > 3 ? Integer.parseInt(args[3]) : 100; try { restartServer(times, alive, sleep); } catch (Exception e) { e.printStackTrace(); } return "restart server,times:" + times + " stop alive time: " + alive + ",sleep time: " + sleep + " usage:r times [alive] [sleep] /r/ntelnet>"; } else { return "echo: " + message + "/r/ntelnet> "; } } }); return telnetserver; } @Test public void testServer() throws Exception { // Read port from property if (PerformanceUtils.getProperty("port", null) == null) { logger.warn("Please set -Dport=9911"); return; } final int port = PerformanceUtils.getIntProperty("port", 9911); final boolean telnet = PerformanceUtils.getBooleanProperty("telnet", true); if (telnet) statTelnetServer(port + 1); server = statServer(); synchronized (PerformanceServerTest.class) { while (true) { try { PerformanceServerTest.class.wait(); } catch (InterruptedException e) { } } } } }
ExecutionChannelHandler继承了WrappedChannelHandler,其received方法判断message是否是Request类型,如果是则创建ChannelEventRunnable放到线程池里头执行,如果不是则直接执行handler.received