网游少不了网络通信,不像写C++时自己造轮子,Java服务器使用Netty。Netty做了很多工作,使编写网络程序变得轻松简单。灵活利用这些基础设施,以实现我们的需求。
其中一个需求是自动重连。自动重连有两种应用场景:
在有多个服务器(比如LoginServer和GameServer等)时,这样就不用考虑服务器启动顺序。有需求就需要有解决方案,其实很简单,Netty已经提供,如下:
ctx.channel().eventLoop().schedule(() -> tryConnect(), reconnectInterval, TimeUnit.SECONDS);
tryConnect是实际执行连接的方法,后面两个参数表示每隔 reconnectInterval 秒重连一次即执行 tryConnect ,而对应上述两种应用场景的分别是connect失败和channel inactive时,详见后面代码。
自动重连解决后,还有一个问题是如何管理连接。Netty使用Channel来抽象一个连接,但实际开发时,通常逻辑上会有一个 会话(Session) 对象用来表示对端,可以在其上添加各种逻辑属性方法等,以及收发网络消息。这样一个Channel就需要对应一个Session,且方便互相索引。
首先考虑如何创建这个Session。
为了方便Netty使用和复用,我抽象了一个TcpServer/TcpClient类分别表示服务器和客户端。理想情况是 TcpServer和TcpClient合并为一个,不同行为由Session来决定。但因为Netty的服务器和客户端分别使用ServerBootstrap和Bootstrap,其分别包含bind和connect,这个想法未能实现。
Session有两种,ListenSession负责监听连接请求,TransmitSession负责传输数据。在实际应用中,有这么一种需求,比如GameServer主动连接LoginServer,这时GameServer即作为client端。在连接成功时,需要GameServer主动发个注册消息给LoginServer,LoginServer籍此得知是哪个服务器组。此时,GameServer可能同时会以Client身份连接另一个服务器比如Gateway而且同样要发消息。那么作为client端主动连接的TransmitSession最好细化,需要包含要连接的主机地址、端口和重连时间等信息,也需要在Active时发送不同消息,而Server端TransmitSession并不需要。所以设计上TransmitSession又分为ClientSession和ServerSession。SeverSession由TcpServer在建立连接时自动创建,而ListenSession和ClientSession则由使用者自行创建并交由TcpServer/TcpClient管理。
接口如下:
public abstract class ListenSession { private boolean working = false; private int localPort = 0; private int relistenInterval = 10; ... public abstract ServerSession createServerSession(); } public abstract class TransmitSession { protected Channel channel = null; protected boolean working = false; ... public abstract void onActive() throws Exception; public abstract void onInactive() throws Exception; public abstract void onException() throws Exception; public abstract void onReceive(Object data) throws Exception; public abstract void send(Object data); } public abstract class ClientSession extends TransmitSession { private String remoteHost = ""; private int remotePort = 0; private int reconnectInterval = 10; ... }
其次考虑如何管理Channel和Session的对应关系。除了使用一个类似HashMap/
综上,TcpServer示例如下:
public class TcpServer { private final AttributeKey<ListenSession> LISTENSESSIONKEY = AttributeKey.valueOf("LISTENSESSIONKEY"); private final AttributeKey<ServerSession> SERVERSESSIONKEY = AttributeKey.valueOf("SERVERSESSIONKEY"); private final ServerBootstrap bootstrap = new ServerBootstrap(); private EventLoopGroup bossGroup = null; private EventLoopGroup workerGroup = null; private ArrayList<ListenSession> listenSessions = new ArrayList<ListenSession>(); ... private void start() { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(4); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("encode", new ObjectEncoder()); pipeline.addLast("decode", new ObjectDecoder(ClassResolvers.cacheDisabled(null))); pipeline.addLast(workerGroup, new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ListenSession listenSession = ctx.channel().parent().attr(LISTENSESSIONKEY).get(); ServerSession serverSession = listenSession.createServerSession(); ctx.channel().attr(SERVERSESSIONKEY).set(serverSession); serverSession.setChannel(ctx.channel()); serverSession.onActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ServerSession serverSession = ctx.channel().attr(SERVERSESSIONKEY).get(); serverSession.onInactive(); } ... } ... private void tryListen(ListenSession listenSession) { if (!listenSession.isWorking()) { return; } final int port = listenSession.getLocalPort(); final int interval = listenSession.getRelistenInterval(); ChannelFuture f = bootstrap.bind(port); f.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { f.channel().attr(LISTENSESSIONKEY).set(listenSession); } else { f.channel().eventLoop().schedule(() -> tryListen(listenSession), interval, TimeUnit.SECONDS); } } }); } }
如果监听失败则隔 interval 秒重试,新连接建立时创建ServerSession关联该Channel。
TcpClient的实现大同小异,不同点在于需要在Channel Inactive时执行重连:
public class TcpClient { private final AttributeKey<ClientSession> SESSIONKEY = AttributeKey.valueOf("SESSIONKEY"); private final Bootstrap bootstrap = new Bootstrap(); private EventLoopGroup workerGroup = null; private ArrayList<ClientSession> clientSessions = new ArrayList<ClientSession>(); ... private void start() { workerGroup = new NioEventLoopGroup(); bootstrap.group(workerGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("encode", new ObjectEncoder()); pipeline.addLast("decode", new ObjectDecoder(ClassResolvers.cacheDisabled(null))); pipeline.addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ClientSession clientSession = ctx.channel().attr(SESSIONKEY).get(); clientSession.setChannel(ctx.channel()); clientSession.onActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ClientSession clientSession = ctx.channel().attr(SESSIONKEY).get(); clientSession.onInactive(); final int interval = clientSession.getReconnectInterval(); ctx.channel().eventLoop().schedule(() -> tryConnect(clientSession), interval, TimeUnit.SECONDS); } ... } ... private void tryConnect(ClientSession clientSession) { if (!clientSession.isWorking()) { return; } final String host = clientSession.getRemoteHost(); final int port = clientSession.getRemotePort(); final int interval = clientSession.getReconnectInterval(); ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { f.channel().attr(SESSIONKEY).set(clientSession); } else { f.channel().eventLoop().schedule(() -> tryConnect(clientSession), interval, TimeUnit.SECONDS); } } }); } }
如果需要监听多个端口或连接多个目的主机,只需要创建多个ClientSession/ListenSession即可。如:
private TcpServer tcpServer = new TcpServer(); private LSServer lsServer = new LSServer(); private LSClient lsClient = new LSClient(); lsServer.setLocalPort(30001); lsServer.setRelistenInterval(10); tcpServer.attach(lsServer); lsClient.setLocalPort(40001); lsClient.setRelistenInterval(10); tcpServer.attach(lsClient);
另外值得一提的是网上很多例子,都会在bind端口后,调用如下代码:
f.channel().closeFuture().sync();
这会阻塞当前线程,其实就是在当前线程做main loop。而实际游戏服务器中,通常main线程做逻辑线程,逻辑线程需要自己tick,也就是自定义main loop,我们在其中执行一些每帧更新的逻辑。所以并不需要上面这种方式。
公共库仓库: JMetazion
服务器示例仓库: JGameDemo
新建QQ交流群:330459037