服务器开发,高并发始终是一个不断追求的目标。若实现这个目标,将所有阻塞操作异步化是必不可少的。执行异步任务,最容易想到的是使用多线程,但线程不是多多益善,相反要控制在一定的数量。如何在多线程环境下,合理地进行任务派发,是这篇文章要讨论的。
就像在概述中描述的,单个游戏服务器进程主要分三层:网络、逻辑和数据存取,下面将具体阐述。
网络层使用Netty,将有若干worker线程收发网络消息,收到消息后如何处理呢?我见过一些服务器程序,直接在收发消息的回调函数里,执行逻辑,甚至进行数据存取,这显然是不合适的。网络线程应该专注于收发消息的操作。首先网络线程一般都是多线程,直接执行业务逻辑,即意味着业务逻辑在多线程环境下,数据同步等就成了必须解决的问题。其次一旦业务逻辑耗时较多,尤其进行数据存取操作,网络线程就会阻塞,这将严重影响网络通信进而降低并发。合适的做法是将网络消息放到逻辑线程队列,所有后续业务逻辑都交由逻辑线程处理。在Java中可以使用concurrent包提供的队列容器,比如:
private ConcurrentLinkedQueue<RequestAction> requestActionQueue = new ConcurrentLinkedQueue<RequestAction>();
逻辑线程依次从队列里取出消息并处理。这里有一个常见的困惑,逻辑线程该使用单线程还是多线程。事情不能一概而论,但大多数情况下,逻辑层是最耗CPU的,多线程并不能提高性能,并会带来数据同步等相关的问题。所以在不知道如何抉择时,单线程是更好的选择。
逻辑单线程,就意味着不应该有任何阻塞操作,而业务逻辑不可避免地要进行数据存取。异步任务派发就是为了解决诸如数据存取等阻塞操作需求,同时需要回调在任务结束后执行后续业务逻辑。先看一个具体示例:
public class RAUserLoginCL extends RequestAction { UserLoginCL req = null; @Override public void setRequest(Message msg) { req = (UserLoginCL) msg; } @Override public void execute() { final String username = req.username; final String password = req.password; TaskLoadUser task = new TaskLoadUser(session, req); User user = AppLogin.getLogicService().getUserManager().getUser(username); if (user != null) { task.setUserData(user.getUserData()); } AppLogin.getLogicService().getTaskManager().pushTask(task); } }
上面这段代码,即是用户登录消息的处理。 UserLoginCL 是登录消息包,网络IO线程收到该消息后,封装成 RequestAction 并push到逻辑线程消息队列。逻辑线程在逻辑循环中取到该消息,并调用 execute() 进行业务处理。这里将处理逻辑简化,简单说就是将请求信息再封装成 TaskLoadUser 任务,因为我们必须先要(从数据库)加载用户数据( UserData )到内存,才能进行密码验证等逻辑。
这个任务依然会首先放到一个当前逻辑线程的任务队列,然后等待执行。
public class TaskManager { private ConcurrentLinkedQueue<Task> taskQueue = new ConcurrentLinkedQueue<Task>(); private ConcurrentHashMap<Integer, Task> taskMap = new ConcurrentHashMap<Integer, Task>(); private AtomicInteger nextSeq = new AtomicInteger(1000); public void tick(long milliseconds) { while (!taskQueue.isEmpty()) { Task task = taskQueue.poll(); task.execute(); putTask(task); } Iterator<Map.Entry<Integer, Task>> iterator = taskMap.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<Integer, Task> entry = iterator.next(); Task task = entry.getValue(); task.tick(milliseconds); if (task.isFinished()) { task.onFinish(); iterator.remove(); continue; } if (task.isTimeout()) { task.onTimeout(); iterator.remove(); continue; } } } ...
看这个任务代码:
public class TaskLoadUser extends Task { private TransmitSession sessionIn = null; private UserLoginCL reqIn = null; private UserLoginLC rspOut = new UserLoginLC(); private volatile UserData userDataOut = null; public TaskLoadUser(TransmitSession sessionIn, UserLoginCL req) { this.sessionIn = sessionIn; this.reqIn = req; } @Override public void execute() { setResult(UserLoginLC.SUCCESS); incDesire(); if (userDataOut != null) { decDesire(); return; } DaoService.execute(() -> { do { final String username = reqIn.username; UserData userData = DbUser.getUserDataByName(username); // 用户不存在 if (userData == null) { setResult(UserLoginLC.ERROR_NOUSER); break; } userDataOut = userData; } while (false); decDesire(); }); } @Override public void onFinish() { if (result != UserLoginLC.SUCCESS) { rollback(); failResponse(); return; } succeedResponse(); } @Override public void onTimeout() { setResult(UserLoginLC.TIMEOUT); rollback(); failResponse(); } public void setUserData(UserData userData) { this.userDataOut = userData; } private void rollback() { // Noting to do } private void succeedResponse() { // 检查密码 if (!reqIn.password.equals(userDataOut.password)) { rspOut.result = UserLoginLC.ERROR_WRONGPWD; sessionIn.send(rspOut); return; } ... rspOut.result = (byte) result; rspOut.userId = userId; rspOut.token = token; sessionIn.send(rspOut); } private void failResponse() { rspOut.result = (byte) result; sessionIn.send(rspOut); } }
上面两段代码是异步任务的核心。逻辑线程取到这个任务后,调用 execute() ,它会通过数据存取层从数据库中加载数据。 execute() 执行后,逻辑线程仍然继续检测这个任务,直到其完成或超时。如果完成会调用 onFinish() 执行后续逻辑,加载可能成功或失败,分别对应 succeedResponse() 和 failResponse() ,如果执行超时则执行 onTimeout() 。
DaoService即对应数据存取层,实现上是一个线程池:
public class DaoService { public static final int IOTHREADNUMBER = Runtime.getRuntime().availableProcessors() * 2; private static final ExecutorService exeSvc = Executors.newFixedThreadPool(IOTHREADNUMBER); public static void execute(Runnable daoOperation) { exeSvc.execute(daoOperation); } }
这就实现了多线程异步数据存取。代码很简短,不了解的可以查阅下Java线程池相关知识。
逻辑线程使用数据存取层异步加载数据,加载完成与否,逻辑线程如何得知呢? incDesire() 和 decDesire() ,就是用来解决这个问题,其类似引用计数。我们可以把所有阻塞操作分解成一个个可以用多线程去异步执行的子任务,先调用 incDesire(N) 增加计数, N 即分解的子任务数,然后使用 DaoService.execute() 去依次并行执行这些子任务。每完成一个子任务调用一次 decDesire(1) 将计数减1,计数为0时,即表明所有异步子任务都已执行完毕。如过指定时间内计数仍然大于0,即超时。还有一个技巧是,每执行完一个子任务,设置一个数据或标志,那么在整个任务失败或超时时,就可以执行 rollback() 回退已完成的子任务。
这里有个地方需要注意下。即使只有一个子任务,也应该先调用一次 incDesire() 。否则,因为初始任务计数为0,异步任务是多线程执行的,可能子任务还在执行中,逻辑线程就已经检测到任务计数为0而错误认为任务已经完成了。
经过上述这些做法,消息收发、业务逻辑和数据存取实现了完全异步化,不再存在阻塞点。需要说明的是,异步任务并非是完美的,异步越多,响应延迟越大。高并发、低延时、易扩展,是一个需要综合权衡的目标。
公共库仓库: JMetazion
服务器示例仓库: JGameDemo
服务器架构QQ交流群:330459037