Motivation: Resource Leak Detector (RLD) tries to helpfully indicate where an object was last accessed and report the accesses in the case the object was not cleaned up. It handles lightly used objects well, but drops all but the last few accesses.

Configuring this is tough because there is split between highly shared (and accessed) objects and lightly accessed objects.

Modification:
There are a number of changes here.  In relative order of importance:

API / Functionality changes:
* Max records and max sample records are gone.  Only "target" records, the number of records tries to retain is exposed.
* Records are sampled based on the number of already stored records.  The likelihood of recording a new sample is `2^(-n)`, where `n` is the number of currently stored elements.
* Records are stored in a concurrent stack structure rather than a list.  This avoids a head and tail.  Since the stack is only read once, there is no need to maintain head and tail pointers
* The properties of this imply that the very first and very last access are always recorded.  When deciding to sample, the top element is replaced rather than pushed.
* Samples that happen between the first and last accesses now have a chance of being recorded.  Previously only the final few were kept.
* Sampling is no longer deterministic.  Previously, a deterministic access pattern meant that you could conceivably always miss some access points.
* Sampling has a linear ramp for low values and and exponentially backs off roughly equal to 2^n.  This means that for 1,000,000 accesses, about 20 will actually be kept.  I have an elegant proof for this which is too large to fit in this commit message.

Code changes:
* All locks are gone.  Because sampling rarely needs to do a write, there is almost 0 contention.  The dropped records counter is slightly contentious, but this could be removed or changed to a LongAdder.  This was not done because of memory concerns.
* Stack trace exclusion is done outside of RLD.  Classes can opt to remove some of their methods.
* Stack trace exclusion is faster, since it uses String.equals, often getting a pointer compare due to interning.  Previously it used contains()
* Leak printing is outputted fairly differently.  I tried to preserve as much of the original formatting as possible, but some things didn't make sense to keep.

Result:
More useful leak reporting.

Faster:
```
Before:
Benchmark                                           (recordTimes)   Mode  Cnt       Score      Error  Units
ResourceLeakDetectorRecordBenchmark.record                      8  thrpt   20  136293.404 ± 7669.454  ops/s
ResourceLeakDetectorRecordBenchmark.record                     16  thrpt   20   72805.720 ± 3710.864  ops/s
ResourceLeakDetectorRecordBenchmark.recordWithHint              8  thrpt   20  139131.215 ± 4882.751  ops/s
ResourceLeakDetectorRecordBenchmark.recordWithHint             16  thrpt   20   74146.313 ± 4999.246  ops/s

After:
Benchmark                                           (recordTimes)   Mode  Cnt       Score      Error  Units
ResourceLeakDetectorRecordBenchmark.record                      8  thrpt   20  155281.969 ± 5301.399  ops/s
ResourceLeakDetectorRecordBenchmark.record                     16  thrpt   20   77866.239 ± 3821.054  ops/s
ResourceLeakDetectorRecordBenchmark.recordWithHint              8  thrpt   20  153360.036 ± 8611.353  ops/s
ResourceLeakDetectorRecordBenchmark.recordWithHint             16  thrpt   20   78670.804 ± 2399.149  ops/s
```
This commit is contained in:
Carl Mastrangelo 2017-09-18 17:46:39 -07:00
parent 9bcf31977c
commit 16b1dbdf92
4 changed files with 174 additions and 121 deletions

View File

