Epoll 是Linux内核的高性能、可扩展的I/O事件通知机制。
在linux2.5.44首次引入epoll,它设计的目的旨在取代既有的select、poll系统函数,让需要大量 操作文件描述符 的程序得以发挥更优异的性能(wikipedia example: 旧有的系统函数所花费的时间复杂度为O(n), epoll的时间复杂度 O(log n) )。epoll实现的功能与poll类似,都是监听多个文件描述符上的事件。
epoll底层是由 可配置的操作系统内核对象 建构而成,并以文件描述符(file descriptor)的形式呈现于 用户空间 (from wikipedia: 在操作系统中,虚拟内存通常会被分成 用户空间,与核心空间这两个区段 。这是存储器保护机制中的一环。 内核* 、核心扩展(kernel extensions)、以及驱动程序 *,运行在 核心空间 上。而其他的应用程序,则运行在用户空间上。所有运行在用户空间的应用程序,都被统称为用户级(userland))。
它是一个用来 管理 软件发出的 数据I/O 的一个程序,并将数据交由CPU和电脑其他电子组件处理,但是直接对硬件操作是非常复杂的,通常内核提供一种硬件抽象的方法来完成(由内核决定一个程序在什么时候对某部分硬件操作多长时间),通过这些方法来完成进程间通信和系统调用。
宏内核:
宏内核简单来说,首先定义了一个高阶的抽象接口,叫系统调用(System call))来实现操作系统的功能,例如进程管理,文件系统,和存储管理等等,这些功能由多个运行在内核态的程序来完成。
微内核:
微内核结构由硬件抽象层和系统调用组成;包括了创建一个系统必需的几个部分;如线程管理,地址空间和进程间通信等。微核的目标是将 系统服务的实现 和 系统的基本操作 规则 分离 开来。
linux就是使用的宏内核。因为它能够在运行时将模块调入执行,使扩充内核的功能变得更简单。
epoll 通过使用 红黑树 (RB-tree) 搜索 被监视的 文件描述符 (file descriptor)。
在 epoll 实例上 注册事件 时,epoll 会将该 事件添加到 epoll 实例的 红黑树 上并 注册一个回调函数 ,当 事件发生时 会将事件 添加到就绪链表 中。
int epoll_create(int size); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); 复制代码
向内核申请空间, 创建一个epoll的句柄 ,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。在最初的实现中,调用者通过 size 参数告知内核需要监听的文件描述符数量。如果监听的文件描述符数量超过 size, 则内核会自动扩容。而现在 size 已经没有这种语义了,但是调用者 调用时 size 依然必须大于 0,以保证后向兼容性 。需要注意的是,当 创建好epoll句柄 后,它就是会 占用一个fd值 ,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的。
向 epfd 对应的内核epoll 实例添加、修改或删除对 fd 上事件 event 的监听。 op 可以为 EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL 分别对应的是添加新的 事件 ,修改文件描述符上监听的事件类型,从实例上删除一个事件。如果 event 的 events 属性设置了 EPOLLET flag ,那么监听该事件的方式是 边缘触发 。
events可以是以下几个宏的集合:
例如:
struct epoll_event ev; //设置与要处理的事件相关的文件描述符 ev.data.fd=listenfd; //设置要处理的事件类型 ev.events=EPOLLIN|EPOLLET; //注册epoll事件 epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev); 复制代码
Linux-2.6.19又引入了可以屏蔽指定信号的epoll_wait: epoll_pwait
当 timeout 为 0 时,epoll_wait 永远会 立即返回 。而 timeout 为 -1 时,epoll_wait 会 一直阻塞 直到任一已注册的事件变为就绪。当 timeout 为一正整数 时,epoll 会阻塞直到 计时结束 或已注册的 事件变为就绪 。因为内核调度延迟,阻塞的时间可能会略微超过 timeout (毫秒级)。
epoll文件描述符用完后,直接用 close 关闭,并且会 自动 从被侦听的文件描述符集合中删除
说了这么多原理,脑壳怕嗡嗡的吧,来看看实战清醒下~
如上知道:每次添加/修改/删除被侦听文件描述符都需要调用epoll_ctl,所以要尽量少地调用epoll_ctl,防止其所引来的开销抵消其带来的好处。有的时候,应用中可能存在大量的短连接(比如说Web服务器),epoll_ctl将被频繁地调用,可能成为这个系统的瓶颈。
传统的 select以及poll 的效率会因为在线人数的线形递增而导致呈二次乃至三次方的下降,这些直接导致了网络服务器可以支持的人数有了个比较明显的限制。这是因为他们有限的文件描述符和遍历所有的fd所带来的低效。
重点哦~
当你拥有一个 很大的socket集合 ,不过由于 网络延时 ,任一时间只有 部分 的 socket是“活跃”的 ,但是select/poll 每次调用都会线性扫描全部的集合 ,导致效率呈现 线性下降 。 epoll 不存在这个问题,它 只会对“活跃”的socket 进行操作---这是因为在内核实现中epoll是根据 每个fd上面的callback 函数实现的。那么, 只有“活跃”的socket才会主动的去调用 callback函数 ,其他idle(空闲)状态socket则不会,在这点上,epoll实现了一个“伪”AIO,因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上都是活跃的---比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。
int epfd = epoll_create(POLL_SIZE); struct epoll_event ev; struct epoll_event *events = NULL; nfds = epoll_wait(epfd, events, 20, 500); { for (n = 0; n < nfds; ++n) { if (events[n].data.fd == listener) { //如果是主socket的事件的话,则表示 //有新连接进入了,进行新连接的处理。 client = accept(listener, (structsockaddr *)&local, &addrlen); if (client < 0) { perror("accept"); continue; } setnonblocking(client); //将新连接置于非阻塞模式 ev.events = EPOLLIN | EPOLLET; //并且将新连接也加入EPOLL的监听队列。 //注意,这里的参数EPOLLIN|EPOLLET并没有设置对写socket的监听, //如果有写操作的话,这个时候epoll是不会返回事件的,如果要对写操作 //也监听的话,应该是EPOLLIN|EPOLLOUT|EPOLLET ev.data.fd = client; if (epoll_ctl(epfd, EPOLL_CTL_ADD, client, &ev) < 0) { //设置好event之后,将这个新的event通过epoll_ctl加入到epoll的监听队列里面, //这里用EPOLL_CTL_ADD来加一个新的epoll事件,通过EPOLL_CTL_DEL来减少一个 //epoll事件,通过EPOLL_CTL_MOD来改变一个事件的监听方式。 fprintf(stderr, "epollsetinsertionerror:fd=%d", client); return -1; } } else if(event[n].events & EPOLLIN) { //如果是已经连接的用户,并且收到数据, //那么进行读入 int sockfd_r; if ((sockfd_r = event[n].data.fd) < 0) continue; read(sockfd_r, buffer, MAXSIZE); //修改sockfd_r上要处理的事件为EPOLLOUT ev.data.fd = sockfd_r; ev.events = EPOLLOUT | EPOLLET; epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_r, &ev) } else if(event[n].events & EPOLLOUT) { //如果有数据发送 int sockfd_w = events[n].data.fd; write(sockfd_w, buffer, sizeof(buffer)); //修改sockfd_w上要处理的事件为EPOLLIN ev.data.fd = sockfd_w; ev.events = EPOLLIN | EPOLLET; epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd_w, &ev) } do_use_fd(events[n].data.fd); } } 复制代码
简单说下流程:
epoll在Java中怎么去调用的?
基础知识:
文件描述符:
Java NIO的世界中, Selector是中央控制器 , Buffer是承载数据的容器 ,而 Channel 可以说是最基础的门面,它是 本地 I/O 设备、 网络I/O 的通信桥梁 。
①先从最简单的ServerSocketChannel看起
ServerSocketChannel与ServerSocket一样是socket监听器,其主要区别前者可以运行在非阻塞模式下运行;
// 创建一个ServerSocketChannel,将会关联一个未绑定的ServerSocket public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); } 复制代码
ServerSocketChannel的创建也是依赖底层操作系统实现,其实现类主要是**ServerSocketChannelImpl****,**我们来看看其构造方法
ServerSocketChannelImpl(SelectorProvider var1) throws IOException { super(var1); // 创建一个文件操作符 this.fd = Net.serverSocket(true); // 得到文件操作符是索引 this.fdVal = IOUtil.fdVal(this.fd); this.state = 0; } 复制代码
新建一个ServerSocketChannelImpl其本质是在底层操作系统创建了一个fd(即文件描述符),相当于建立了一个用于网络通信的通道,调用socket的bind()方法绑定,通过accept()调用操作系统获取TCP连接
public SocketChannel accept() throws IOException { // 忽略一些校验及无关代码 .... SocketChannelImpl var2 = null; // var3的作用主要是说明当前的IO状态,主要有 /** * EOF = -1; * UNAVAILABLE = -2; * INTERRUPTED = -3; * UNSUPPORTED = -4; * THROWN = -5; * UNSUPPORTED_CASE = -6; */ int var3 = 0; // 这里本质也是用fd来获取连接 FileDescriptor var4 = new FileDescriptor(); // 用来存储TCP连接的地址信息 InetSocketAddress[] var5 = new InetSocketAddress[1]; try { // 这里设置了一个中断器,中断时会将连接关闭 this.begin(); // 这里当IO被中断时,会重新获取连接 do { var3 = this.accept(this.fd, var4, var5); } while(var3 == -3 && this.isOpen()); }finally { // 当连接被关闭且accept失败时或抛出AsynchronousCloseException this.end(var3 > 0); // 验证连接是可用的 assert IOStatus.check(var3); } if (var3 < 1) { return null; } { // 默认连接是阻塞的 IOUtil.configureBlocking(var4, true); // 创建一个SocketChannel的引用 var2 = new SocketChannelImpl(this.provider(), var4, var5[0]); // 下面是是否连接成功校验,这里忽略... return var2; } } // 依赖底层操作系统实现的accept0方法 private int accept(FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException { return this.accept0(var1, var2, var3); } 复制代码
②SocketChannel
用于读写TCP通信的数据,相当于客户端
public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel(); } public SocketChannel openSocketChannel() throws IOException { return new SocketChannelImpl(this); } // State, increases monotonically private static final int ST_UNINITIALIZED = -1; private static final int ST_UNCONNECTED = 0; private static final int ST_PENDING = 1; private static final int ST_CONNECTED = 2; private static final int ST_KILLPENDING = 3; private static final int ST_KILLED = 4; private int state = ST_UNINITIALIZED; SocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); // 创建一个scoket通道,即fd(fd的作用可参考上面的描述) this.fd = Net.socket(true); // 得到该fd的索引 this.fdVal = IOUtil.fdVal(fd); // 设置为未连接 this.state = ST_UNCONNECTED; } 复制代码
// 代码均来自JDK1.8 部分代码 public boolean connect(SocketAddress var1) throws IOException { boolean var2 = false; // 读写都锁住 synchronized(this.readLock) { synchronized(this.writeLock) { /****状态检查,channel和address****/ // 判断channel是否open this.ensureOpenAndUnconnected(); InetSocketAddress var5 = Net.checkAddress(var1); SecurityManager var6 = System.getSecurityManager(); if (var6 != null) { var6.checkConnect(var5.getAddress().getHostAddress(), var5.getPort()); } boolean var10000; /****连接建立****/ // 阻塞状态变更的锁也锁住 synchronized(this.blockingLock()) { int var8 = 0; try { try { this.begin(); // 如果当前socket未绑定本地端口,则尝试着判断和服务端是否能建立连接 synchronized(this.stateLock) { if (!this.isOpen()) { boolean var10 = false; return var10; } if (this.localAddress == null) { // 和远程建立连接后关闭连接 NetHooks.beforeTcpConnect(this.fd, var5.getAddress(), var5.getPort()); } this.readerThread = NativeThread.current(); } do { InetAddress var9 = var5.getAddress(); if (var9.isAnyLocalAddress()) { var9 = InetAddress.getLocalHost(); } // 建立连接 var8 = Net.connect(this.fd, var9, var5.getPort()); } while(var8 == -3 && this.isOpen()); synchronized(this.stateLock) { this.remoteAddress = var5; if (var8 <= 0) { if (!this.isBlocking()) { this.state = 1; } else { assert false; } } else { this.state = 2;// 连接成功 if (this.isOpen()) { this.localAddress = Net.localAddress(this.fd); } var10000 = true; return var10000; } } } var10000 = false; return var10000; } } } 复制代码
在建立在绑定地址之前,我们需要调用 NetHooks.beforeTcpBind ,这个方法是将fd转换为SDP(Sockets Direct Protocol,Java套接字直接协议) socket。SDP需要网卡支持InfiniBand高速网络通信技术,windows不支持该协议。
我们来看看在openjdk: src/solaris/classes/sun/net下的NetHooks.java
private static final Provider provider = new sun.net.sdp.SdpProvider(); public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException { provider.implBeforeTcpBind(fdObj, address, port); } public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException { provider.implBeforeTcpConnect(fdObj, address, port); } 复制代码
可以看到实际是调用的SdpProvider里的implBeforeTcpBind
@Override public void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException { if (enabled) convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port); } // converts unbound TCP socket to a SDP socket if it matches the rules private void convertTcpToSdpIfMatch(FileDescriptor fdObj, Action action, InetAddress address, int port) throws IOException { boolean matched = false; // 主要是先通过规则校验器判断入参是否符合,一般有PortRangeRule校验器 // 然后再执行将fd转换为socket for (Rule rule: rules) { if (rule.match(action, address, port)) { SdpSupport.convertSocket(fdObj); matched = true; break; } } } public static void convertSocket(FileDescriptor fd) throws IOException { ... //获取fd索引 int fdVal = fdAccess.get(fd); convert0(fdVal); } // convert0 JNIEXPORT void JNICALL Java_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, int fd) { // create方法实际是通过socket(AF_INET_SDP, SOCK_STREAM, 0);方法得到一个socket int s = create(env); if (s >= 0) { socklen_t len; int arg, res; struct linger linger; /* copy socket options that are relevant to SDP */ len = sizeof(arg); // 重用TIME_WAIT的端口 if (getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0) setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, len); len = sizeof(arg); // 紧急数据放入普通数据流 if (getsockopt(fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0) setsockopt(s, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, len); len = sizeof(linger); // 延迟关闭连接 if (getsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0) setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len); // 将fd也引用到s所持有的通道 RESTARTABLE(dup2(s, fd), res); if (res < 0) JNU_ThrowIOExceptionWithLastError(env, "dup2"); // 执行close方法,关闭s这个引用 RESTARTABLE(close(s), res); } } 复制代码
public int read(ByteBuffer var1) throws IOException { // 省略一些判断 synchronized(this.readLock) { this.begin(); synchronized(this.stateLock) { do { // 通过IOUtil的读取fd的数据至buf // 这里的nd是SocketDispatcher,用于调用底层的read和write操作 var3 = IOUtil.read(this.fd, var1, -1L, nd); } while(var3 == -3 && this.isOpen()); // 这个方法主要是将UNAVAILABLE(原为-2)这个状态返回0,否则返回n var4 = IOStatus.normalize(var3); var20 = false; break label367; } this.readerCleanup(); assert IOStatus.check(var3); } } } } static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException { if (var1.isReadOnly()) { throw new IllegalArgumentException("Read-only buffer"); } else if (var1 instanceof DirectBuffer) { return readIntoNativeBuffer(var0, var1, var2, var4); } else { // 临时缓冲区,大小为buf的remain(limit - position),堆外内存,使用ByteBuffer.allocateDirect(size)分配 // Notes:这里分配后后面有个try-finally块会释放该部分内存 ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining()); int var7; try { // 将网络中的buf读进direct buffer int var6 = readIntoNativeBuffer(var0, var5, var2, var4); var5.flip();// 待读取 if (var6 > 0) { var1.put(var5);// 成功时写入 } var7 = var6; } finally { Util.offerFirstTemporaryDirectBuffer(var5); } return var7; } } private static int readIntoNativeBuffer(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException { // 忽略变量init if (var2 != -1L) { // pread方法只有在同步状态下才能使用 var9 = var4.pread(var0, ((DirectBuffer)var1).address() + (long)var5, var7, var2); } else { // 其调用SocketDispatcher.read方法 -> FileDispatcherImpl.read0方法 var9 = var4.read(var0, ((DirectBuffer)var1).address() + (long)var5, var7); } if (var9 > 0) { var1.position(var5 + var9); } return var9; } } // 同样找到openjdk:src/solaris/native/sun/nio/ch //FileDispatcherImpl.c JNIEXPORT jint JNICALL Java_sun_nio_ch_FileDispatcherImpl_read0(JNIEnv *env, jclass clazz, jobject fdo, jlong address, jint len) { jint fd = fdval(env, fdo);// 获取fd索引 void *buf = (void *)jlong_to_ptr(address); // 调用底层read方法 return convertReturnVal(env, read(fd, buf, len), JNI_TRUE); } 复制代码
总结一下读取的过程
看完了前面的read,write整个执行流程基本一样,具体的细节参考如下
public int write(ByteBuffer var1) throws IOException { if (var1 == null) { throw new NullPointerException(); } else { synchronized(this.writeLock) { this.ensureWriteOpen(); this.begin(); synchronized(this.stateLock) { if (!this.isOpen()) { var5 = 0; var20 = false; break label310; } this.writerThread = NativeThread.current(); } do { // 通过IOUtil的读取fd的数据至buf // 这里的nd是SocketDispatcher,用于调用底层的read和write操作 var3 = IOUtil.write(this.fd, var1, -1L, nd); } while(var3 == -3 && this.isOpen()); var4 = IOStatus.normalize(var3); var20 = false; this.writerCleanup(); assert IOStatus.check(var3); return var4; } } } } static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException { if (var1 instanceof DirectBuffer) { return writeFromNativeBuffer(var0, var1, var2, var4); } else { ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7); int var10; try { // 这里的pos为buf初始的position,意思是将buf重置为最初的状态;因为目前还没有真实的写入到channel中 var8.put(var1); var8.flip(); var1.position(var5); // 调用 int var9 = writeFromNativeBuffer(var0, var8, var2, var4); if (var9 > 0) { var1.position(var5 + var9); } var10 = var9; } finally { Util.offerFirstTemporaryDirectBuffer(var8); } return var10; } } IOUtil.writeFromNativeBuffer(fd , buf , position , nd) { // ... 忽略一些获取buf变量的代码 int written = 0; if (position != -1) { // pread方法只有在同步状态下才能使用 written = nd.pwrite(fd ,((DirectBuffer)bb).address() + pos,rem, position); } else { // 其调用SocketDispatcher.write方法 -> FileDispatcherImpl.write0方法 written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem); } //.... } FileDispatcherImpl.write0 { // 调用底层的write方法写入 return convertReturnVal(env, write(fd, buf, len), JNI_FALSE); } } 复制代码
总结一下write的过程:
理解了前面的一些基础知识,接下来的部分就会涉及到Java是怎么样来使用epoll的。
Selector的作用是Java NIO中管理一组多路复用的 SelectableChannel 对象,并能够 识别 通道是否为诸如 读写事件做好准备 的组件 --Java doc
Selector的创建过程如下:
// 1.创建Selector Selector selector = Selector.open(); // 2.将Channel注册到选择器中 // ....... new channel的过程 .... //Notes:channel要注册到Selector上就必须是非阻塞的,所以FileChannel是不可以 //使用Selector的,因为FileChannel是阻塞的 channel.configureBlocking(false); // 第二个参数指定了我们对 Channel 的什么类型的事件感兴趣 SelectionKey key = channel.register(selector , SelectionKey.OP_READ); // 也可以使用或运算|来组合多个事件,例如 SelectionKey key = channel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE); // 不过值得注意的是,一个 Channel 仅仅可以被注册到一个 Selector 一次, // 如果将 Channel 注册到 Selector 多次, 那么其实就是相当于更新 SelectionKey //的 interest set. 复制代码
①一个Channel在Selector注册其代表的是一个SelectionKey事件,SelectionKey的类型包括:
②一个Selector内部维护了三组keys:
③Selector类中总共包含以下10个方法:
谈到Selector就不得不提SelectionKey,两者是紧密关联,配合使用的;如上文所示,往Channel注册Selector会返回一个SelectionKey对象, 这个对象包含了如下内容:
①****interest set可以通过SelectionKey类中的方法来获取和设置interes set
// 返回当前感兴趣的事件列表 int interestSet = key.interestOps(); // 也可通过interestSet判断其中包含的事件 boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT; boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT; boolean isInterestedInRead = interestSet & SelectionKey.OP_READ; boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE; // 可以通过interestOps(int ops)方法修改事件列表 key.interestOps(interestSet | SelectionKey.OP_WRITE); 复制代码
②ready set 当前Channel就绪的事件列表
int readySet = key.readyOps(); // 也可通过四个方法来分别判断不同事件是否就绪 key.isReadable(); //读事件是否就绪 key.isWritable(); //写事件是否就绪 key.isConnectable(); //客户端连接事件是否就绪 key.isAcceptable(); //服务端连接事件是否就绪 复制代码
③channel和selector 我们可以通过SelectionKey来获取当前的channel和selector
// 返回当前事件关联的通道,可转换的选项包括:`ServerSocketChannel`和`SocketChannel` Channel channel = key.channel(); //返回当前事件所关联的Selector对象 Selector selector = key.selector(); 复制代码
attached object 我们可以在selectionKey中附加一个对象,或者在注册时直接附加:
key.attach(theObject); Object attachedObj = key.attachment(); // 在注册时直接附加 SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject); 复制代码
万丈高楼平地起,基础知识差不多了,了解了这些,可以找一些nio demo或者netty demo练练手。接下来讲解本节比较重要的~epoll
前面多次提到了openjdk,seletor的具体实现肯定是跟操作系统有关的,我们一起来看看。
可以看到Selector的实现是SelectorImpl, 然后SelectorImpl又将职责委托给了具体的平台,比如图中的linux2.6 EpollSelectorImpl ,windows是 WindowsSelectorImpl ,MacOSX是 KQueueSelectorImpl
根据前面我们知道,Selector.open()可以得到一个Selector实例,怎么实现的呢?
// Selector.java public static Selector open() throws IOException { // 首先找到provider,然后再打开Selector return SelectorProvider.provider().openSelector(); } // java.nio.channels.spi.SelectorProvider public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; // 这里就是打开Selector的真正方法 provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } } 复制代码
在openjdk中,每个操作系统都有一个sun.nio.ch.DefaultSelectorProvider实现,以src solaris /classes/sun/nio/ch下的DefaultSelectorProvider为例:
/** * Returns the default SelectorProvider. */ public static SelectorProvider create() { // 获取OS名称 String osname = AccessController .doPrivileged(new GetPropertyAction("os.name")); // 根据名称来创建不同的Selctor if (osname.equals("SunOS")) return createProvider("sun.nio.ch.DevPollSelectorProvider"); if (osname.equals("Linux")) return createProvider("sun.nio.ch.EPollSelectorProvider"); return new sun.nio.ch.PollSelectorProvider(); } 复制代码
打开src solaris /classes/sun/nio/ch下的EPollSelectorProvider.java
public class EPollSelectorProvider extends SelectorProviderImpl { public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this); } public Channel inheritedChannel() throws IOException { return InheritedChannel.getChannel(); } } 复制代码
Linux平台就得到了最终的Selector实现:src solaris /classes/sun/nio/ch下的EPollSelectorImpl.java
来看看它实现的构造器:
EPollSelectorImpl(SelectorProvider sp) throws IOException { super(sp); // makePipe返回管道的2个文件描述符,编码在一个long类型的变量中 // 高32位代表读 低32位代表写 // 使用pipe为了实现Selector的wakeup逻辑 long pipeFds = IOUtil.makePipe(false); fd0 = (int) (pipeFds >>> 32); fd1 = (int) pipeFds; // 新建一个EPollArrayWrapper pollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); fdToKey = new HashMap<>(); } 复制代码
/src/solaris/native/sun/nio/ch下的EPollArrayWrapper.c
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this) { /* * epoll_create expects a size as a hint to the kernel about how to * dimension internal structures. We can't predict the size in advance. */ int epfd = epoll_create(256); if (epfd < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed"); } return epfd; } 复制代码
①epoll_create在前面已经讲过了,这里就不再赘述了。
调用Selector.select(返回键的数量,可能是零)最后会委托给各个实现的doSelect方法,限于篇幅不贴出太详细的,这里看下EpollSelectorImpl的 doSelect 方法
protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); //EPollArrayWrapper pollWrapper pollWrapper.poll(timeout);//重点在这里 } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys();// 后面会讲到 if (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; } int poll(long timeout) throws IOException { updateRegistrations();// 这个代码在下面讲,涉及到epoo_ctl // 这个epollWait是不是有点熟悉呢? updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); for (int i=0; i<updated; i++) { if (getDescriptor(i) == incomingInterruptFD) { interruptedIndex = i; interrupted = true; break; } } return updated; 复制代码
看下EPollArrayWrapper.c
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) { struct epoll_event *events = jlong_to_ptr(address); int res; if (timeout <= 0) { /* Indefinite or no wait */ //系统调用等待内核事件 RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res); } else { /* Bounded wait; bounded restarts */ res = iepoll(epfd, events, numfds, timeout); } if (res < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed"); } return res; } 复制代码
可以看到在linux中Selector.select()其实是调用了epoll_wait
JDK中对于注册到Selector上的IO事件关系是使用 SelectionKey 来表示,代表了Channel感兴趣的事件,如 Read,Write,Connect,Accept .
调用**Selector.register()**时均会将事件存储到EpollArrayWrapper.java的成员变量eventsLow和eventsHigh中
// events for file descriptors with registration changes pending, indexed // by file descriptor and stored as bytes for efficiency reasons. For // file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at // least) then the update is stored in a map. // 使用数组保存事件变更, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024 private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE]; // 超过数组长度的事件会缓存到这个map中,等待下次处理 private Map<Integer,Byte> eventsHigh; /** * Sets the pending update events for the given file descriptor. This * method has no effect if the update events is already set to KILLED, * unless {@code force} is {@code true}. */ private void setUpdateEvents(int fd, byte events, boolean force) { // 判断fd和数组长度 if (fd < MAX_UPDATE_ARRAY_SIZE) { if ((eventsLow[fd] != KILLED) || force) { eventsLow[fd] = events; } } else { Integer key = Integer.valueOf(fd); if (!isEventsHighKilled(key) || force) { eventsHigh.put(key, Byte.valueOf(events)); } } } /** * Returns the pending update events for the given file descriptor. */ private byte getUpdateEvents(int fd) { if (fd < MAX_UPDATE_ARRAY_SIZE) { return eventsLow[fd]; } else { Byte result = eventsHigh.get(Integer.valueOf(fd)); // result should never be null return result.byteValue(); } 复制代码
在上面poll代码中涉及到
int poll(long timeout) throws IOException { updateRegistrations();/ /** * Update the pending registrations. */ private void updateRegistrations() { synchronized (updateLock) { int j = 0; while (j < updateCount) { int fd = updateDescriptors[j]; // 从保存的eventsLow和eventsHigh里取出事件 short events = getUpdateEvents(fd); boolean isRegistered = registered.get(fd); int opcode = 0; if (events != KILLED) { if (isRegistered) { // 判断操作类型以传给epoll_ctl // 没有指定EPOLLET事件类型 opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; } else { opcode = (events != 0) ? EPOLL_CTL_ADD : 0; } if (opcode != 0) { // 熟悉的epoll_ctl epollCtl(epfd, opcode, fd, events); if (opcode == EPOLL_CTL_ADD) { registered.set(fd); } else if (opcode == EPOLL_CTL_DEL) { registered.clear(fd); } } } j++; } updateCount = 0; } private native void epollCtl(int epfd, int opcode, int fd, int events); 复制代码
可以看到epollCtl调用的native方法,我们进入EpollArrayWrapper.c
JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) { struct epoll_event event; int res; event.events = events; event.data.fd = fd; // epoll_ctl这里就不用多说了吧 RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res); /* * A channel may be registered with several Selectors. When each Selector * is polled a EPOLL_CTL_DEL op will be inserted into its pending update * list to remove the file descriptor from epoll. The "last" Selector will * close the file descriptor which automatically unregisters it from each * epoll descriptor. To avoid costly synchronization between Selectors we * allow pending updates to be processed, ignoring errors. The errors are * harmless as the last update for the file descriptor is guaranteed to * be EPOLL_CTL_DEL. */ if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) { JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed"); } } 复制代码
在doSelect方法poll执行后,会更新EpollSelectorImpl.java里的 updateSelectedKeys,就是Selector里的三个set集合,具体可看前面。
/** *更新已被epoll选择fd的键。 *将就绪兴趣集添加到就绪队列。 */ private int updateSelectedKeys() { int entries = pollWrapper.updated; int numKeysUpdated = 0; for (int i=0; i<entries; i++) { int nextFD = pollWrapper.getDescriptor(i); SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); // ski is null in the case of an interrupt if (ski != null) { int rOps = pollWrapper.getEventOps(i); if (selectedKeys.contains(ski)) { if (ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; } } else { ski.channel.translateAndSetReadyOps(rOps, ski); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { selectedKeys.add(ski); numKeysUpdated++; } } } } return numKeysUpdated; } 复制代码
总结
通过本文,你应该知道Channel、Selector基本原理和在Java中怎么使用Epoll的。 (包括更细节的fd与channel和socket之间的转换关系)掌握这些基础知识,再去看NIO、netty网络框架的源码可能就没有那么吃力了。在接下来的文章里我会跟进关于Netty的文章,毕竟这已成为分布式网络通信框架的主流了!
感谢
zh.wikipedia.org/wiki/Epoll 维基百科
baike.baidu.com/item/epoll/…
juejin.im/entry/5b515…
www.jianshu.com/p/f26f1eaa7…
来自:微信公众号(作者:汀雨笔记: mp.weixin.qq.com/s/G6TfGbc4U… ,著作权属于:本文和汀雨