Improve performance of Recycler

Motivation:

Recycler is used in many places to reduce GC-pressure but is still not as fast as possible because of the internal datastructures used.

Modification:

 - Rewrite Recycler to use a WeakOrderQueue which makes minimal guaranteer about order and visibility for max performance.
 - Recycling of the same object multiple times without acquire it will fail.
 - Introduce a RecyclableMpscLinkedQueueNode which can be used for MpscLinkedQueueNodes that use Recycler

These changes are based on @belliottsmith 's work that was part of #2504.

Result:

Huge increase in performance.

4.0 branch without this commit:

Benchmark                                                (size)   Mode   Samples        Score  Score error    Units
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    00000  thrpt        20 116026994.130  2763381.305    ops/s
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    00256  thrpt        20 110823170.627  3007221.464    ops/s
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    01024  thrpt        20 118290272.413  7143962.304    ops/s
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    04096  thrpt        20 120560396.523  6483323.228    ops/s
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    16384  thrpt        20 114726607.428  2960013.108    ops/s
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    65536  thrpt        20 119385917.899  3172913.684    ops/s
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 297.617 sec - in io.netty.microbench.internal.RecyclableArrayListBenchmark

4.0 branch with this commit:

Benchmark                                                (size)   Mode   Samples        Score  Score error    Units
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    00000  thrpt        20 204158855.315  5031432.145    ops/s
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    00256  thrpt        20 205179685.861  1934137.841    ops/s
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    01024  thrpt        20 209906801.437  8007811.254    ops/s
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    04096  thrpt        20 214288320.053  6413126.689    ops/s
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    16384  thrpt        20 215940902.649  7837706.133    ops/s
i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread    65536  thrpt        20 211141994.206  5017868.542    ops/s
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 297.648 sec - in io.netty.microbench.internal.RecyclableArrayListBenchmark
This commit is contained in:
Norman Maurer 2014-06-13 11:56:35 +02:00
parent bf85af5743
commit 030bcaae81
6 changed files with 355 additions and 58 deletions

View File

