diff --git a/common/src/main/java/io/netty/util/Recycler.java b/common/src/main/java/io/netty/util/Recycler.java index 8b646698e3..d872e75f33 100644 --- a/common/src/main/java/io/netty/util/Recycler.java +++ b/common/src/main/java/io/netty/util/Recycler.java @@ -28,6 +28,9 @@ import java.util.Map; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicInteger; +import static java.lang.Math.max; +import static java.lang.Math.min; + /** * Light-weight object pool based on a thread-local stack. * @@ -48,8 +51,10 @@ public abstract class Recycler { private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement(); // TODO: Some arbitrary large number - should adjust as we get more production experience. private static final int DEFAULT_INITIAL_MAX_CAPACITY = 262144; + private static final int DEFAULT_MAX_CAPACITY; private static final int INITIAL_CAPACITY; + private static final int MAX_SHARED_CAPACITY_FACTOR; private static final int LINK_CAPACITY; static { @@ -60,30 +65,37 @@ public abstract class Recycler { if (maxCapacity < 0) { maxCapacity = DEFAULT_INITIAL_MAX_CAPACITY; } - DEFAULT_MAX_CAPACITY = maxCapacity; + MAX_SHARED_CAPACITY_FACTOR = max(2, + SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor", + 2)); + LINK_CAPACITY = MathUtil.findNextPositivePowerOfTwo( - Math.max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16)); + max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16)); if (logger.isDebugEnabled()) { if (DEFAULT_MAX_CAPACITY == 0) { logger.debug("-Dio.netty.recycler.maxCapacity: disabled"); + logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled"); logger.debug("-Dio.netty.recycler.linkCapacity: disabled"); } else { logger.debug("-Dio.netty.recycler.maxCapacity: {}", DEFAULT_MAX_CAPACITY); + logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR); logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY); } } - INITIAL_CAPACITY = Math.min(DEFAULT_MAX_CAPACITY, 256); + INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY, 256); } private final int maxCapacity; + private final int maxSharedCapacityFactor; + private final FastThreadLocal> threadLocal = new FastThreadLocal>() { @Override protected Stack initialValue() { - return new Stack(Recycler.this, Thread.currentThread(), maxCapacity); + return new Stack(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor); } }; @@ -92,7 +104,17 @@ public abstract class Recycler { } protected Recycler(int maxCapacity) { - this.maxCapacity = Math.max(0, maxCapacity); + this(maxCapacity, MAX_SHARED_CAPACITY_FACTOR); + } + + protected Recycler(int maxCapacity, int maxSharedCapacityFactor) { + if (maxCapacity <= 0) { + this.maxCapacity = 0; + this.maxSharedCapacityFactor = 1; + } else { + this.maxCapacity = maxCapacity; + this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor); + } } @SuppressWarnings("unchecked") @@ -201,6 +223,7 @@ public abstract class Recycler { private WeakOrderQueue next; private final WeakReference owner; private final int id = ID_GENERATOR.getAndIncrement(); + private final Stack stack; WeakOrderQueue(Stack stack, Thread thread) { head = tail = new Link(); @@ -209,6 +232,10 @@ public abstract class Recycler { next = stack.head; stack.head = this; } + this.stack = stack; + // We allocated a Link so reserve the space + boolean reserved = stack.reserveSpace(LINK_CAPACITY); + assert reserved; } void add(DefaultHandle handle) { @@ -217,7 +244,13 @@ public abstract class Recycler { Link tail = this.tail; int writeIndex; if ((writeIndex = tail.get()) == LINK_CAPACITY) { + if (!stack.reserveSpace(LINK_CAPACITY)) { + // Drop it. + return; + } + // We allocate a Link so reserve the space this.tail = tail = tail.next = new Link(); + writeIndex = tail.get(); } tail.elements[writeIndex] = handle; @@ -259,7 +292,7 @@ public abstract class Recycler { if (expectedCapacity > dst.elements.length) { final int actualCapacity = dst.increaseCapacity(expectedCapacity); - srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd); + srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd); } if (srcStart != srcEnd) { @@ -280,6 +313,9 @@ public abstract class Recycler { dst.size = newDstSize; if (srcEnd == LINK_CAPACITY && head.next != null) { + // Add capacity back as the Link is GCed. + stack.reclaimSpace(LINK_CAPACITY); + this.head = head.next; } @@ -303,15 +339,35 @@ public abstract class Recycler { private DefaultHandle[] elements; private final int maxCapacity; private int size; + private final AtomicInteger availableSharedCapacity; private volatile WeakOrderQueue head; private WeakOrderQueue cursor, prev; - Stack(Recycler parent, Thread thread, int maxCapacity) { + Stack(Recycler parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor) { this.parent = parent; this.thread = thread; this.maxCapacity = maxCapacity; - elements = new DefaultHandle[Math.min(INITIAL_CAPACITY, maxCapacity)]; + availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)); + elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)]; + } + + boolean reserveSpace(int space) { + assert space >= 0; + for (;;) { + int available = availableSharedCapacity.get(); + if (available < space) { + return false; + } + if (availableSharedCapacity.compareAndSet(available, available - space)) { + return true; + } + } + } + + void reclaimSpace(int space) { + assert space >= 0; + availableSharedCapacity.addAndGet(space); } int increaseCapacity(int expectedCapacity) { @@ -321,7 +377,7 @@ public abstract class Recycler { newCapacity <<= 1; } while (newCapacity < expectedCapacity && newCapacity < maxCapacity); - newCapacity = Math.min(newCapacity, maxCapacity); + newCapacity = min(newCapacity, maxCapacity); if (newCapacity != elements.length) { elements = Arrays.copyOf(elements, newCapacity); } @@ -421,7 +477,7 @@ public abstract class Recycler { return; } if (size == elements.length) { - elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity)); + elements = Arrays.copyOf(elements, min(size << 1, maxCapacity)); } elements[size] = item; diff --git a/common/src/test/java/io/netty/util/RecyclerTest.java b/common/src/test/java/io/netty/util/RecyclerTest.java index 4a893ce3fa..83704a0920 100644 --- a/common/src/test/java/io/netty/util/RecyclerTest.java +++ b/common/src/test/java/io/netty/util/RecyclerTest.java @@ -18,6 +18,7 @@ package io.netty.util; import org.junit.Test; import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; @@ -199,6 +200,53 @@ public class RecyclerTest { assertThat(recycler.threadLocalSize(), is(0)); } + @Test + public void testDiscardingExceedingElementsWithRecycleAtDifferentThread() throws Exception { + final int maxCapacity = 32; + final AtomicInteger instancesCount = new AtomicInteger(0); + + final Recycler recycler = new Recycler(maxCapacity, 2) { + @Override + protected HandledObject newObject(Recycler.Handle handle) { + instancesCount.incrementAndGet(); + return new HandledObject(handle); + } + }; + + // Borrow 2 * maxCapacity objects. + final HandledObject[] array = new HandledObject[maxCapacity * 2]; + for (int i = 0; i < array.length; i++) { + array[i] = recycler.get(); + } + + assertEquals(array.length, instancesCount.get()); + // Reset counter. + instancesCount.set(0); + + // Recycle from other thread. + final Thread thread = new Thread() { + @Override + public void run() { + for (HandledObject object: array) { + object.recycle(); + } + } + }; + thread.start(); + thread.join(); + + assertEquals(0, instancesCount.get()); + + // Borrow 2 * maxCapacity objects. Half of them should come from + // the recycler queue, the other half should be freshly allocated. + for (int i = 0; i < array.length; i++) { + recycler.get(); + } + + // The implementation uses maxCapacity / 2 as limit per WeakOrderQueue + assertEquals(array.length - maxCapacity / 2, instancesCount.get()); + } + static final class HandledObject { Recycler.Handle handle;