先说下写这个的目的,其实是好奇,dubbo是怎么实现同步转异步的,然后了解到,其依赖了请求中携带的请求id来完成这个连接复用;然后我又发现,redisson这个redis客户端,底层也是用的netty,那就比较好奇了:netty是异步的,上层是同步的,要拿结果的,同时呢,redis协议也不可能按照redisson的要求,在请求和响应里携带请求id,那,它是怎么实现同步转异步的呢,异步结果回来后,又是怎么把结果对应上的呢?
对redisson debug调试了long long time之后(你们知道的,多线程不好调试),大概理清了思路,基本就是:连接池 的思路。比如,我要访问redis:
其实问题的关键是,第二步的promise传递,要设置为channel的一个attribute,不然的话,响应回来后,也不知道把响应给谁。
理清了redisson的基本思路后,我想到了很早之前,面试oppo,二面的面试官就问了我一个问题:写过类似代理的中间件没有?(因为当时面试的是中间件部门)
然后我说没有,然后基本就凉了。
其实,中间件最主要的要求,尤其是代理这种,一方面接收请求,一方面还得作为客户端去发起请求,发起请求这一步,很容易变成性能瓶颈,不少实现里,这一步都是直接使用http client这类同步请求的工具(也是支持异步的,只是同步更常见),所以我也一直想写一个netty这种异步的客户端,同时还能同步转异步的,不能同步转异步,应用场景就比较受限了。
源码给懒得看文字的同学:
https://gitee.com/ckl111/pooled-netty-http-client.git
扯了这么多,我说下我这个http client的思路,和上面那个redisson的差不多,我这边的场景也是作为一个中间件,要访问的后端服务就几个,比如要访问http://192.168.19.102:8080下的若干服务,我这边是启动时候,就会去建一个连接池(直接配置commons pool2的池化参数,我这里配置的是,2个连接),连接池好了后,netty 的channel已经是ok的了,如下所示:
这每一个长连接,是包在我们的一个核心的数据结构里的,叫NettyClient。
核心的属性,其实主要下面两个:
//要连接的host和端口 private HostAndPortConfig config; /** * 当前使用的channel */ Channel channel;
构造函数如下:
public NettyClient(HostAndPortConfig config) { this.config = config; } @Data @AllArgsConstructor @NoArgsConstructor public class HostAndPortConfig { private String host; private Integer port; }
够简单吧,先不考虑连接池,最开始测试的时候,我就是这样,直接new对象的。
public static void main(String[] args) { HostAndPortConfig config = new HostAndPortConfig("192.168.19.102", 8080); NettyClient client = new NettyClient(config); client.initConnection(); NettyHttpResponse response = client.doPost("http://192.168.19.102:8080/BOL_WebService/xxxxx.do", JSONObject.toJSONString(new Object())); if (response == null) { return; } System.out.println(response.getBody()); }
上面的测试代码,new完对象后,开始初始化连接。
public void initConnection() { log.info("initConnection starts..."); Bootstrap bootstrap; //1.创建netty所需的bootstrap配置 bootstrap = createBootstrap(config); //2.发起连接 ChannelFuture future = bootstrap.connect(config.getHost(), config.getPort()); log.info("current thread:{}", Thread.currentThread().getName()); //3.等待连接成功 boolean ret = future.awaitUninterruptibly(2000, MILLISECONDS); boolean bIsSuccess = ret && future.isSuccess(); if (!bIsSuccess) { //4.不成功抛异常 bIsConnectionOk = false; log.error("host config:{}",config); throw new RuntimeException("连接失败"); } //5.走到这里,说明成功了,新的channle赋值给field cleanOldChannelAndCancelReconnect(future, channel); bIsConnectionOk = true; }
这里初始化连接是直接同步等待的,如果失败,直接抛异常。第5步里,主要是把新的channel赋值给当前对象的一个field,同时,关闭旧的channle之类的。
private void cleanOldChannelAndCancelReconnect(ChannelFuture future, Channel oldChannel) { /** * 连接成功,关闭旧的channel,再用新的channel赋值给field */ try { if (oldChannel != null) { try { log.info("Close old netty channel " + oldChannel); oldChannel.close(); } catch (Exception e) { log.error("e:{}", e); } } } finally { /** * 新channel覆盖field */ NettyClient.this.channel = future.channel(); NettyClient.this.bIsConnectionOk = true; log.info("connection is ok,new channel:{}", NettyClient.this.channel); if (NettyClient.this.scheduledFuture != null) { log.info("cancel scheduledFuture"); NettyClient.this.scheduledFuture.cancel(true); } } }
这里说下前面的bootstrap的构造,如下:
private Bootstrap createBootstrap(HostAndPortConfig config) { Bootstrap bootstrap = new Bootstrap() .channel(NioSocketChannel.class) .group(NIO_EVENT_LOOP_GROUP); bootstrap.handler(new CustomChannelInitializer(bootstrap, config, this)); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); return bootstrap; }
handler 链,主要在CustomChannelInitializer类中。
protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // http客户端编解码器,包括了客户端http请求编码,http响应的解码 pipeline.addLast(new HttpClientCodec()); // 把多个HTTP请求中的数据组装成一个 pipeline.addLast(new HttpObjectAggregator(65536)); // 用于处理大数据流 pipeline.addLast(new ChunkedWriteHandler()); /** * 重连handler */ pipeline.addLast(new ReconnectHandler(nettyClient)); /** * 发送业务数据前,进行json编码 */ pipeline.addLast(new HttpJsonRequestEncoder()); pipeline.addLast(new HttpResponseHandler()); }
其中,出站时(即客户端向外部write时),涉及的handler如下:
简单说下HttpJsonRequestEncoder,这个是我自定义的:
/** * http请求发送前,使用该编码器进行编码 * * 本来是打算在这里编码body为json,感觉没必要,直接上移到工具类了 */ public class HttpJsonRequestEncoder extends MessageToMessageEncoder<NettyHttpRequest> { final static String CHARSET_NAME = "UTF-8"; final static Charset UTF_8 = Charset.forName(CHARSET_NAME); @Override protected void encode(ChannelHandlerContext ctx, NettyHttpRequest nettyHttpRequest, List<Object> out) { // 1. 这个就是要最终传递出去的对象 FullHttpRequest request = null; if (nettyHttpRequest.getHttpMethod() == HttpMethod.POST) { ByteBuf encodeBuf = Unpooled.copiedBuffer((CharSequence) nettyHttpRequest.getBody(), UTF_8); request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, nettyHttpRequest.getUri(), encodeBuf); HttpUtil.setContentLength(request, encodeBuf.readableBytes()); } else if (nettyHttpRequest.getHttpMethod() == HttpMethod.GET) { request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, nettyHttpRequest.getUri()); } else { throw new RuntimeException(); } //2. 填充header populateHeaders(ctx, request); out.add(request); } private void populateHeaders(ChannelHandlerContext ctx, FullHttpRequest request) { /** * headers 设置 */ HttpHeaders headers = request.headers(); headers.set(HttpHeaderNames.HOST, ctx.channel().remoteAddress().toString().substring(1)); headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); headers.set(HttpHeaderNames.CONTENT_TYPE, "application/json"); /** * 设置我方可以接收的 */ headers.set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP.toString() + ',' + HttpHeaderValues.DEFLATE.toString()); headers.set(HttpHeaderNames.ACCEPT_CHARSET, "utf-8,ISO-8859-1;q=0.7,*;q=0.7"); headers.set(HttpHeaderNames.ACCEPT_LANGUAGE, "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"); headers.set(HttpHeaderNames.ACCEPT, "*/*"); /** * 设置agent */ headers.set(HttpHeaderNames.USER_AGENT, "Netty xml Http Client side"); } }
/** * http请求响应的处理器 */ @Slf4j public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception { String s = fullHttpResponse.content().toString(CharsetUtil.UTF_8); NettyHttpResponse nettyHttpResponse = NettyHttpResponse.successResponse(s); // 1. NettyHttpRequestContext nettyHttpRequestContext = (NettyHttpRequestContext) ctx.channel().attr(NettyClient.CURRENT_REQ_BOUND_WITH_THE_CHANNEL).get(); log.info("req url:{},params:{},resp:{}", nettyHttpRequestContext.getNettyHttpRequest().getFullUrl(), nettyHttpRequestContext.getNettyHttpRequest().getBody(), nettyHttpResponse); // 2. Promise<NettyHttpResponse> promise = nettyHttpRequestContext.getDefaultPromise(); promise.setSuccess(nettyHttpResponse); } }
说完了netty client,我们再说说调用的过程:
public NettyHttpResponse doPost(String url, Object body) { NettyHttpRequest request = new NettyHttpRequest(url, body); return doHttpRequest(request); } private static final DefaultEventLoop NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP = new DefaultEventLoop(null, new NamedThreadFactory("NettyResponsePromiseNotify")); private NettyHttpResponse doHttpRequest(NettyHttpRequest request) { // 1 Promise<NettyHttpResponse> defaultPromise = NETTY_RESPONSE_PROMISE_NOTIFY_EVENT_LOOP.newPromise(); // 2 NettyHttpRequestContext context = new NettyHttpRequestContext(request, defaultPromise); channel.attr(CURRENT_REQ_BOUND_WITH_THE_CHANNEL).set(context); // 3 ChannelFuture channelFuture = channel.writeAndFlush(request); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { System.out.println(Thread.currentThread().getName() + " 请求发送完成"); } }); // 4 return get(defaultPromise); }
上面我已经标注了几个数字,分别讲一下:
第四步等待的get方法如下:
public <V> V get(Promise<V> future) { // 1. if (!future.isDone()) { CountDownLatch l = new CountDownLatch(1); future.addListener(new GenericFutureListener<Future<? super V>>() { @Override public void operationComplete(Future<? super V> future) throws Exception { log.info("received response,listener is invoked"); if (future.isDone()) { // 2 // promise的线程池,会回调该listener l.countDown(); } } }); boolean interrupted = false; if (!future.isDone()) { try { // 3 l.await(4, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("e:{}", e); interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } //4 if (future.isSuccess()) { return future.getNow(); } log.error("wait result time out "); return null; }
前面我们提到了,在response的handler中:
/** * http请求响应的处理器 */ @Slf4j public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> { @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception { String s = fullHttpResponse.content().toString(CharsetUtil.UTF_8); NettyHttpResponse nettyHttpResponse = NettyHttpResponse.successResponse(s); // 1. NettyHttpRequestContext nettyHttpRequestContext = (NettyHttpRequestContext) ctx.channel().attr(NettyClient.CURRENT_REQ_BOUND_WITH_THE_CHANNEL).get(); log.info("req url:{},params:{},resp:{}", nettyHttpRequestContext.getNettyHttpRequest().getFullUrl(), nettyHttpRequestContext.getNettyHttpRequest().getBody(), nettyHttpResponse); // 2. Promise<NettyHttpResponse> promise = nettyHttpRequestContext.getDefaultPromise(); promise.setSuccess(nettyHttpResponse); } }
其中,2处,修改promise,此时就会回调前面说的那个listenr,打开闭锁,主线程也因此得以继续执行:
public <V> V get(Promise<V> future) { if (!future.isDone()) { CountDownLatch l = new CountDownLatch(1); future.addListener(new GenericFutureListener<Future<? super V>>() { @Override public void operationComplete(Future<? super V> future) throws Exception { log.info("received response,listener is invoked"); if (future.isDone()) { // io线程会回调该listener l.countDown(); } } }); ..... }
本篇的大致思路差不多就是这样了,主要逻辑在于同步转异步那一块。
还有些没讲到的,后面再讲,大概还有2个部分。
代码我放在:
https://gitee.com/ckl111/pooled-netty-http-client.git