From d92c5f5f5b93570c4f80b7ec9e6510e261f71b25 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 26 Jul 2016 20:15:21 +0200 Subject: [PATCH] Introduce allocation / pooling ratio in Recycler Motivation: At the moment the Recyler is very sensitive to allocation bursts which means that if there is a need for X objects for only one time these will most likely end up in the Recycler and sit there forever as the normal workload only need a subset of this number. Modifications: Add a ratio which sets how many objects should be pooled for each new allocation. This allows to slowly increase the number of objects in the Recycler while not be to sensitive for bursts. Result: Less unused objects in the Recycler if allocation rate sometimes bursts. --- .../buffer/AbstractPooledByteBufTest.java | 49 +------- .../src/main/java/io/netty/util/Recycler.java | 59 +++++++-- .../test/java/io/netty/util/RecyclerTest.java | 113 ++++++------------ 3 files changed, 93 insertions(+), 128 deletions(-) diff --git a/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java b/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java index a1a6fadb6a..9c76e3d18e 100644 --- a/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java @@ -15,8 +15,6 @@ */ package io.netty.buffer; -import org.junit.Test; - import static org.junit.Assert.*; public abstract class AbstractPooledByteBufTest extends AbstractByteBufTest { @@ -26,49 +24,14 @@ public abstract class AbstractPooledByteBufTest extends AbstractByteBufTest { @Override protected ByteBuf newBuffer(int length) { ByteBuf buffer = alloc(length); + + // Testing if the writerIndex and readerIndex are correct when allocate and also after we reset the mark. + assertEquals(0, buffer.writerIndex()); + assertEquals(0, buffer.readerIndex()); + buffer.resetReaderIndex(); + buffer.resetWriterIndex(); assertEquals(0, buffer.writerIndex()); assertEquals(0, buffer.readerIndex()); return buffer; } - - @Test - public void testDiscardMarks() { - testDiscardMarks(4); - } - - @Test - public void testDiscardMarksUnpooled() { - testDiscardMarks(32 * 1024 * 1024); - } - - private void testDiscardMarks(int capacity) { - ByteBuf buf = newBuffer(capacity); - buf.writeShort(1); - - buf.skipBytes(1); - - buf.markReaderIndex(); - buf.markWriterIndex(); - assertTrue(buf.release()); - - ByteBuf buf2 = newBuffer(capacity); - - assertEquals(unwrapIfNeeded(buf), unwrapIfNeeded(buf2)); - - buf2.writeShort(1); - - buf2.resetReaderIndex(); - buf2.resetWriterIndex(); - - assertEquals(0, buf2.readerIndex()); - assertEquals(0, buf2.writerIndex()); - assertTrue(buf2.release()); - } - - private static ByteBuf unwrapIfNeeded(ByteBuf buf) { - if (buf instanceof AdvancedLeakAwareByteBuf || buf instanceof SimpleLeakAwareByteBuf) { - return buf.unwrap(); - } - return buf; - } } diff --git a/common/src/main/java/io/netty/util/Recycler.java b/common/src/main/java/io/netty/util/Recycler.java index 210f07b2fa..71fd442e34 100644 --- a/common/src/main/java/io/netty/util/Recycler.java +++ b/common/src/main/java/io/netty/util/Recycler.java @@ -17,7 +17,6 @@ package io.netty.util; import io.netty.util.concurrent.FastThreadLocal; -import io.netty.util.internal.MathUtil; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -28,6 +27,7 @@ import java.util.Map; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicInteger; +import static io.netty.util.internal.MathUtil.findNextPositivePowerOfTwo; import static java.lang.Math.max; import static java.lang.Math.min; @@ -55,6 +55,7 @@ public abstract class Recycler { private static final int INITIAL_CAPACITY; private static final int MAX_SHARED_CAPACITY_FACTOR; private static final int LINK_CAPACITY; + private static final int RATIO; static { // In the future, we might have different maxCapacity for different object types. @@ -70,18 +71,26 @@ public abstract class Recycler { SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor", 2)); - LINK_CAPACITY = MathUtil.findNextPositivePowerOfTwo( + LINK_CAPACITY = findNextPositivePowerOfTwo( max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16)); + // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before. + // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation + // bursts. + RATIO = min(findNextPositivePowerOfTwo( + max(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8), 2)), 0x40000000); + if (logger.isDebugEnabled()) { if (DEFAULT_MAX_CAPACITY == 0) { logger.debug("-Dio.netty.recycler.maxCapacity: disabled"); logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled"); logger.debug("-Dio.netty.recycler.linkCapacity: disabled"); + logger.debug("-Dio.netty.recycler.ratio: disabled"); } else { logger.debug("-Dio.netty.recycler.maxCapacity: {}", DEFAULT_MAX_CAPACITY); logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR); logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY); + logger.debug("-Dio.netty.recycler.ratio: {}", RATIO); } } @@ -90,11 +99,12 @@ public abstract class Recycler { private final int maxCapacity; private final int maxSharedCapacityFactor; + private final int ratioMask; private final FastThreadLocal> threadLocal = new FastThreadLocal>() { @Override protected Stack initialValue() { - return new Stack(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor); + return new Stack(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor, ratioMask); } }; @@ -107,6 +117,14 @@ public abstract class Recycler { } protected Recycler(int maxCapacity, int maxSharedCapacityFactor) { + this(maxCapacity, maxSharedCapacityFactor, RATIO); + } + + protected Recycler(int maxCapacity, int maxSharedCapacityFactor, int ratio) { + if (ratio > 0x40000000) { + throw new IllegalArgumentException(ratio + ": " + ratio + " (expected: < 0x40000000)"); + } + ratioMask = findNextPositivePowerOfTwo(ratio) - 1; if (maxCapacity <= 0) { this.maxCapacity = 0; this.maxSharedCapacityFactor = 1; @@ -166,6 +184,8 @@ public abstract class Recycler { private int lastRecycledId; private int recycleId; + boolean hasBeenRecycled; + private Stack stack; private Object value; @@ -178,6 +198,7 @@ public abstract class Recycler { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } + Thread thread = Thread.currentThread(); if (thread == stack.thread) { stack.push(this); @@ -337,11 +358,15 @@ public abstract class Recycler { } else if (element.recycleId != element.lastRecycledId) { throw new IllegalStateException("recycled already"); } + srcElems[i] = null; + + if (dst.dropHandle(element)) { + // Drop the object. + continue; + } element.stack = dst; dstElems[newDstSize ++] = element; - srcElems[i] = null; } - dst.size = newDstSize; if (srcEnd == LINK_CAPACITY && head.next != null) { // Add capacity back as the Link is GCed. @@ -351,6 +376,10 @@ public abstract class Recycler { } head.readIndex = srcEnd; + if (dst.size == newDstSize) { + return false; + } + dst.size = newDstSize; return true; } else { // The destination stack is full already. @@ -385,18 +414,21 @@ public abstract class Recycler { final Thread thread; private DefaultHandle[] elements; private final int maxCapacity; + private final int ratioMask; private int size; + private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled. final AtomicInteger availableSharedCapacity; private volatile WeakOrderQueue head; private WeakOrderQueue cursor, prev; - Stack(Recycler parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor) { + Stack(Recycler parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int ratioMask) { this.parent = parent; this.thread = thread; this.maxCapacity = maxCapacity; availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)); elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)]; + this.ratioMask = ratioMask; } int increaseCapacity(int expectedCapacity) { @@ -501,8 +533,8 @@ public abstract class Recycler { item.recycleId = item.lastRecycledId = OWN_THREAD_ID; int size = this.size; - if (size >= maxCapacity) { - // Hit the maximum capacity - drop the possibly youngest object. + if (size >= maxCapacity || dropHandle(item)) { + // Hit the maximum capacity or should drop - drop the possibly youngest object. return; } if (size == elements.length) { @@ -513,6 +545,17 @@ public abstract class Recycler { this.size = size + 1; } + boolean dropHandle(DefaultHandle handle) { + if (!handle.hasBeenRecycled) { + if ((++handleRecycleCount & ratioMask) != 0) { + // Drop the object. + return true; + } + handle.hasBeenRecycled = true; + } + return false; + } + DefaultHandle newHandle() { return new DefaultHandle(this); } diff --git a/common/src/test/java/io/netty/util/RecyclerTest.java b/common/src/test/java/io/netty/util/RecyclerTest.java index 83704a0920..fe8a63b7bd 100644 --- a/common/src/test/java/io/netty/util/RecyclerTest.java +++ b/common/src/test/java/io/netty/util/RecyclerTest.java @@ -20,84 +20,48 @@ import org.junit.Test; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; public class RecyclerTest { + private static Recycler newRecycler(int max) { + return new Recycler(max) { + @Override + protected HandledObject newObject( + Recycler.Handle handle) { + return new HandledObject(handle); + } + }; + } + @Test(expected = IllegalStateException.class) public void testMultipleRecycle() { - RecyclableObject object = RecyclableObject.newInstance(); + Recycler recycler = newRecycler(1024); + HandledObject object = recycler.get(); object.recycle(); object.recycle(); } @Test public void testRecycle() { - RecyclableObject object = RecyclableObject.newInstance(); + Recycler recycler = newRecycler(1024); + HandledObject object = recycler.get(); object.recycle(); - RecyclableObject object2 = RecyclableObject.newInstance(); + HandledObject object2 = recycler.get(); assertSame(object, object2); object2.recycle(); } @Test public void testRecycleDisable() { - DisabledRecyclableObject object = DisabledRecyclableObject.newInstance(); + Recycler recycler = newRecycler(-1); + HandledObject object = recycler.get(); object.recycle(); - DisabledRecyclableObject object2 = DisabledRecyclableObject.newInstance(); + HandledObject object2 = recycler.get(); assertNotSame(object, object2); object2.recycle(); } - static final class RecyclableObject { - - private static final Recycler RECYCLER = new Recycler() { - @Override - protected RecyclableObject newObject(Handle handle) { - return new RecyclableObject(handle); - } - }; - - private final Recycler.Handle handle; - - private RecyclableObject(Recycler.Handle handle) { - this.handle = handle; - } - - public static RecyclableObject newInstance() { - return RECYCLER.get(); - } - - public void recycle() { - handle.recycle(this); - } - } - - static final class DisabledRecyclableObject { - - private static final Recycler RECYCLER = new Recycler(-1) { - @Override - protected DisabledRecyclableObject newObject(Handle handle) { - return new DisabledRecyclableObject(handle); - } - }; - - private final Recycler.Handle handle; - - private DisabledRecyclableObject(Recycler.Handle handle) { - this.handle = handle; - } - - public static DisabledRecyclableObject newInstance() { - return RECYCLER.get(); - } - - public void recycle() { - handle.recycle(this); - } - } - /** * Test to make sure bug #2848 never happens again * https://github.com/netty/netty/issues/2848 @@ -111,15 +75,8 @@ public class RecyclerTest { } } - void testMaxCapacity(int maxCapacity) { - Recycler recycler = new Recycler(maxCapacity) { - @Override - protected HandledObject newObject( - Recycler.Handle handle) { - return new HandledObject(handle); - } - }; - + private static void testMaxCapacity(int maxCapacity) { + Recycler recycler = newRecycler(maxCapacity); HandledObject[] objects = new HandledObject[maxCapacity * 3]; for (int i = 0; i < objects.length; i++) { objects[i] = recycler.get(); @@ -130,12 +87,14 @@ public class RecyclerTest { objects[i] = null; } - assertEquals(maxCapacity, recycler.threadLocalCapacity()); + assertTrue("The threadLocalCapacity (" + recycler.threadLocalCapacity() + ") must be <= maxCapacity (" + + maxCapacity + ") as we not pool all new handles internally", + maxCapacity >= recycler.threadLocalCapacity()); } @Test public void testRecycleAtDifferentThread() throws Exception { - final Recycler recycler = new Recycler(256) { + final Recycler recycler = new Recycler(256, 10, 2) { @Override protected HandledObject newObject(Recycler.Handle handle) { return new HandledObject(handle); @@ -143,27 +102,25 @@ public class RecyclerTest { }; final HandledObject o = recycler.get(); + final HandledObject o2 = recycler.get(); final Thread thread = new Thread() { @Override public void run() { o.recycle(); + o2.recycle(); } }; thread.start(); thread.join(); - assertThat(recycler.get(), is(sameInstance(o))); + assertSame(recycler.get(), o); + assertNotSame(recycler.get(), o2); } @Test public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception { final int maxCapacity = 4; // Choose the number smaller than WeakOrderQueue.LINK_CAPACITY - final Recycler recycler = new Recycler(maxCapacity) { - @Override - protected HandledObject newObject(Recycler.Handle handle) { - return new HandledObject(handle); - } - }; + final Recycler recycler = newRecycler(maxCapacity); // Borrow 2 * maxCapacity objects. // Return the half from the same thread. @@ -189,15 +146,15 @@ public class RecyclerTest { thread.start(); thread.join(); - assertThat(recycler.threadLocalCapacity(), is(maxCapacity)); - assertThat(recycler.threadLocalSize(), is(maxCapacity)); + assertEquals(maxCapacity, recycler.threadLocalCapacity()); + assertEquals(1, recycler.threadLocalSize()); for (int i = 0; i < array.length; i ++) { recycler.get(); } - assertThat(recycler.threadLocalCapacity(), is(maxCapacity)); - assertThat(recycler.threadLocalSize(), is(0)); + assertEquals(maxCapacity, recycler.threadLocalCapacity()); + assertEquals(0, recycler.threadLocalSize()); } @Test @@ -244,7 +201,9 @@ public class RecyclerTest { } // The implementation uses maxCapacity / 2 as limit per WeakOrderQueue - assertEquals(array.length - maxCapacity / 2, instancesCount.get()); + assertTrue("The instances count (" + instancesCount.get() + ") must be <= array.length (" + array.length + + ") - maxCapacity (" + maxCapacity + ") / 2 as we not pool all new handles" + + " internally", array.length - maxCapacity / 2 <= instancesCount.get()); } static final class HandledObject {