Allow to limit the maximum number of WeakOrderQueue instances per Thread.

Motivation:

To better restrict resource usage we should limit the number of WeakOrderQueue instances per Thread. Once this limit is reached object that are recycled from a different Thread then the allocation Thread are dropped on the floor.

Modifications:

Add new system property io.netty.recycler.maxDelayedQueuesPerThread and constructor that allows to limit the max number of WeakOrderQueue instances per Thread for Recycler instance. The default is 2 * cores (the same as the default number of EventLoop instances per EventLoopGroup).

Result:

Better way to restrict resource / memory usage per Recycler instance.
This commit is contained in:
Norman Maurer 2016-07-25 11:15:56 +02:00
parent c13908adb5
commit d3dc9c9e74
2 changed files with 68 additions and 29 deletions

View File

@ -54,6 +54,7 @@ public abstract class Recycler<T> {
private static final int DEFAULT_MAX_CAPACITY; private static final int DEFAULT_MAX_CAPACITY;
private static final int INITIAL_CAPACITY; private static final int INITIAL_CAPACITY;
private static final int MAX_SHARED_CAPACITY_FACTOR; private static final int MAX_SHARED_CAPACITY_FACTOR;
private static final int MAX_DELAYED_QUEUES_PER_THREAD;
private static final int LINK_CAPACITY; private static final int LINK_CAPACITY;
private static final int RATIO; private static final int RATIO;
@ -71,6 +72,11 @@ public abstract class Recycler<T> {
SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor", SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
2)); 2));
MAX_DELAYED_QUEUES_PER_THREAD = max(0,
SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
// We use the same value as default EventLoop number
Runtime.getRuntime().availableProcessors() * 2));
LINK_CAPACITY = safeFindNextPositivePowerOfTwo( LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16)); max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
@ -99,11 +105,13 @@ public abstract class Recycler<T> {
private final int maxCapacity; private final int maxCapacity;
private final int maxSharedCapacityFactor; private final int maxSharedCapacityFactor;
private final int ratioMask; private final int ratioMask;
private final int maxDelayedQueuesPerThread;
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override @Override
protected Stack<T> initialValue() { protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor, ratioMask); return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity, maxSharedCapacityFactor,
ratioMask, maxDelayedQueuesPerThread);
} }
}; };
@ -116,17 +124,19 @@ public abstract class Recycler<T> {
} }
protected Recycler(int maxCapacity, int maxSharedCapacityFactor) { protected Recycler(int maxCapacity, int maxSharedCapacityFactor) {
this(maxCapacity, maxSharedCapacityFactor, RATIO); this(maxCapacity, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
} }
protected Recycler(int maxCapacity, int maxSharedCapacityFactor, int ratio) { protected Recycler(int maxCapacity, int maxSharedCapacityFactor, int ratio, int maxDelayedQueuesPerThread) {
ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1; ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
if (maxCapacity <= 0) { if (maxCapacity <= 0) {
this.maxCapacity = 0; this.maxCapacity = 0;
this.maxSharedCapacityFactor = 1; this.maxSharedCapacityFactor = 1;
this.maxDelayedQueuesPerThread = 0;
} else { } else {
this.maxCapacity = maxCapacity; this.maxCapacity = maxCapacity;
this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor); this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
} }
} }
@ -194,26 +204,7 @@ public abstract class Recycler<T> {
if (object != value) { if (object != value) {
throw new IllegalArgumentException("object does not belong to handle"); throw new IllegalArgumentException("object does not belong to handle");
} }
stack.push(this);
Thread thread = Thread.currentThread();
if (thread == stack.thread) {
stack.push(this);
return;
}
// we don't want to have a ref to the queue as the value in our weak map
// so we null it out; to ensure there are no races with restoring it later
// we impose a memory ordering here (no-op on x86)
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(stack);
if (queue == null) {
queue = WeakOrderQueue.allocate(stack, thread);
if (queue == null) {
// drop object
return;
}
delayedRecycled.put(stack, queue);
}
queue.add(this);
} }
} }
@ -229,6 +220,8 @@ public abstract class Recycler<T> {
// but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain // but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
private static final class WeakOrderQueue { private static final class WeakOrderQueue {
static final WeakOrderQueue DUMMY = new WeakOrderQueue();
// Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex. // Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static final class Link extends AtomicInteger { private static final class Link extends AtomicInteger {
@ -246,6 +239,11 @@ public abstract class Recycler<T> {
private final int id = ID_GENERATOR.getAndIncrement(); private final int id = ID_GENERATOR.getAndIncrement();
private final AtomicInteger availableSharedCapacity; private final AtomicInteger availableSharedCapacity;
private WeakOrderQueue() {
owner = null;
availableSharedCapacity = null;
}
private WeakOrderQueue(Stack<?> stack, Thread thread) { private WeakOrderQueue(Stack<?> stack, Thread thread) {
head = tail = new Link(); head = tail = new Link();
owner = new WeakReference<Thread>(thread); owner = new WeakReference<Thread>(thread);
@ -408,23 +406,26 @@ public abstract class Recycler<T> {
// still recycling all items. // still recycling all items.
final Recycler<T> parent; final Recycler<T> parent;
final Thread thread; final Thread thread;
private DefaultHandle<?>[] elements; final AtomicInteger availableSharedCapacity;
final int maxDelayedQueues;
private final int maxCapacity; private final int maxCapacity;
private final int ratioMask; private final int ratioMask;
private DefaultHandle<?>[] elements;
private int size; private int size;
private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled. private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
final AtomicInteger availableSharedCapacity;
private volatile WeakOrderQueue head;
private WeakOrderQueue cursor, prev; private WeakOrderQueue cursor, prev;
private volatile WeakOrderQueue head;
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, int ratioMask) { Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
int ratioMask, int maxDelayedQueues) {
this.parent = parent; this.parent = parent;
this.thread = thread; this.thread = thread;
this.maxCapacity = maxCapacity; this.maxCapacity = maxCapacity;
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY)); availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)]; elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
this.ratioMask = ratioMask; this.ratioMask = ratioMask;
this.maxDelayedQueues = maxDelayedQueues;
} }
int increaseCapacity(int expectedCapacity) { int increaseCapacity(int expectedCapacity) {
@ -523,6 +524,18 @@ public abstract class Recycler<T> {
} }
void push(DefaultHandle<?> item) { void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (thread == currentThread) {
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
pushNow(item);
} else {
// The current Thread is not the one that belongs to the Stack, we need to signal that the push
// happens later.
pushLater(item, currentThread);
}
}
private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) { if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already"); throw new IllegalStateException("recycled already");
} }
@ -541,6 +554,32 @@ public abstract class Recycler<T> {
this.size = size + 1; this.size = size + 1;
} }
private void pushLater(DefaultHandle<?> item, Thread thread) {
// we don't want to have a ref to the queue as the value in our weak map
// so we null it out; to ensure there are no races with restoring it later
// we impose a memory ordering here (no-op on x86)
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
if (delayedRecycled.size() >= maxDelayedQueues) {
// Add a dummy queue so we know we should drop the object
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}
queue.add(item);
}
boolean dropHandle(DefaultHandle<?> handle) { boolean dropHandle(DefaultHandle<?> handle) {
if (!handle.hasBeenRecycled) { if (!handle.hasBeenRecycled) {
if ((++handleRecycleCount & ratioMask) != 0) { if ((++handleRecycleCount & ratioMask) != 0) {

View File

@ -94,7 +94,7 @@ public class RecyclerTest {
@Test @Test
public void testRecycleAtDifferentThread() throws Exception { public void testRecycleAtDifferentThread() throws Exception {
final Recycler<HandledObject> recycler = new Recycler<HandledObject>(256, 10, 2) { final Recycler<HandledObject> recycler = new Recycler<HandledObject>(256, 10, 2, 10) {
@Override @Override
protected HandledObject newObject(Recycler.Handle<HandledObject> handle) { protected HandledObject newObject(Recycler.Handle<HandledObject> handle) {
return new HandledObject(handle); return new HandledObject(handle);