From d3dc9c9e74fd0b82e43443683a5440d39b3a8906 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 25 Jul 2016 11:15:56 +0200 Subject: [PATCH] Allow to limit the maximum number of WeakOrderQueue instances per Thread. Motivation: To better restrict resource usage we should limit the number of WeakOrderQueue instances per Thread. Once this limit is reached object that are recycled from a different Thread then the allocation Thread are dropped on the floor. Modifications: Add new system property io.netty.recycler.maxDelayedQueuesPerThread and constructor that allows to limit the max number of WeakOrderQueue instances per Thread for Recycler instance. The default is 2 * cores (the same as the default number of EventLoop instances per EventLoopGroup). Result: Better way to restrict resource / memory usage per Recycler instance. --- .../src/main/java/io/netty/util/Recycler.java | 95 +++++++++++++------ .../test/java/io/netty/util/RecyclerTest.java | 2 +- 2 files changed, 68 insertions(+), 29 deletions(-) diff --git a/common/src/main/java/io/netty/util/Recycler.java b/common/src/main/java/io/netty/util/Recycler.java index 593974f5ec..d79eed3077 100644 --- a/common/src/main/java/io/netty/util/Recycler.java +++ b/common/src/main/java/io/netty/util/Recycler.java @@ -54,6 +54,7 @@ public abstract class Recycler { private static final int DEFAULT_MAX_CAPACITY; private static final int INITIAL_CAPACITY; private static final int MAX_SHARED_CAPACITY_FACTOR; + private static final int MAX_DELAYED_QUEUES_PER_THREAD; private static final int LINK_CAPACITY; private static final int RATIO; @@ -71,6 +72,11 @@ public abstract class Recycler { SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor", 2)); + MAX_DELAYED_QUEUES_PER_THREAD = max(0, + SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread", + // We use the same value as default EventLoop number + Runtime.getRuntime().availableProcessors() * 2)); + LINK_CAPACITY = safeFindNextPositivePowerOfTwo( max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16)); @@ -99,11 +105,13 @@ public abstract class Recycler { private final int maxCapacity; private final int maxSharedCapacityFactor; private final int ratioMask; + private final int maxDelayedQueuesPerThread; private final FastThreadLocal> threadLocal = new FastThreadLocal>() { @Override protected Stack initialValue() { - return new Stack(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor, ratioMask); + return new Stack(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor, + ratioMask, maxDelayedQueuesPerThread); } }; @@ -116,17 +124,19 @@ public abstract class Recycler { } protected Recycler(int maxCapacity, int maxSharedCapacityFactor) { - this(maxCapacity, maxSharedCapacityFactor, RATIO); + this(maxCapacity, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD); } - protected Recycler(int maxCapacity, int maxSharedCapacityFactor, int ratio) { + protected Recycler(int maxCapacity, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) { ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1; if (maxCapacity <= 0) { this.maxCapacity = 0; this.maxSharedCapacityFactor = 1; + this.maxDelayedQueuesPerThread = 0; } else { this.maxCapacity = maxCapacity; this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor); + this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread); } } @@ -194,26 +204,7 @@ public abstract class Recycler { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } - - Thread thread = Thread.currentThread(); - if (thread == stack.thread) { - stack.push(this); - return; - } - // we don't want to have a ref to the queue as the value in our weak map - // so we null it out; to ensure there are no races with restoring it later - // we impose a memory ordering here (no-op on x86) - Map, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); - WeakOrderQueue queue = delayedRecycled.get(stack); - if (queue == null) { - queue = WeakOrderQueue.allocate(stack, thread); - if (queue == null) { - // drop object - return; - } - delayedRecycled.put(stack, queue); - } - queue.add(this); + stack.push(this); } } @@ -229,6 +220,8 @@ public abstract class Recycler { // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain private static final class WeakOrderQueue { + static final WeakOrderQueue DUMMY = new WeakOrderQueue(); + // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex. @SuppressWarnings("serial") private static final class Link extends AtomicInteger { @@ -246,6 +239,11 @@ public abstract class Recycler { private final int id = ID_GENERATOR.getAndIncrement(); private final AtomicInteger availableSharedCapacity; + private WeakOrderQueue() { + owner = null; + availableSharedCapacity = null; + } + private WeakOrderQueue(Stack stack, Thread thread) { head = tail = new Link(); owner = new WeakReference(thread); @@ -408,23 +406,26 @@ public abstract class Recycler { // still recycling all items. final Recycler parent; final Thread thread; - private DefaultHandle[] elements; + final AtomicInteger availableSharedCapacity; + final int maxDelayedQueues; + private final int maxCapacity; private final int ratioMask; + private DefaultHandle[] elements; private int size; private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled. - final AtomicInteger availableSharedCapacity; - - private volatile WeakOrderQueue head; private WeakOrderQueue cursor, prev; + private volatile WeakOrderQueue head; - Stack(Recycler parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int ratioMask) { + Stack(Recycler parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, + int ratioMask, int maxDelayedQueues) { this.parent = parent; this.thread = thread; this.maxCapacity = maxCapacity; availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)); elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)]; this.ratioMask = ratioMask; + this.maxDelayedQueues = maxDelayedQueues; } int increaseCapacity(int expectedCapacity) { @@ -523,6 +524,18 @@ public abstract class Recycler { } void push(DefaultHandle item) { + Thread currentThread = Thread.currentThread(); + if (thread == currentThread) { + // The current Thread is the thread that belongs to the Stack, we can try to push the object now. + pushNow(item); + } else { + // The current Thread is not the one that belongs to the Stack, we need to signal that the push + // happens later. + pushLater(item, currentThread); + } + } + + private void pushNow(DefaultHandle item) { if ((item.recycleId | item.lastRecycledId) != 0) { throw new IllegalStateException("recycled already"); } @@ -541,6 +554,32 @@ public abstract class Recycler { this.size = size + 1; } + private void pushLater(DefaultHandle item, Thread thread) { + // we don't want to have a ref to the queue as the value in our weak map + // so we null it out; to ensure there are no races with restoring it later + // we impose a memory ordering here (no-op on x86) + Map, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); + WeakOrderQueue queue = delayedRecycled.get(this); + if (queue == null) { + if (delayedRecycled.size() >= maxDelayedQueues) { + // Add a dummy queue so we know we should drop the object + delayedRecycled.put(this, WeakOrderQueue.DUMMY); + return; + } + // Check if we already reached the maximum number of delayed queues and if we can allocate at all. + if ((queue = WeakOrderQueue.allocate(this, thread)) == null) { + // drop object + return; + } + delayedRecycled.put(this, queue); + } else if (queue == WeakOrderQueue.DUMMY) { + // drop object + return; + } + + queue.add(item); + } + boolean dropHandle(DefaultHandle handle) { if (!handle.hasBeenRecycled) { if ((++handleRecycleCount & ratioMask) != 0) { diff --git a/common/src/test/java/io/netty/util/RecyclerTest.java b/common/src/test/java/io/netty/util/RecyclerTest.java index fe8a63b7bd..feef9114bb 100644 --- a/common/src/test/java/io/netty/util/RecyclerTest.java +++ b/common/src/test/java/io/netty/util/RecyclerTest.java @@ -94,7 +94,7 @@ public class RecyclerTest { @Test public void testRecycleAtDifferentThread() throws Exception { - final Recycler recycler = new Recycler(256, 10, 2) { + final Recycler recycler = new Recycler(256, 10, 2, 10) { @Override protected HandledObject newObject(Recycler.Handle handle) { return new HandledObject(handle);