diff --git a/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java index c1f4bc2cac..9e67724c53 100644 --- a/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/AbstractByteBufAllocator.java @@ -30,6 +30,10 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator { static final int DEFAULT_MAX_COMPONENTS = 16; static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page + static { + ResourceLeakDetector.addExclusions(AbstractByteBufAllocator.class, "toLeakAwareBuffer"); + } + protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) { ResourceLeakTracker leak; switch (ResourceLeakDetector.getLevel()) { diff --git a/buffer/src/main/java/io/netty/buffer/AdvancedLeakAwareByteBuf.java b/buffer/src/main/java/io/netty/buffer/AdvancedLeakAwareByteBuf.java index eb972093b6..4820017e34 100644 --- a/buffer/src/main/java/io/netty/buffer/AdvancedLeakAwareByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/AdvancedLeakAwareByteBuf.java @@ -16,6 +16,7 @@ package io.netty.buffer; +import io.netty.util.ResourceLeakDetector; import io.netty.util.ResourceLeakTracker; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -43,6 +44,9 @@ final class AdvancedLeakAwareByteBuf extends SimpleLeakAwareByteBuf { if (logger.isDebugEnabled()) { logger.debug("-D{}: {}", PROP_ACQUIRE_AND_RELEASE_ONLY, ACQUIRE_AND_RELEASE_ONLY); } + + ResourceLeakDetector.addExclusions( + AdvancedLeakAwareByteBuf.class, "recordLeakNonRefCountingOperation"); } AdvancedLeakAwareByteBuf(ByteBuf buf, ResourceLeakTracker leak) { diff --git a/common/src/main/java/io/netty/util/ResourceLeakDetector.java b/common/src/main/java/io/netty/util/ResourceLeakDetector.java index 16ffef6d05..33b878cf79 100644 --- a/common/src/main/java/io/netty/util/ResourceLeakDetector.java +++ b/common/src/main/java/io/netty/util/ResourceLeakDetector.java @@ -16,6 +16,7 @@ package io.netty.util; +import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -23,9 +24,14 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.lang.ref.PhantomReference; import java.lang.ref.ReferenceQueue; +import java.lang.reflect.Method; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static io.netty.util.internal.StringUtil.EMPTY_STRING; import static io.netty.util.internal.StringUtil.NEWLINE; @@ -37,12 +43,10 @@ public class ResourceLeakDetector { private static final String PROP_LEVEL = "io.netty.leakDetection.level"; private static final Level DEFAULT_LEVEL = Level.SIMPLE; - private static final String PROP_MAX_RECORDS = "io.netty.leakDetection.maxRecords"; - private static final int DEFAULT_MAX_RECORDS = 4; - private static final String PROP_MAX_SAMPLED_RECORDS = "io.netty.leakDetection.maxSampledRecords"; + private static final String PROP_TARGET_RECORDS = "io.netty.leakDetection.targetRecords"; + private static final int DEFAULT_TARGET_RECORDS = 4; - private static final int MAX_RECORDS; - private static final int MAX_SAMPLED_RECORDS; + private static final int TARGET_RECORDS; /** * Represents the level of resource leak detection. @@ -110,18 +114,16 @@ public class ResourceLeakDetector { levelStr = SystemPropertyUtil.get(PROP_LEVEL, levelStr); Level level = Level.parseLevel(levelStr); - MAX_RECORDS = SystemPropertyUtil.getInt(PROP_MAX_RECORDS, DEFAULT_MAX_RECORDS); - long maxRecordsSampled = SystemPropertyUtil.getLong(PROP_MAX_SAMPLED_RECORDS, MAX_RECORDS * 10L); - MAX_SAMPLED_RECORDS = Math.max((int) Math.min(Integer.MAX_VALUE, maxRecordsSampled), MAX_RECORDS); + TARGET_RECORDS = SystemPropertyUtil.getInt(PROP_TARGET_RECORDS, DEFAULT_TARGET_RECORDS); ResourceLeakDetector.level = level; if (logger.isDebugEnabled()) { logger.debug("-D{}: {}", PROP_LEVEL, level.name().toLowerCase()); - logger.debug("-D{}: {}", PROP_MAX_RECORDS, MAX_RECORDS); - logger.debug("-D{}: {}", PROP_MAX_SAMPLED_RECORDS, MAX_SAMPLED_RECORDS); + logger.debug("-D{}: {}", PROP_TARGET_RECORDS, TARGET_RECORDS); } } + // There is a minor performance benefit in TLR if this is a power of 2. static final int DEFAULT_SAMPLING_INTERVAL = 128; /** @@ -157,7 +159,7 @@ public class ResourceLeakDetector { } /** the collection of active resources */ - private final ConcurrentMap allLeaks = PlatformDependent.newConcurrentHashMap(); + private final ConcurrentMap, LeakEntry> allLeaks = PlatformDependent.newConcurrentHashMap(); private final ReferenceQueue refQueue = new ReferenceQueue(); private final ConcurrentMap reportedLeaks = PlatformDependent.newConcurrentHashMap(); @@ -251,12 +253,12 @@ public class ResourceLeakDetector { if (level.ordinal() < Level.PARANOID.ordinal()) { if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) { reportLeak(); - return new DefaultResourceLeak(obj); + return new DefaultResourceLeak(obj, refQueue, allLeaks); } return null; } reportLeak(); - return new DefaultResourceLeak(obj); + return new DefaultResourceLeak(obj, refQueue, allLeaks); } private void clearRefQueue() { @@ -331,18 +333,31 @@ public class ResourceLeakDetector { } @SuppressWarnings("deprecation") - private final class DefaultResourceLeak extends PhantomReference implements ResourceLeakTracker, - ResourceLeak { - private final Record head; + private static final class DefaultResourceLeak + extends PhantomReference implements ResourceLeakTracker, ResourceLeak { - // This will be updated once a new Record will be created and added - private Record tail; + @SuppressWarnings("unchecked") // generics and updaters do not mix. + private static final AtomicReferenceFieldUpdater, Record> headUpdater = + (AtomicReferenceFieldUpdater) + AtomicReferenceFieldUpdater.newUpdater(DefaultResourceLeak.class, Record.class, "head"); + @SuppressWarnings("unchecked") // generics and updaters do not mix. + private static final AtomicIntegerFieldUpdater> droppedRecordsUpdater = + (AtomicIntegerFieldUpdater) + AtomicIntegerFieldUpdater.newUpdater(DefaultResourceLeak.class, "droppedRecords"); + + @SuppressWarnings("unused") + private volatile Record head; + @SuppressWarnings("unused") + private volatile int droppedRecords; + + private final ConcurrentMap, LeakEntry> allLeaks; private final int trackedHash; - private int numRecords; - private int droppedRecords; - DefaultResourceLeak(Object referent) { + DefaultResourceLeak( + Object referent, + ReferenceQueue refQueue, + ConcurrentMap, LeakEntry> allLeaks) { super(referent, refQueue); assert referent != null; @@ -351,8 +366,9 @@ public class ResourceLeakDetector { // It's important that we not store a reference to the referent as this would disallow it from // be collected via the PhantomReference. trackedHash = System.identityHashCode(referent); - head = tail = getLevel().ordinal() >= Level.ADVANCED.ordinal() ? new Record() : null; allLeaks.put(this, LeakEntry.INSTANCE); + headUpdater.set(this, Record.BOTTOM); + this.allLeaks = allLeaks; } @Override @@ -365,29 +381,46 @@ public class ResourceLeakDetector { record0(hint); } + /** + * This method works by exponentially backing off as more records are present in the stack. Each record has a + * 1 / 2^n chance of dropping the top most record and replacing it with itself. This has a number of convenient + * properties. First, the first record is always recorded. Second, the very last access will always be + * recorded. Third, an arbitrary number of accesses can be accepted, rather than just the last few. Fourth, + * it is easy to keep a precise record of the number of elements in the stack, since each element has to know + * how tall the stack is. + * + * In this particular implementation, there are also some advantages. A thread local random is used to decide + * if something should be recorded. This means that if there is a deterministic access pattern, it is now + * possible to see what other accesses occur, rather than always dropping them. Second, there is roughly a + * linear ramp up to {@link #TARGET_RECORDS}, after which backoff occurs. This matches typical access patterns, + * where there are either a high number of accesses (i.e. a cached buffer), or low (an ephemeral buffer), but + * not many in between. + * + * The use of atomics avoids serializing a high number of accesses, when most of the records will be thrown + * away. High contention only happens when there are very few existing records, which is only likely when the + * object isn't shared! If this is a problem, the loop can be aborted and the record dropped, because another + * thread won the race. + */ private void record0(Object hint) { - // Check MAX_RECORDS > 0 here to avoid similar check before remove from and add to lastRecords - if (head != null && MAX_RECORDS > 0) { - synchronized (head) { - if (tail == null) { - // already closed + // Check TARGET_RECORDS > 0 here to avoid similar check before remove from and add to lastRecords + if (TARGET_RECORDS > 0) { + Record oldHead; + Record prevHead; + Record newHead; + boolean dropped; + do { + if ((prevHead = oldHead = headUpdater.get(this)) == null) { + // already closed. return; } - - Record record = hint == null ? new Record() : new Record(hint); - tail.next = record; - tail = record; - - // Enforce a limit so our linked-list not grows too large and cause a GC storm later on when we - // unlink it. The reason why we choose a different limit to MAX_RECORDS is that we will not handle - // duplications here as its very expensive. We will filter these out when we actually - // detected a leak. - if (numRecords == MAX_SAMPLED_RECORDS) { - head.next = head.next.next; - droppedRecords++; - } else { - numRecords++; + int numElements = oldHead.pos + 1; + if (dropped = PlatformDependent.threadLocalRandom().nextInt(1 << numElements) >= TARGET_RECORDS) { + prevHead = oldHead.next; } + newHead = hint != null ? new Record(prevHead, hint) : new Record(prevHead); + } while (!headUpdater.compareAndSet(this, oldHead, newHead)); + if (dropped) { + droppedRecordsUpdater.incrementAndGet(this); } } } @@ -403,15 +436,7 @@ public class ResourceLeakDetector { if (allLeaks.remove(this, LeakEntry.INSTANCE)) { // Call clear so the reference is not even enqueued. clear(); - - if (head != null) { - synchronized (head) { - // Allow to GC all the records. - head.next = null; - numRecords = 0; - tail = null; - } - } + headUpdater.set(this, null); return true; } return false; @@ -431,107 +456,119 @@ public class ResourceLeakDetector { @Override public String toString() { - if (head == null) { + Record oldHead; + do { + oldHead = headUpdater.get(this); + } while (!headUpdater.compareAndSet(this, oldHead, null)); + if (oldHead == null) { + // Already closed return EMPTY_STRING; } - final String creationRecord; - final String[] array; - int idx = 0; - String last = null; - final int dropped; + final int dropped = droppedRecordsUpdater.get(this); + int duped = 0; - synchronized (head) { - if (tail == null) { - // Already closed - return EMPTY_STRING; - } - dropped = droppedRecords; - creationRecord = head.toString(); - array = new String[numRecords]; - Record record = head.next; - while (record != null) { - String recordStr = record.toString(); - if (last == null || !last.equals(recordStr)) { - array[idx ++] = recordStr; - last = recordStr; + int present = oldHead.pos + 1; + // Guess about 2 kilobytes per stack trace + StringBuilder buf = new StringBuilder(present * 2048).append(NEWLINE); + buf.append("Recent access records: ").append(NEWLINE); + + int i = 1; + Set seen = new HashSet(present); + for (; oldHead != Record.BOTTOM; oldHead = oldHead.next) { + String s = oldHead.toString(); + if (seen.add(s)) { + if (oldHead.next == Record.BOTTOM) { + buf.append("Created at:").append(NEWLINE).append(s); + } else { + buf.append('#').append(i++).append(':').append(NEWLINE).append(s); } - record = record.next; + } else { + duped++; } } - int removed = idx > MAX_RECORDS ? idx - MAX_RECORDS : 0; - - StringBuilder buf = new StringBuilder(16384).append(NEWLINE); - if (removed > 0) { - buf.append("WARNING: ") - .append(removed) - .append(" leak records were discarded because the leak record count is limited to ") - .append(MAX_RECORDS) - .append(". Use system property ") - .append(PROP_MAX_RECORDS) - .append(" to increase the limit.") - .append(NEWLINE); + if (duped > 0) { + buf.append(": ") + .append(dropped) + .append(" leak records were discarded because they were duplicates") + .append(NEWLINE); } + if (dropped > 0) { - buf.append(dropped) - .append(" leak records were not sampled because the leak record sample count is limited to ") - .append(MAX_SAMPLED_RECORDS) + buf.append(": ") + .append(dropped) + .append(" leak records were discarded because the leak record count is targeted to ") + .append(TARGET_RECORDS) .append(". Use system property ") - .append(PROP_MAX_SAMPLED_RECORDS) + .append(PROP_TARGET_RECORDS) .append(" to increase the limit.") .append(NEWLINE); } - int records = idx - removed; - buf.append("Recent access records: ").append(records).append(NEWLINE); - - if (records > 0) { - // The array may not be completely filled so we need to take this into account. - for (int i = records - 1; i >= 0; i --) { - buf.append('#') - .append(i + 1) - .append(':') - .append(NEWLINE) - .append(array[i]); - } - } - - buf.append("Created at:") - .append(NEWLINE) - .append(creationRecord); - buf.setLength(buf.length() - NEWLINE.length()); return buf.toString(); } } + private static final AtomicReference excludedMethods = + new AtomicReference(EmptyArrays.EMPTY_STRINGS); + + public static void addExclusions(Class clz, String ... methodNames) { + Set nameSet = new HashSet(Arrays.asList(methodNames)); + // Use loop rather than lookup. This avoids knowing the parameters, and doesn't have to handle + // NoSuchMethodException. + for (Method method : clz.getDeclaredMethods()) { + if (nameSet.remove(method.getName()) && nameSet.isEmpty()) { + break; + } + } + if (!nameSet.isEmpty()) { + throw new IllegalArgumentException("Can't find '" + nameSet + "' in " + clz.getName()); + } + String[] oldMethods; + String[] newMethods; + do { + oldMethods = excludedMethods.get(); + newMethods = Arrays.copyOf(oldMethods, oldMethods.length + 2 * methodNames.length); + for (int i = 0; i < methodNames.length; i++) { + newMethods[oldMethods.length + i * 2] = clz.getName(); + newMethods[oldMethods.length + i * 2 + 1] = methodNames[i]; + } + } while (!excludedMethods.compareAndSet(oldMethods, newMethods)); + } + private static final class Record extends Throwable { - private static final Set STACK_TRACE_ELEMENT_EXCLUSIONS = new HashSet(); - static { - STACK_TRACE_ELEMENT_EXCLUSIONS.add("io.netty.util.ReferenceCountUtil.touch"); - STACK_TRACE_ELEMENT_EXCLUSIONS.add("io.netty.buffer.AdvancedLeakAwareByteBuf.touch"); - STACK_TRACE_ELEMENT_EXCLUSIONS.add("io.netty.buffer.AbstractByteBufAllocator.toLeakAwareBuffer"); - STACK_TRACE_ELEMENT_EXCLUSIONS.add( - "io.netty.buffer.AdvancedLeakAwareByteBuf.recordLeakNonRefCountingOperation"); - } + private static final Record BOTTOM = new Record(); private final String hintString; - Record next; + private final Record next; + private final int pos; - Record(Object hint) { + Record(Record next, Object hint) { // This needs to be generated even if toString() is never called as it may change later on. hintString = hint instanceof ResourceLeakHint ? ((ResourceLeakHint) hint).toHintString() : hint.toString(); + this.next = next; + this.pos = next.pos + 1; } - Record() { + Record(Record next) { hintString = null; + this.next = next; + this.pos = next.pos + 1; + } + + // Used to terminate the stack + private Record() { + hintString = null; + next = null; + pos = -1; } @Override public String toString() { - StringBuilder buf = new StringBuilder(4096); + StringBuilder buf = new StringBuilder(2048); if (hintString != null) { buf.append("\tHint: ").append(hintString).append(NEWLINE); } @@ -539,13 +576,17 @@ public class ResourceLeakDetector { // Append the stack trace. StackTraceElement[] array = getStackTrace(); // Skip the first three elements. - for (int i = 3; i < array.length; i++) { + out: for (int i = 3; i < array.length; i++) { StackTraceElement element = array[i]; - // Strip the noisy stack trace elements. - if (STACK_TRACE_ELEMENT_EXCLUSIONS.contains(element.getClassName() + '.' + element.getMethodName())) { - continue; + String[] exclusions = excludedMethods.get(); + for (int k = 0; k < exclusions.length; k += 2) { + if (exclusions[k].equals(element.getClassName()) + && exclusions[k + 1].equals(element.getMethodName())) { + continue out; + } } + buf.append('\t'); buf.append(element.toString()); buf.append(NEWLINE);