void Reactor::handle_events(){ // 通过同步事件多路选择器提供的select()方法监听网络事件 select(handlers); // 处理网络事件 for(h in handlers){ h.handle_event(); } } // 在主程序中启动事件循环 while (true) { handle_events(); }
public class Echo { public static void main(String[] args) { // 事件处理器 EchoServerHandler serverHandler = new EchoServerHandler(); // boss线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); // worker线程组 EventLoopGroup workerGroup = new NioEventLoopGroup(1); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(serverHandler); } }); // 绑定端口号 ChannelFuture future = bootstrap.bind(9090).sync(); future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 终止worker线程组 workerGroup.shutdownGracefully(); // 终止boss线程组 bossGroup.shutdownGracefully(); } } } // Socket连接处理器 class EchoServerHandler extends ChannelInboundHandlerAdapter { // 处理读事件 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.write(msg); } // 处理读完成事件 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } // 处理异常事件 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }