Improve performance of Recycler
Motivation: Recycler is used in many places to reduce GC-pressure but is still not as fast as possible because of the internal datastructures used. Modification: - Rewrite Recycler to use a WeakOrderQueue which makes minimal guaranteer about order and visibility for max performance. - Recycling of the same object multiple times without acquire it will fail. - Introduce a RecyclableMpscLinkedQueueNode which can be used for MpscLinkedQueueNodes that use Recycler These changes are based on @belliottsmith 's work that was part of #2504. Result: Huge increase in performance. 4.0 branch without this commit: Benchmark (size) Mode Samples Score Score error Units i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 00000 thrpt 20 116026994.130 2763381.305 ops/s i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 00256 thrpt 20 110823170.627 3007221.464 ops/s i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 01024 thrpt 20 118290272.413 7143962.304 ops/s i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 04096 thrpt 20 120560396.523 6483323.228 ops/s i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 16384 thrpt 20 114726607.428 2960013.108 ops/s i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 65536 thrpt 20 119385917.899 3172913.684 ops/s Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 297.617 sec - in io.netty.microbench.internal.RecyclableArrayListBenchmark 4.0 branch with this commit: Benchmark (size) Mode Samples Score Score error Units i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 00000 thrpt 20 204158855.315 5031432.145 ops/s i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 00256 thrpt 20 205179685.861 1934137.841 ops/s i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 01024 thrpt 20 209906801.437 8007811.254 ops/s i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 04096 thrpt 20 214288320.053 6413126.689 ops/s i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 16384 thrpt 20 215940902.649 7837706.133 ops/s i.n.m.i.RecyclableArrayListBenchmark.recycleSameThread 65536 thrpt 20 211141994.206 5017868.542 ops/s Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 297.648 sec - in io.netty.microbench.internal.RecyclableArrayListBenchmark
This commit is contained in:
parent
22f16e52bf
commit
790c63e8d2
@ -21,8 +21,11 @@ import io.netty.util.internal.SystemPropertyUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.util.IdentityHashMap;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Light-weight object pool based on a thread-local stack.
|
||||
@ -33,6 +36,8 @@ public abstract class Recycler<T> {
|
||||
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);
|
||||
|
||||
private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
|
||||
private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
|
||||
private static final int DEFAULT_MAX_CAPACITY;
|
||||
private static final int INITIAL_CAPACITY;
|
||||
|
||||
@ -70,27 +75,26 @@ public abstract class Recycler<T> {
|
||||
this.maxCapacity = Math.max(0, maxCapacity);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public final T get() {
|
||||
Stack<T> stack = threadLocal.get();
|
||||
T o = stack.pop();
|
||||
if (o == null) {
|
||||
o = newObject(stack);
|
||||
DefaultHandle handle = stack.pop();
|
||||
if (handle == null) {
|
||||
handle = stack.newHandle();
|
||||
handle.value = newObject(handle);
|
||||
}
|
||||
return o;
|
||||
return (T) handle.value;
|
||||
}
|
||||
|
||||
public final boolean recycle(T o, Handle handle) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Stack<T> stack = (Stack<T>) handle;
|
||||
if (stack.parent != this) {
|
||||
DefaultHandle h = (DefaultHandle) handle;
|
||||
if (h.stack.parent != this) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (Thread.currentThread() != stack.thread) {
|
||||
return false;
|
||||
if (o != h.value) {
|
||||
throw new IllegalArgumentException("o does not belong to handle");
|
||||
}
|
||||
|
||||
stack.push(o);
|
||||
h.recycle();
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -98,51 +102,235 @@ public abstract class Recycler<T> {
|
||||
|
||||
public interface Handle { }
|
||||
|
||||
static final class Stack<T> implements Handle {
|
||||
static final class DefaultHandle implements Handle {
|
||||
private int lastRecycledId;
|
||||
private int recycleId;
|
||||
|
||||
private T[] elements;
|
||||
private int size;
|
||||
private final int maxCapacity;
|
||||
private Stack<?> stack;
|
||||
private Object value;
|
||||
|
||||
private final Map<T, Boolean> map;
|
||||
DefaultHandle(Stack<?> stack) {
|
||||
this.stack = stack;
|
||||
}
|
||||
|
||||
public void recycle() {
|
||||
Thread thread = Thread.currentThread();
|
||||
if (thread == stack.thread) {
|
||||
stack.push(this);
|
||||
return;
|
||||
}
|
||||
// we don't want to have a ref to the queue as the value in our weak map
|
||||
// so we null it out; to ensure there are no races with restoring it later
|
||||
// we impose a memory ordering here (no-op on x86)
|
||||
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
|
||||
WeakOrderQueue queue = delayedRecycled.get(stack);
|
||||
if (queue == null) {
|
||||
delayedRecycled.put(stack, queue = new WeakOrderQueue(stack, thread));
|
||||
}
|
||||
queue.add(this);
|
||||
}
|
||||
}
|
||||
|
||||
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
|
||||
new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
|
||||
@Override
|
||||
protected Map<Stack<?>, WeakOrderQueue> initialValue() {
|
||||
return new WeakHashMap<Stack<?>, WeakOrderQueue>();
|
||||
}
|
||||
};
|
||||
|
||||
// a queue that makes only moderate guarantees about visibility: items are seen in the correct order,
|
||||
// but we aren't absolutely guaranteed to ever see anything at all, thereby keeping the queue cheap to maintain
|
||||
private static final class WeakOrderQueue {
|
||||
private static final int LINK_CAPACITY = 16;
|
||||
|
||||
// Let Link extend AtomicInteger for intrinsics. The Link itself will be used as writerIndex.
|
||||
@SuppressWarnings("serial")
|
||||
private static final class Link extends AtomicInteger {
|
||||
private final DefaultHandle[] elements = new DefaultHandle[LINK_CAPACITY];
|
||||
|
||||
private int readIndex;
|
||||
private Link next;
|
||||
}
|
||||
|
||||
// chain of data items
|
||||
private Link head, tail;
|
||||
// pointer to another queue of delayed items for the same stack
|
||||
private WeakOrderQueue next;
|
||||
private final WeakReference<Thread> owner;
|
||||
private final int id = ID_GENERATOR.getAndIncrement();
|
||||
|
||||
public WeakOrderQueue(Stack<?> stack, Thread thread) {
|
||||
head = tail = new Link();
|
||||
owner = new WeakReference<Thread>(thread);
|
||||
synchronized (stack) {
|
||||
next = stack.head;
|
||||
stack.head = this;
|
||||
}
|
||||
}
|
||||
|
||||
void add(DefaultHandle handle) {
|
||||
handle.lastRecycledId = id;
|
||||
|
||||
Link tail = this.tail;
|
||||
int writeIndex;
|
||||
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
|
||||
this.tail = tail = tail.next = new Link();
|
||||
writeIndex = tail.get();
|
||||
}
|
||||
tail.elements[writeIndex] = handle;
|
||||
handle.stack = null;
|
||||
// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
|
||||
// this also means we guarantee visibility of an element in the queue if we see the index updated
|
||||
tail.lazySet(writeIndex + 1);
|
||||
}
|
||||
|
||||
boolean hasFinalData() {
|
||||
return tail.readIndex != tail.get();
|
||||
}
|
||||
|
||||
// transfer as many items as we can from this queue to the stack, returning true if any were transferred
|
||||
boolean transfer(Stack<?> to) {
|
||||
|
||||
Link head = this.head;
|
||||
if (head == null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (head.readIndex == LINK_CAPACITY) {
|
||||
if (head.next == null) {
|
||||
return false;
|
||||
}
|
||||
this.head = head = head.next;
|
||||
}
|
||||
|
||||
int start = head.readIndex;
|
||||
int end = head.get();
|
||||
if (start == end) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int count = end - start;
|
||||
if (to.size + count > to.elements.length) {
|
||||
to.elements = Arrays.copyOf(to.elements, (to.size + count) * 2);
|
||||
}
|
||||
|
||||
DefaultHandle[] src = head.elements;
|
||||
DefaultHandle[] trg = to.elements;
|
||||
int size = to.size;
|
||||
while (start < end) {
|
||||
DefaultHandle element = src[start];
|
||||
if (element.recycleId == 0) {
|
||||
element.recycleId = element.lastRecycledId;
|
||||
} else if (element.recycleId != element.lastRecycledId) {
|
||||
throw new IllegalStateException("recycled already");
|
||||
}
|
||||
element.stack = to;
|
||||
trg[size++] = element;
|
||||
src[start++] = null;
|
||||
}
|
||||
to.size = size;
|
||||
|
||||
if (end == LINK_CAPACITY & head.next != null) {
|
||||
this.head = head.next;
|
||||
}
|
||||
|
||||
head.readIndex = end;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
static final class Stack<T> {
|
||||
|
||||
// we keep a queue of per-thread queues, which is appended to once only, each time a new thread other
|
||||
// than the stack owner recycles: when we run out of items in our stack we iterate this collection
|
||||
// to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
|
||||
// still recycling all items.
|
||||
final Recycler<T> parent;
|
||||
final Thread thread;
|
||||
private DefaultHandle[] elements;
|
||||
private final int maxCapacity;
|
||||
private int size;
|
||||
|
||||
private volatile WeakOrderQueue head;
|
||||
private WeakOrderQueue cursor, prev;
|
||||
|
||||
@SuppressWarnings("AssertWithSideEffects")
|
||||
Stack(Recycler<T> parent, Thread thread, int maxCapacity) {
|
||||
this.parent = parent;
|
||||
this.thread = thread;
|
||||
this.maxCapacity = maxCapacity;
|
||||
elements = newArray(INITIAL_CAPACITY);
|
||||
|
||||
// *assigns* true if assertions are on.
|
||||
@SuppressWarnings("UnusedAssignment")
|
||||
boolean assertionEnabled = false;
|
||||
assert assertionEnabled = true;
|
||||
|
||||
if (assertionEnabled) {
|
||||
map = new IdentityHashMap<T, Boolean>(INITIAL_CAPACITY);
|
||||
} else {
|
||||
map = null;
|
||||
}
|
||||
elements = new DefaultHandle[INITIAL_CAPACITY];
|
||||
}
|
||||
|
||||
T pop() {
|
||||
DefaultHandle pop() {
|
||||
int size = this.size;
|
||||
if (size == 0) {
|
||||
return null;
|
||||
if (!scavenge()) {
|
||||
return null;
|
||||
}
|
||||
size = this.size;
|
||||
}
|
||||
size --;
|
||||
T ret = elements[size];
|
||||
elements[size] = null;
|
||||
assert map == null || map.remove(ret) != null;
|
||||
DefaultHandle ret = elements[size];
|
||||
if (ret.lastRecycledId != ret.recycleId) {
|
||||
throw new IllegalStateException("recycled multiple times");
|
||||
}
|
||||
ret.recycleId = 0;
|
||||
ret.lastRecycledId = 0;
|
||||
this.size = size;
|
||||
return ret;
|
||||
}
|
||||
|
||||
void push(T o) {
|
||||
assert map == null || map.put(o, Boolean.TRUE) == null: "recycled already";
|
||||
boolean scavenge() {
|
||||
// continue an existing scavenge, if any
|
||||
if (scavengeSome()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// reset our scavenge cursor
|
||||
prev = null;
|
||||
cursor = head;
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean scavengeSome() {
|
||||
boolean success = false;
|
||||
WeakOrderQueue cursor = this.cursor, prev = this.prev;
|
||||
while (cursor != null) {
|
||||
if (cursor.transfer(this)) {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
WeakOrderQueue next = cursor.next;
|
||||
if (cursor.owner.get() == null) {
|
||||
// if the thread associated with the queue is gone, unlink it, after
|
||||
// performing a volatile read to confirm there is no data left to collect
|
||||
// we never unlink the first queue, as we don't want to synchronize on updating the head
|
||||
if (cursor.hasFinalData()) {
|
||||
for (;;) {
|
||||
if (!cursor.transfer(this)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (prev != null) {
|
||||
prev.next = next;
|
||||
}
|
||||
} else {
|
||||
prev = cursor;
|
||||
}
|
||||
cursor = next;
|
||||
}
|
||||
this.prev = prev;
|
||||
this.cursor = cursor;
|
||||
return success;
|
||||
}
|
||||
|
||||
void push(DefaultHandle item) {
|
||||
if ((item.recycleId | item.lastRecycledId) != 0) {
|
||||
throw new IllegalStateException("recycled already");
|
||||
}
|
||||
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
|
||||
|
||||
int size = this.size;
|
||||
if (size == elements.length) {
|
||||
@ -150,19 +338,15 @@ public abstract class Recycler<T> {
|
||||
// Hit the maximum capacity - drop the possibly youngest object.
|
||||
return;
|
||||
}
|
||||
|
||||
T[] newElements = newArray(Math.min(maxCapacity, size << 1));
|
||||
System.arraycopy(elements, 0, newElements, 0, size);
|
||||
elements = newElements;
|
||||
elements = Arrays.copyOf(elements, size << 1);
|
||||
}
|
||||
|
||||
elements[size] = o;
|
||||
elements[size] = item;
|
||||
this.size = size + 1;
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })
|
||||
private static <T> T[] newArray(int length) {
|
||||
return (T[]) new Object[length];
|
||||
DefaultHandle newHandle() {
|
||||
return new DefaultHandle(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ final class MpscLinkedQueue<E> extends MpscLinkedQueueTailRef<E> implements Queu
|
||||
lazySetHeadRef(next);
|
||||
|
||||
// Break the linkage between the old head and the new head.
|
||||
oldHead.setNext(null);
|
||||
oldHead.unlink();
|
||||
|
||||
return next.clearMaybe();
|
||||
}
|
||||
|
@ -55,4 +55,11 @@ public abstract class MpscLinkedQueueNode<T> {
|
||||
protected T clearMaybe() {
|
||||
return value();
|
||||
}
|
||||
|
||||
/**
|
||||
* Unlink to allow GC'ed
|
||||
*/
|
||||
void unlink() {
|
||||
setNext(null);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,45 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.util.internal;
|
||||
|
||||
import io.netty.util.Recycler;
|
||||
|
||||
/**
|
||||
* {@link MpscLinkedQueueNode} that will automatically call {@link #recycle(Recycler.Handle)} when the node was
|
||||
* unlinked.
|
||||
*/
|
||||
public abstract class RecyclableMpscLinkedQueueNode<T> extends MpscLinkedQueueNode<T> {
|
||||
private final Recycler.Handle handle;
|
||||
|
||||
protected RecyclableMpscLinkedQueueNode(Recycler.Handle handle) {
|
||||
if (handle == null) {
|
||||
throw new NullPointerException("handle");
|
||||
}
|
||||
this.handle = handle;
|
||||
}
|
||||
|
||||
@Override
|
||||
final void unlink() {
|
||||
super.unlink();
|
||||
recycle(handle);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called once unliked and so ready to recycled.
|
||||
*/
|
||||
protected abstract void recycle(Recycler.Handle handle);
|
||||
}
|
62
common/src/test/java/io/netty/util/RecyclerTest.java
Normal file
62
common/src/test/java/io/netty/util/RecyclerTest.java
Normal file
@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Copyright 2014 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class RecyclerTest {
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testMultipleRecycle() {
|
||||
RecyclableObject object = RecyclableObject.newInstance();
|
||||
object.recycle();
|
||||
object.recycle();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecycle() {
|
||||
RecyclableObject object = RecyclableObject.newInstance();
|
||||
object.recycle();
|
||||
RecyclableObject object2 = RecyclableObject.newInstance();
|
||||
Assert.assertSame(object, object2);
|
||||
object2.recycle();
|
||||
}
|
||||
|
||||
static final class RecyclableObject {
|
||||
|
||||
private static final Recycler<RecyclableObject> RECYCLER = new Recycler<RecyclableObject>() {
|
||||
@Override
|
||||
protected RecyclableObject newObject(Handle handle) {
|
||||
return new RecyclableObject(handle);
|
||||
}
|
||||
};
|
||||
|
||||
private final Recycler.Handle handle;
|
||||
|
||||
private RecyclableObject(Recycler.Handle handle) {
|
||||
this.handle = handle;
|
||||
}
|
||||
|
||||
public static RecyclableObject newInstance() {
|
||||
return RECYCLER.get();
|
||||
}
|
||||
|
||||
public void recycle() {
|
||||
RECYCLER.recycle(this, handle);
|
||||
}
|
||||
}
|
||||
}
|
@ -24,6 +24,7 @@ import io.netty.util.ReferenceCountUtil;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.EventExecutorGroup;
|
||||
import io.netty.util.internal.OneTimeTask;
|
||||
import io.netty.util.internal.RecyclableMpscLinkedQueueNode;
|
||||
import io.netty.util.internal.StringUtil;
|
||||
|
||||
import java.net.SocketAddress;
|
||||
@ -892,16 +893,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
}
|
||||
}
|
||||
|
||||
abstract static class AbstractWriteTask extends OneTimeTask {
|
||||
private final Recycler.Handle handle;
|
||||
|
||||
abstract static class AbstractWriteTask extends RecyclableMpscLinkedQueueNode<Runnable> implements Runnable {
|
||||
private AbstractChannelHandlerContext ctx;
|
||||
private Object msg;
|
||||
private ChannelPromise promise;
|
||||
private int size;
|
||||
|
||||
private AbstractWriteTask(Recycler.Handle handle) {
|
||||
this.handle = handle;
|
||||
super(handle);
|
||||
}
|
||||
|
||||
protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
|
||||
@ -928,15 +927,17 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
|
||||
ctx = null;
|
||||
msg = null;
|
||||
promise = null;
|
||||
recycle(handle);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Runnable value() {
|
||||
return this;
|
||||
}
|
||||
|
||||
protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||
ctx.invokeWrite(msg, promise);
|
||||
}
|
||||
|
||||
protected abstract void recycle(Recycler.Handle handle);
|
||||
}
|
||||
|
||||
static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
|
||||
|
Loading…
x
Reference in New Issue
Block a user