什么是ByteBuf?
ByteBuf在Netty中充当着非常重要的角色;它是在数据传输中负责装载字节数据的一个容器;其内部结构和数组类似,初始化默认长度为256,默认最大长度为Integer.MAX_VALUE。
ByteBuf数据结构
* <pre> * +-------------------+------------------+------------------+ * | discardable bytes | readable bytes | writable bytes | * | | (CONTENT) | | * +-------------------+------------------+------------------+ * | | | | * 0 <= readerIndex <= writerIndex <= capacity * </pre>
ByteBuf字节缓冲区主要由 discardable
、 readable
、 writable
三种类型的字节组成的;
ByteBuf字节缓冲区可以操控 readerIndex
、 writerIndex
二个下标;这两个下标都是 单独维护
的
名词 | 解释 | 方法 |
---|---|---|
discardable bytes | 丢弃的字节;ByteBuf中已经读取的字节 | discardReadBytes(); |
readable bytes | 剩余的可读的字节 | |
writable bytes | 已经写入的字节 | |
readerIndex | 字节读指针(数组下标) | readerIndex() |
writerIndex | 字节写指针(数组下标) | writerIndex() |
ByteBuf中主要的类-UML图
ByteBuf是通过Unpooled来进行创建;默认长度为256,可自定义指定长度,最大长度为Integer.MAX_VALUE;
类型 | 解释 | 对应的字节缓冲区类 |
---|---|---|
Pooled | 池化; 简单的理解就是pooled拥有一个pool 池空间 (poolArea), 凡是创建过的字节缓冲区都会被缓存进去, 有新的连接需要字节缓冲区会先从缓存中 get , 取不到则在进行创建; |
1.PooledDirectByteBuf 2.PooledHeapByteBuf 3.PooledUnsafeDirectByteBuf 4.PooledUnsafeHeapByteBuf |
Unpooled | 非池化; 每次都会创建一个字节缓冲区 |
1.UnpooledDirectByteBuf 2.UnpooledHeapByteBuf 3.UnpooledUnsafeDirectByteBuf 4.UnpooledUnsafeHeapByteBuf |
类型 | 解释 | 特点 | 构造方法 |
---|---|---|---|
heapBuffer(常用) | 堆字节缓冲区; | 底层就是JVM的堆内存,只是IO读写需要从堆内存拷贝到内核中(类似之前学过的IO多路复用) | buffer(128) |
directBuffer(常用) | 直接内存字节缓冲区; | 直接存于操作系统内核空间(堆外内存) | directBuffer(256) |
每一种都有自己优势的地方,我们要根据实际的业务来灵活的运用;如果涉及到大量的文件操作建议使用directBuffer(搬来搬去确实挺耗性能);大部分业务还是推荐使用heapBuffer(heapBuffer,普通的业务搬来搬去相比在内核申请一块内存和释放内存来说要更加优)。
heapBuffer是基于堆内存来进行创建的,回收自然而然通过JVM的回收机制进行回收
可以通过DirectByteBuffer中的Cleaner来进行清除 或者依靠unsafe的释放内存(freeMemory方法)也可以进行回收
ByteBuf内存分配
ByteBuf的内存分配主要分为heap(堆内存)和direct(堆外内存);
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { // 是否支持unsafe return PlatformDependent.hasUnsafe() ? // 创建unsafe非池化堆字节缓冲区 new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : // 创建非池化堆字节缓冲区 new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity); }
// InstrumentedUnpooledUnsafeHeapByteBuf/InstrumentedUnpooledHeapByteBuf // 最终都是通过这个方法来创建分配内存;后面会讲讲unsafe和普通的非unsafe的区别 protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { // 设置最大容量 super(maxCapacity); // 检查内存分配类是否为空 checkNotNull(alloc, "alloc"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; // allocateArray初始化一个initialCapacity长度的字节数组 setArray(allocateArray(initialCapacity)); // 初始化读写索引为0 setIndex(0, 0); } // 初始化一个initialCapacity长度的字节数组 byte[] allocateArray(int initialCapacity) { return new byte[initialCapacity]; } // 初始化读写索引为0 @Override public ByteBuf setIndex(int readerIndex, int writerIndex) { if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) { throw new IndexOutOfBoundsException(String.format( "readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))", readerIndex, writerIndex, capacity())); } setIndex0(readerIndex, writerIndex); return this; } final void setIndex0(int readerIndex, int writerIndex) { this.readerIndex = readerIndex; this.writerIndex = writerIndex; }
从源码可以得知,堆内存ByteBuf通过判断系统环境是否支持unsafe来判断是创建UnsafeHeapByteBuf还是heapByteBuf; 如果支持unsafe则返回 InstrumentedUnpooledUnsafeHeapByteBuf
实例,反之则返回 InstrumentedUnpooledHeapByteBuf
实例;但它们都是分配一个byte数组来进行存储字节数据。
unsafe和非unsafe创建的heapByteBuf区别在于获取数据;非unsafe获取数据直接是通过数组索引来进行获取的;而unsafe获取数据则是通过UNSAFE操控内存来获取;我们可以通过源码来看看
heapByteBuf获取数据
@Override public byte getByte(int index) { ensureAccessible(); return _getByte(index); } @Override protected byte _getByte(int index) { return HeapByteBufUtil.getByte(array, index); } // 直接返回数组对应索引的值 static byte getByte(byte[] memory, int index) { return memory[index]; }
unsafeHeapByteBuf获取数据
@Override public byte getByte(int index) { checkIndex(index); return _getByte(index); } @Override protected byte _getByte(int index) { return UnsafeByteBufUtil.getByte(array, index); } static byte getByte(byte[] array, int index) { return PlatformDependent.getByte(array, index); } public static byte getByte(byte[] data, int index) { return PlatformDependent0.getByte(data, index); } static byte getByte(byte[] data, int index) { // 通过unsafe来获取 return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index); }
// PlatformDependent检测运行环境的变量属性,比如java环境,unsafe是否支持等 @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { final ByteBuf buf; // 支持unsafe if (PlatformDependent.hasUnsafe()) { // 运行环境是否使用不清空的direct内存 buf = PlatformDependent.useDirectBufferNoCleaner() ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) : new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { // 创建非unsafe实例ByteBuf buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
UnpooledUnsafeDirectByteBuf
实例,非unsafe返回 UnpooledDirectByteBuf
实例 // 创建unsafe direct字节缓冲区 protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { // 设置最大容量 super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; // allocateDirect创建DirectByteBuffer(java nio)分配内存 setByteBuffer(allocateDirect(initialCapacity), false); } // 非unsafe创建direct内存 protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); }
protected ByteBuffer allocateDirect(int initialCapacity) { return ByteBuffer.allocateDirect(initialCapacity); } /** * Allocates a new direct byte buffer. 分配一个新的直接内存字节缓冲区 * * <p> The new buffer's position will be zero, its limit will be its * capacity, its mark will be undefined, and each of its elements will be * initialized to zero. Whether or not it has a * {@link #hasArray backing array} is unspecified. * * @param capacity * The new buffer's capacity, in bytes * * @return The new byte buffer * * @throws IllegalArgumentException * If the <tt>capacity</tt> is a negative integer */ public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); } // allocateDirect创建DirectByteBuffer(java nio) DirectByteBuffer(int cap) { // package-private // 设置文件描述, 位置等信息 super(-1, 0, cap, cap); boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0)); Bits.reserveMemory(size, cap); long base = 0; try { // 通过unsafe类来分配内存 base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps != 0)) { // Round up to page boundary address = base + ps - (base & (ps - 1)); } else { address = base; } // 实例化cleaner,用于后续回收 cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; }
unsafe设置ByteBuffer
/** * buffer 数据字节 * tryFree 尝试释放,默认为false */ final void setByteBuffer(ByteBuffer buffer, boolean tryFree) { if (tryFree) { // 全局buffer设置成旧buffer ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { // 释放旧缓冲区的内存 freeDirect(oldBuffer); } } } // 将当前传入的buffer设置成全局buffer this.buffer = buffer; // 记录内存地址 memoryAddress = PlatformDependent.directBufferAddress(buffer); // 将临时buff设置为null tmpNioBuf = null; // 记录容量大小 capacity = buffer.remaining(); } // 获取对象在内存中的地址 static long directBufferAddress(ByteBuffer buffer) { return getLong(buffer, ADDRESS_FIELD_OFFSET); } // 通过unsafe操控系统内存,获取对象在内存中的地址 private static long getLong(Object object, long fieldOffset) { return UNSAFE.getLong(object, fieldOffset); }
非unsafe设置ByteBuffer
// 非unsafe设置属性 private void setByteBuffer(ByteBuffer buffer) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { freeDirect(oldBuffer); } } this.buffer = buffer; tmpNioBuf = null; capacity = buffer.remaining(); }
根据源码,direct通过判断运行系统环境是否使用 useDirectBufferNoCleaner
来实例不同的ByteBufferedReader(unsafe和非unsafe),但是他们最终都是通过ByteBuffer来分配内存,底层都是通过在不同的ByteBuf实例中构建一个ByteBuffer来进行存储字节数据的(具体可以看看 UnpooledDirectByteBuf
的set方法)
@Override protected byte _getByte(int index) { // UnsafeByteBufUtil unsafe工具类获取 return UnsafeByteBufUtil.getByte(addr(index)); } // addr 获取索引的内存地址 long addr(int index) { return memoryAddress + index; } static byte getByte(long address) { return PlatformDependent.getByte(address); } public static byte getByte(long address) { return PlatformDependent0.getByte(address); } // 通过UNSAFE获取内存地址的值 static byte getByte(long address) { return UNSAFE.getByte(address); }
@Override public byte getByte(int index) { // 检查授权,及ByteBuffer对象是否还有引用 ensureAccessible(); return _getByte(index); } @Override protected byte _getByte(int index) { // 通过索引获取值 return buffer.get(index); }
每次我们再往字节缓冲区中写入数据的时候都会判断当前容量是否还能写入数据,当发现容量不够时,此时ByteBuf会总动进行扩容;当然我们也可以手动更改ByteBuf的容量;详细见代码分析。
public static void main(String[] args) { // 利用非池化Unpooled类创建字节缓冲区 ByteBuf byteBuf = Unpooled.buffer(2); System.out.println("initCapacity: " + byteBuf.capacity()); byteBuf.writeByte(66); byteBuf.writeByte(67); byteBuf.readBytes(1); System.out.println("readerIndex: " + byteBuf.readerIndex()); System.out.println("writerIndex: " + byteBuf.writerIndex()); // 丢弃已经阅读的字节 byteBuf.discardReadBytes(); byteBuf.writeByte(68); byteBuf.writeByte(69); System.out.println("readerIndex: " + byteBuf.readerIndex()); System.out.println("writerIndex: " + byteBuf.writerIndex()); System.out.println("capacity: " + byteBuf.capacity()); } // 运行结果 initCapacity: 2 readerIndex: 1 writerIndex: 2 readerIndex: 0 writerIndex: 3 capacity: 64
上面代码的操作步骤:初始化ByteBuf --- 写入数据 --- 读取数据 --- 丢弃数据 --- 再写入数据;
丢弃了一个字节数的数据又写入了2个字节数的数据,初始化容量的缓冲区明显不够发生了自动扩容,扩容后的容量:64;它是怎么进行扩容的呢?什么时候扩容的呢?看下源码
public ByteBuf writeByte(int value) { // 确保可以写入(判断是否容量够不够写入) ensureWritable0(1); // 设置写索引、存值 _setByte(writerIndex++, value); return this; } // minWritableBytes默认为1 因为writeByte每次只能写入一个字节数 final void ensureWritable0(int minWritableBytes) { // 检查是否还有占有权和是否还有引用 ensureAccessible(); // writableBytes() = capacity - writerIndex 剩余可写容量 if (minWritableBytes <= writableBytes()) { return; } // ByteBuf虽然支持自动扩容但是也有上限(Integer.MAX_VALUE) if (minWritableBytes > maxCapacity - writerIndex) { throw new IndexOutOfBoundsException(String.format( "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s", writerIndex, minWritableBytes, maxCapacity, this)); } // 开始进行扩容 newCapacity = writerIndex + minWritableBytes int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity); // 将新的容量写入到ByteBuf capacity(newCapacity); } /** * 计算新的容量 * minNewCapacity 写入的最小容量 * maxCapacity 最大容量及Integer.MAX_VALUE 2147483647 */ public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { if (minNewCapacity < 0) { throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expected: 0+)"); } if (minNewCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "minNewCapacity: %d (expected: not greater than maxCapacity(%d)", minNewCapacity, maxCapacity)); } // 4兆大小 4194304 final int threshold = CALCULATE_THRESHOLD; // 4 MiB page if (minNewCapacity == threshold) { return threshold; } // 如果超过了4兆, if (minNewCapacity > threshold) { // 新的容量扩容为超过的倍数的容量 int newCapacity = minNewCapacity / threshold * threshold; // 如果超过了最大的容量则直接设置为最大容量 if (newCapacity > maxCapacity - threshold) { newCapacity = maxCapacity; } else { newCapacity += threshold; } return newCapacity; } // 默认扩容大小为64 int newCapacity = 64; while (newCapacity < minNewCapacity) { // 左移一位 newCapacity = newCapacity*2 newCapacity <<= 1; } return Math.min(newCapacity, maxCapacity); }
从上面的源码可知,自动扩容在 4兆
的范围内变化的话,每次扩容都是 64 * 2的N字方
(N >= 1); 一旦 超过了4兆
则递增倍数为 (newCapacity / 4194304) * 4194304
即表示的是基于4兆增长的倍数。
读写
参考文献:
https://www.cnblogs.com/stateis0/p/9062152.html https://www.jianshu.com/p/1585e32cf6b4 https://blog.csdn.net/ZBylant/article/details/83037421