@ -21,8 +21,11 @@ import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.IdentityHashMap;
import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Light-weight object pool based on a thread-local stack.
@ -33,6 +36,8 @@ public abstract class Recycler<T> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
private static final int DEFAULT_MAX_CAPACITY;
private static final int INITIAL_CAPACITY;
@ -70,27 +75,24 @@ public abstract class Recycler<T> {
this.maxCapacity = Math.max(0, maxCapacity);
}
@SuppressWarnings("unchecked")
public final T get() {
Stack<T> stack = threadLocal.get();
T o = stack.pop();
if (o == null) {
o = newObject(stack);
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return o;
return (T) handle.value;
}
public final boolean recycle(T o, Handle<T> handle) {
@SuppressWarnings("unchecked")
Stack<T> stack = (Stack<T>) handle;
if (stack.parent != this) {
DefaultHandle<T> h = (DefaultHandle<T>) handle;
if (h.stack.parent != this) {
return false;
}
if (Thread.currentThread() != stack.thread) {
return false;
}
stack.push(o);
h.recycle(o);
return true;
}
@ -100,56 +102,241 @@ public abstract class Recycler<T> {
void recycle(T object);
}
static final class Stack<T> implements Handle<T> {
static final class DefaultHandle<T> implements Handle<T> {
private int lastRecycledId;
private int recycleId;
private T[] elements;
private int size;
private final int maxCapacity;
private Stack<?> stack;
private Object value;
private final Map<T, Boolean> map;
DefaultHandle(Stack<?> stack) {
this.stack = stack;
}
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Thread thread = Thread.currentThread();
if (thread == stack.thread) {
stack.push(this);
return;
}
// we don't want to have a ref to the queue as the value in our weak map
// so we null it out; to ensure there are no races with restoring it later
// we impose a memory ordering here (no-op on x86)
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(stack);
if (queue == null) {
delayedRecycled.put(stack, queue = new WeakOrderQueue(stack, thread));
}
queue.add(this);
}
}
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
@Override
protected Map<Stack<?>, WeakOrderQueue> initialValue() {
return new WeakHashMap<Stack<?>, WeakOrderQueue>();
}
};
// a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
// but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
private static final class WeakOrderQueue {
private static final int LINK_CAPACITY = 16;
// Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
@SuppressWarnings("serial")
private static final class Link extends AtomicInteger {
private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
private int readIndex;
private Link next;
}
// chain of data items
private Link head, tail;
// pointer to another queue of delayed items for the same stack
private WeakOrderQueue next;
private final WeakReference<Thread> owner;
private final int id = ID_GENERATOR.getAndIncrement();
public WeakOrderQueue(Stack<?> stack, Thread thread) {
head = tail = new Link();
owner = new WeakReference<Thread>(thread);
synchronized (stack) {
next = stack.head;
stack.head = this;
}
}
void add(DefaultHandle<?> handle) {
handle.lastRecycledId = id;
Link tail = this.tail;
int writeIndex;
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
this.tail = tail = tail.next = new Link();
writeIndex = tail.get();
}
tail.elements[writeIndex] = handle;
handle.stack = null;
// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
// this also means we guarantee visibility of an element in the queue if we see the index updated
tail.lazySet(writeIndex + 1);
}
boolean hasFinalData() {
return tail.readIndex != tail.get();
}
// transfer as many items as we can from this queue to the stack, returning true if any were transferred
@SuppressWarnings({ "unchecked", "rawtypes" })
boolean transfer(Stack<?> to) {
Link head = this.head;
if (head == null) {
return false;
}
if (head.readIndex == LINK_CAPACITY) {
if (head.next == null) {
return false;
}
this.head = head = head.next;
}
int start = head.readIndex;
int end = head.get();
if (start == end) {
return false;
}
int count = end - start;
if (to.size + count > to.elements.length) {
to.elements = Arrays.copyOf(to.elements, (to.size + count) * 2);
}
DefaultHandle[] src = head.elements;
DefaultHandle[] trg = to.elements;
int size = to.size;
while (start < end) {
DefaultHandle element = src[start];
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
element.stack = to;
trg[size++] = element;
src[start++] = null;
}
to.size = size;
if (end == LINK_CAPACITY & head.next != null) {
this.head = head.next;
}
head.readIndex = end;
return true;
}
}
static final class Stack<T> {
// we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
// than the stack owner recycles: when we run out of items in our stack we iterate this collection
// to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
// still recycling all items.
final Recycler<T> parent;
final Thread thread;
private DefaultHandle<?>[] elements;
private final int maxCapacity;
private int size;
private volatile WeakOrderQueue head;
private WeakOrderQueue cursor, prev;
@SuppressWarnings("AssertWithSideEffects")
Stack(Recycler<T> parent, Thread thread, int maxCapacity) {
this.parent = parent;
this.thread = thread;
this.maxCapacity = maxCapacity;
elements = newArray(INITIAL_CAPACITY);
// *assigns* true if assertions are on.
@SuppressWarnings("UnusedAssignment")
boolean assertionEnabled = false;
assert assertionEnabled = true;
if (assertionEnabled) {
map = new IdentityHashMap<T, Boolean>(INITIAL_CAPACITY);
} else {
map = null;
}
elements = new DefaultHandle[INITIAL_CAPACITY];
}
@Override
public void recycle(T object) {
parent.recycle(object, this);
}
T pop() {
@SuppressWarnings({ "unchecked", "rawtypes" })
DefaultHandle<T> pop() {
int size = this.size;
if (size == 0) {
if (!scavenge()) {
return null;
}
size = this.size;
}
size --;
T ret = elements[size];
elements[size] = null;
assert map == null || map.remove(ret) != null;
DefaultHandle ret = elements[size];
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
void push(T o) {
assert map == null || map.put(o, Boolean.TRUE) == null: "recycled already";
boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) {
return true;
}
// reset our scavenge cursor
prev = null;
cursor = head;
return false;
}
boolean scavengeSome() {
boolean success = false;
WeakOrderQueue cursor = this.cursor, prev = this.prev;
while (cursor != null) {
if (cursor.transfer(this)) {
success = true;
break;
}
WeakOrderQueue next = cursor.next;
if (cursor.owner.get() == null) {
// if the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect
// we never unlink the first queue, as we don't want to synchronize on updating the head
if (cursor.hasFinalData()) {
for (;;) {
if (!cursor.transfer(this)) {
break;
}
}
}
if (prev != null) {
prev.next = next;
}
} else {
prev = cursor;
}
cursor = next;
}
this.prev = prev;
this.cursor = cursor;
return success;
}
void push(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
if (size == elements.length) {
@ -157,19 +344,15 @@ public abstract class Recycler<T> {
// Hit the maximum capacity - drop the possibly youngest object.
return;
}
T[] newElements = newArray(Math.min(maxCapacity, size << 1));
System.arraycopy(elements, 0, newElements, 0, size);
elements = newElements;
elements = Arrays.copyOf(elements, size << 1);
}
elements[size] = o;
elements[size] = item;
this.size = size + 1;
}
@SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })
private static <T> T[] newArray(int length) {
return (T[]) new Object[length];
DefaultHandle<T> newHandle() {
return new DefaultHandle<T>(this);
}
}
}

View File

@ -149,7 +149,7 @@ final class MpscLinkedQueue<E> extends MpscLinkedQueueTailRef<E> implements Queu
lazySetHeadRef(next);
// Break the linkage between the old head and the new head.
oldHead.setNext(null);
oldHead.unlink();
return next.clearMaybe();
}

View File

@ -55,4 +55,11 @@ public abstract class MpscLinkedQueueNode<T> {
protected T clearMaybe() {
return value();
}
/**
* Unlink to allow GC'ed
*/
void unlink() {
setNext(null);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.util.Recycler;
/**
* {@link MpscLinkedQueueNode} that will automatically call {@link Recycler.Handle#recycle(Object)} when the node was
* unlinked.
*/
public abstract class RecyclableMpscLinkedQueueNode<T> extends MpscLinkedQueueNode<T> {
@SuppressWarnings("rawtypes")
private final Recycler.Handle handle;
protected RecyclableMpscLinkedQueueNode(Recycler.Handle<? extends RecyclableMpscLinkedQueueNode<T>> handle) {
if (handle == null) {
throw new NullPointerException("handle");
}
this.handle = handle;
}
@SuppressWarnings("unchecked")
@Override
final void unlink() {
super.unlink();
handle.recycle(this);
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
import org.junit.Assert;
import org.junit.Test;
public class RecyclerTest {
@Test(expected = IllegalStateException.class)
public void testMultipleRecycle() {
RecyclableObject object = RecyclableObject.newInstance();
object.recycle();
object.recycle();
}
@Test
public void testRecycle() {
RecyclableObject object = RecyclableObject.newInstance();
object.recycle();
RecyclableObject object2 = RecyclableObject.newInstance();
Assert.assertSame(object, object2);
object2.recycle();
}
static final class RecyclableObject {
private static final Recycler<RecyclableObject> RECYCLER = new Recycler<RecyclableObject>() {
@Override
protected RecyclableObject newObject(Handle handle) {
return new RecyclableObject(handle);
}
};
private final Recycler.Handle handle;
private RecyclableObject(Recycler.Handle handle) {
this.handle = handle;
}
public static RecyclableObject newInstance() {
return RECYCLER.get();
}
public void recycle() {
RECYCLER.recycle(this, handle);
}
}
}

View File

@ -20,6 +20,7 @@ import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.RecyclableMpscLinkedQueueNode;
import java.net.SocketAddress;
@ -398,7 +399,8 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
}
}
static final class WriteTask extends OneTimeTask implements SingleThreadEventLoop.NonWakeupRunnable {
static final class WriteTask extends RecyclableMpscLinkedQueueNode<SingleThreadEventLoop.NonWakeupRunnable>
implements SingleThreadEventLoop.NonWakeupRunnable {
private ChannelHandlerContext ctx;
private Object msg;
private ChannelPromise promise;
@ -421,10 +423,8 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
return task;
}
private final Recycler.Handle<WriteTask> handle;
private WriteTask(Recycler.Handle<WriteTask> handle) {
this.handle = handle;
super(handle);
}
@Override
@ -443,9 +443,12 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
ctx = null;
msg = null;
promise = null;
}
}
RECYCLER.recycle(this, handle);
}
@Override
public SingleThreadEventLoop.NonWakeupRunnable value() {
return this;
}
}
}