Reactor-Netty 版本:
<dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> <version>0.8.10.RELEASE</version> </dependency> 复制代码
示例程序:
public class TcpServerApplication { public static void main(String[] args) { DisposableServer server = TcpServer .create() .host("127.0.0.1") .port(8080) .handle((inbound, outbound) -> inbound.receive().asString().log().then() ) .bindNow(); server.onDispose() .block(); } } public class TcpClientApplication { public static void main(String[] args) throws InterruptedException { TcpClient client = TcpClient.create() // 1 TcpClientConnect .host("127.0.0.1") // 2 TcpClientBootstrap .port(8080) // 3 TcpClientBootstrap .handle((inbound, outbound) -> outbound.sendString(Mono.just("Hello World!")).then()); // 4 TcpClientDoOn client.connectNow(); // 5 Connection Thread.sleep(3000); } } 复制代码
TcpServerApplication 输出结果:
[ INFO] (reactor-tcp-nio-2) onSubscribe(FluxHandle.HandleSubscriber) [ INFO] (reactor-tcp-nio-2) request(unbounded) [ INFO] (reactor-tcp-nio-2) onNext(Hello World!) [ INFO] (reactor-tcp-nio-2) cancel() 复制代码
基本逻辑是:Server 端绑定 8080 端口并监听请求;Client 端连接上端口后发送字符串 Hello World!
;Server 端口收到请求后打印出来。
下面进行具体源码分析。
###TcpClient.create()
public static TcpClient create() { return create(TcpResources.get()); } /** * 最终返回的是 TcpClientConnect * 从入参可知,TcpClientConnect 关注的是连接管理 ConnectionProvider */ public static TcpClient create(ConnectionProvider provider) { return new TcpClientConnect(provider); } public class TcpResources implements ConnectionProvider, LoopResources { final ConnectionProvider defaultProvider; final LoopResources defaultLoops; protected TcpResources(LoopResources defaultLoops, ConnectionProvider defaultProvider) { this.defaultLoops = defaultLoops; this.defaultProvider = defaultProvider; } /** * 该静态方法最终返回的是 TcpResources,包括: * ConnectionProvider: 管理连接 * LoopResources: 管理线程 */ public static TcpResources get() { // 如果不存在,那么创建 TcpResources;否则,直接返回 TcpResources return getOrCreate(tcpResources, null, null, ON_TCP_NEW, "tcp"); } 复制代码
/** * 1. 最终返回的是 TcpClientBootstrap * 2. TcpClientBootstrap 类有一个 bootstrapMapper, 是一个 Function: b -> TcpUtils.updateHost(b, host),关注两个地方:b 是一个 Bootstrap 对象,b 何时生成?Function 接口的 apply 方法什么时候被执行?可以看到 TcpClientBootstrap 类的 configure() 方法同时满足了上面 2 个地方,因此只需要关注该方法何时被调用即可。 */ public final TcpClient host(String host) { Objects.requireNonNull(host, "host"); return bootstrap(b -> TcpUtils.updateHost(b, host)); } public final TcpClient bootstrap(Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) { return new TcpClientBootstrap(this, bootstrapMapper); } final class TcpClientBootstrap extends TcpClientOperator { final Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper; TcpClientBootstrap(TcpClient client, Function<? super Bootstrap, ? extends Bootstrap> bootstrapMapper) { super(client); this.bootstrapMapper = Objects.requireNonNull(bootstrapMapper, "bootstrapMapper"); } @Override public Bootstrap configure() { return Objects.requireNonNull(bootstrapMapper.apply(source.configure()), "bootstrapMapper"); } } 复制代码
/** * 和 host(String host) 方法类似 */ public final TcpClient port(int port) { return bootstrap(b -> TcpUtils.updatePort(b, port)); } 复制代码
/** * 最终返回的是 TcpClientDoOn; * handler 的入参是 BiFunction,并且在 doOnConnected 方法中直接调用了 apply 方法; * BiFunction 返回的 Publisher 也直接调用了 subscribe 方法; * 因此,只需要关注 doOnConnected 方法的入参 Consumer 何时被调用即可 */ public final TcpClient handle(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler) { Objects.requireNonNull(handler, "handler"); return doOnConnected(c -> { if (log.isDebugEnabled()) { log.debug(format(c.channel(), "Handler is being applied: {}"), handler); } Mono.fromDirect(handler.apply((NettyInbound) c, (NettyOutbound) c)) .subscribe(c.disposeSubscriber()); }); } public final TcpClient doOnConnected(Consumer<? super Connection> doOnConnected) { Objects.requireNonNull(doOnConnected, "doOnConnected"); return new TcpClientDoOn(this, null, doOnConnected, null); } final class TcpClientDoOn extends TcpClientOperator implements ConnectionObserver { final Consumer<? super Bootstrap> onConnect; // onConnected 即 handle 方法中调用的 doOnConnected 的 Consumer final Consumer<? super Connection> onConnected; final Consumer<? super Connection> onDisconnected; TcpClientDoOn(TcpClient client, @Nullable Consumer<? super Bootstrap> onConnect, @Nullable Consumer<? super Connection> onConnected, @Nullable Consumer<? super Connection> onDisconnected) { // 继承上一个 TcpClient super(client); this.onConnect = onConnect; this.onConnected = onConnected; this.onDisconnected = onDisconnected; } @Override public Bootstrap configure() { Bootstrap b = source.configure(); ConnectionObserver observer = BootstrapHandlers.connectionObserver(b); // 注意:这里设置了 ConnectionObserver,后面会讲到 BootstrapHandlers.connectionObserver(b, observer.then(this)); return b; } @Override public Mono<? extends Connection> connect(Bootstrap b) { if (onConnect != null) { return source.connect(b) .doOnSubscribe(s -> onConnect.accept(b)); } return source.connect(b); } @Override public void onStateChange(Connection connection, State newState) { // onConnected 在这里被调用,即 connection 状态改变时 if (onConnected != null && newState == State.CONFIGURED) { onConnected.accept(connection); return; } if (onDisconnected != null) { if (newState == State.DISCONNECTING) { connection.onDispose(() -> onDisconnected.accept(connection)); } else if (newState == State.RELEASED) { onDisconnected.accept(connection); } } } } 复制代码
// 设置超时 45s public final Connection connectNow() { return connectNow(Duration.ofSeconds(45)); } public final Connection connectNow(Duration timeout) { Objects.requireNonNull(timeout, "timeout"); try { // 这里 connect() 方法返回的是 Mono return Objects.requireNonNull(connect().block(timeout), "aborted"); } catch (IllegalStateException e) { ... } } // 返回的是 Mono public final Mono<? extends Connection> connect() { ... return connect(b); } // block 方法中直接开始订阅 public T block(Duration timeout) { BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>(); onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber)); return subscriber.blockingGet(timeout.toMillis(), TimeUnit.MILLISECONDS); } final T blockingGet(long timeout, TimeUnit unit) { ... if (getCount() != 0) { try { if (!await(timeout, unit)) { dispose(); // 超时取消订阅 throw new IllegalStateException("Timeout on blocking read for " + timeout + " " + unit); } } catch (InterruptedException ex) { dispose(); RuntimeException re = Exceptions.propagate(ex); //this is ok, as re is always a new non-singleton instance re.addSuppressed(new Exception("#block has been interrupted")); throw re; } } ... } 复制代码
由以上分析可知,在最后的 connectNow() 方法中,才开始真正的订阅执行。下面继续分析 connect 方法。
public final Mono<? extends Connection> connect() { Bootstrap b; try { // 1. 获取默认的 Bootstrap b = configure(); } catch (Throwable t) { Exceptions.throwIfJvmFatal(t); return Mono.error(t); } // 2. connect(b) return connect(b); } public Bootstrap configure() { return DEFAULT_BOOTSTRAP.clone(); } static final Bootstrap DEFAULT_BOOTSTRAP = new Bootstrap().option(ChannelOption.AUTO_READ, false) .remoteAddress(InetSocketAddressUtil.createUnresolved(NetUtil.LOCALHOST.getHostAddress(), DEFAULT_PORT)); 复制代码
继续看 connect(Bootstrap b) 方法:
// 这是一个抽象方法,很多继承类都实现了该方法。根据之前的代码分析,首先调用的应该是 TcpClientDoOn 类 public abstract Mono<? extends Connection> connect(Bootstrap b); // TcpClientDoOn 类 public Mono<? extends Connection> connect(Bootstrap b) { if (onConnect != null) { return source.connect(b) .doOnSubscribe(s -> onConnect.accept(b)); } // 往上传递,source 代表上一个 TcpClient;最终传递到初始的 TcpClientConnect return source.connect(b); } // TcpClientConnect 类 final ConnectionProvider provider; public Mono<? extends Connection> connect(Bootstrap b) { // 填充 b 的属性 if (b.config() .group() == null) { TcpClientRunOn.configure(b, LoopResources.DEFAULT_NATIVE, TcpResources.get(), maxConnections != -1); } // 最终调用这个方法 return provider.acquire(b); } 复制代码
上面讲到 connect 方法最终调用的是 ConnectionProvider 类中的方法。ConnectionProvider 在之前的分析中出现过,即TcpResources.get() 方法返回的 TcpResources 对象中包含这个属性。
// 创建默认的 TcpResources static <T extends TcpResources> T create(@Nullable T previous, @Nullable LoopResources loops, @Nullable ConnectionProvider provider, String name, BiFunction<LoopResources, ConnectionProvider, T> onNew) { if (previous == null) { loops = loops == null ? LoopResources.create("reactor-" + name) : loops; // 创建 ConnectionProvider provider = provider == null ? ConnectionProvider.elastic(name) : provider; } else { loops = loops == null ? previous.defaultLoops : loops; provider = provider == null ? previous.defaultProvider : provider; } return onNew.apply(loops, provider); } } static ConnectionProvider elastic(String name) { // 这里的第 2 个入参 PoolFactory 又是一个函数式接口,因此对象的生成时间点在于何时调用 PoolFactory.newPool 方法; 生成的 ChannelPool 类型为 SimpleChannelPool。 return new PooledConnectionProvider(name, (bootstrap, handler, checker) -> new SimpleChannelPool(bootstrap, handler, checker, true, false)); } final class PooledConnectionProvider implements ConnectionProvider { interface PoolFactory { ChannelPool newPool(Bootstrap b, ChannelPoolHandler handler, ChannelHealthChecker checker); } final ConcurrentMap<PoolKey, Pool> channelPools; final String name; final PoolFactory poolFactory; final int maxConnections; PooledConnectionProvider(String name, PoolFactory poolFactory) { this.name = name; this.poolFactory = poolFactory; this.channelPools = PlatformDependent.newConcurrentHashMap(); this.maxConnections = -1; } ... } 复制代码
现在回到 provider.acquire(b) 方法,可以知道调用的是 PooledConnectionProvider 类中的方法,继续分析:
// Map 结构,每个 (remote address, handler) 组合都有一个连接池 final ConcurrentMap<PoolKey, Pool> channelPools; final String name; // 通过 poolFactory 生成 ChannelPool final PoolFactory poolFactory; final int maxConnections; /** * 主要作用是从连接池中获取连接 * 首先需要找到对应的连接池, 通过 channelPools.get(holder) * 如果不存在,那么创建新的连接池,并加入到 channelPools 中 * 最后调用 disposableAcquire(sink, obs, pool, false); */ public Mono<Connection> acquire(Bootstrap b) { return Mono.create(sink -> { Bootstrap bootstrap = b.clone(); // TODO ChannelOperations.OnSetup opsFactory = BootstrapHandlers.channelOperationFactory(bootstrap); // TODO: 连接生命周期的监听器 ConnectionObserver obs = BootstrapHandlers.connectionObserver(bootstrap); // 懒加载,这里需要设置 bootstrap 的 remote address(ip:port) NewConnectionProvider.convertLazyRemoteAddress(bootstrap); // 每个 (remote address, handler) 都有一个 Pool ChannelHandler handler = bootstrap.config().handler(); PoolKey holder = new PoolKey(bootstrap.config().remoteAddress(), handler != null ? handler.hashCode() : -1); Pool pool; for (; ; ) { // 直接获取 pool = channelPools.get(holder); if (pool != null) { break; } // 不存在则创建新的连接池 pool = new Pool(bootstrap, poolFactory, opsFactory); if (channelPools.putIfAbsent(holder, pool) == null) { if (log.isDebugEnabled()) { log.debug("Creating new client pool [{}] for {}", name, bootstrap.config() .remoteAddress()); } break; } // 关闭多创建的 pool pool.close(); } disposableAcquire(sink, obs, pool, false); }); } Pool(Bootstrap bootstrap, PoolFactory provider, ChannelOperations.OnSetup opsFactory) { this.bootstrap = bootstrap; this.opsFactory = opsFactory; // 创建新的连接池 this.pool = provider.newPool(bootstrap, this, this); this.defaultGroup = bootstrap.config() .group(); HEALTHY = defaultGroup.next() .newSucceededFuture(true); UNHEALTHY = defaultGroup.next() .newSucceededFuture(false); } 复制代码
继续 disposableAcquire 方法,
static void disposableAcquire(MonoSink<Connection> sink, ConnectionObserver obs, Pool pool, boolean retried) { // 获取 Channel Future<Channel> f = pool.acquire(); DisposableAcquire disposableAcquire = new DisposableAcquire(sink, f, pool, obs, retried); // 设置监听器, 该方法最终会调用 disposableAcquire.operationComplete() 方法,operationComplete() 方法会调用 disposableAcquire.run() f.addListener(disposableAcquire); sink.onCancel(disposableAcquire); } final static class DisposableAcquire implements Disposable, GenericFutureListener<Future<Channel>>, ConnectionObserver , Runnable { final Future<Channel> f; final MonoSink<Connection> sink; final Pool pool; final ConnectionObserver obs; final boolean retried; DisposableAcquire(MonoSink<Connection> sink, Future<Channel> future, Pool pool, ConnectionObserver obs, boolean retried) { this.f = future; this.pool = pool; this.sink = sink; this.obs = obs; this.retried = retried; } // 当连接的状态改变时,调用 obs.onStateChange;而这里的 obs 就是我们在 TcpClientDoOn.configure() 方法中设置的;所以一旦连接状态改变,就会调用 TcpClient.handle 中的方法 @Override public void onStateChange(Connection connection, State newState) { if (newState == State.CONFIGURED) { sink.success(connection); } obs.onStateChange(connection, newState); } ... } 复制代码
DisposableAcquire 是一个监听器,监听的是连接,即上面代码中的 Future f = pool.acquire()。那么这个 f 是什么类型呢?之前的代码分析中已经知道 pool 为 SimpleChannelPool 类型。
public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck, boolean releaseHealthCheck, boolean lastRecentUsed) { this.handler = checkNotNull(handler, "handler"); this.healthCheck = checkNotNull(healthCheck, "healthCheck"); this.releaseHealthCheck = releaseHealthCheck; // Clone the original Bootstrap as we want to set our own handler this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone(); this.bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { assert ch.eventLoop().inEventLoop(); // 当新建连接时,会调用该方法 handler.channelCreated(ch); } }); this.lastRecentUsed = lastRecentUsed; } } public void channelCreated(Channel ch) { inactiveConnections.incrementAndGet(); ... // 这里把 ch 包装成了一下, PooledConnection 这个类同时实现了 Connection 以及 ConnectionObserver 接口,也就是说既是一个 channel,又是一个 listener。后续如果 channel 的状态发生改变,会调用 PooledConnection 的 onStateChange 方法。 PooledConnection pooledConnection = new PooledConnection(ch, this); pooledConnection.bind(); Bootstrap bootstrap = this.bootstrap.clone(); BootstrapHandlers.finalizeHandler(bootstrap, opsFactory, pooledConnection); ch.pipeline() .addFirst(bootstrap.config() .handler()); } 复制代码
下面继续看 PooledConnection 的 onStateChange 方法。
public void onStateChange(Connection connection, State newState) { if (newState == State.DISCONNECTING) { ... } // 其他状态走这里 owner().onStateChange(connection, newState); } ConnectionObserver owner() { ConnectionObserver obs; for (;;) { obs = channel.attr(OWNER) .get(); if (obs == null) { obs = new PendingConnectionObserver(); } else { return obs; } // 设置 channel.attr(OWNER) 为新创建的 PendingConnectionObserver // 之后再次调用 own() 方法时直接返回该 PendingConnectionObserver if (channel.attr(OWNER) .compareAndSet(null, obs)) { return obs; } } } final static class PendingConnectionObserver implements ConnectionObserver { final Queue<Pending> pendingQueue = Queues.<Pending>unbounded(4).get(); @Override public void onUncaughtException(Connection connection, Throwable error) { pendingQueue.add(new Pending(connection, error, null)); } @Override public void onStateChange(Connection connection, State newState) { // 把状态变更放入了等待队列,其他什么都不做 pendingQueue.add(new Pending(connection, null, newState)); } static class Pending { final Connection connection; final Throwable error; final State state; Pending(Connection connection, @Nullable Throwable error, @Nullable State state) { this.connection = connection; this.error = error; this.state = state; } } } 复制代码
从上面代码可知,Channel 的状态变更最终放入了一个等待队列,缺少了通知各个监听器的调用。继续回到 DisposableAcquire 类,发现同时实现了 Runnable 接口。
final static class DisposableAcquire implements Disposable, GenericFutureListener<Future<Channel>>, ConnectionObserver , Runnable { final Future<Channel> f; final MonoSink<Connection> sink; final Pool pool; final ConnectionObserver obs; final boolean retried; @Override public void onStateChange(Connection connection, State newState) { if (newState == State.CONFIGURED) { sink.success(connection); } obs.onStateChange(connection, newState); } @Override public void run() { Channel c = f.getNow(); pool.activeConnections.incrementAndGet(); pool.inactiveConnections.decrementAndGet(); // 之前 owner() 方法设置了 PendingConnectionObserver ConnectionObserver current = c.attr(OWNER) .getAndSet(this); if (current instanceof PendingConnectionObserver) { PendingConnectionObserver pending = (PendingConnectionObserver)current; PendingConnectionObserver.Pending p; current = null; // 监听连接关闭 registerClose(c, pool); // 依次处理等待队列中的事件(连接状态变更) while((p = pending.pendingQueue.poll()) != null) { if (p.error != null) { onUncaughtException(p.connection, p.error); } else if (p.state != null) { // 通知各个监听器 onStateChange(p.connection, p.state); } } } else if (current == null) { registerClose(c, pool); } // TODO 什么情况会走这边? if (current != null) { Connection conn = Connection.from(c); if (log.isDebugEnabled()) { log.debug(format(c, "Channel acquired, now {} active connections and {} inactive connections"), pool.activeConnections, pool.inactiveConnections); } obs.onStateChange(conn, State.ACQUIRED); PooledConnection con = conn.as(PooledConnection.class); if (con != null) { ChannelOperations<?, ?> ops = pool.opsFactory.create(con, con, null); if (ops != null) { ops.bind(); obs.onStateChange(ops, State.CONFIGURED); sink.success(ops); } else { //already configured, just forward the connection sink.success(con); } } else { //already bound, just forward the connection sink.success(conn); } return; } //Connected, leave onStateChange forward the event if factory ... if (pool.opsFactory == ChannelOperations.OnSetup.empty()) { sink.success(Connection.from(c)); } } } 复制代码
至此,TcpClient 示例程序中的几行代码差不多就算是分析完了。