本文主要研究一下netty的ResourceLeakDetector
2019-04-02 15:23:17.026 ERROR 1 --- [reactor-http-epoll-2] io.netty.util.ResourceLeakDetector : LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: #1: io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328) io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) java.base/java.lang.Thread.run(Thread.java:835) #2: io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670) io.netty.handler.codec.http.HttpObjectDecoder$HeaderParser.parse(HttpObjectDecoder.java:801) io.netty.handler.codec.http.HttpObjectDecoder.readHeaders(HttpObjectDecoder.java:601) io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:227) io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328) io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) java.base/java.lang.Thread.run(Thread.java:835) #3: io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670) io.netty.handler.codec.http.HttpObjectDecoder$HeaderParser.parse(HttpObjectDecoder.java:801) io.netty.handler.codec.http.HttpObjectDecoder.readHeaders(HttpObjectDecoder.java:581) io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:227) io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328) io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) java.base/java.lang.Thread.run(Thread.java:835) #4: io.netty.buffer.AdvancedLeakAwareByteBuf.forEachByte(AdvancedLeakAwareByteBuf.java:670) io.netty.handler.codec.http.HttpObjectDecoder$HeaderParser.parse(HttpObjectDecoder.java:801) io.netty.handler.codec.http.HttpObjectDecoder$LineParser.parse(HttpObjectDecoder.java:850) io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:208) io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328) io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) java.base/java.lang.Thread.run(Thread.java:835) #5: io.netty.buffer.AdvancedLeakAwareByteBuf.getUnsignedByte(AdvancedLeakAwareByteBuf.java:160) io.netty.handler.codec.http.HttpObjectDecoder.skipControlCharacters(HttpObjectDecoder.java:566) io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:202) io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:202) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328) io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) java.base/java.lang.Thread.run(Thread.java:835) #6: Hint: 'reactor.left.httpCodec' will handle the message from this point. io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328) io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) java.base/java.lang.Thread.run(Thread.java:835) #7: Hint: 'DefaultChannelPipeline$HeadContext#0' will handle the message from this point. io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328) io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) java.base/java.lang.Thread.run(Thread.java:835) Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:339) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:77) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:784) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328) io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905) java.base/java.lang.Thread.run(Thread.java:835) : 9 leak records were discarded because the leak record count is targeted to 4. Use system property io.netty.leakDetection.targetRecords to increase the limit.
netty-common-4.1.33.Final-sources.jar!/io/netty/util/ResourceLeakDetector.java
public class ResourceLeakDetector<T> { private static final String PROP_LEVEL_OLD = "io.netty.leakDetectionLevel"; private static final String PROP_LEVEL = "io.netty.leakDetection.level"; private static final Level DEFAULT_LEVEL = Level.SIMPLE; private static final String PROP_TARGET_RECORDS = "io.netty.leakDetection.targetRecords"; private static final int DEFAULT_TARGET_RECORDS = 4; private static final String PROP_SAMPLING_INTERVAL = "io.netty.leakDetection.samplingInterval"; // There is a minor performance benefit in TLR if this is a power of 2. private static final int DEFAULT_SAMPLING_INTERVAL = 128; private static final int TARGET_RECORDS; static final int SAMPLING_INTERVAL; /** * Represents the level of resource leak detection. */ public enum Level { /** * Disables resource leak detection. */ DISABLED, /** * Enables simplistic sampling resource leak detection which reports there is a leak or not, * at the cost of small overhead (default). */ SIMPLE, /** * Enables advanced sampling resource leak detection which reports where the leaked object was accessed * recently at the cost of high overhead. */ ADVANCED, /** * Enables paranoid resource leak detection which reports where the leaked object was accessed recently, * at the cost of the highest possible overhead (for testing purposes only). */ PARANOID; /** * Returns level based on string value. Accepts also string that represents ordinal number of enum. * * @param levelStr - level string : DISABLED, SIMPLE, ADVANCED, PARANOID. Ignores case. * @return corresponding level or SIMPLE level in case of no match. */ static Level parseLevel(String levelStr) { String trimmedLevelStr = levelStr.trim(); for (Level l : values()) { if (trimmedLevelStr.equalsIgnoreCase(l.name()) || trimmedLevelStr.equals(String.valueOf(l.ordinal()))) { return l; } } return DEFAULT_LEVEL; } } private static Level level; private static final InternalLogger logger = InternalLoggerFactory.getInstance(ResourceLeakDetector.class); static { final boolean disabled; if (SystemPropertyUtil.get("io.netty.noResourceLeakDetection") != null) { disabled = SystemPropertyUtil.getBoolean("io.netty.noResourceLeakDetection", false); logger.debug("-Dio.netty.noResourceLeakDetection: {}", disabled); logger.warn( "-Dio.netty.noResourceLeakDetection is deprecated. Use '-D{}={}' instead.", PROP_LEVEL, DEFAULT_LEVEL.name().toLowerCase()); } else { disabled = false; } Level defaultLevel = disabled? Level.DISABLED : DEFAULT_LEVEL; // First read old property name String levelStr = SystemPropertyUtil.get(PROP_LEVEL_OLD, defaultLevel.name()); // If new property name is present, use it levelStr = SystemPropertyUtil.get(PROP_LEVEL, levelStr); Level level = Level.parseLevel(levelStr); TARGET_RECORDS = SystemPropertyUtil.getInt(PROP_TARGET_RECORDS, DEFAULT_TARGET_RECORDS); SAMPLING_INTERVAL = SystemPropertyUtil.getInt(PROP_SAMPLING_INTERVAL, DEFAULT_SAMPLING_INTERVAL); ResourceLeakDetector.level = level; if (logger.isDebugEnabled()) { logger.debug("-D{}: {}", PROP_LEVEL, level.name().toLowerCase()); logger.debug("-D{}: {}", PROP_TARGET_RECORDS, TARGET_RECORDS); } } /** * @deprecated Use {@link #setLevel(Level)} instead. */ @Deprecated public static void setEnabled(boolean enabled) { setLevel(enabled? Level.SIMPLE : Level.DISABLED); } /** * Returns {@code true} if resource leak detection is enabled. */ public static boolean isEnabled() { return getLevel().ordinal() > Level.DISABLED.ordinal(); } /** * Sets the resource leak detection level. */ public static void setLevel(Level level) { if (level == null) { throw new NullPointerException("level"); } ResourceLeakDetector.level = level; } /** * Returns the current resource leak detection level. */ public static Level getLevel() { return level; } /** the collection of active resources */ private final Set<DefaultResourceLeak<?>> allLeaks = Collections.newSetFromMap(new ConcurrentHashMap<DefaultResourceLeak<?>, Boolean>()); private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>(); private final ConcurrentMap<String, Boolean> reportedLeaks = PlatformDependent.newConcurrentHashMap(); private final String resourceType; private final int samplingInterval; //...... /** * Creates a new {@link ResourceLeakTracker} which is expected to be closed via * {@link ResourceLeakTracker#close(Object)} when the related resource is deallocated. * * @return the {@link ResourceLeakTracker} or {@code null} */ @SuppressWarnings("unchecked") public final ResourceLeakTracker<T> track(T obj) { return track0(obj); } private DefaultResourceLeak track0(T obj) { Level level = ResourceLeakDetector.level; if (level == Level.DISABLED) { return null; } if (level.ordinal() < Level.PARANOID.ordinal()) { if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) { reportLeak(); return new DefaultResourceLeak(obj, refQueue, allLeaks); } return null; } reportLeak(); return new DefaultResourceLeak(obj, refQueue, allLeaks); } private void reportLeak() { if (!logger.isErrorEnabled()) { clearRefQueue(); return; } // Detect and report previous leaks. for (;;) { @SuppressWarnings("unchecked") DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll(); if (ref == null) { break; } if (!ref.dispose()) { continue; } String records = ref.toString(); if (reportedLeaks.putIfAbsent(records, Boolean.TRUE) == null) { if (records.isEmpty()) { reportUntracedLeak(resourceType); } else { reportTracedLeak(resourceType, records); } } } } /** * This method is called when a traced leak is detected. It can be overridden for tracking how many times leaks * have been detected. */ protected void reportTracedLeak(String resourceType, String records) { logger.error( "LEAK: {}.release() was not called before it's garbage-collected. " + "See http://netty.io/wiki/reference-counted-objects.html for more information.{}", resourceType, records); } /** * This method is called when an untraced leak is detected. It can be overridden for tracking how many times leaks * have been detected. */ protected void reportUntracedLeak(String resourceType) { logger.error("LEAK: {}.release() was not called before it's garbage-collected. " + "Enable advanced leak reporting to find out where the leak occurred. " + "To enable advanced leak reporting, " + "specify the JVM option '-D{}={}' or call {}.setLevel() " + "See http://netty.io/wiki/reference-counted-objects.html for more information.", resourceType, PROP_LEVEL, Level.ADVANCED.name().toLowerCase(), simpleClassName(this)); } //...... }
io.netty.leakDetection.targetRecords
)及SAMPLING_INTERVAL( io.netty.leakDetection.samplingInterval
)两个属性,其中targetRecords默认为4,samplingInterval默认为128 PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0
)是否调用reportLeak并创建DefaultResourceLeak;reportLeak方法有个for循环,不断从refQueue取DefaultResourceLeak,然后调用reportUntracedLeak或者reportTracedLeak进行error netty-common-4.1.33.Final-sources.jar!/io/netty/util/ResourceLeakDetector.java
private static final class DefaultResourceLeak<T> extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak { @SuppressWarnings("unchecked") // generics and updaters do not mix. private static final AtomicReferenceFieldUpdater<DefaultResourceLeak<?>, Record> headUpdater = (AtomicReferenceFieldUpdater) AtomicReferenceFieldUpdater.newUpdater(DefaultResourceLeak.class, Record.class, "head"); @SuppressWarnings("unchecked") // generics and updaters do not mix. private static final AtomicIntegerFieldUpdater<DefaultResourceLeak<?>> droppedRecordsUpdater = (AtomicIntegerFieldUpdater) AtomicIntegerFieldUpdater.newUpdater(DefaultResourceLeak.class, "droppedRecords"); @SuppressWarnings("unused") private volatile Record head; @SuppressWarnings("unused") private volatile int droppedRecords; private final Set<DefaultResourceLeak<?>> allLeaks; private final int trackedHash; DefaultResourceLeak( Object referent, ReferenceQueue<Object> refQueue, Set<DefaultResourceLeak<?>> allLeaks) { super(referent, refQueue); assert referent != null; // Store the hash of the tracked object to later assert it in the close(...) method. // It's important that we not store a reference to the referent as this would disallow it from // be collected via the WeakReference. trackedHash = System.identityHashCode(referent); allLeaks.add(this); // Create a new Record so we always have the creation stacktrace included. headUpdater.set(this, new Record(Record.BOTTOM)); this.allLeaks = allLeaks; } @Override public void record() { record0(null); } @Override public void record(Object hint) { record0(hint); } /** * This method works by exponentially backing off as more records are present in the stack. Each record has a * 1 / 2^n chance of dropping the top most record and replacing it with itself. This has a number of convenient * properties: * * <ol> * <li> The current record is always recorded. This is due to the compare and swap dropping the top most * record, rather than the to-be-pushed record. * <li> The very last access will always be recorded. This comes as a property of 1. * <li> It is possible to retain more records than the target, based upon the probability distribution. * <li> It is easy to keep a precise record of the number of elements in the stack, since each element has to * know how tall the stack is. * </ol> * * In this particular implementation, there are also some advantages. A thread local random is used to decide * if something should be recorded. This means that if there is a deterministic access pattern, it is now * possible to see what other accesses occur, rather than always dropping them. Second, after * {@link #TARGET_RECORDS} accesses, backoff occurs. This matches typical access patterns, * where there are either a high number of accesses (i.e. a cached buffer), or low (an ephemeral buffer), but * not many in between. * * The use of atomics avoids serializing a high number of accesses, when most of the records will be thrown * away. High contention only happens when there are very few existing records, which is only likely when the * object isn't shared! If this is a problem, the loop can be aborted and the record dropped, because another * thread won the race. */ private void record0(Object hint) { // Check TARGET_RECORDS > 0 here to avoid similar check before remove from and add to lastRecords if (TARGET_RECORDS > 0) { Record oldHead; Record prevHead; Record newHead; boolean dropped; do { if ((prevHead = oldHead = headUpdater.get(this)) == null) { // already closed. return; } final int numElements = oldHead.pos + 1; if (numElements >= TARGET_RECORDS) { final int backOffFactor = Math.min(numElements - TARGET_RECORDS, 30); if (dropped = PlatformDependent.threadLocalRandom().nextInt(1 << backOffFactor) != 0) { prevHead = oldHead.next; } } else { dropped = false; } newHead = hint != null ? new Record(prevHead, hint) : new Record(prevHead); } while (!headUpdater.compareAndSet(this, oldHead, newHead)); if (dropped) { droppedRecordsUpdater.incrementAndGet(this); } } } boolean dispose() { clear(); return allLeaks.remove(this); } @Override public boolean close() { if (allLeaks.remove(this)) { // Call clear so the reference is not even enqueued. clear(); headUpdater.set(this, null); return true; } return false; } @Override public boolean close(T trackedObject) { // Ensure that the object that was tracked is the same as the one that was passed to close(...). assert trackedHash == System.identityHashCode(trackedObject); try { return close(); } finally { // This method will do `synchronized(trackedObject)` and we should be sure this will not cause deadlock. // It should not, because somewhere up the callstack should be a (successful) `trackedObject.release`, // therefore it is unreasonable that anyone else, anywhere, is holding a lock on the trackedObject. // (Unreasonable but possible, unfortunately.) reachabilityFence0(trackedObject); } } /** * Ensures that the object referenced by the given reference remains * <a href="package-summary.html#reachability"><em>strongly reachable</em></a>, * regardless of any prior actions of the program that might otherwise cause * the object to become unreachable; thus, the referenced object is not * reclaimable by garbage collection at least until after the invocation of * this method. * * <p> Recent versions of the JDK have a nasty habit of prematurely deciding objects are unreachable. * see: https://stackoverflow.com/questions/26642153/finalize-called-on-strongly-reachable-object-in-java-8 * The Java 9 method Reference.reachabilityFence offers a solution to this problem. * * <p> This method is always implemented as a synchronization on {@code ref}, not as * {@code Reference.reachabilityFence} for consistency across platforms and to allow building on JDK 6-8. * <b>It is the caller's responsibility to ensure that this synchronization will not cause deadlock.</b> * * @param ref the reference. If {@code null}, this method has no effect. * @see java.lang.ref.Reference#reachabilityFence */ private static void reachabilityFence0(Object ref) { if (ref != null) { // Empty synchronized is ok: https://stackoverflow.com/a/31933260/1151521 synchronized (ref) { } } } @Override public String toString() { Record oldHead = headUpdater.getAndSet(this, null); if (oldHead == null) { // Already closed return EMPTY_STRING; } final int dropped = droppedRecordsUpdater.get(this); int duped = 0; int present = oldHead.pos + 1; // Guess about 2 kilobytes per stack trace StringBuilder buf = new StringBuilder(present * 2048).append(NEWLINE); buf.append("Recent access records: ").append(NEWLINE); int i = 1; Set<String> seen = new HashSet<String>(present); for (; oldHead != Record.BOTTOM; oldHead = oldHead.next) { String s = oldHead.toString(); if (seen.add(s)) { if (oldHead.next == Record.BOTTOM) { buf.append("Created at:").append(NEWLINE).append(s); } else { buf.append('#').append(i++).append(':').append(NEWLINE).append(s); } } else { duped++; } } if (duped > 0) { buf.append(": ") .append(duped) .append(" leak records were discarded because they were duplicates") .append(NEWLINE); } if (dropped > 0) { buf.append(": ") .append(dropped) .append(" leak records were discarded because the leak record count is targeted to ") .append(TARGET_RECORDS) .append(". Use system property ") .append(PROP_TARGET_RECORDS) .append(" to increase the limit.") .append(NEWLINE); } buf.setLength(buf.length() - NEWLINE.length()); return buf.toString(); } }
定义了record、close方法
)接口;record方法内部调用的是record0方法,它会更新newHead为新的Record;close方法会移除allLeaks,allLeaks由ResourceLeakDetector创建DefaultResourceLeak时传入,每创建一个DefaultResourceLeak,DefaultResourceLeak会把自己加入到allLeaks中 netty-netty-4.1.33.Final/buffer/src/main/java/io/netty/buffer/SimpleLeakAwareByteBuf.java
class SimpleLeakAwareByteBuf extends WrappedByteBuf { /** * This object's is associated with the {@link ResourceLeakTracker}. When {@link ResourceLeakTracker#close(Object)} * is called this object will be used as the argument. It is also assumed that this object is used when * {@link ResourceLeakDetector#track(Object)} is called to create {@link #leak}. */ private final ByteBuf trackedByteBuf; final ResourceLeakTracker<ByteBuf> leak; SimpleLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) { super(wrapped); this.trackedByteBuf = ObjectUtil.checkNotNull(trackedByteBuf, "trackedByteBuf"); this.leak = ObjectUtil.checkNotNull(leak, "leak"); } SimpleLeakAwareByteBuf(ByteBuf wrapped, ResourceLeakTracker<ByteBuf> leak) { this(wrapped, wrapped, leak); } //...... @Override public boolean release() { if (super.release()) { closeLeak(); return true; } return false; } @Override public boolean release(int decrement) { if (super.release(decrement)) { closeLeak(); return true; } return false; } private void closeLeak() { // Close the ResourceLeakTracker with the tracked ByteBuf as argument. This must be the same that was used when // calling DefaultResourceLeak.track(...). boolean closed = leak.close(trackedByteBuf); assert closed; } private ByteBuf unwrappedDerived(ByteBuf derived) { // We only need to unwrap SwappedByteBuf implementations as these will be the only ones that may end up in // the AbstractLeakAwareByteBuf implementations beside slices / duplicates and "real" buffers. ByteBuf unwrappedDerived = unwrapSwapped(derived); if (unwrappedDerived instanceof AbstractPooledDerivedByteBuf) { // Update the parent to point to this buffer so we correctly close the ResourceLeakTracker. ((AbstractPooledDerivedByteBuf) unwrappedDerived).parent(this); ResourceLeakTracker<ByteBuf> newLeak = AbstractByteBuf.leakDetector.track(derived); if (newLeak == null) { // No leak detection, just return the derived buffer. return derived; } return newLeakAwareByteBuf(derived, newLeak); } return newSharedLeakAwareByteBuf(derived); } //...... }
netty-netty-4.1.33.Final/buffer/src/main/java/io/netty/buffer/AdvancedLeakAwareByteBuf.java
final class AdvancedLeakAwareByteBuf extends SimpleLeakAwareByteBuf { private static final String PROP_ACQUIRE_AND_RELEASE_ONLY = "io.netty.leakDetection.acquireAndReleaseOnly"; private static final boolean ACQUIRE_AND_RELEASE_ONLY; private static final InternalLogger logger = InternalLoggerFactory.getInstance(AdvancedLeakAwareByteBuf.class); static { ACQUIRE_AND_RELEASE_ONLY = SystemPropertyUtil.getBoolean(PROP_ACQUIRE_AND_RELEASE_ONLY, false); if (logger.isDebugEnabled()) { logger.debug("-D{}: {}", PROP_ACQUIRE_AND_RELEASE_ONLY, ACQUIRE_AND_RELEASE_ONLY); } ResourceLeakDetector.addExclusions( AdvancedLeakAwareByteBuf.class, "touch", "recordLeakNonRefCountingOperation"); } AdvancedLeakAwareByteBuf(ByteBuf buf, ResourceLeakTracker<ByteBuf> leak) { super(buf, leak); } AdvancedLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) { super(wrapped, trackedByteBuf, leak); } static void recordLeakNonRefCountingOperation(ResourceLeakTracker<ByteBuf> leak) { if (!ACQUIRE_AND_RELEASE_ONLY) { leak.record(); } } //...... @Override public ByteBuf order(ByteOrder endianness) { recordLeakNonRefCountingOperation(leak); return super.order(endianness); } @Override public ByteBuf slice() { recordLeakNonRefCountingOperation(leak); return super.slice(); } @Override public ByteBuf slice(int index, int length) { recordLeakNonRefCountingOperation(leak); return super.slice(index, length); } //...... @Override public ByteBuf retain() { leak.record(); return super.retain(); } @Override public ByteBuf retain(int increment) { leak.record(); return super.retain(increment); } @Override public boolean release() { leak.record(); return super.release(); } @Override public boolean release(int decrement) { leak.record(); return super.release(decrement); } @Override public ByteBuf touch() { leak.record(); return this; } @Override public ByteBuf touch(Object hint) { leak.record(hint); return this; } //...... }
io.netty.leakDetection.targetRecords
)及SAMPLING_INTERVAL( io.netty.leakDetection.samplingInterval
)两个属性,其中targetRecords默认为4,samplingInterval默认为128 PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0
)是否调用reportLeak并创建DefaultResourceLeak;reportLeak方法有个for循环,不断从refQueue取DefaultResourceLeak,然后调用reportUntracedLeak或者reportTracedLeak进行error 定义了record、close方法
)接口;record方法内部调用的是record0方法,它会更新newHead为新的Record;close方法会移除allLeaks,allLeaks由ResourceLeakDetector创建DefaultResourceLeak时传入,每创建一个DefaultResourceLeak,DefaultResourceLeak会把自己加入到allLeaks中