From 42fba015ce82ab4ab30e547c888db82fe74094e9 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 21 Oct 2016 16:58:32 -0700 Subject: [PATCH] reduce lock contention in resource leak Motivation: ResourceLeakDetector shows two main problems, racy access and heavy lock contention. Modifications: This PR fixes this by doing two things: 1. Replace the sampling counter with a ThreadLocalRandom. This has two benefits. First, it makes the sampling ration no longer have to be a power of two. Second, it de-noises the continuous races that fight over this single value. Instead, this change uses slightly more CPU to decide if it should sample by using TLR. 2. DefaultResourceLeaks need to be kept alive in order to catch leaks. The means by which this happens is by a singular, doubly-linked list. This creates a large amount of contention when allocating quickly. This is noticeable when running on a multi core machine. Instead, this uses a concurrent hash map to keep track of active resources which has much better contention characteristics. Results: Better concurrent hygiene. Running the gRPC QPS benchmark showed RLD taking about 3 CPU seconds for every 1 wall second when runnign with 12 threads. There are some minor perks to this as well. DefaultResourceLeak accounting is moved to a central place which probably has better caching behavior. --- .../io/netty/util/ResourceLeakDetector.java | 50 ++++--------------- 1 file changed, 9 insertions(+), 41 deletions(-) diff --git a/common/src/main/java/io/netty/util/ResourceLeakDetector.java b/common/src/main/java/io/netty/util/ResourceLeakDetector.java index a13b59a75c..6e37561796 100644 --- a/common/src/main/java/io/netty/util/ResourceLeakDetector.java +++ b/common/src/main/java/io/netty/util/ResourceLeakDetector.java @@ -16,9 +16,9 @@ package io.netty.util; -import io.netty.util.internal.MathUtil; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; +import io.netty.util.internal.ThreadLocalRandom; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -143,22 +143,17 @@ public class ResourceLeakDetector { return level; } - /** the linked list of active resources */ - private final DefaultResourceLeak head = new DefaultResourceLeak(null); - private final DefaultResourceLeak tail = new DefaultResourceLeak(null); + /** the collection of active resources */ + private final ConcurrentMap allLeaks = PlatformDependent.newConcurrentHashMap(); private final ReferenceQueue refQueue = new ReferenceQueue(); private final ConcurrentMap reportedLeaks = PlatformDependent.newConcurrentHashMap(); private final String resourceType; private final int samplingInterval; - private final int mask; private final long maxActive; - private long active; private final AtomicBoolean loggedTooManyActive = new AtomicBoolean(); - private long leakCheckCnt; - /** * @deprecated use {@link ResourceLeakDetectorFactory#newResourceLeakDetector(Class, int, long)}. */ @@ -198,14 +193,8 @@ public class ResourceLeakDetector { } this.resourceType = resourceType; - this.samplingInterval = MathUtil.safeFindNextPositivePowerOfTwo(samplingInterval); - // samplingInterval is a power of two so we calculate a mask that we can use to - // check if we need to do any leak detection or not. - mask = this.samplingInterval - 1; + this.samplingInterval = samplingInterval; this.maxActive = maxActive; - - head.next = tail; - tail.prev = head; } /** @@ -221,7 +210,7 @@ public class ResourceLeakDetector { } if (level.ordinal() < Level.PARANOID.ordinal()) { - if ((++ leakCheckCnt & mask) == 0) { + if ((ThreadLocalRandom.current().nextInt(0, samplingInterval)) == 0) { reportLeak(level); return new DefaultResourceLeak(obj); } else { @@ -248,7 +237,7 @@ public class ResourceLeakDetector { // Report too many instances. int samplingInterval = level == Level.PARANOID? 1 : this.samplingInterval; - if (active * samplingInterval > maxActive && loggedTooManyActive.compareAndSet(false, true)) { + if (allLeaks.size() * samplingInterval > maxActive && loggedTooManyActive.compareAndSet(false, true)) { reportInstancesLeak(resourceType); } @@ -314,9 +303,6 @@ public class ResourceLeakDetector { private final class DefaultResourceLeak extends PhantomReference implements ResourceLeak { private final String creationRecord; private final Deque lastRecords = new ArrayDeque(); - private final AtomicBoolean freed; - private DefaultResourceLeak prev; - private DefaultResourceLeak next; private int removedRecords; DefaultResourceLeak(Object referent) { @@ -330,18 +316,9 @@ public class ResourceLeakDetector { creationRecord = null; } - // TODO: Use CAS to update the list. - synchronized (head) { - prev = head; - next = head.next; - head.next.prev = this; - head.next = this; - active ++; - } - freed = new AtomicBoolean(); + allLeaks.put(this, Boolean.TRUE); } else { creationRecord = null; - freed = new AtomicBoolean(true); } } @@ -374,17 +351,8 @@ public class ResourceLeakDetector { @Override public boolean close() { - if (freed.compareAndSet(false, true)) { - synchronized (head) { - active --; - prev.next = next; - next.prev = prev; - prev = null; - next = null; - } - return true; - } - return false; + // Use the ConcurrentMap remove method, which avoids allocating an iterator. + return allLeaks.remove(this, Boolean.TRUE); } @Override