这篇文章应该是Netty专栏的倒数第二篇文章了,下一篇就是对整个Netty的总结。本篇文章主要讲Netty的编码器和解码器,他们两个定义和作用根据他们的名字很快就能了解。这里我们就开始分析吧。
在Netty中对于服务端来说收到的是一个 二进制数据流 ,然后解码器的作用就是将这个数据流通过一定的 解码规则 将这个二进制流解码成一个个 ByteBuf ,然后仍给业务做处理。在本章中我们会了解到netty是怎样把这个解码过程抽象出来的,还有Netty中有哪些现成的解码器。以此那么这一章节就主要是围绕 解码器基类 和对 常见解码器分析 两个部分来分析。
Netty的底层解码器都是基于一个叫做 ByteToMessageDecoder 的类来实现的。这个类的解码过程大概可以分为两步, 首先 是累加字节流,netty会通过一个ByteBuf这么一个累加器把所有读进来的字节流累加到当前的累加器中, 然后 第二过程就是子类的decode方法进行解析, 最后 将解析到的ByteBuf向下传播。那么这里我们就一步步分析一下源码,首先是累加字节流,我们先看看源码,如下:
/**
* ByteToMessageDecoder.java 236行起
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
复制代码
至于为什么从这个函数开始,我们在Pipeline那一章节分析过,这里就不过多介绍了。这里首先是判断收到的msg是不是一个ByteBuf对象,如果不是就直接传播,如果是才会进行解码操作。首先是下面这段代码的逻辑,源码如下:
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
复制代码
首先会判断是不是第一次,如果是第一次就将接收到的msg赋值给 cumulation 对象,反之就将去调用cumulate这个方法,将读进来的数据和当前累加器里的数据进行累加。那么这里我们先来看看,这个cumulation是一个怎样的对象。我们看源码,如下:
/**
* ByteToMessageDecoder.java 133行起
*/
private Cumulator cumulator = MERGE_CUMULATOR;
/**
* ByteToMessageDecoder.java 75行起
*/
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain().
//
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
复制代码
这里可以看到这个 cumulation 是一个 MERGE_CUMULATOR 的对象,而 MERGE_CUMULATOR 又是一个 Cumulator 对象,这个对象中有一个 cumulate() 方法,这个方法就是一个累加方法。我们分析一下这个方法,首先有个条件判断,这个条件判断的是当前的ByteBuf是否能容纳传进来的Bytebuf,如果不能容纳就需要扩容也就是调用 expandCumulation 这个方法。反之就不需要扩容,将要累加的ByteBuf添加到ByteBuf中即可。接下来就是第二步也就是下面这行代码,如下:
callDecode(ctx, cumulation, out); 复制代码
这个方法的意思就是将 cumulation 累加器中的数据调用子类解码函数,解析成一个一个对象,存储在out这个类似于list的类中。然后我们跟进这个代码看看,如下:
/**
* ByteToMessageDecoder.java 390行起
*/
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
复制代码
这个方法就是一个while循环,只要累加器中有数据这个循环就会进行下去。然后 第一个if 中包裹的代码含义是,判断out中是否有对象了,如果有了就将这个对象向下传播,当然这里第一次进入是不会被调用的。然后下面我们会通过 oldInputLength 这个变量来记录可读的一个长度,然后调用 decode() 方法,这个方法是一个抽象方法让子类来实现。 然后下面有一个 outSize == out.size() 的判断,这个判断是表示通过调用 decode() 方法是否解析出了对象,然后再判断是否已经读了数据,如果读了数据但是对象没有增加,证明这个对象的数据还没传完,所以就得等到下一次解析了,最后就是一个异常的抛出。然后我们又回到先前最后一步,将解析的对象继续向下传播,具体源码如下:
/**
* ByteToMessageDecoder.java 253行起
*/
finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
复制代码
这里最重要的是最后几步,首先用size几率out中解析的对象个数,后面会调用 fireChannelRead() 将解析的对象向下传播。然后我们跟进这个方法看看,具体源码如下:
/**
* ByteToMessageDecoder.java 291行起
*/
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
复制代码
这里就是将我们解析出的对象一个一个的向下传播。
这里我们选用一个Netty中比较简单的解码器基于 固定长度 的解码器,来分析一下解码的过程。这个解码器是 FixedLengthFrameDecoder 这个类实现的,这里我们来看看其 decode() 方法,具体源码如下:
/**
* FixedLengthFrameDecoder.java 56行起
*/
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
复制代码
我们可以看到这个方法其实很简单,就是调用 decode() 方法解析出对象然后添加到out中。那么我们接下来看看这个 decode() 方法,具体源码如下:
/**
* FixedLengthFrameDecoder.java 72行起
*/
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
复制代码
这个方法其实也很简单,首先判断累加器中的可读数据长度是否小于固定长度,若是的话表示不能读取完整的对象则返回空,反之读取固定长度的数据返回。
基于分隔符的解码器在Netty中也比较常见,他是在一个叫做 DelimiterBasedFrameDecoder 的类中实现的,这里我们还是关注他的 decode() 方法,具体源码如下:
/**
* DelimiterBasedFrameDecoder.java 230行起
*/
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
if (minDelim != null) {
int minDelimLength = minDelim.capacity();
ByteBuf frame;
if (discardingTooLongFrame) {
// We've just finished discarding a very large frame.
// Go back to the initial state.
discardingTooLongFrame = false;
buffer.skipBytes(minFrameLength + minDelimLength);
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
if (minFrameLength > maxFrameLength) {
// Discard read frame.
buffer.skipBytes(minFrameLength + minDelimLength);
fail(minFrameLength);
return null;
}
if (stripDelimiter) {
frame = buffer.readRetainedSlice(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
} else {
if (!discardingTooLongFrame) {
if (buffer.readableBytes() > maxFrameLength) {
// Discard the content of the buffer until a delimiter is found.
tooLongFrameLength = buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
discardingTooLongFrame = true;
if (failFast) {
fail(tooLongFrameLength);
}
}
} else {
// Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
复制代码
他的 decode() 方法最终会调用到上面这个方法上,这里我们来分析分析。这里的一个过程就是先判断我们传进来的分隔符是不是行处理器的分隔符具体的将就是是不是 "/r/n" ,如果是的话就直接调用行解码器来处理,netty中的行解码器是在 LineBasedFrameDecoder 这个类中实现的,这里就不过多介绍了。然后我们看下面这段代码干了啥,具体源码如下:
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
复制代码
这段代码的逻辑就是,遍历里面的数据找到我们传进来的分隔符集合中距离读指针最接近的分隔符。最后剩下的代码就是一个解码的步骤了,我们先看看这段代码,如下:
if (minDelim != null) {
int minDelimLength = minDelim.capacity();
ByteBuf frame;
if (discardingTooLongFrame) {
// We've just finished discarding a very large frame.
// Go back to the initial state.
discardingTooLongFrame = false;
buffer.skipBytes(minFrameLength + minDelimLength);
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
if (minFrameLength > maxFrameLength) {
// Discard read frame.
buffer.skipBytes(minFrameLength + minDelimLength);
fail(minFrameLength);
return null;
}
if (stripDelimiter) {
frame = buffer.readRetainedSlice(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
} else {
if (!discardingTooLongFrame) {
if (buffer.readableBytes() > maxFrameLength) {
// Discard the content of the buffer until a delimiter is found.
tooLongFrameLength = buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
discardingTooLongFrame = true;
if (failFast) {
fail(tooLongFrameLength);
}
}
} else {
// Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
复制代码
这里首先是判断,是否找到分隔符。如果找到了分割符,就执行下面的代码逻辑。首先判断当前的解码器是否处于 discardingTooLongFrame 状态,具体的讲就是说上次解码的时候发现了超长帧,被抛弃过。 如果是就将这个状态修改过来,标志为不是抛弃过超长帧,这个时候将整个帧丢弃掉,然后如果不是failfast状态,抛出异常,这个怎么理解呢,可以理解为上次在读取的时候发现了超长帧,但是由于设置了不立即抛出异常,而是等读完整个帧数据才抛出异常,这个时候既然发现了分隔符,该到抛出异常的时候了,最后return null表明此次分帧是 失败状态 。如果发现此次的帧数据超过最大帧的长度,直接抛出异常最后就是如果跳过分隔符,就直接跳过,负责就和分隔符和帧的实际数据一块返回。 如果发现当前的解码器不是处于 discardingTooLongFrame 状态,当前buffer里面的可读数据又比最大帧要大,我们就将解码器标记为 discardingTooLongFrame 状态,并设置这个超长帧的大小,如果是failfast状态,就立马抛出异常,也就是说我们发现了超长帧了,所以我们立马抛出异常如果发现当前的解码器已经处于 discardingTooLongFrame 状态,我们别无他方,只能修改下 tooLongFrameLength 的长度,然后听天由命,等待下次解码操作这个时候如果一旦发现了超长帧,都return null,含义就是说此次解码是 无效 的。 在Netty中还有很多的解码器,例如上面我们谈到到行解码器,还有基于长度域解码器的编码器,这里我就不过多的介绍了。
这一节我主要分析一些Netty是怎样编码的,及将一个对象如何变成字节流最终写到socket底层的。这里其实他这里面主要有两个 Handler 处理,首先是一个 biz 的Handler里面主要是调用 writeAndFlush() 将对象写入字节流中,然后在最后有一个 encoder 的Handler对对象进行编码,那么我们下面我们根据这两个部分来进行分析。
为了分析这一小节,我添加了下面一段代码,如下:
public class BizHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//...
User user = new User(19, "zhangsan");
ctx.channel().writeAndFlush(user);
}
}
复制代码
这里我们就跟进一下 writeAndFlush() 这个方法,具体源码如下:
/**
* AbstractChannel.java 295行起
*/
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
/**
* DefaultChannelPipeline.java 1030行起
*/
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
复制代码
有上面可以知道,这个对象会通过pipeline的方法进行传递,首先会调用 tail 这个尾结点的writeAndFlush方法,那么我们具体的跟进这个方法看看,具体源码如下:
/**
* AbstractChannelHandlerContext.java 840行起
*/
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
/**
* AbstractChannelHandlerContext.java 793行起
*/
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
write(msg, true, promise);
return promise;
}
复制代码
这里我们可以看到最终会调用到 AbstractChannelHandlerContext 中的 writeAndFlush 方法,而这个方法中其实最后会调用到一个 write 方法,这里我们继续跟进这个方法,具体源码如下:
/**
* AbstractChannelHandlerContext.java 819行起
*/
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
复制代码
上面这段代码的逻辑其实先前我们是分析过的,这里就不再过多的分析了,下面我们分析一些 invokeWriteAndFlush() 这个方法,因为我们最终会调用到这个方法中,下面我们看源码,如下:
/**
* AbstractChannelHandlerContext.java 810行起
*/
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
复制代码
这里按照我们传进来的参数会调用 invokeWrite0() 和 invokeFlush0() 这两个方法。这两个方法的作用是逐个调用ChannelHandler的 write 和 flush 方法,这里我们逐个来分析,首先是 invokeWrite0() 这个方法,具体源码如下:
/**
* AbstractChannelHandlerContext.java 747行起
*/
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
复制代码
这里的代码就很清晰了,先前我们分析Pipeline的时候分析过如果Handler不覆盖write方法,那么这个对象就会一直在Pipeline中传播,这里一定会传到我们定义的一个编码器中也就是继承 MessageToByteEncoder 这个类的Handler,这个Handler继承了write方法,这部分下一小节再讲,这里我们继续回到前面 invokeFlush0() 这个方法,这里我们继续看看这个方法的源码吧,具体如下:
/**
* AbstractChannelHandlerContext.java 785行起
*/
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
复制代码
这里的代码逻辑和上面的差不多,也是只要Handler不覆盖write方法,那么这个对象就会一直在Pipeline中传播。一般的Handler都不会覆盖这个方法的,那么最终会传到 head 这个Handler中的。
为了这一小节的分析我也写了如下的一段代码,具体源码如下:
public class Encoder extends MessageToByteEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception {
byte[] bytes = user.getName().getBytes();
out.writeInt(4 + bytes.length);
out.writeInt(user.getAge());
out.writeBytes(bytes);
}
}
复制代码
其实这一小节我们最主要的就是分析 MessageToByteEncoder 中的write方法,这个方法的源码具体如下:
/**
* MessageToByteEncoder.java 98行起
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
复制代码
这个方法大体的可以分为以下步骤, 匹配对象 、 分配内存 、 编码实现 、 释放对象 、 传播数据 和 释放内存 。下面我们具体分析,首先是 匹配对象 ,这个在源码中就是if的判断语句 acceptOutboundMessage() 这个方法,具体的我们可以看看器源码,如下:
/**
* MessageToByteEncoder.java 94行起
*/
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
/**
* TypeParameterMatcher.java 170行起
*/
@Override
public boolean match(Object msg) {
return type.isInstance(msg);
}
复制代码
可以看出这里其实调用的是一个 isInstance 方法,这个方法比较的两个参数,一个是继承 MessageToByteEncoder 解码器的泛型参数,另一个就是我们先前在调用 writeAndFlush() 方法传进来的对象,如果这两个对象是同种类型就可以处理。然后下面是内存分配对应的源码就是,如下:
buf = allocateBuffer(ctx, cast, preferDirect); 复制代码
这里就是一个 内存分配 ,在上一篇文章我详细的分析了,这里我就不过多的分析了,这里注意的是默认分配的是堆内内存。然后紧接着就是编码实现了,这里其实就是 encode() 这个方法,这个方法就是一个抽象方法,等着子类去实现,也就是我们先前贴出来那段代码,如下:
protected void encode(ChannelHandlerContext ctx, User user, ByteBuf out) throws Exception {
byte[] bytes = user.getName().getBytes();
out.writeInt(4 + bytes.length);
out.writeInt(user.getAge());
out.writeBytes(bytes);
}
复制代码
然后接下来就是 释放对象 ,这里释放的对象就还是我们传进来的对象,在代码中的具体代码体现就是 ReferenceCountUtil.release(cast) 这个方法。然后紧跟着就是 传播数据 ,在方法中也就是 ctx.write(buf, promise) 这一行代码,这个会把编码好的byteBuf向前传播,最终会传到head节点中。最后就是 释放对象 ,也就是在最后的finally中如果buf不为空则调用 release() 方法。
由我们上一小节所讲到的,数据编码后最终会传到Head节点的write方法中,在这个方法中会把数据写到buffer队列中。这里他最终会调用到如下代码,如下:
/**
* DefaultChannelPipeline.java 1289行起
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
复制代码
那么我们仅需跟进这个write方法,如下:
/**
* AbstractChannel.java 781行起
*/
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
复制代码
这段代码最主要的逻辑就是 try-catch 中的代码,首先我们看看 filterOutboundMessage() 这个方法的源码如下:
/**
* AbstractNioByteChannel.java 245行起
*/
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) {
return msg;
}
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
复制代码
这段代码的逻辑,就是首先判断传进来的对象是不是 堆外内存 ,如果是直接返回回去。反之如果不是,就调用 newDirectBuffer() 这个方法将堆内内存转换成堆外内存,返回回去。下面一个重要的就是把这个数据插入写队列中,就是调用 addMessage() ,这个方法。我们具体来看看这个方法,具体源码如下:
/**
* ChannelOutboundBuffer.java 115行起
*/
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(size, false);
}
复制代码
这里的代码逻辑其实很简单,首先根据传进来的ByteBuf创建一个 Entry 对象,然后将这个对象插入链表尾部,也就是 tailEntry 指针指向他,另外的就是注意还维持了两个 flushed 指针。当然最后会调用一个 incrementPendingOutboundBytes() 方法,设置写状态,具体源码如下:
/**
* ChannelOutboundBuffer.java 172行起
*/
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
复制代码
这里的代码逻辑就是,将我们传进来的字节长度加到已经有的字节长度上,然后比较总和是否超过channel配置的最大长度,这里默认的是64个字节,如果超过了就去调用 setUnwritable() 这个方法,这个方法的作用就是通知其他Handler不能再写了。我们可以看看这个方法的源码,具体如下:
/**
* ChannelOutboundBuffer.java 562行起
*/
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
复制代码
这里采用一个自旋锁,加上一个compareAndSet操作,然后传播一个 fireChannelWritabilityChanged 事件,这个事件通过channel传播,就可以传播到其他ChannelHandler里面,这样就相当于通知他们不能写了。
紧跟着上面小节我们将字节流写入队列中后,需要刷新队列。这里他最终会调用到如下代码,如下:
/**
* DefaultChannelPipeline.java 1295行起
*/
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
复制代码
这里我们继续跟进这个flush方法,如下:
/**
* AbstractChannel.java 813行起
*/
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
复制代码
这里的代码首先是一个断言,然后拿到缓冲区。下面的是非常重要的两行代码,调用 addFlush() 这个方法的作用就是添加刷新标志并设置写状态,这里我们先分析这个方法,具体源码如下:
/**
* ChannelOutboundBuffer.java 138行起
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++;
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
复制代码
这个函数主要就是对 flushed 指针的一些操作,这里不做过多分析,然后主要的是 decrementPendingOutboundBytes() 这个方法,我们具体的看看这个方法的源码,如下:
/**
* ChannelOutboundBuffer.java 191行起
*/
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
复制代码
这个方法和我们上一小节讲的方法相反,首先减掉刷新的字节数,现在剩下的也就是缓冲区里实际的字节数,然后拿着这个数值去和32字节进行比较,如果小于那么就将通知其他ChannelHandler可以写了,这里我们就把addFlush这个方法分析了一遍,然后我们就看看 flush0() 这个方法,具体源码如下:
/**
* AbstractChannel.java 826行起
*/
@SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
outboundBuffer.failFlushed(t, true);
}
} finally {
inFlush0 = false;
}
}
复制代码
这上面的代码挺多的,但是重要的是 doWrite() 这个方法,具体的我们跟进这个方法看看,源码如下:
/**
* AbstractNioByteChannel.java 156行起
*/
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
// Break the loop and so incompleteWrite(...) is called.
break;
}
}
......
}
复制代码
这里面最重要的也是for循环里面的代码,首先调用**current()**这个这个方法就拿到缓存队列中的第一个对象 Entry 并返回他的 msg ,这我们只分析msg是ByteBuf的情况。首先判断这个ByteBuf中是否有可写的,没有可写的就退出本次循环,继续下一次。紧接着下面会调用一个 getWriteSpinCount() 拿到自旋锁的次数,这里默认是16,另外使用自旋锁的原因是在并发编程中使用可以提高内存的使用率,增加写入量。然后下面是一个关键,调用 doWriteBytes() 方法,他就是调用jdk底层进行自旋写。最后如果写完了会调用 remove() 方法,我们可以具体的看看这个方法,具体源码如下:
/**
* ChannelOutboundBuffer.java 246行起
*/
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
int size = e.pendingSize;
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
ReferenceCountUtil.safeRelease(msg);
safeSuccess(promise);
decrementPendingOutboundBytes(size, false, true);
}
// recycle the entry
e.recycle();
return true;
}
复制代码
首先他会拿到 flushedEntry ,然后把这个对象清除,另外的就是一些指针的操作这里就不过多的分析了。
本篇文章主要是分析Netty中编解码相关的代码,在Netty中其实已经封装了大多数的编解码器,除了上面文章中讲到的那些还有包括HTTP等编解码器,其实这些完全能满足我们日常的开发需求了,本篇文章我只是分析了大概的流程,下来我还会认真的分析的。另外这是这个系列的倒数第二篇文章了,最后一篇我打算对Netty做个总结。