Introduce FastThreadLocal which uses an EnumMap and a predefined fixed set of possible thread locals

Motivation:
Provide a faster ThreadLocal implementation

Modification:
Add a "FastThreadLocal" which uses an EnumMap and a predefined fixed set of possible thread locals (all of the static instances created by netty) that is around 10-20% faster than standard ThreadLocal in my benchmarks (and can be seen having an effect in the direct PooledByteBufAllocator benchmark that uses the DEFAULT ByteBufAllocator which uses this FastThreadLocal, as opposed to normal instantiations that do not, and in the new RecyclableArrayList benchmark);

Result:
Improved performance
This commit is contained in:
belliottsmith 2014-06-09 01:18:46 +01:00 committed by Norman Maurer
parent cf1d9823a0
commit 1ac2ff8d7b
16 changed files with 315 additions and 31 deletions

View File

@ -17,6 +17,7 @@
package io.netty.buffer;
import io.netty.util.ThreadDeathWatcher;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -118,7 +119,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
}
public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
new PooledByteBufAllocator(FastThreadLocal.Type.PooledByteBufAllocator_DefaultAllocator);
private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas;
@ -147,8 +148,21 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize,
normalCacheSize, null);
}
private PooledByteBufAllocator(FastThreadLocal.Type fastThreadLocalType) {
this(PlatformDependent.directBufferPreferred(),
DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER,
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE, fastThreadLocalType);
}
private PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
FastThreadLocal.Type fastThreadLocalType) {
super(preferDirect);
threadCache = new PoolThreadLocalCache();
threadCache = new PoolThreadLocalCache(fastThreadLocalType);
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
@ -188,7 +202,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
long cacheThreadAliveCheckInterval) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
tinyCacheSize, smallCacheSize, normalCacheSize);
tinyCacheSize, smallCacheSize, normalCacheSize, null);
}
@SuppressWarnings("unchecked")
@ -282,10 +296,14 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
threadCache.free();
}
final class PoolThreadLocalCache extends ThreadLocal<PoolThreadCache> {
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final AtomicInteger index = new AtomicInteger();
private boolean initialized;
PoolThreadLocalCache(FastThreadLocal.Type fastThreadLocalType) {
super(fastThreadLocalType);
}
@Override
protected PoolThreadCache initialValue() {
final int idx = index.getAndIncrement();

View File

@ -17,6 +17,7 @@
package io.netty.buffer;
import io.netty.util.Recycler;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.io.InputStream;
@ -28,7 +29,8 @@ import java.nio.channels.ScatteringByteChannel;
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
private static final Recycler<PooledDirectByteBuf> RECYCLER
= new Recycler<PooledDirectByteBuf>(FastThreadLocal.Type.PooledDirectByteBuf_Recycler) {
@Override
protected PooledDirectByteBuf newObject(Handle handle) {
return new PooledDirectByteBuf(handle, 0);

View File

@ -15,6 +15,7 @@
package io.netty.buffer;
import io.netty.util.Recycler;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
@ -27,7 +28,8 @@ import java.nio.channels.ScatteringByteChannel;
final class PooledHeapByteBuf extends PooledByteBuf<byte[]> {
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
private static final Recycler<PooledHeapByteBuf> RECYCLER
= new Recycler<PooledHeapByteBuf>(FastThreadLocal.Type.PooledHeapByteBuf_Recycler) {
@Override
protected PooledHeapByteBuf newObject(Handle handle) {
return new PooledHeapByteBuf(handle, 0);

View File

@ -17,6 +17,7 @@
package io.netty.buffer;
import io.netty.util.Recycler;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import java.io.IOException;
@ -32,7 +33,8 @@ final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER
= new Recycler<PooledUnsafeDirectByteBuf>(FastThreadLocal.Type.PooledUnsafeDirectByteBuf_Recycler) {
@Override
protected PooledUnsafeDirectByteBuf newObject(Handle handle) {
return new PooledUnsafeDirectByteBuf(handle, 0);

View File

@ -23,6 +23,8 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.IdentityHashMap;
import java.util.Map;
import io.netty.util.concurrent.FastThreadLocal;
/**
* Light-weight object pool based on a thread-local stack.
*
@ -54,23 +56,21 @@ public abstract class Recycler<T> {
}
private final int maxCapacity;
private final FastThreadLocal<Stack<T>> threadLocal;
private final ThreadLocal<Stack<T>> threadLocal = new ThreadLocal<Stack<T>>() {
protected Recycler(FastThreadLocal.Type type) {
this(DEFAULT_MAX_CAPACITY, type);
}
public Recycler(int maxCapacity, FastThreadLocal.Type type) {
super();
this.maxCapacity = Math.max(0, maxCapacity);
threadLocal = new FastThreadLocal<Stack<T>>(type) {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity);
return new Stack<T>(Recycler.this, Thread.currentThread(), Recycler.this.maxCapacity);
}
};
protected Recycler() {
this(DEFAULT_MAX_CAPACITY);
}
protected Recycler(int maxCapacity) {
if (maxCapacity <= 0) {
maxCapacity = 0;
}
this.maxCapacity = maxCapacity;
}
public final T get() {

View File

@ -35,7 +35,8 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
private static final int MAX_LISTENER_STACK_DEPTH = 8;
private static final ThreadLocal<Integer> LISTENER_STACK_DEPTH = new ThreadLocal<Integer>() {
private static final FastThreadLocal<Integer> LISTENER_STACK_DEPTH
= new FastThreadLocal<Integer>(FastThreadLocal.Type.DefaultPromise_ListenerStackDepth) {
@Override
protected Integer initialValue() {
return 0;

View File

@ -98,7 +98,7 @@ public class DefaultThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + nextId.incrementAndGet());
Thread t = new FastThreadLocal.FastThreadLocalThread(r, prefix + nextId.incrementAndGet());
try {
if (t.isDaemon()) {
if (!daemon) {

View File

@ -0,0 +1,176 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.EnumMap;
/**
* A simple class providing equivalent functionality to java.lang,ThreadLocal, but operating
* over a predefined hash range, so that we can hash perfectly. This permits less indirection
* and offers a slight performance improvement, so is useful when invoked frequently.
*
* The fast path is only possible on threads that extend FastThreadLocalThread, as this class
* stores the necessary state. Access by any other kind of thread falls back to a regular ThreadLocal
*
* @param <V>
*/
public class FastThreadLocal<V> {
public static enum Type {
LocalChannel_ReaderStackDepth,
PooledDirectByteBuf_Recycler,
PooledHeapByteBuf_Recycler,
PooledUnsafeDirectByteBuf_Recycler,
ChannelOutboundBuffer_Recycler,
ChannelOutboundBuffer_PooledByteBuf_Recycler,
DefaultChannelHandlerContext_WriteAndFlushTask_Recycler,
DefaultChannelHandlerContext_WriteTask_Recycler,
PendingWrite_Recycler,
RecyclableArrayList_Recycler,
DefaultPromise_ListenerStackDepth,
PooledByteBufAllocator_DefaultAllocator
}
// the set of already defined FastThreadLocals
private static final EnumMap<Type, Boolean> SET = new EnumMap<Type, Boolean>(Type.class);
// the type values, for cheap iteration
private static final Type[] TYPES = Type.values();
// a marker to indicate a value has not yet been initialised
private static final Object EMPTY = new Object();
/**
* To utilise the FastThreadLocal fast-path, all threads accessing a FastThreadLocal must extend this class
*/
public static class FastThreadLocalThread extends Thread {
private final EnumMap<Type, Object> lookup = initialMap();
static EnumMap<Type, Object> initialMap() {
EnumMap<Type, Object> r = new EnumMap<Type, Object>(Type.class);
for (Type type : TYPES) {
r.put(type, EMPTY);
}
return r;
}
public FastThreadLocalThread() {
super();
}
public FastThreadLocalThread(Runnable target) {
super(target);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target) {
super(group, target);
}
public FastThreadLocalThread(String name) {
super(name);
}
public FastThreadLocalThread(ThreadGroup group, String name) {
super(group, name);
}
public FastThreadLocalThread(Runnable target, String name) {
super(target, name);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) {
super(group, target, name);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) {
super(group, target, name, stackSize);
}
}
final Type type;
final ThreadLocal<V> fallback = new ThreadLocal<V>() {
protected V initialValue() {
return FastThreadLocal.this.initialValue();
}
};
/**
* @param type the predefined type this FastThreadLocal represents; each type may be used only once
* globally in a single VM
*/
public FastThreadLocal(Type type) {
if (type != null) {
synchronized (SET) {
if (SET.put(type, Boolean.TRUE) != null) {
throw new IllegalStateException(type + " has been assigned multiple times");
}
}
}
this.type = type;
}
/**
* Override this method to define the default value to assign
* when a thread first calls get() without a preceding set()
*
* @return the initial value
*/
protected V initialValue() {
return null;
}
/**
* Set the value for the current thread
* @param value
*/
public void set(V value) {
Thread thread = Thread.currentThread();
if (type == null || !(thread instanceof FastThreadLocalThread)) {
fallback.set(value);
return;
}
EnumMap<Type, Object> lookup = ((FastThreadLocalThread) thread).lookup;
lookup.put(type, value);
}
/**
* Sets the value to uninitialized; a proceeding call to get() will trigger a call to initialValue()
*/
public void remove() {
Thread thread = Thread.currentThread();
if (type == null || !(thread instanceof FastThreadLocalThread)) {
fallback.remove();
return;
}
EnumMap<Type, Object> lookup = ((FastThreadLocalThread) thread).lookup;
lookup.put(type, EMPTY);
}
/**
* @return the current value for the current thread
*/
public final V get() {
Thread thread = Thread.currentThread();
if (type == null || !(thread instanceof FastThreadLocalThread)) {
return fallback.get();
}
EnumMap<Type, Object> lookup = ((FastThreadLocalThread) thread).lookup;
Object v = lookup.get(type);
if (v == EMPTY) {
lookup.put(type, v = initialValue());
}
return (V) v;
}
}

View File

@ -17,13 +17,17 @@ package io.netty.util.internal;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Promise;
import static io.netty.util.concurrent.FastThreadLocal.Type.PendingWrite_Recycler;
/**
* Some pending write which should be picked up later.
*/
public final class PendingWrite {
private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() {
private static final Recycler<PendingWrite> RECYCLER
= new Recycler<PendingWrite>(PendingWrite_Recycler) {
@Override
protected PendingWrite newObject(Handle handle) {
return new PendingWrite(handle);

View File

@ -18,6 +18,7 @@ package io.netty.util.internal;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collection;
@ -33,7 +34,8 @@ public final class RecyclableArrayList extends ArrayList<Object> {
private static final int DEFAULT_INITIAL_CAPACITY = 8;
private static final Recycler<RecyclableArrayList> RECYCLER = new Recycler<RecyclableArrayList>() {
private static final Recycler<RecyclableArrayList> RECYCLER
= new Recycler<RecyclableArrayList>(FastThreadLocal.Type.RecyclableArrayList_Recycler) {
@Override
protected RecyclableArrayList newObject(Handle handle) {
return new RecyclableArrayList(handle);

View File

@ -83,4 +83,16 @@ public class ByteBufAllocatorBenchmark extends AbstractMicrobenchmark {
}
pooledDirectBuffers[idx] = pooledAllocator.directBuffer(size);
}
@GenerateMicroBenchmark
public void defaultPooledHeapAllocAndFree() {
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(size);
buffer.release();
}
@GenerateMicroBenchmark
public void defaultPooledDirectAllocAndFree() {
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
buffer.release();
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.microbench.internal;
import io.netty.microbench.util.AbstractMicrobenchmark;
import io.netty.util.internal.RecyclableArrayList;
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Threads;
/**
* This class benchmarks different allocators with different allocation sizes.
*/
@Threads(4)
@Measurement(iterations = 10, batchSize = 100)
public class RecyclableArrayListBenchmark extends AbstractMicrobenchmark {
@Param({ "00000", "00256", "01024", "04096", "16384", "65536" })
public int size;
@GenerateMicroBenchmark
public void recycleSameThread() {
RecyclableArrayList list = RecyclableArrayList.newInstance(size);
list.recycle();
}
}

View File

@ -16,6 +16,7 @@
package io.netty.microbench.util;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.SystemPropertyUtil;
import org.junit.Test;
import org.openjdk.jmh.annotations.Fork;
@ -29,6 +30,9 @@ import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.File;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Base class for all JMH benchmarks.
@ -43,11 +47,21 @@ public class AbstractMicrobenchmark {
protected static final int DEFAULT_MEASURE_ITERATIONS = 10;
protected static final int DEFAULT_FORKS = 2;
public static final class HarnessExecutor extends ThreadPoolExecutor {
public HarnessExecutor(int maxThreads, String prefix) {
super(0, maxThreads, 1L, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
new DefaultThreadFactory(prefix));
System.out.println("Using harness executor");
}
}
protected static final String[] JVM_ARGS = {
"-server", "-dsa", "-da", "-ea:io.netty...", "-Xms768m", "-Xmx768m",
"-XX:MaxDirectMemorySize=768m", "-XX:+AggressiveOpts", "-XX:+UseBiasedLocking",
"-XX:+UseFastAccessorMethods", "-XX:+UseStringCache", "-XX:+OptimizeStringConcat",
"-XX:+HeapDumpOnOutOfMemoryError", "-Dio.netty.noResourceLeakDetection"
"-XX:+HeapDumpOnOutOfMemoryError", "-Dio.netty.noResourceLeakDetection",
"-Dharness.executor=CUSTOM",
"-Dharness.executor.class=io.netty.microbench.util.AbstractMicrobenchmark$HarnessExecutor"
};
static {

View File

@ -16,12 +16,16 @@
package io.netty.channel;
import static io.netty.channel.DefaultChannelPipeline.logger;
import static io.netty.util.concurrent.FastThreadLocal.Type.DefaultChannelHandlerContext_WriteAndFlushTask_Recycler;
import static io.netty.util.concurrent.FastThreadLocal.Type.DefaultChannelHandlerContext_WriteTask_Recycler;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.StringUtil;
@ -940,7 +944,8 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
private static final Recycler<WriteTask> RECYCLER =
new Recycler<WriteTask>(DefaultChannelHandlerContext_WriteTask_Recycler) {
@Override
protected WriteTask newObject(Handle handle) {
return new WriteTask(handle);
@ -966,7 +971,8 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
static final class WriteAndFlushTask extends AbstractWriteTask {
private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
private static final Recycler<WriteAndFlushTask> RECYCLER =
new Recycler<WriteAndFlushTask>(DefaultChannelHandlerContext_WriteAndFlushTask_Recycler) {
@Override
protected WriteAndFlushTask newObject(Handle handle) {
return new WriteAndFlushTask(handle);

View File

@ -29,6 +29,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -57,7 +58,8 @@ public final class ChannelOutboundBuffer {
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize);
}
private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
private static final Recycler<ChannelOutboundBuffer> RECYCLER
= new Recycler<ChannelOutboundBuffer>(FastThreadLocal.Type.ChannelOutboundBuffer_Recycler) {
@Override
protected ChannelOutboundBuffer newObject(Handle handle) {
return new ChannelOutboundBuffer(handle);
@ -693,7 +695,8 @@ public final class ChannelOutboundBuffer {
static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf {
private final Recycler.Handle handle;
private static final Recycler<ThreadLocalPooledByteBuf> RECYCLER = new Recycler<ThreadLocalPooledByteBuf>() {
private static final Recycler<ThreadLocalPooledByteBuf> RECYCLER
= new Recycler<ThreadLocalPooledByteBuf>(FastThreadLocal.Type.ChannelOutboundBuffer_PooledByteBuf_Recycler) {
@Override
protected ThreadLocalPooledByteBuf newObject(Handle handle) {
return new ThreadLocalPooledByteBuf(handle);

View File

@ -27,6 +27,7 @@ import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.net.SocketAddress;
@ -46,7 +47,8 @@ public class LocalChannel extends AbstractChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final int MAX_READER_STACK_DEPTH = 8;
private static final ThreadLocal<Integer> READER_STACK_DEPTH = new ThreadLocal<Integer>() {
private static final FastThreadLocal<Integer> READER_STACK_DEPTH
= new FastThreadLocal<Integer>(FastThreadLocal.Type.LocalChannel_ReaderStackDepth) {
@Override
protected Integer initialValue() {
return 0;