diff --git a/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java b/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java index a1a6fadb6a..9c76e3d18e 100644 --- a/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/AbstractPooledByteBufTest.java @@ -15,8 +15,6 @@ */ package io.netty.buffer; -import org.junit.Test; - import static org.junit.Assert.*; public abstract class AbstractPooledByteBufTest extends AbstractByteBufTest { @@ -26,49 +24,14 @@ public abstract class AbstractPooledByteBufTest extends AbstractByteBufTest { @Override protected ByteBuf newBuffer(int length) { ByteBuf buffer = alloc(length); + + // Testing if the writerIndex and readerIndex are correct when allocate and also after we reset the mark. + assertEquals(0, buffer.writerIndex()); + assertEquals(0, buffer.readerIndex()); + buffer.resetReaderIndex(); + buffer.resetWriterIndex(); assertEquals(0, buffer.writerIndex()); assertEquals(0, buffer.readerIndex()); return buffer; } - - @Test - public void testDiscardMarks() { - testDiscardMarks(4); - } - - @Test - public void testDiscardMarksUnpooled() { - testDiscardMarks(32 * 1024 * 1024); - } - - private void testDiscardMarks(int capacity) { - ByteBuf buf = newBuffer(capacity); - buf.writeShort(1); - - buf.skipBytes(1); - - buf.markReaderIndex(); - buf.markWriterIndex(); - assertTrue(buf.release()); - - ByteBuf buf2 = newBuffer(capacity); - - assertEquals(unwrapIfNeeded(buf), unwrapIfNeeded(buf2)); - - buf2.writeShort(1); - - buf2.resetReaderIndex(); - buf2.resetWriterIndex(); - - assertEquals(0, buf2.readerIndex()); - assertEquals(0, buf2.writerIndex()); - assertTrue(buf2.release()); - } - - private static ByteBuf unwrapIfNeeded(ByteBuf buf) { - if (buf instanceof AdvancedLeakAwareByteBuf || buf instanceof SimpleLeakAwareByteBuf) { - return buf.unwrap(); - } - return buf; - } } diff --git a/common/src/main/java/io/netty/util/Recycler.java b/common/src/main/java/io/netty/util/Recycler.java index 210f07b2fa..71fd442e34 100644 --- a/common/src/main/java/io/netty/util/Recycler.java +++ b/common/src/main/java/io/netty/util/Recycler.java @@ -17,7 +17,6 @@ package io.netty.util; import io.netty.util.concurrent.FastThreadLocal; -import io.netty.util.internal.MathUtil; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -28,6 +27,7 @@ import java.util.Map; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicInteger; +import static io.netty.util.internal.MathUtil.findNextPositivePowerOfTwo; import static java.lang.Math.max; import static java.lang.Math.min; @@ -55,6 +55,7 @@ public abstract class Recycler { private static final int INITIAL_CAPACITY; private static final int MAX_SHARED_CAPACITY_FACTOR; private static final int LINK_CAPACITY; + private static final int RATIO; static { // In the future, we might have different maxCapacity for different object types. @@ -70,18 +71,26 @@ public abstract class Recycler { SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor", 2)); - LINK_CAPACITY = MathUtil.findNextPositivePowerOfTwo( + LINK_CAPACITY = findNextPositivePowerOfTwo( max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16)); + // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before. + // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation + // bursts. + RATIO = min(findNextPositivePowerOfTwo( + max(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8), 2)), 0x40000000); + if (logger.isDebugEnabled()) { if (DEFAULT_MAX_CAPACITY == 0) { logger.debug("-Dio.netty.recycler.maxCapacity: disabled"); logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled"); logger.debug("-Dio.netty.recycler.linkCapacity: disabled"); + logger.debug("-Dio.netty.recycler.ratio: disabled"); } else { logger.debug("-Dio.netty.recycler.maxCapacity: {}", DEFAULT_MAX_CAPACITY); logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR); logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY); + logger.debug("-Dio.netty.recycler.ratio: {}", RATIO); } } @@ -90,11 +99,12 @@ public abstract class Recycler { private final int maxCapacity; private final int maxSharedCapacityFactor; + private final int ratioMask; private final FastThreadLocal> threadLocal = new FastThreadLocal>() { @Override protected Stack initialValue() { - return new Stack(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor); + return new Stack(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor, ratioMask); } }; @@ -107,6 +117,14 @@ public abstract class Recycler { } protected Recycler(int maxCapacity, int maxSharedCapacityFactor) { + this(maxCapacity, maxSharedCapacityFactor, RATIO); + } + + protected Recycler(int maxCapacity, int maxSharedCapacityFactor, int ratio) { + if (ratio > 0x40000000) { + throw new IllegalArgumentException(ratio + ": " + ratio + " (expected: < 0x40000000)"); + } + ratioMask = findNextPositivePowerOfTwo(ratio) - 1; if (maxCapacity <= 0) { this.maxCapacity = 0; this.maxSharedCapacityFactor = 1; @@ -166,6 +184,8 @@ public abstract class Recycler { private int lastRecycledId; private int recycleId; + boolean hasBeenRecycled; + private Stack stack; private Object value; @@ -178,6 +198,7 @@ public abstract class Recycler { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } + Thread thread = Thread.currentThread(); if (thread == stack.thread) { stack.push(this); @@ -337,11 +358,15 @@ public abstract class Recycler { } else if (element.recycleId != element.lastRecycledId) { throw new IllegalStateException("recycled already"); } + srcElems[i] = null; + + if (dst.dropHandle(element)) { + // Drop the object. + continue; + } element.stack = dst; dstElems[newDstSize ++] = element; - srcElems[i] = null; } - dst.size = newDstSize; if (srcEnd == LINK_CAPACITY && head.next != null) { // Add capacity back as the Link is GCed. @@ -351,6 +376,10 @@ public abstract class Recycler { } head.readIndex = srcEnd; + if (dst.size == newDstSize) { + return false; + } + dst.size = newDstSize; return true; } else { // The destination stack is full already. @@ -385,18 +414,21 @@ public abstract class Recycler { final Thread thread; private DefaultHandle[] elements; private final int maxCapacity; + private final int ratioMask; private int size; + private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled. final AtomicInteger availableSharedCapacity; private volatile WeakOrderQueue head; private WeakOrderQueue cursor, prev; - Stack(Recycler parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor) { + Stack(Recycler parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int ratioMask) { this.parent = parent; this.thread = thread; this.maxCapacity = maxCapacity; availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)); elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)]; + this.ratioMask = ratioMask; } int increaseCapacity(int expectedCapacity) { @@ -501,8 +533,8 @@ public abstract class Recycler { item.recycleId = item.lastRecycledId = OWN_THREAD_ID; int size = this.size; - if (size >= maxCapacity) { - // Hit the maximum capacity - drop the possibly youngest object. + if (size >= maxCapacity || dropHandle(item)) { + // Hit the maximum capacity or should drop - drop the possibly youngest object. return; } if (size == elements.length) { @@ -513,6 +545,17 @@ public abstract class Recycler { this.size = size + 1; } + boolean dropHandle(DefaultHandle handle) { + if (!handle.hasBeenRecycled) { + if ((++handleRecycleCount & ratioMask) != 0) { + // Drop the object. + return true; + } + handle.hasBeenRecycled = true; + } + return false; + } + DefaultHandle newHandle() { return new DefaultHandle(this); } diff --git a/common/src/test/java/io/netty/util/RecyclerTest.java b/common/src/test/java/io/netty/util/RecyclerTest.java index 83704a0920..fe8a63b7bd 100644 --- a/common/src/test/java/io/netty/util/RecyclerTest.java +++ b/common/src/test/java/io/netty/util/RecyclerTest.java @@ -20,84 +20,48 @@ import org.junit.Test; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; public class RecyclerTest { + private static Recycler newRecycler(int max) { + return new Recycler(max) { + @Override + protected HandledObject newObject( + Recycler.Handle handle) { + return new HandledObject(handle); + } + }; + } + @Test(expected = IllegalStateException.class) public void testMultipleRecycle() { - RecyclableObject object = RecyclableObject.newInstance(); + Recycler recycler = newRecycler(1024); + HandledObject object = recycler.get(); object.recycle(); object.recycle(); } @Test public void testRecycle() { - RecyclableObject object = RecyclableObject.newInstance(); + Recycler recycler = newRecycler(1024); + HandledObject object = recycler.get(); object.recycle(); - RecyclableObject object2 = RecyclableObject.newInstance(); + HandledObject object2 = recycler.get(); assertSame(object, object2); object2.recycle(); } @Test public void testRecycleDisable() { - DisabledRecyclableObject object = DisabledRecyclableObject.newInstance(); + Recycler recycler = newRecycler(-1); + HandledObject object = recycler.get(); object.recycle(); - DisabledRecyclableObject object2 = DisabledRecyclableObject.newInstance(); + HandledObject object2 = recycler.get(); assertNotSame(object, object2); object2.recycle(); } - static final class RecyclableObject { - - private static final Recycler RECYCLER = new Recycler() { - @Override - protected RecyclableObject newObject(Handle handle) { - return new RecyclableObject(handle); - } - }; - - private final Recycler.Handle handle; - - private RecyclableObject(Recycler.Handle handle) { - this.handle = handle; - } - - public static RecyclableObject newInstance() { - return RECYCLER.get(); - } - - public void recycle() { - handle.recycle(this); - } - } - - static final class DisabledRecyclableObject { - - private static final Recycler RECYCLER = new Recycler(-1) { - @Override - protected DisabledRecyclableObject newObject(Handle handle) { - return new DisabledRecyclableObject(handle); - } - }; - - private final Recycler.Handle handle; - - private DisabledRecyclableObject(Recycler.Handle handle) { - this.handle = handle; - } - - public static DisabledRecyclableObject newInstance() { - return RECYCLER.get(); - } - - public void recycle() { - handle.recycle(this); - } - } - /** * Test to make sure bug #2848 never happens again * https://github.com/netty/netty/issues/2848 @@ -111,15 +75,8 @@ public class RecyclerTest { } } - void testMaxCapacity(int maxCapacity) { - Recycler recycler = new Recycler(maxCapacity) { - @Override - protected HandledObject newObject( - Recycler.Handle handle) { - return new HandledObject(handle); - } - }; - + private static void testMaxCapacity(int maxCapacity) { + Recycler recycler = newRecycler(maxCapacity); HandledObject[] objects = new HandledObject[maxCapacity * 3]; for (int i = 0; i < objects.length; i++) { objects[i] = recycler.get(); @@ -130,12 +87,14 @@ public class RecyclerTest { objects[i] = null; } - assertEquals(maxCapacity, recycler.threadLocalCapacity()); + assertTrue("The threadLocalCapacity (" + recycler.threadLocalCapacity() + ") must be <= maxCapacity (" + + maxCapacity + ") as we not pool all new handles internally", + maxCapacity >= recycler.threadLocalCapacity()); } @Test public void testRecycleAtDifferentThread() throws Exception { - final Recycler recycler = new Recycler(256) { + final Recycler recycler = new Recycler(256, 10, 2) { @Override protected HandledObject newObject(Recycler.Handle handle) { return new HandledObject(handle); @@ -143,27 +102,25 @@ public class RecyclerTest { }; final HandledObject o = recycler.get(); + final HandledObject o2 = recycler.get(); final Thread thread = new Thread() { @Override public void run() { o.recycle(); + o2.recycle(); } }; thread.start(); thread.join(); - assertThat(recycler.get(), is(sameInstance(o))); + assertSame(recycler.get(), o); + assertNotSame(recycler.get(), o2); } @Test public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception { final int maxCapacity = 4; // Choose the number smaller than WeakOrderQueue.LINK_CAPACITY - final Recycler recycler = new Recycler(maxCapacity) { - @Override - protected HandledObject newObject(Recycler.Handle handle) { - return new HandledObject(handle); - } - }; + final Recycler recycler = newRecycler(maxCapacity); // Borrow 2 * maxCapacity objects. // Return the half from the same thread. @@ -189,15 +146,15 @@ public class RecyclerTest { thread.start(); thread.join(); - assertThat(recycler.threadLocalCapacity(), is(maxCapacity)); - assertThat(recycler.threadLocalSize(), is(maxCapacity)); + assertEquals(maxCapacity, recycler.threadLocalCapacity()); + assertEquals(1, recycler.threadLocalSize()); for (int i = 0; i < array.length; i ++) { recycler.get(); } - assertThat(recycler.threadLocalCapacity(), is(maxCapacity)); - assertThat(recycler.threadLocalSize(), is(0)); + assertEquals(maxCapacity, recycler.threadLocalCapacity()); + assertEquals(0, recycler.threadLocalSize()); } @Test @@ -244,7 +201,9 @@ public class RecyclerTest { } // The implementation uses maxCapacity / 2 as limit per WeakOrderQueue - assertEquals(array.length - maxCapacity / 2, instancesCount.get()); + assertTrue("The instances count (" + instancesCount.get() + ") must be <= array.length (" + array.length + + ") - maxCapacity (" + maxCapacity + ") / 2 as we not pool all new handles" + + " internally", array.length - maxCapacity / 2 <= instancesCount.get()); } static final class HandledObject {