public class NBTimeServer { public static void main(String[] args) { try { Selector acceptSelector = SelectorProvider.provider().openSelector(); //创建一个新的server socket,设置为非阻塞模式 ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); // 绑定server sokcet到本机和对应的端口 InetAddress lh = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(lh, 8900); ssc.socket().bind(isa); //通过selector注册server socket,这里即告诉selector,当accept发生的时候,socket会被放在reday队列 SelectionKey acceptKey = ssc.register(acceptSelector, SelectionKey.OP_ACCEPT); int keysAdded = 0; // 当任何一个注册事件发生的时候,select就会返回 while ((keysAdded = acceptSelector.select()) > 0) { // 获取已经准备好的selectorkey Set readyKeys = acceptSelector.selectedKeys(); Iterator i = readyKeys.iterator(); while (i.hasNext()) { SelectionKey sk = (SelectionKey)i.next(); i.remove(); ServerSocketChannel nextReady = (ServerSocketChannel)sk.channel(); Socket s = nextReady.accept().socket(); PrintWriter out = new PrintWriter(s.getOutputStream(), true); Date now = new Date(); out.println(now); out.close(); } } } catch(Exception e) { e.printStackTrace(); } } } 复制代码
SelectorProvider提供的所有provider都是同一个对象。如果没有,它会通过 AccessController.doPrivileged
来给获取provider的代码最高的权限,执行逻辑是:
以solaris的实现为例,创建的provider会根据操作系统的版本和操作系统的名字分别创建不同的实例
if ("SunOS".equals(osname)) { return new sun.nio.ch.DevPollSelectorProvider(); } if("Linux".equals(osname)){ if (major > 2 || (major == 2 && minor >= 6)) { return new sun.nio.ch.EPollSelectorProvider(); } } return new sun.nio.ch.PollSelectorProvider(); //默认返回 复制代码
代码存在缩减,只取核心
类之间的关系如下
下面只关注Epoll和Poll
拿到provider之后,开始执行openSelector,获取真正的selector。
对于poll,返回的实例是PollSelectorImpl,对于Epoll返回的实例则是EpollSelectorImpl。
file descriptor :unix设计哲学就是一切都是文件,它可能是一个网络连接、一个终端等等。它本身就是一个数值,在系统中会维护文件描述符和它对应文件的一个指针,从而找到对应的文件操作
long pipeFds = IOUtil.makePipe(false); fd0 = (int) (pipeFds >>> 32); // >>> 表示无符号右移,最高位补0,这里即获取读文件描述符 fd1 = (int) pipeFds; //截掉了高位,存储的是读文件描述符 复制代码
IOUtil针对不同的操作系统有不同的实现,以solaris为例,它的实现在IOUtil.c中,主要实现即通过Linux pipe方法和Linux fcntl方法 (代码有删减)
int fd[2]; if (pipe(fd) < 0) // 获取读和写的文件符 if ((configureBlocking(fd[0], JNI_FALSE) < 0) //标注为非阻塞 || (configureBlocking(fd[1], JNI_FALSE) < 0)) return ((jlong) fd[0] << 32) | (jlong) fd[1]; //读的文件描述符放在高位,写的文件描述符放在低位 复制代码
configureBlocking本身的实现在IOUtil.c中
static int configureBlocking(int fd, jboolean blocking) //设置为非阻塞状态 { int flags = fcntl(fd, F_GETFL); int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK); return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags); } 复制代码
pipe实际是创建了一个进程间通信的单向数据管道,参数中的fd[0]表示管道读取端的结尾,fd[1]表示管道写端的结尾;fcntl则主要是根据第二个参数,如源码中的F_GETFL和F_SETFL,对第一个参数执行对应的操作;
pollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); 复制代码
int epfd = (*epoll_create_func)(256);
而epoll_create_func在Java_sun_nio_ch_EPollArrayWrapper_init执行的时候已经是执行了初始化,对应的是Linux epoll_create ,返回既是一个epoll实例,它实质也是一个文件描述符 epoll_create_func = (epoll_create_t) dlsym(RTLD_DEFAULT, "epoll_create"); epoll_ctl_func = (epoll_ctl_t) dlsym(RTLD_DEFAULT, "epoll_ctl"); epoll_wait_func = (epoll_wait_t) dlsym(RTLD_DEFAULT, "epoll_wait"); 复制代码
NUM_EPOLLEVENTS * SIZE_EPOLLEVENT
,其中的NUM_EPOLLEVENTS则是去的文件描述符限制和8192相比的最小值 Math.min(fdLimit(), 8192);
详见 Linux getrlimit , 实质是AllocatedNativeObject
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
,即把fd0注册到epfd上,将epfd上的EPOLLIN事件关联到fd0上,详见Linux epoll_ctl pollWrapper = new PollArrayWrapper(INIT_CAP); //初始为10 pollWrapper.initInterrupt(fd0, fd1); 复制代码
pollArray:它的大小为 (10+1)*SIZE_POLLFD
(SIZE_POLLFD取值为8), 实质是AllocatedNativeObject
NativeObject是用来操作本地内存的一个代理,所有的操作通过Unsafe来实现,它本身是一个单例
它还是会去获取系统级别的provider,由于已经在拿selector的时候初始化,不再新建。同样会通过PollSelectorProvider或者是EPollSelectorProvider来开启服务端的socket的channel,而二者的实现均是通过父类SelectorProviderImpl,创建一个ServerSocketChannelImpl实例
channel:代表与硬件、文件、网络socket或者是程序组件等能够进行一些I/O操作(读和写)的实体的连接
Closeable:是关闭与流相关的系统资源
AutoCloseable:从1.7开始的支持的语法糖try-with-resources结构,实现自动关闭资源
SelectableChannel:支持通过selector复用的Channel,提供对channel的注册,返回对应的SelectionKey,可以工作在阻塞(默认)和非阻塞模式下
NetworkChannel:对应网络socket的channel,提供将socket绑定到本机地址的bind方法
fd是使用IOUtil.newFD创建,创建过程如下:
Native方法 Net.socket0
Net.scoket0 方法对应的实现为 Net.c中的Java_sun_nio_ch_Net_socket0
,从头文件的引入 #include <sys/socket.h> 可以看到,socket0的内部很多实现都依赖于操作系统本身,操作系统不一样,就会有不同的调用结果。关键实现如下
fd = socket(domain, type, 0); setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg,sizeof(arg)) 复制代码
在IOUtil.c中存在方法 Java_sun_nio_ch_IOUtil_setfdVal,它就是调用JNI的方法将获取的值存入到java对象FileDescriptor中取
FileDescriptor的实例是用来表示1个打开的文件,或者是一个打开的socket或者类似的字节源
fdVal的赋值则是使用创建好的fd调用JNI中的 (*env)->GetIntField(env, fdo, fd_fdID);
实现
本质是通过ServerSocketAdaptor创建一个实例返回
ServerSocket本质是一个对SocketImpl的包装类,相关的请求处理都是由impl来处理
SocksSocketImpl是按照SOCKS协议的TCP socket实现,而PlainSocketImpl则是一个‘平凡’的socket实现,它不对防火墙或者代理做任何的突破。
SocketImpl是所有实现socket的父抽象类,用来创建客户端和服务端的socket
Socket类是两台机器之间通信的端点,端点(endpoint)指的是 服务IP和它的端口,它的实际操作还是由SocketImpl来实现。
SOCKS4(SOCKets缩写)是一个网络协议,它主要负责在防火墙上中继TCP会话,以便应用用户能够透过防火墙进行访问。它主要定义了两个操作:CONNECT和BIND。
SOCKS5相对于SOCKS4做了功能扩展,支持UDP、IPV6、鉴定的支持
ServerSocketChannelImpl的bind方法。
public InetSocketAddress(int port) { this(InetAddress.anyLocalAddress(), port); } 复制代码
//将传入的java对象的InetAddress和端口转换为结构体:sockaddr_in或者sockaddr_in6 NET_InetAddressToSockaddr(env, iao, port, (struct sockaddr *)&sa, &sa_len, preferIPv6); rv = NET_Bind(fdval(env, fdo), (struct sockaddr *)&sa, sa_len); 复制代码
bind对于windows系统和linux系统有不同的实现,以Linux为例,它实际执行的就是Linux bind,所做的操作就是把指定的地址(SocketAddress)分配给socket文件描述符, 对于Hello world的实现来说就是它的字段fd
注册事件在实质上就是维护新建channel的文件描述符和SelectionKey的关系,就实现上而言, Poll用的是数组,Epoll用的是HashMap
合法的操作为SelectionKey.OP_READ、SelectionKey.OP_WRITE、SelectionKey.OP_CONNECT
根据是Poll还是Epoll有不同的实现。select的实质就是去获取poll和epoll的结果,然后更新自身维护的selector结构对应的状态
在非阻塞模式下,accept会立马返回
Linux accept实际上就是从监听状态的socketfd的连接等待队列中获取第一个连接请求,然后新建一个socket返回。
这里新建的SocketChannelImpl,而之前使用的是ServerSocketChannelImpl。区别在于 SocketChannelImpl支持读写数据,而ServerSocketChannelImpl则更多的用于等待连接的到来,充当服务端
接下来,获取的socket方式同第3步中新建socket
outpusStream通过 Channels.newOutputStream
新建,它会持有accept处新建的SocketChannelImpl,它实际上就是新建OutputStream并重写它的write方法
printWriter的print经过BufferWriter到OutputStreamWriter,再到它的StreamEncoder到它的方法 writeBytes
执行 out.write(bb.array(), bb.arrayOffset() + pos, rem);
即socket中重写的write方法,它的主要实现是调用 Channels.writeFully
,然后调用Channel自己的 SocketChannelImpl.write
方法,它核心在于 n = IOUtil.write(fd, buf, -1, nd, writeLock);
static int write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd, Object lock) throws IOException { //判断是否是直接内存 if (src instanceof DirectBuffer) return writeFromNativeBuffer(fd, src, position, nd, lock); // Substitute a native buffer int pos = src.position(); int lim = src.limit(); assert (pos <= lim); int rem = (pos <= lim ? lim - pos : 0); //申请一个DirectBuffer,即通过ByteBuffer.allocateDirect来申请直接内存; ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); try { bb.put(src); bb.flip(); // Do not update src until we see how many bytes were written src.position(pos); //写数据,实际上执行的是FileDispatcherImpl的Native方法writ0 int n = writeFromNativeBuffer(fd, bb, position, nd, lock); if (n > 0) { // now update src src.position(pos + n); } return n; } finally { Util.offerFirstTemporaryDirectBuffer(bb); } } 复制代码
可以看到这里有一段从JVM的Buffer拷贝到NativeBuffer中,也就是说 NIO的数据写肯定是从直接内存发送出去的
,如果本身不是直接内存则会经过一次内存拷贝。
JNIEXPORT jint JNICALL Java_sun_nio_ch_FileDispatcherImpl_write0(JNIEnv *env, jclass clazz, jobject fdo, jlong address, jint len) { jint fd = fdval(env, fdo); void *buf = (void *)jlong_to_ptr(address); return convertReturnVal(env, write(fd, buf, len), JNI_FALSE); } 复制代码
最终的写可以看到用的就是Linux write
SelectionKey会持有各自操作系统下的SelectorImpl对象,对于PollSelectorImpl的channel注册内部实际是通过数组存储了文件描述符和Selector的关系,EpollSelectorImpl的channel注册则是内部用的HashMap存储文件描述符和Selector的关系。当读取到事件的时候,就通过轮询的方式拿到所有准备好的事件返回,一个个的处理
它依赖于操作系统本身,对于windows/mac/linux均有不同的版本实现。这里以Liunx为例,它实际上就是个使用Linux的一系列方法,比如 read/write/accept等,操作文件描述符
socket本身只是获取通信的服务和端口的一个实现类,对于服务的连接,是通过自身的属性来处理。而这个属性impl实际也就是对SOCKS协议的实现。来提供连接和绑定服务。
public class TimeServer { private static Charset charset = Charset.forName("US-ASCII"); private static CharsetEncoder encoder = charset.newEncoder(); public static void main(String[] args) throws IOException { ServerSocketChannel ssc = ServerSocketChannel.open(); InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(), 8013); ssc.socket().bind(isa); for (;;) { SocketChannel sc = ssc.accept(); try { String now = new Date().toString(); sc.write(encoder.encode(CharBuffer.wrap(now + "/r/n"))); System.out.println(sc.socket().getInetAddress() + " : " + now); sc.close(); } finally { // Make sure we close the channel (and hence the socket) sc.close(); } } } } 复制代码
它与NIO的区别主要区别在于在于,NIO通过configureBlocking设置为false,会把它自身的fd设置为非阻塞,而阻塞IO则没有,默认阻塞。
public class TimeQuery { // Charset and decoder for US-ASCII private static Charset charset = Charset.forName("US-ASCII"); private static CharsetDecoder decoder = charset.newDecoder(); // Direct byte buffer for reading private static ByteBuffer dbuf = ByteBuffer.allocateDirect(1024); public static void main(String[] args) { try { InetSocketAddress isa = new InetSocketAddress(InetAddress.getLocalHost(), 8900); SocketChannel sc = null; try { // Connect sc = SocketChannel.open(); sc.connect(isa); // Read the time from the remote host. For simplicity we assume // that the time comes back to us in a single packet, so that we // only need to read once. dbuf.clear(); sc.read(dbuf); // Print the remote address and the received time dbuf.flip(); CharBuffer cb = decoder.decode(dbuf); System.out.print(isa + " : " + cb); } finally { // Make sure we close the channel (and hence the socket) if (sc != null) sc.close(); } } catch (IOException x) { System.err.println( x); } } } 复制代码
真实的执行实际上也就是Linux connect和 Linux read
jdk 7 源码地址
NIO服务端 源码地址
IO服务端 源码地址
客户端 源码地址
如何读open jdk native 源码
java JNI简介