Introduce allocation / pooling ratio in Recycler

Motivation:

At the moment the Recyler is very sensitive to allocation bursts which means that if there is a need for X objects for only one time these will most likely end up in the Recycler and sit there forever as the normal workload only need a subset of this number.

Modifications:

Add a ratio which sets how many objects should be pooled for each new allocation. This allows to slowly increase the number of objects in the Recycler while not be to sensitive for bursts.

Result:

Less unused objects in the Recycler if allocation rate sometimes bursts.
This commit is contained in:
Norman Maurer 2016-07-26 20:15:21 +02:00
parent 7f8b5f8efd
commit d92c5f5f5b
3 changed files with 93 additions and 128 deletions

View File

@ -15,8 +15,6 @@
*/
package io.netty.buffer;
import org.junit.Test;
import static org.junit.Assert.*;
public abstract class AbstractPooledByteBufTest extends AbstractByteBufTest {
@ -26,49 +24,14 @@ public abstract class AbstractPooledByteBufTest extends AbstractByteBufTest {
@Override
protected ByteBuf newBuffer(int length) {
ByteBuf buffer = alloc(length);
// Testing if the writerIndex and readerIndex are correct when allocate and also after we reset the mark.
assertEquals(0, buffer.writerIndex());
assertEquals(0, buffer.readerIndex());
buffer.resetReaderIndex();
buffer.resetWriterIndex();
assertEquals(0, buffer.writerIndex());
assertEquals(0, buffer.readerIndex());
return buffer;
}
@Test
public void testDiscardMarks() {
testDiscardMarks(4);
}
@Test
public void testDiscardMarksUnpooled() {
testDiscardMarks(32 * 1024 * 1024);
}
private void testDiscardMarks(int capacity) {
ByteBuf buf = newBuffer(capacity);
buf.writeShort(1);
buf.skipBytes(1);
buf.markReaderIndex();
buf.markWriterIndex();
assertTrue(buf.release());
ByteBuf buf2 = newBuffer(capacity);
assertEquals(unwrapIfNeeded(buf), unwrapIfNeeded(buf2));
buf2.writeShort(1);
buf2.resetReaderIndex();
buf2.resetWriterIndex();
assertEquals(0, buf2.readerIndex());
assertEquals(0, buf2.writerIndex());
assertTrue(buf2.release());
}
private static ByteBuf unwrapIfNeeded(ByteBuf buf) {
if (buf instanceof AdvancedLeakAwareByteBuf || buf instanceof SimpleLeakAwareByteBuf) {
return buf.unwrap();
}
return buf;
}
}

View File

@ -17,7 +17,6 @@
package io.netty.util;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.MathUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -28,6 +27,7 @@ import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.util.internal.MathUtil.findNextPositivePowerOfTwo;
import static java.lang.Math.max;
import static java.lang.Math.min;
@ -55,6 +55,7 @@ public abstract class Recycler<T> {
private static final int INITIAL_CAPACITY;
private static final int MAX_SHARED_CAPACITY_FACTOR;
private static final int LINK_CAPACITY;
private static final int RATIO;
static {
// In the future, we might have different maxCapacity for different object types.
@ -70,18 +71,26 @@ public abstract class Recycler<T> {
SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
2));
LINK_CAPACITY = MathUtil.findNextPositivePowerOfTwo(
LINK_CAPACITY = findNextPositivePowerOfTwo(
max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
// By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
// This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
// bursts.
RATIO = min(findNextPositivePowerOfTwo(
max(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8), 2)), 0x40000000);
if (logger.isDebugEnabled()) {
if (DEFAULT_MAX_CAPACITY == 0) {
logger.debug("-Dio.netty.recycler.maxCapacity: disabled");
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
logger.debug("-Dio.netty.recycler.ratio: disabled");
} else {
logger.debug("-Dio.netty.recycler.maxCapacity: {}", DEFAULT_MAX_CAPACITY);
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
}
}
@ -90,11 +99,12 @@ public abstract class Recycler<T> {
private final int maxCapacity;
private final int maxSharedCapacityFactor;
private final int ratioMask;
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor);
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor, ratioMask);
}
};
@ -107,6 +117,14 @@ public abstract class Recycler<T> {
}
protected Recycler(int maxCapacity, int maxSharedCapacityFactor) {
this(maxCapacity, maxSharedCapacityFactor, RATIO);
}
protected Recycler(int maxCapacity, int maxSharedCapacityFactor, int ratio) {
if (ratio > 0x40000000) {
throw new IllegalArgumentException(ratio + ": " + ratio + " (expected: < 0x40000000)");
}
ratioMask = findNextPositivePowerOfTwo(ratio) - 1;
if (maxCapacity <= 0) {
this.maxCapacity = 0;
this.maxSharedCapacityFactor = 1;
@ -166,6 +184,8 @@ public abstract class Recycler<T> {
private int lastRecycledId;
private int recycleId;
boolean hasBeenRecycled;
private Stack<?> stack;
private Object value;
@ -178,6 +198,7 @@ public abstract class Recycler<T> {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Thread thread = Thread.currentThread();
if (thread == stack.thread) {
stack.push(this);
@ -337,11 +358,15 @@ public abstract class Recycler<T> {
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
srcElems[i] = null;
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
element.stack = dst;
dstElems[newDstSize ++] = element;
srcElems[i] = null;
}
dst.size = newDstSize;
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
@ -351,6 +376,10 @@ public abstract class Recycler<T> {
}
head.readIndex = srcEnd;
if (dst.size == newDstSize) {
return false;
}
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
@ -385,18 +414,21 @@ public abstract class Recycler<T> {
final Thread thread;
private DefaultHandle<?>[] elements;
private final int maxCapacity;
private final int ratioMask;
private int size;
private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
final AtomicInteger availableSharedCapacity;
private volatile WeakOrderQueue head;
private WeakOrderQueue cursor, prev;
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor) {
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int ratioMask) {
this.parent = parent;
this.thread = thread;
this.maxCapacity = maxCapacity;
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
this.ratioMask = ratioMask;
}
int increaseCapacity(int expectedCapacity) {
@ -501,8 +533,8 @@ public abstract class Recycler<T> {
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
if (size >= maxCapacity) {
// Hit the maximum capacity - drop the possibly youngest object.
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
if (size == elements.length) {
@ -513,6 +545,17 @@ public abstract class Recycler<T> {
this.size = size + 1;
}
boolean dropHandle(DefaultHandle<?> handle) {
if (!handle.hasBeenRecycled) {
if ((++handleRecycleCount & ratioMask) != 0) {
// Drop the object.
return true;
}
handle.hasBeenRecycled = true;
}
return false;
}
DefaultHandle<T> newHandle() {
return new DefaultHandle<T>(this);
}

View File

@ -20,84 +20,48 @@ import org.junit.Test;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
public class RecyclerTest {
private static Recycler<HandledObject> newRecycler(int max) {
return new Recycler<HandledObject>(max) {
@Override
protected HandledObject newObject(
Recycler.Handle<HandledObject> handle) {
return new HandledObject(handle);
}
};
}
@Test(expected = IllegalStateException.class)
public void testMultipleRecycle() {
RecyclableObject object = RecyclableObject.newInstance();
Recycler<HandledObject> recycler = newRecycler(1024);
HandledObject object = recycler.get();
object.recycle();
object.recycle();
}
@Test
public void testRecycle() {
RecyclableObject object = RecyclableObject.newInstance();
Recycler<HandledObject> recycler = newRecycler(1024);
HandledObject object = recycler.get();
object.recycle();
RecyclableObject object2 = RecyclableObject.newInstance();
HandledObject object2 = recycler.get();
assertSame(object, object2);
object2.recycle();
}
@Test
public void testRecycleDisable() {
DisabledRecyclableObject object = DisabledRecyclableObject.newInstance();
Recycler<HandledObject> recycler = newRecycler(-1);
HandledObject object = recycler.get();
object.recycle();
DisabledRecyclableObject object2 = DisabledRecyclableObject.newInstance();
HandledObject object2 = recycler.get();
assertNotSame(object, object2);
object2.recycle();
}
static final class RecyclableObject {
private static final Recycler<RecyclableObject> RECYCLER = new Recycler<RecyclableObject>() {
@Override
protected RecyclableObject newObject(Handle<RecyclableObject> handle) {
return new RecyclableObject(handle);
}
};
private final Recycler.Handle<RecyclableObject> handle;
private RecyclableObject(Recycler.Handle<RecyclableObject> handle) {
this.handle = handle;
}
public static RecyclableObject newInstance() {
return RECYCLER.get();
}
public void recycle() {
handle.recycle(this);
}
}
static final class DisabledRecyclableObject {
private static final Recycler<DisabledRecyclableObject> RECYCLER = new Recycler<DisabledRecyclableObject>(-1) {
@Override
protected DisabledRecyclableObject newObject(Handle<DisabledRecyclableObject> handle) {
return new DisabledRecyclableObject(handle);
}
};
private final Recycler.Handle<DisabledRecyclableObject> handle;
private DisabledRecyclableObject(Recycler.Handle<DisabledRecyclableObject> handle) {
this.handle = handle;
}
public static DisabledRecyclableObject newInstance() {
return RECYCLER.get();
}
public void recycle() {
handle.recycle(this);
}
}
/**
* Test to make sure bug #2848 never happens again
* https://github.com/netty/netty/issues/2848
@ -111,15 +75,8 @@ public class RecyclerTest {
}
}
void testMaxCapacity(int maxCapacity) {
Recycler<HandledObject> recycler = new Recycler<HandledObject>(maxCapacity) {
@Override
protected HandledObject newObject(
Recycler.Handle<HandledObject> handle) {
return new HandledObject(handle);
}
};
private static void testMaxCapacity(int maxCapacity) {
Recycler<HandledObject> recycler = newRecycler(maxCapacity);
HandledObject[] objects = new HandledObject[maxCapacity * 3];
for (int i = 0; i < objects.length; i++) {
objects[i] = recycler.get();
@ -130,12 +87,14 @@ public class RecyclerTest {
objects[i] = null;
}
assertEquals(maxCapacity, recycler.threadLocalCapacity());
assertTrue("The threadLocalCapacity (" + recycler.threadLocalCapacity() + ") must be <= maxCapacity ("
+ maxCapacity + ") as we not pool all new handles internally",
maxCapacity >= recycler.threadLocalCapacity());
}
@Test
public void testRecycleAtDifferentThread() throws Exception {
final Recycler<HandledObject> recycler = new Recycler<HandledObject>(256) {
final Recycler<HandledObject> recycler = new Recycler<HandledObject>(256, 10, 2) {
@Override
protected HandledObject newObject(Recycler.Handle<HandledObject> handle) {
return new HandledObject(handle);
@ -143,27 +102,25 @@ public class RecyclerTest {
};
final HandledObject o = recycler.get();
final HandledObject o2 = recycler.get();
final Thread thread = new Thread() {
@Override
public void run() {
o.recycle();
o2.recycle();
}
};
thread.start();
thread.join();
assertThat(recycler.get(), is(sameInstance(o)));
assertSame(recycler.get(), o);
assertNotSame(recycler.get(), o2);
}
@Test
public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception {
final int maxCapacity = 4; // Choose the number smaller than WeakOrderQueue.LINK_CAPACITY
final Recycler<HandledObject> recycler = new Recycler<HandledObject>(maxCapacity) {
@Override
protected HandledObject newObject(Recycler.Handle handle) {
return new HandledObject(handle);
}
};
final Recycler<HandledObject> recycler = newRecycler(maxCapacity);
// Borrow 2 * maxCapacity objects.
// Return the half from the same thread.
@ -189,15 +146,15 @@ public class RecyclerTest {
thread.start();
thread.join();
assertThat(recycler.threadLocalCapacity(), is(maxCapacity));
assertThat(recycler.threadLocalSize(), is(maxCapacity));
assertEquals(maxCapacity, recycler.threadLocalCapacity());
assertEquals(1, recycler.threadLocalSize());
for (int i = 0; i < array.length; i ++) {
recycler.get();
}
assertThat(recycler.threadLocalCapacity(), is(maxCapacity));
assertThat(recycler.threadLocalSize(), is(0));
assertEquals(maxCapacity, recycler.threadLocalCapacity());
assertEquals(0, recycler.threadLocalSize());
}
@Test
@ -244,7 +201,9 @@ public class RecyclerTest {
}
// The implementation uses maxCapacity / 2 as limit per WeakOrderQueue
assertEquals(array.length - maxCapacity / 2, instancesCount.get());
assertTrue("The instances count (" + instancesCount.get() + ") must be <= array.length (" + array.length
+ ") - maxCapacity (" + maxCapacity + ") / 2 as we not pool all new handles" +
" internally", array.length - maxCapacity / 2 <= instancesCount.get());
}
static final class HandledObject {