@ -30,6 +30,10 @@ public abstract class AbstractByteBufAllocator implements ByteBufAllocator {
static final int DEFAULT_MAX_COMPONENTS = 16;
static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
static {
ResourceLeakDetector.addExclusions(AbstractByteBufAllocator.class, "toLeakAwareBuffer");
}
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
ResourceLeakTracker<ByteBuf> leak;
switch (ResourceLeakDetector.getLevel()) {

View File

@ -17,6 +17,7 @@
package io.netty.buffer;
import io.netty.util.ByteProcessor;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakTracker;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -45,6 +46,9 @@ final class AdvancedLeakAwareByteBuf extends SimpleLeakAwareByteBuf {
if (logger.isDebugEnabled()) {
logger.debug("-D{}: {}", PROP_ACQUIRE_AND_RELEASE_ONLY, ACQUIRE_AND_RELEASE_ONLY);
}
ResourceLeakDetector.addExclusions(
AdvancedLeakAwareByteBuf.class, "touch", "recordLeakNonRefCountingOperation");
}
AdvancedLeakAwareByteBuf(ByteBuf buf, ResourceLeakTracker<ByteBuf> leak) {

View File

@ -26,6 +26,10 @@ public final class ReferenceCountUtil {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ReferenceCountUtil.class);
static {
ResourceLeakDetector.addExclusions(ReferenceCountUtil.class, "touch");
}
/**
* Try to call {@link ReferenceCounted#retain()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.

View File

@ -16,6 +16,7 @@
package io.netty.util;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -23,9 +24,14 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import static io.netty.util.internal.StringUtil.EMPTY_STRING;
import static io.netty.util.internal.StringUtil.NEWLINE;
@ -37,12 +43,10 @@ public class ResourceLeakDetector<T> {
private static final String PROP_LEVEL = "io.netty.leakDetection.level";
private static final Level DEFAULT_LEVEL = Level.SIMPLE;
private static final String PROP_MAX_RECORDS = "io.netty.leakDetection.maxRecords";
private static final int DEFAULT_MAX_RECORDS = 4;
private static final String PROP_MAX_SAMPLED_RECORDS = "io.netty.leakDetection.maxSampledRecords";
private static final String PROP_TARGET_RECORDS = "io.netty.leakDetection.targetRecords";
private static final int DEFAULT_TARGET_RECORDS = 4;
private static final int MAX_RECORDS;
private static final int MAX_SAMPLED_RECORDS;
private static final int TARGET_RECORDS;
/**
* Represents the level of resource leak detection.
@ -110,18 +114,16 @@ public class ResourceLeakDetector<T> {
levelStr = SystemPropertyUtil.get(PROP_LEVEL, levelStr);
Level level = Level.parseLevel(levelStr);
MAX_RECORDS = SystemPropertyUtil.getInt(PROP_MAX_RECORDS, DEFAULT_MAX_RECORDS);
long maxRecordsSampled = SystemPropertyUtil.getLong(PROP_MAX_SAMPLED_RECORDS, MAX_RECORDS * 10L);
MAX_SAMPLED_RECORDS = Math.max((int) Math.min(Integer.MAX_VALUE, maxRecordsSampled), MAX_RECORDS);
TARGET_RECORDS = SystemPropertyUtil.getInt(PROP_TARGET_RECORDS, DEFAULT_TARGET_RECORDS);
ResourceLeakDetector.level = level;
if (logger.isDebugEnabled()) {
logger.debug("-D{}: {}", PROP_LEVEL, level.name().toLowerCase());
logger.debug("-D{}: {}", PROP_MAX_RECORDS, MAX_RECORDS);
logger.debug("-D{}: {}", PROP_MAX_SAMPLED_RECORDS, MAX_SAMPLED_RECORDS);
logger.debug("-D{}: {}", PROP_TARGET_RECORDS, TARGET_RECORDS);
}
}
// There is a minor performance benefit in TLR if this is a power of 2.
static final int DEFAULT_SAMPLING_INTERVAL = 128;
/**
@ -157,7 +159,7 @@ public class ResourceLeakDetector<T> {
}
/** the collection of active resources */
private final ConcurrentMap<DefaultResourceLeak, LeakEntry> allLeaks = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<DefaultResourceLeak<?>, LeakEntry> allLeaks = PlatformDependent.newConcurrentHashMap();
private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
private final ConcurrentMap<String, Boolean> reportedLeaks = PlatformDependent.newConcurrentHashMap();
@ -251,12 +253,12 @@ public class ResourceLeakDetector<T> {
if (level.ordinal() < Level.PARANOID.ordinal()) {
if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
reportLeak();
return new DefaultResourceLeak(obj);
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
return null;
}
reportLeak();
return new DefaultResourceLeak(obj);
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
private void clearRefQueue() {
@ -331,18 +333,31 @@ public class ResourceLeakDetector<T> {
}
@SuppressWarnings("deprecation")
private final class DefaultResourceLeak extends PhantomReference<Object> implements ResourceLeakTracker<T>,
ResourceLeak {
private final Record head;
private static final class DefaultResourceLeak<T>
extends PhantomReference<Object> implements ResourceLeakTracker<T>, ResourceLeak {
// This will be updated once a new Record will be created and added
private Record tail;
@SuppressWarnings("unchecked") // generics and updaters do not mix.
private static final AtomicReferenceFieldUpdater<DefaultResourceLeak<?>, Record> headUpdater =
(AtomicReferenceFieldUpdater)
AtomicReferenceFieldUpdater.newUpdater(DefaultResourceLeak.class, Record.class, "head");
@SuppressWarnings("unchecked") // generics and updaters do not mix.
private static final AtomicIntegerFieldUpdater<DefaultResourceLeak<?>> droppedRecordsUpdater =
(AtomicIntegerFieldUpdater)
AtomicIntegerFieldUpdater.newUpdater(DefaultResourceLeak.class, "droppedRecords");
@SuppressWarnings("unused")
private volatile Record head;
@SuppressWarnings("unused")
private volatile int droppedRecords;
private final ConcurrentMap<DefaultResourceLeak<?>, LeakEntry> allLeaks;
private final int trackedHash;
private int numRecords;
private int droppedRecords;
DefaultResourceLeak(Object referent) {
DefaultResourceLeak(
Object referent,
ReferenceQueue<Object> refQueue,
ConcurrentMap<DefaultResourceLeak<?>, LeakEntry> allLeaks) {
super(referent, refQueue);
assert referent != null;
@ -351,8 +366,9 @@ public class ResourceLeakDetector<T> {
// It's important that we not store a reference to the referent as this would disallow it from
// be collected via the PhantomReference.
trackedHash = System.identityHashCode(referent);
head = tail = getLevel().ordinal() >= Level.ADVANCED.ordinal() ? new Record() : null;
allLeaks.put(this, LeakEntry.INSTANCE);
headUpdater.set(this, Record.BOTTOM);
this.allLeaks = allLeaks;
}
@Override
@ -365,29 +381,46 @@ public class ResourceLeakDetector<T> {
record0(hint);
}
/**
* This method works by exponentially backing off as more records are present in the stack. Each record has a
* 1 / 2^n chance of dropping the top most record and replacing it with itself. This has a number of convenient
* properties. First, the first record is always recorded. Second, the very last access will always be
* recorded. Third, an arbitrary number of accesses can be accepted, rather than just the last few. Fourth,
* it is easy to keep a precise record of the number of elements in the stack, since each element has to know
* how tall the stack is.
*
* In this particular implementation, there are also some advantages. A thread local random is used to decide
* if something should be recorded. This means that if there is a deterministic access pattern, it is now
* possible to see what other accesses occur, rather than always dropping them. Second, there is roughly a
* linear ramp up to {@link #TARGET_RECORDS}, after which backoff occurs. This matches typical access patterns,
* where there are either a high number of accesses (i.e. a cached buffer), or low (an ephemeral buffer), but
* not many in between.
*
* The use of atomics avoids serializing a high number of accesses, when most of the records will be thrown
* away. High contention only happens when there are very few existing records, which is only likely when the
* object isn't shared! If this is a problem, the loop can be aborted and the record dropped, because another
* thread won the race.
*/
private void record0(Object hint) {
// Check MAX_RECORDS > 0 here to avoid similar check before remove from and add to lastRecords
if (head != null && MAX_RECORDS > 0) {
synchronized (head) {
if (tail == null) {
// already closed
// Check TARGET_RECORDS > 0 here to avoid similar check before remove from and add to lastRecords
if (TARGET_RECORDS > 0) {
Record oldHead;
Record prevHead;
Record newHead;
boolean dropped;
do {
if ((prevHead = oldHead = headUpdater.get(this)) == null) {
// already closed.
return;
}
Record record = hint == null ? new Record() : new Record(hint);
tail.next = record;
tail = record;
// Enforce a limit so our linked-list not grows too large and cause a GC storm later on when we
// unlink it. The reason why we choose a different limit to MAX_RECORDS is that we will not handle
// duplications here as its very expensive. We will filter these out when we actually
// detected a leak.
if (numRecords == MAX_SAMPLED_RECORDS) {
head.next = head.next.next;
droppedRecords++;
} else {
numRecords++;
int numElements = oldHead.pos + 1;
if (dropped = PlatformDependent.threadLocalRandom().nextInt(1 << numElements) >= TARGET_RECORDS) {
prevHead = oldHead.next;
}
newHead = hint != null ? new Record(prevHead, hint) : new Record(prevHead);
} while (!headUpdater.compareAndSet(this, oldHead, newHead));
if (dropped) {
droppedRecordsUpdater.incrementAndGet(this);
}
}
}
@ -403,15 +436,7 @@ public class ResourceLeakDetector<T> {
if (allLeaks.remove(this, LeakEntry.INSTANCE)) {
// Call clear so the reference is not even enqueued.
clear();
if (head != null) {
synchronized (head) {
// Allow to GC all the records.
head.next = null;
numRecords = 0;
tail = null;
}
}
headUpdater.set(this, null);
return true;
}
return false;
@ -431,107 +456,119 @@ public class ResourceLeakDetector<T> {
@Override
public String toString() {
if (head == null) {
Record oldHead;
do {
oldHead = headUpdater.get(this);
} while (!headUpdater.compareAndSet(this, oldHead, null));
if (oldHead == null) {
// Already closed
return EMPTY_STRING;
}
final String creationRecord;
final String[] array;
int idx = 0;
String last = null;
final int dropped;
final int dropped = droppedRecordsUpdater.get(this);
int duped = 0;
synchronized (head) {
if (tail == null) {
// Already closed
return EMPTY_STRING;
}
dropped = droppedRecords;
creationRecord = head.toString();
array = new String[numRecords];
Record record = head.next;
while (record != null) {
String recordStr = record.toString();
if (last == null || !last.equals(recordStr)) {
array[idx ++] = recordStr;
last = recordStr;
int present = oldHead.pos + 1;
// Guess about 2 kilobytes per stack trace
StringBuilder buf = new StringBuilder(present * 2048).append(NEWLINE);
buf.append("Recent access records: ").append(NEWLINE);
int i = 1;
Set<String> seen = new HashSet<String>(present);
for (; oldHead != Record.BOTTOM; oldHead = oldHead.next) {
String s = oldHead.toString();
if (seen.add(s)) {
if (oldHead.next == Record.BOTTOM) {
buf.append("Created at:").append(NEWLINE).append(s);
} else {
buf.append('#').append(i++).append(':').append(NEWLINE).append(s);
}
record = record.next;
} else {
duped++;
}
}
int removed = idx > MAX_RECORDS ? idx - MAX_RECORDS : 0;
StringBuilder buf = new StringBuilder(16384).append(NEWLINE);
if (removed > 0) {
buf.append("WARNING: ")
.append(removed)
.append(" leak records were discarded because the leak record count is limited to ")
.append(MAX_RECORDS)
.append(". Use system property ")
.append(PROP_MAX_RECORDS)
.append(" to increase the limit.")
.append(NEWLINE);
if (duped > 0) {
buf.append(": ")
.append(dropped)
.append(" leak records were discarded because they were duplicates")
.append(NEWLINE);
}
if (dropped > 0) {
buf.append(dropped)
.append(" leak records were not sampled because the leak record sample count is limited to ")
.append(MAX_SAMPLED_RECORDS)
buf.append(": ")
.append(dropped)
.append(" leak records were discarded because the leak record count is targeted to ")
.append(TARGET_RECORDS)
.append(". Use system property ")
.append(PROP_MAX_SAMPLED_RECORDS)
.append(PROP_TARGET_RECORDS)
.append(" to increase the limit.")
.append(NEWLINE);
}
int records = idx - removed;
buf.append("Recent access records: ").append(records).append(NEWLINE);
if (records > 0) {
// The array may not be completely filled so we need to take this into account.
for (int i = records - 1; i >= 0; i --) {
buf.append('#')
.append(i + 1)
.append(':')
.append(NEWLINE)
.append(array[i]);
}
}
buf.append("Created at:")
.append(NEWLINE)
.append(creationRecord);
buf.setLength(buf.length() - NEWLINE.length());
return buf.toString();
}
}
private static final AtomicReference<String[]> excludedMethods =
new AtomicReference<String[]>(EmptyArrays.EMPTY_STRINGS);
public static void addExclusions(Class clz, String ... methodNames) {
Set<String> nameSet = new HashSet<String>(Arrays.asList(methodNames));
// Use loop rather than lookup. This avoids knowing the parameters, and doesn't have to handle
// NoSuchMethodException.
for (Method method : clz.getDeclaredMethods()) {
if (nameSet.remove(method.getName()) && nameSet.isEmpty()) {
break;
}
}
if (!nameSet.isEmpty()) {
throw new IllegalArgumentException("Can't find '" + nameSet + "' in " + clz.getName());
}
String[] oldMethods;
String[] newMethods;
do {
oldMethods = excludedMethods.get();
newMethods = Arrays.copyOf(oldMethods, oldMethods.length + 2 * methodNames.length);
for (int i = 0; i < methodNames.length; i++) {
newMethods[oldMethods.length + i * 2] = clz.getName();
newMethods[oldMethods.length + i * 2 + 1] = methodNames[i];
}
} while (!excludedMethods.compareAndSet(oldMethods, newMethods));
}
private static final class Record extends Throwable {
private static final Set<String> STACK_TRACE_ELEMENT_EXCLUSIONS = new HashSet<String>();
static {
STACK_TRACE_ELEMENT_EXCLUSIONS.add("io.netty.util.ReferenceCountUtil.touch");
STACK_TRACE_ELEMENT_EXCLUSIONS.add("io.netty.buffer.AdvancedLeakAwareByteBuf.touch");
STACK_TRACE_ELEMENT_EXCLUSIONS.add("io.netty.buffer.AbstractByteBufAllocator.toLeakAwareBuffer");
STACK_TRACE_ELEMENT_EXCLUSIONS.add(
"io.netty.buffer.AdvancedLeakAwareByteBuf.recordLeakNonRefCountingOperation");
}
private static final Record BOTTOM = new Record();
private final String hintString;
Record next;
private final Record next;
private final int pos;
Record(Object hint) {
Record(Record next, Object hint) {
// This needs to be generated even if toString() is never called as it may change later on.
hintString = hint instanceof ResourceLeakHint ? ((ResourceLeakHint) hint).toHintString() : hint.toString();
this.next = next;
this.pos = next.pos + 1;
}
Record() {
Record(Record next) {
hintString = null;
this.next = next;
this.pos = next.pos + 1;
}
// Used to terminate the stack
private Record() {
hintString = null;
next = null;
pos = -1;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(4096);
StringBuilder buf = new StringBuilder(2048);
if (hintString != null) {
buf.append("\tHint: ").append(hintString).append(NEWLINE);
}
@ -539,13 +576,17 @@ public class ResourceLeakDetector<T> {
// Append the stack trace.
StackTraceElement[] array = getStackTrace();
// Skip the first three elements.
for (int i = 3; i < array.length; i++) {
out: for (int i = 3; i < array.length; i++) {
StackTraceElement element = array[i];
// Strip the noisy stack trace elements.
if (STACK_TRACE_ELEMENT_EXCLUSIONS.contains(element.getClassName() + '.' + element.getMethodName())) {
continue;
String[] exclusions = excludedMethods.get();
for (int k = 0; k < exclusions.length; k += 2) {
if (exclusions[k].equals(element.getClassName())
&& exclusions[k + 1].equals(element.getMethodName())) {
continue out;
}
}
buf.append('\t');
buf.append(element.toString());
buf.append(NEWLINE);