Automatic diagnosis of resource leaks

Now that we are going to use buffer pooling by default, it is obvious
that a user will forget to call .free() and report memory leak. In this
case, we should have a tool to determine if it is a bug in our allocator
implementation or in the user's code.

This pull request adds a system property flag called
'io.netty.resourceLeakDetection'. If set, when a user forgets to call
.free(), the ResourceLeakDetector will detect it and log a message with
detailed stack trace to tell where the leaked buffer has been allocated.

Because obtaining stack trace is an expensive operation, I used sampling
technique. Allocation is recorded only for every 113th allocation. I
chose 113 because it's a prime number.

In production, a user might not want to enable this option due to
potential performance impact. If a user does not specify the
'-Dio.netty.resourceLeakDetection' option leak detection is disabled.

Even if the leak detection is enabled, the overhead should be less than
5% because only ~1% of allocations are monitored.

I also replaced SharedResourceMisuseDetector with ResourceLeakDetector.
This commit is contained in:
Trustin Lee 2013-01-11 22:47:54 +09:00
parent 201df99ee0
commit 337f5bbb8e
9 changed files with 306 additions and 80 deletions

View File

@ -15,6 +15,8 @@
*/ */
package io.netty.buffer; package io.netty.buffer;
import io.netty.util.ResourceLeakDetector;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -30,6 +32,8 @@ import java.nio.charset.Charset;
*/ */
public abstract class AbstractByteBuf implements ByteBuf { public abstract class AbstractByteBuf implements ByteBuf {
static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);
private int readerIndex; private int readerIndex;
private int writerIndex; private int writerIndex;
private int markedReaderIndex; private int markedReaderIndex;

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.buffer; package io.netty.buffer;
import io.netty.util.ResourceLeak;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import java.io.IOException; import java.io.IOException;
@ -43,6 +44,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
private static final ByteBuffer[] EMPTY_NIOBUFFERS = new ByteBuffer[0]; private static final ByteBuffer[] EMPTY_NIOBUFFERS = new ByteBuffer[0];
private final ResourceLeak leak = leakDetector.open(this);
private final ByteBufAllocator alloc; private final ByteBufAllocator alloc;
private final boolean direct; private final boolean direct;
private final List<Component> components = new ArrayList<Component>(); private final List<Component> components = new ArrayList<Component>();
@ -1544,6 +1546,8 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
for (Component c: components) { for (Component c: components) {
c.freeIfNecessary(); c.freeIfNecessary();
} }
leak.close();
} }
@Override @Override

View File

