毕玄老师发表了一篇公众号文章:来测试下你的Java编程能力,本系列文章为其中问题的个人解答。
第一个问题:
基于BIO实现的Server端,当建立了100个连接时,会有多少个线程?如果基于NIO,又会是多少个线程? 为什么?
说实话,如果面试被问到这个问题,也不敢保证能完全答对。那么就回炉重造一下吧。
package com.xetlab.javatest.question1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.nio.charset.Charset; public class ServerMain1 { private static final Logger logger = LoggerFactory.getLogger(ServerMain1.class); public static void main(String[] args) { logger.info("0.主线程启动"); try { //服务端初始化,在9999端口监听 ServerSocket serverSocket = new ServerSocket(9999); while (true) { //等待客户端连接,如果没有连接就阻塞当前线程 Socket clientSocket = serverSocket.accept(); logger.info("1.客户端 {}:{} 已连接", clientSocket.getInetAddress().getHostAddress(), clientSocket.getPort()); //向客户端发消息 logger.info("2.向客户端发欢迎消息"); clientSocket.getOutputStream().write("你好,请报上名来!".getBytes("UTF8")); clientSocket.getOutputStream().flush(); //从客户端读取消息 StringBuffer msgBuf = new StringBuffer(); byte[] byteBuf = new byte[1024]; clientSocket.getInputStream().read(byteBuf); msgBuf.append(new String(byteBuf, "UTF8")); logger.info("5.收到客户端消息:{}", msgBuf); //向客户端发消息 logger.info("6.向客户端发退出消息"); clientSocket.getOutputStream().write(String.format("退下,%s!", msgBuf.toString()).getBytes(Charset.forName("UTF8"))); clientSocket.getOutputStream().flush(); } } catch (IOException e) { logger.error("server error", e); System.exit(1); } } }
package com.xetlab.javatest.question1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.Socket; public class ClientMain1 { private static final Logger logger = LoggerFactory.getLogger(ClientMain1.class); public static void main(String[] args) { try { Socket socket = new Socket("127.0.0.1", 9999); while (true) { StringBuffer msgBuf = new StringBuffer(); byte[] byteBuf = new byte[1024]; socket.getInputStream().read(byteBuf); msgBuf.append(new String(byteBuf, "UTF8")); logger.info("3.收到服务端消息:{}", msgBuf); logger.info("4.向服务端发送名字消息"); socket.getOutputStream().write("Mr Nobody.".getBytes("UTF8")); socket.getOutputStream().flush(); msgBuf = new StringBuffer(); byteBuf = new byte[1024]; socket.getInputStream().read(byteBuf); msgBuf.append(new String(byteBuf, "UTF8")); logger.info("7.收到服务端消息:{}", msgBuf); if (msgBuf.toString().startsWith("退下")) { socket.close(); logger.info("8.客户端退出"); break; } } } catch (IOException e) { logger.error("client error", e); System.exit(1); } } }
2019-03-23 23:36:39,480 [INFO] com.xetlab.javatest.question1.ServerMain1 [main] - 0.主线程启动 2019-03-23 23:36:44,883 [INFO] com.xetlab.javatest.question1.ServerMain1 [main] - 1.客户端 127.0.0.1:7473 已连接 2019-03-23 23:36:44,884 [INFO] com.xetlab.javatest.question1.ServerMain1 [main] - 2.向客户端发欢迎消息 2019-03-23 23:36:44,888 [INFO] com.xetlab.javatest.question1.ClientMain1 [main] - 3.收到服务端消息:你好,请报上名来! 2019-03-23 23:36:44,891 [INFO] com.xetlab.javatest.question1.ClientMain1 [main] - 4.向服务端发送名字消息 2019-03-23 23:36:44,891 [INFO] com.xetlab.javatest.question1.ServerMain1 [main] - 5.收到客户端消息:Mr Nobody. 2019-03-23 23:36:44,892 [INFO] com.xetlab.javatest.question1.ServerMain1 [main] - 6.向客户端发退出消息 2019-03-23 23:36:44,892 [INFO] com.xetlab.javatest.question1.ClientMain1 [main] - 7.收到服务端消息:退下,Mr Nobody. 2019-03-23 23:36:44,892 [INFO] com.xetlab.javatest.question1.ClientMain1 [main] - 8.客户端退出
如果我们按上面的方式实现Server端,答案会是:BIO Server端,一个线程就够了。我们来分析下这种实现方式的优缺点。
一次只能服务一个客户端,别的客户端只能等待,具体表现是:如果同时启动两个慢客户端,那么两个客户端的底层TCP连接是建立好的,先启动的客户端会先得到服务,但后启动的那个客户端会在读取数据时一直被阻塞,如下所示(windows):
netstat -ano|find “9999”
TCP 127.0.0.1:9999 127.0.0.1:29712 ESTABLISHED 16996 TCP 127.0.0.1:9999 127.0.0.1:29740 ESTABLISHED 16996
服务端输出
2019-03-24 10:47:48,881 [INFO] com.xetlab.javatest.question1.ServerMain1 [main] - 0.主线程启动 2019-03-24 10:47:52,549 [INFO] com.xetlab.javatest.question1.ServerMain1 [main] - 1.客户端 127.0.0.1:29712 已连接 2019-03-24 10:47:52,550 [INFO] com.xetlab.javatest.question1.ServerMain1 [main] - 2.向客户端发欢迎消息
客户端1收到消息后,休眠
2019-03-24 10:47:52,555 [INFO] com.xetlab.javatest.question1.ClientMain1 [main] - 3.收到服务端消息:你好,请报上名来!
客户端2
//客户端2在此处被阻塞 socket.getInputStream().read(byteBuf);
因此这种方式实现的Server端,只能用于入门示例,不能用于生产环境。另外BIO全称是Blocking IO,即阻塞式IO,这个BIO体现在哪呢?体现在这两处:
//1.当客户端没发消息过来时,此时服务端读取消息时就会阻塞 //2.当读取的数据较多时,线程没有阻塞,但是读取数据的耗时会挺久 clientSocket.getInputStream().read(bytes); //当给客户端发送的数据较多时,这里线程没有阻塞,但是写数据的耗时会挺久 clientSocket.getOutputStream().write(bytes);
package com.xetlab.javatest.question1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.nio.charset.Charset; public class ServerMain2 { private static final Logger logger = LoggerFactory.getLogger(ServerMain2.class); public static void main(String[] args) { logger.info("0.主线程启动"); try { //服务端初始化,在9999端口监听 ServerSocket serverSocket = new ServerSocket(9999); while (true) { //等待客户端连接,如果没有连接就阻塞当前线程 Socket clientSocket = serverSocket.accept(); String clientId = String.format("%s:%s", clientSocket.getInetAddress().getHostAddress(), clientSocket.getPort()); logger.info("1.客户端 {} 已连接", clientId); new Thread(new Handler(clientSocket), clientId).start(); } } catch (IOException e) { logger.error("server error", e); System.exit(1); } } static class Handler implements Runnable { private Socket clientSocket; public Handler(Socket clientSocket) { this.clientSocket = clientSocket; } public void run() { try { //向客户端发消息 logger.info("2.向客户端发欢迎消息"); clientSocket.getOutputStream().write("你好,请报上名来!".getBytes("UTF8")); clientSocket.getOutputStream().flush(); //从客户端读取消息 StringBuffer msgBuf = new StringBuffer(); byte[] byteBuf = new byte[1024]; clientSocket.getInputStream().read(byteBuf); msgBuf.append(new String(byteBuf, "UTF8")); logger.info("5.收到客户端消息:{}", msgBuf); //向客户端发消息 logger.info("6.向客户端发退出消息"); clientSocket.getOutputStream().write(String.format("退下,%s!", msgBuf.toString()).getBytes(Charset.forName("UTF8"))); clientSocket.getOutputStream().flush(); } catch (IOException e) { logger.error("io error", e); } } } }
客户端保持不变,只是把其中一个在回复名字前故意休眠很久,另一个保持正常。此时各端的输出如下:
服务端
2019-03-24 12:50:56,514 [INFO] com.xetlab.javatest.question1.ServerMain2 [main] - 0.主线程启动 2019-03-24 12:51:02,613 [INFO] com.xetlab.javatest.question1.ServerMain2 [main] - 1.客户端 127.0.0.1:44334 已连接 2019-03-24 12:51:02,613 [INFO] com.xetlab.javatest.question1.ServerMain2 [127.0.0.1:44334] - 2.向客户端发欢迎消息 2019-03-24 12:51:08,331 [INFO] com.xetlab.javatest.question1.ServerMain2 [main] - 1.客户端 127.0.0.1:44347 已连接 2019-03-24 12:51:08,331 [INFO] com.xetlab.javatest.question1.ServerMain2 [127.0.0.1:44347] - 2.向客户端发欢迎消息 2019-03-24 12:51:08,339 [INFO] com.xetlab.javatest.question1.ServerMain2 [127.0.0.1:44347] - 5.收到客户端消息:Mr Nobody. 2019-03-24 12:51:08,339 [INFO] com.xetlab.javatest.question1.ServerMain2 [127.0.0.1:44347] - 6.向客户端发退出消息
慢客户端先连接,收到消息后,休眠
2019-03-24 12:51:02,619 [INFO] com.xetlab.javatest.question1.ClientMain1 [main] - 3.收到服务端消息:你好,请报上名来!
正常客户端后连接
2019-03-24 12:51:08,336 [INFO] com.xetlab.javatest.question1.ClientMain2 [main] - 3.收到服务端消息:你好,请报上名来! 2019-03-24 12:51:08,338 [INFO] com.xetlab.javatest.question1.ClientMain2 [main] - 4.向服务端发送名字消息 2019-03-24 12:51:08,339 [INFO] com.xetlab.javatest.question1.ClientMain2 [main] - 7.收到服务端消息:退下,Mr Nobody. 2019-03-24 12:51:08,340 [INFO] com.xetlab.javatest.question1.ClientMain2 [main] - 8.客户端退出
可以看到,引入多线程后,每个线程服务一个客户端,可以同时服务100个连接了,如果这样实现Server端,IO还是BIO,线程数需要101个,一个线程用于接受客户端连接,100个线程用于服务客户端。同样来分析下优缺点。
编写多线程任务时,可以把执行任务的逻辑使用Runnable接口来实现,这样任务可以直接放到Thread线程对象里执行,也可以提交到线程池中去执行。
有没有可能同时具备方式一和二的优点呢,具体来说就是,一个线程同时服务N个客户端?Yes,NIO就可以!那什么是NIO?NIO即New IO,更多时候我们是看成Non blocking IO,就是非阻塞IO。
具体NIO如何实现一个线程服务N个客户端,在深入代码细节前,我们先理一理。
回顾上面的BIO实现,我们知道有这几个点会阻塞或者响应慢:
由于会阻塞或者响应慢BIO用了不同的线程去分别处理,如果可以只由一个线程去负责检查是否有客户端连接,客户端的数据是否可读,是否可以往客户端写数据,当有对应的事件已经准备好时,再由于当前线程去处理相应的任务,那就完美了。
NIO里有个对象是Selector,这个Selector就是用于注册事件,并检查事件是否已准备好。现在来看下具体代码。
package com.xetlab.javatest.question1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; public class ServerMain3 { private static final Logger logger = LoggerFactory.getLogger(ServerMain3.class); public static void main(String[] args) { logger.info("0.主线程启动"); try { Map<SocketChannel, Queue> msgQueueMap = new ConcurrentHashMap<SocketChannel, Queue>(); //创建channel管理器,用于注册channel的事件 Selector selector = Selector.open(); //服务端初始化,在9999端口监听,保留BIO初始化方式用于参照 //ServerSocket serverSocket = new ServerSocket(9999); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //设置非阻塞 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(9999)); //注册可accept事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { //NIO仅有的一个阻塞方法,当有注册的事件产生时,才会返回 selector.select(); //产生事件的事件源列表 Set<SelectionKey> readyKeys = selector.selectedKeys(); Iterator<SelectionKey> keyItr = readyKeys.iterator(); while (keyItr.hasNext()) { SelectionKey readyKey = keyItr.next(); keyItr.remove(); if (readyKey.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) readyKey.channel(); //接受客户端 SocketChannel clientChannel = serverChannel.accept(); String clientId = String.format("%s:%s", clientChannel.socket().getInetAddress().getHostAddress(), clientChannel.socket().getPort()); logger.info("1.客户端 {} 已连接", clientId); msgQueueMap.put(clientChannel, new ArrayBlockingQueue(100)); logger.info("2.向客户端发欢迎消息"); //NIO发消息先放到消息队列里,等可写时再发 msgQueueMap.get(clientChannel).add("你好,请报上名来!"); //设置非阻塞 clientChannel.configureBlocking(false); //注册可读和可写事件 clientChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } else if (readyKey.isReadable()) { SocketChannel clientChannel = (SocketChannel) readyKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int bytesRead = clientChannel.read(byteBuffer); if (bytesRead <= 0) { continue; } byteBuffer.flip(); byte[] msgByte = new byte[bytesRead]; byteBuffer.get(msgByte); final String clientName = new String(msgByte, "UTF8"); logger.info("5.收到客户端消息:{}", clientName); msgQueueMap.get(clientChannel).add(String.format("退下!%s", clientName)); } else if (readyKey.isWritable()) { SocketChannel clientChannel = (SocketChannel) readyKey.channel(); Queue<String> msgQueue = msgQueueMap.get(clientChannel); String msg = msgQueue.poll(); if (msg != null) { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.put(msg.getBytes("UTF8")); byteBuffer.flip(); clientChannel.write(byteBuffer); logger.info("6.向客户端发退出消息"); } } } } } catch (IOException e) { logger.error("server error", e); System.exit(1); } } }
上面我们用NIO实现了和原来BIO一模一样的逻辑,NIO确实是只用一个线程高效的解决了问题,但是代码看起来复杂多了。不过我们用伪代码总结一下,会简单一点:
NIO中,由于是单线程,不能在连接就绪,读写就绪之后的事件处理逻辑执行耗时操作,那样将会让服务性能急剧下降,正确方法应该是把耗时的逻辑放在独立的线程中去执行,或放到专门的worker线程池中执行。
https://github.com/huangyemin/javatest https://gitee.com/huangyemin/javatest