[#5505] Enforce Recycler limit when recycling from different threads

Motivation:

Currently, the recycler max capacity it's only enforced on the
thread-local stack which is used when the recycling happens on the
same thread that requested the object.

When the recycling happens in a different thread, then the objects
will be queued into a linked list (where each node holds N objects,
default=16). These objects are then transfered into the stack when
new objects are requested and the stack is empty.

The problem is that the queue doesn't have a max capacity and that
can lead to bad scenarios. Eg:

- Allocate 1M object from recycler
- Recycle all of them from different thread
- Recycler WeakOrderQueue will contain 1M objects
- Reference graph will be very long to traverse and GC timeseems to be negatively impacted
- Size of the queue will never shrink after this

Modifications:

Add some shared counter which is used to manage capacity limits when recycle from different thread then the allocation thread. We modify the counter whenever we allocate a new Link to reduce the overhead of increment / decrement it.

Result:

More predictable number of objects mantained in the recycler pool.
This commit is contained in:
Norman Maurer 2016-07-11 11:55:09 +02:00
parent 771cfaec22
commit afafadd3d7
2 changed files with 114 additions and 10 deletions

View File

@ -28,6 +28,9 @@ import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.Math.max;
import static java.lang.Math.min;
/**
* Light-weight object pool based on a thread-local stack.
*
@ -48,8 +51,10 @@ public abstract class Recycler<T> {
private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
// TODO: Some arbitrary large number - should adjust as we get more production experience.
private static final int DEFAULT_INITIAL_MAX_CAPACITY = 262144;
private static final int DEFAULT_MAX_CAPACITY;
private static final int INITIAL_CAPACITY;
private static final int MAX_SHARED_CAPACITY_FACTOR;
private static final int LINK_CAPACITY;
static {
@ -60,30 +65,37 @@ public abstract class Recycler<T> {
if (maxCapacity < 0) {
maxCapacity = DEFAULT_INITIAL_MAX_CAPACITY;
}
DEFAULT_MAX_CAPACITY = maxCapacity;
MAX_SHARED_CAPACITY_FACTOR = max(2,
SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
2));
LINK_CAPACITY = MathUtil.findNextPositivePowerOfTwo(
Math.max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
if (logger.isDebugEnabled()) {
if (DEFAULT_MAX_CAPACITY == 0) {
logger.debug("-Dio.netty.recycler.maxCapacity: disabled");
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
} else {
logger.debug("-Dio.netty.recycler.maxCapacity: {}", DEFAULT_MAX_CAPACITY);
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
}
}
INITIAL_CAPACITY = Math.min(DEFAULT_MAX_CAPACITY, 256);
INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY, 256);
}
private final int maxCapacity;
private final int maxSharedCapacityFactor;
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity);
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor);
}
};
@ -92,7 +104,17 @@ public abstract class Recycler<T> {
}
protected Recycler(int maxCapacity) {
this.maxCapacity = Math.max(0, maxCapacity);
this(maxCapacity, MAX_SHARED_CAPACITY_FACTOR);
}
protected Recycler(int maxCapacity, int maxSharedCapacityFactor) {
if (maxCapacity <= 0) {
this.maxCapacity = 0;
this.maxSharedCapacityFactor = 1;
} else {
this.maxCapacity = maxCapacity;
this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
}
}
@SuppressWarnings("unchecked")
@ -201,6 +223,7 @@ public abstract class Recycler<T> {
private WeakOrderQueue next;
private final WeakReference<Thread> owner;
private final int id = ID_GENERATOR.getAndIncrement();
private final Stack<?> stack;
WeakOrderQueue(Stack<?> stack, Thread thread) {
head = tail = new Link();
@ -209,6 +232,10 @@ public abstract class Recycler<T> {
next = stack.head;
stack.head = this;
}
this.stack = stack;
// We allocated a Link so reserve the space
boolean reserved = stack.reserveSpace(LINK_CAPACITY);
assert reserved;
}
void add(DefaultHandle<?> handle) {
@ -217,7 +244,13 @@ public abstract class Recycler<T> {
Link tail = this.tail;
int writeIndex;
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
if (!stack.reserveSpace(LINK_CAPACITY)) {
// Drop it.
return;
}
// We allocate a Link so reserve the space
this.tail = tail = tail.next = new Link();
writeIndex = tail.get();
}
tail.elements[writeIndex] = handle;
@ -259,7 +292,7 @@ public abstract class Recycler<T> {
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
if (srcStart != srcEnd) {
@ -280,6 +313,9 @@ public abstract class Recycler<T> {
dst.size = newDstSize;
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
stack.reclaimSpace(LINK_CAPACITY);
this.head = head.next;
}
@ -303,15 +339,35 @@ public abstract class Recycler<T> {
private DefaultHandle<?>[] elements;
private final int maxCapacity;
private int size;
private final AtomicInteger availableSharedCapacity;
private volatile WeakOrderQueue head;
private WeakOrderQueue cursor, prev;
Stack(Recycler<T> parent, Thread thread, int maxCapacity) {
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor) {
this.parent = parent;
this.thread = thread;
this.maxCapacity = maxCapacity;
elements = new DefaultHandle[Math.min(INITIAL_CAPACITY, maxCapacity)];
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
}
boolean reserveSpace(int space) {
assert space >= 0;
for (;;) {
int available = availableSharedCapacity.get();
if (available < space) {
return false;
}
if (availableSharedCapacity.compareAndSet(available, available - space)) {
return true;
}
}
}
void reclaimSpace(int space) {
assert space >= 0;
availableSharedCapacity.addAndGet(space);
}
int increaseCapacity(int expectedCapacity) {
@ -321,7 +377,7 @@ public abstract class Recycler<T> {
newCapacity <<= 1;
} while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
newCapacity = Math.min(newCapacity, maxCapacity);
newCapacity = min(newCapacity, maxCapacity);
if (newCapacity != elements.length) {
elements = Arrays.copyOf(elements, newCapacity);
}
@ -421,7 +477,7 @@ public abstract class Recycler<T> {
return;
}
if (size == elements.length) {
elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity));
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
elements[size] = item;

View File

@ -18,6 +18,7 @@ package io.netty.util;
import org.junit.Test;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
@ -199,6 +200,53 @@ public class RecyclerTest {
assertThat(recycler.threadLocalSize(), is(0));
}
@Test
public void testDiscardingExceedingElementsWithRecycleAtDifferentThread() throws Exception {
final int maxCapacity = 32;
final AtomicInteger instancesCount = new AtomicInteger(0);
final Recycler<HandledObject> recycler = new Recycler<HandledObject>(maxCapacity, 2) {
@Override
protected HandledObject newObject(Recycler.Handle<HandledObject> handle) {
instancesCount.incrementAndGet();
return new HandledObject(handle);
}
};
// Borrow 2 * maxCapacity objects.
final HandledObject[] array = new HandledObject[maxCapacity * 2];
for (int i = 0; i < array.length; i++) {
array[i] = recycler.get();
}
assertEquals(array.length, instancesCount.get());
// Reset counter.
instancesCount.set(0);
// Recycle from other thread.
final Thread thread = new Thread() {
@Override
public void run() {
for (HandledObject object: array) {
object.recycle();
}
}
};
thread.start();
thread.join();
assertEquals(0, instancesCount.get());
// Borrow 2 * maxCapacity objects. Half of them should come from
// the recycler queue, the other half should be freshly allocated.
for (int i = 0; i < array.length; i++) {
recycler.get();
}
// The implementation uses maxCapacity / 2 as limit per WeakOrderQueue
assertEquals(array.length - maxCapacity / 2, instancesCount.get());
}
static final class HandledObject {
Recycler.Handle<HandledObject> handle;