@ -16,6 +16,8 @@
package io.netty.buffer; package io.netty.buffer;
import io.netty.util.ResourceLeak;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -23,6 +25,8 @@ import java.util.Queue;
abstract class PooledByteBuf<T> extends AbstractByteBuf { abstract class PooledByteBuf<T> extends AbstractByteBuf {
private final ResourceLeak leak = leakDetector.open(this);
protected PoolChunk<T> chunk; protected PoolChunk<T> chunk;
protected long handle; protected long handle;
protected T memory; protected T memory;
@ -180,6 +184,7 @@ abstract class PooledByteBuf<T> extends AbstractByteBuf {
this.handle = -1; this.handle = -1;
memory = null; memory = null;
chunk.arena.free(chunk, handle); chunk.arena.free(chunk, handle);
leak.close();
} }
} }

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.buffer; package io.netty.buffer;
import io.netty.util.ResourceLeak;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import java.io.IOException; import java.io.IOException;
@ -33,10 +34,11 @@ import java.util.Queue;
* and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the * and {@link Unpooled#wrappedBuffer(ByteBuffer)} instead of calling the
* constructor explicitly. * constructor explicitly.
*/ */
@SuppressWarnings("restriction")
final class UnpooledDirectByteBuf extends AbstractByteBuf { final class UnpooledDirectByteBuf extends AbstractByteBuf {
private final ResourceLeak leak = leakDetector.open(this);
private final ByteBufAllocator alloc; private final ByteBufAllocator alloc;
private ByteBuffer buffer; private ByteBuffer buffer;
private ByteBuffer tmpNioBuf; private ByteBuffer tmpNioBuf;
private int capacity; private int capacity;
@ -465,12 +467,11 @@ final class UnpooledDirectByteBuf extends AbstractByteBuf {
this.buffer = null; this.buffer = null;
if (doNotFree) {
return;
}
resumeIntermediaryDeallocations(); resumeIntermediaryDeallocations();
PlatformDependent.freeDirectBuffer(buffer); if (!doNotFree) {
PlatformDependent.freeDirectBuffer(buffer);
}
leak.close();
} }
@Override @Override

View File

@ -22,7 +22,6 @@ import io.netty.monitor.MonitorName;
import io.netty.monitor.MonitorRegistry; import io.netty.monitor.MonitorRegistry;
import io.netty.monitor.ValueDistributionMonitor; import io.netty.monitor.ValueDistributionMonitor;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SharedResourceMisuseDetector;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -84,14 +83,16 @@ public class HashedWheelTimer implements Timer {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(HashedWheelTimer.class); InternalLoggerFactory.getInstance(HashedWheelTimer.class);
private static final SharedResourceMisuseDetector misuseDetector = private static final ResourceLeakDetector<HashedWheelTimer> leakDetector =
new SharedResourceMisuseDetector(HashedWheelTimer.class); new ResourceLeakDetector<HashedWheelTimer>(
HashedWheelTimer.class, 1, Runtime.getRuntime().availableProcessors() * 4);
private static final MonitorName TIMEOUT_EXPIRATION_TIME_DEVIATION_MN = new MonitorName(HashedWheelTimer.class, private static final MonitorName TIMEOUT_EXPIRATION_TIME_DEVIATION_MN = new MonitorName(HashedWheelTimer.class,
"timeout-expiration-time-deviation"); "timeout-expiration-time-deviation");
private static final MonitorName TIMEOUTS_PER_SECOND_MN = new MonitorName(HashedWheelTimer.class, private static final MonitorName TIMEOUTS_PER_SECOND_MN = new MonitorName(HashedWheelTimer.class,
"timeouts-per-second"); "timeouts-per-second");
private final ResourceLeak leak = leakDetector.open(this);
private final Worker worker = new Worker(); private final Worker worker = new Worker();
final Thread workerThread; final Thread workerThread;
final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down
@ -306,9 +307,6 @@ public class HashedWheelTimer implements Timer {
monitorRegistry.newValueDistributionMonitor(timeoutExpirationTimeDeviationMonitorName); monitorRegistry.newValueDistributionMonitor(timeoutExpirationTimeDeviationMonitorName);
timeoutsPerSecond = timeoutsPerSecond =
monitorRegistry.newEventRateMonitor(timeoutsPerSecondMonitorName, TimeUnit.SECONDS); monitorRegistry.newEventRateMonitor(timeoutsPerSecondMonitorName, TimeUnit.SECONDS);
// Misuse check
misuseDetector.increase();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -391,7 +389,7 @@ public class HashedWheelTimer implements Timer {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
misuseDetector.decrease(); leak.close();
Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
for (Set<HashedWheelTimeout> bucket: wheel) { for (Set<HashedWheelTimeout> bucket: wheel) {

View File

@ -0,0 +1,26 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
public interface ResourceLeak {
/**
* Close the leak so that {@link ResourceLeakDetector} does not warn about leaked resources.
*
* @return {@code true} if called first time, {@code false} if called already
*/
boolean close();
}

View File

@ -0,0 +1,188 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import io.netty.util.internal.SystemPropertyUtil;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
public final class ResourceLeakDetector<T> {
private static final boolean ENABLED = SystemPropertyUtil.getBoolean("io.netty.resourceLeakDetection", false);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ResourceLeakDetector.class);
static {
if (ENABLED) {
logger.debug("Resource leak detection enabled.");
}
}
private static final int DEFAULT_SAMPLING_INTERVAL = 113;
private static final ResourceLeak NOOP = new ResourceLeak() {
@Override
public boolean close() {
return false;
}
};
/** the linked list of active resources */
private final DefaultResourceLeak head = new DefaultResourceLeak(null);
private final DefaultResourceLeak tail = new DefaultResourceLeak(null);
private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
private final ConcurrentMap<Exception, Boolean> reportedLeaks = new ConcurrentHashMap<Exception, Boolean>();
private final String resourceType;
private final int samplingInterval;
private final long maxActive;
private long active;
private final AtomicBoolean loggedTooManyActive = new AtomicBoolean();
private static volatile long leakCheckCnt;
public ResourceLeakDetector(Class<?> resourceType) {
this(resourceType.getSimpleName());
}
public ResourceLeakDetector(String resourceType) {
this(resourceType, DEFAULT_SAMPLING_INTERVAL, Long.MAX_VALUE);
}
public ResourceLeakDetector(Class<?> resourceType, int samplingInterval, long maxActive) {
this(resourceType.getSimpleName(), samplingInterval, maxActive);
}
public ResourceLeakDetector(String resourceType, int samplingInterval, long maxActive) {
if (resourceType == null) {
throw new NullPointerException("resourceType");
}
if (samplingInterval <= 0) {
throw new IllegalArgumentException("samplingInterval: " + samplingInterval + " (expected: 1+)");
}
if (maxActive <= 0) {
throw new IllegalArgumentException("maxActive: " + maxActive + " (expected: 1+)");
}
this.resourceType = resourceType;
this.samplingInterval = samplingInterval;
this.maxActive = maxActive;
head.next = tail;
tail.prev = head;
}
public ResourceLeak open(T obj) {
if (!ENABLED || leakCheckCnt ++ % samplingInterval != 0) {
return NOOP;
}
reportLeak();
return new DefaultResourceLeak(obj);
}
private void reportLeak() {
if (!logger.isWarnEnabled()) {
for (;;) {
@SuppressWarnings("unchecked")
DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
if (ref == null) {
break;
}
ref.close();
}
return;
}
// Report too many instances.
if (active * samplingInterval > maxActive && loggedTooManyActive.compareAndSet(false, true)) {
logger.warn(
"LEAK: You are creating too many " + resourceType + " instances. " +
resourceType + " is a shared resource that must be reused across the JVM," +
"so that only a few instances are created.");
}
// Detect and report previous leaks.
for (;;) {
@SuppressWarnings("unchecked")
DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
if (ref == null) {
break;
}
if (!ref.close()) {
continue;
}
if (reportedLeaks.putIfAbsent(ref.exception, Boolean.TRUE) == null) {
logger.warn("LEAK: " + resourceType + " was GC'd before being released correctly.", ref.exception);
}
}
}
private final class DefaultResourceLeak extends WeakReference<Object> implements ResourceLeak {
private final ResourceLeakException exception;
private final AtomicBoolean freed;
private DefaultResourceLeak prev;
private DefaultResourceLeak next;
public DefaultResourceLeak(Object referent) {
super(referent, referent != null? refQueue : null);
if (referent != null) {
exception = new ResourceLeakException(
referent.getClass().getName() + '@' + Integer.toHexString(System.identityHashCode(referent)));
// TODO: Use CAS to update the list.
synchronized (head) {
prev = head;
next = head.next;
head.next.prev = this;
head.next = this;
active ++;
}
freed = new AtomicBoolean();
} else {
exception = null;
freed = new AtomicBoolean(true);
}
}
@Override
public boolean close() {
if (freed.compareAndSet(false, true)) {
synchronized (head) {
active --;
prev.next = next;
next.prev = prev;
prev = null;
next = null;
}
return true;
}
return false;
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
import java.util.Arrays;
public class ResourceLeakException extends RuntimeException {
private static final long serialVersionUID = 7186453858343358280L;
private final StackTraceElement[] cachedStackTrace;
public ResourceLeakException() {
cachedStackTrace = getStackTrace();
}
public ResourceLeakException(String message) {
super(message);
cachedStackTrace = getStackTrace();
}
public ResourceLeakException(String message, Throwable cause) {
super(message, cause);
cachedStackTrace = getStackTrace();
}
public ResourceLeakException(Throwable cause) {
super(cause);
cachedStackTrace = getStackTrace();
}
@Override
public int hashCode() {
StackTraceElement[] trace = cachedStackTrace;
int hashCode = 0;
for (StackTraceElement e: trace) {
hashCode = hashCode * 31 + e.hashCode();
}
return hashCode;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof ResourceLeakException)) {
return false;
}
if (o == this) {
return true;
}
return Arrays.equals(cachedStackTrace, ((ResourceLeakException) o).cachedStackTrace);
}
}

View File

@ -1,67 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.logging.InternalLogger;
import io.netty.logging.InternalLoggerFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Warn when user creates too many instances to avoid {@link OutOfMemoryError}.
*/
public final class SharedResourceMisuseDetector {
private static final int MAX_ACTIVE_INSTANCES = 256;
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SharedResourceMisuseDetector.class);
private final Class<?> type;
private final AtomicLong activeInstances = new AtomicLong();
private final AtomicBoolean logged = new AtomicBoolean();
public SharedResourceMisuseDetector(Class<?> type) {
if (type == null) {
throw new NullPointerException("type");
}
this.type = type;
}
/**
* Increase the use-count of the instance
*/
public void increase() {
if (activeInstances.incrementAndGet() > MAX_ACTIVE_INSTANCES) {
if (logger.isWarnEnabled()) {
if (logged.compareAndSet(false, true)) {
logger.warn(
"You are creating too many " + type.getSimpleName() +
" instances. " + type.getSimpleName() +
" is a shared resource that must be reused across the" +
" application, so that only a few instances are created.");
}
}
}
}
/**
* Decrease the use-count of the instance
*/
public void decrease() {
activeInstances.decrementAndGet();
}
}