Introduce FastThreadLocal which uses an EnumMap and a predefined fixed set of possible thread locals

Motivation:
Provide a faster ThreadLocal implementation

Modification:
Add a "FastThreadLocal" which uses an EnumMap and a predefined fixed set of possible thread locals (all of the static instances created by netty) that is around 10-20% faster than standard ThreadLocal in my benchmarks (and can be seen having an effect in the direct PooledByteBufAllocator benchmark that uses the DEFAULT ByteBufAllocator which uses this FastThreadLocal, as opposed to normal instantiations that do not, and in the new RecyclableArrayList benchmark);

Result:
Improved performance
This commit is contained in:
belliottsmith 2014-06-09 01:18:46 +01:00 committed by Norman Maurer
parent 3d81afb8a5
commit 2a2a21ec59
19 changed files with 302 additions and 24 deletions

View File

@ -17,6 +17,7 @@
package io.netty.buffer;
import io.netty.util.ThreadDeathWatcher;
import io.netty.util.internal.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -28,7 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
private static final int DEFAULT_NUM_HEAP_ARENA;
private static final int DEFAULT_NUM_DIRECT_ARENA;
@ -273,7 +273,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
threadCache.free();
}
final class PoolThreadLocalCache extends ThreadLocal<PoolThreadCache> {
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final AtomicInteger index = new AtomicInteger();
private boolean initialized;

View File

@ -16,9 +16,11 @@
package io.netty.handler.codec.http;
import io.netty.util.internal.FastThreadLocal;
final class CookieEncoderUtil {
static final ThreadLocal<StringBuilder> buffer = new ThreadLocal<StringBuilder>() {
static final ThreadLocal<StringBuilder> buffer = new FastThreadLocal<StringBuilder>() {
@Override
public StringBuilder get() {
StringBuilder buf = super.get();

View File

@ -15,6 +15,8 @@
*/
package io.netty.handler.codec.http;
import io.netty.util.internal.FastThreadLocal;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;
@ -38,7 +40,7 @@ final class HttpHeaderDateFormat extends SimpleDateFormat {
private final SimpleDateFormat format2 = new HttpHeaderDateFormatObsolete2();
private static final ThreadLocal<HttpHeaderDateFormat> dateFormatThreadLocal =
new ThreadLocal<HttpHeaderDateFormat>() {
new FastThreadLocal<HttpHeaderDateFormat>() {
@Override
protected HttpHeaderDateFormat initialValue() {
return new HttpHeaderDateFormat();

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.marshalling;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.internal.FastThreadLocal;
import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.MarshallingConfiguration;
@ -27,7 +28,7 @@ import org.jboss.marshalling.MarshallingConfiguration;
* many small {@link Object}'s and your actual Thread count is not to big
*/
public class ThreadLocalMarshallerProvider implements MarshallerProvider {
private final ThreadLocal<Marshaller> marshallers = new ThreadLocal<Marshaller>();
private final ThreadLocal<Marshaller> marshallers = new FastThreadLocal<Marshaller>();
private final MarshallerFactory factory;
private final MarshallingConfiguration config;

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.marshalling;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.internal.FastThreadLocal;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.MarshallingConfiguration;
import org.jboss.marshalling.Unmarshaller;
@ -27,7 +28,7 @@ import org.jboss.marshalling.Unmarshaller;
* many small {@link Object}'s.
*/
public class ThreadLocalUnmarshallerProvider implements UnmarshallerProvider {
private final ThreadLocal<Unmarshaller> unmarshallers = new ThreadLocal<Unmarshaller>();
private final ThreadLocal<Unmarshaller> unmarshallers = new FastThreadLocal<Unmarshaller>();
private final MarshallerFactory factory;
private final MarshallingConfiguration config;

View File

@ -15,6 +15,8 @@
*/
package io.netty.util;
import io.netty.util.internal.FastThreadLocal;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
@ -61,7 +63,7 @@ public final class CharsetUtil {
public static final Charset US_ASCII = Charset.forName("US-ASCII");
private static final ThreadLocal<Map<Charset, CharsetEncoder>> encoders =
new ThreadLocal<Map<Charset, CharsetEncoder>>() {
new FastThreadLocal<Map<Charset, CharsetEncoder>>() {
@Override
protected Map<Charset, CharsetEncoder> initialValue() {
return new IdentityHashMap<Charset, CharsetEncoder>();
@ -69,7 +71,7 @@ public final class CharsetUtil {
};
private static final ThreadLocal<Map<Charset, CharsetDecoder>> decoders =
new ThreadLocal<Map<Charset, CharsetDecoder>>() {
new FastThreadLocal<Map<Charset, CharsetDecoder>>() {
@Override
protected Map<Charset, CharsetDecoder> initialValue() {
return new IdentityHashMap<Charset, CharsetDecoder>();

View File

@ -23,6 +23,8 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.IdentityHashMap;
import java.util.Map;
import io.netty.util.internal.FastThreadLocal;
/**
* Light-weight object pool based on a thread-local stack.
*
@ -54,8 +56,7 @@ public abstract class Recycler<T> {
}
private final int maxCapacity;
private final ThreadLocal<Stack<T>> threadLocal = new ThreadLocal<Stack<T>>() {
private final ThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity);
@ -66,11 +67,8 @@ public abstract class Recycler<T> {
this(DEFAULT_MAX_CAPACITY);
}
protected Recycler(int maxCapacity) {
if (maxCapacity <= 0) {
maxCapacity = 0;
}
this.maxCapacity = maxCapacity;
public Recycler(int maxCapacity) {
this.maxCapacity = Math.max(0, maxCapacity);
}
public final T get() {

View File

@ -17,6 +17,7 @@ package io.netty.util.concurrent;
import io.netty.util.Signal;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -35,7 +36,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
private static final int MAX_LISTENER_STACK_DEPTH = 8;
private static final ThreadLocal<Integer> LISTENER_STACK_DEPTH = new ThreadLocal<Integer>() {
private static final ThreadLocal<Integer> LISTENER_STACK_DEPTH = new FastThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;

View File

@ -16,6 +16,7 @@
package io.netty.util.concurrent;
import io.netty.util.internal.FastThreadLocalThread;
import io.netty.util.internal.StringUtil;
import java.util.Locale;
@ -98,7 +99,7 @@ public class DefaultThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + nextId.incrementAndGet());
Thread t = newThread(r, prefix + nextId.incrementAndGet());
try {
if (t.isDaemon()) {
if (!daemon) {
@ -118,4 +119,8 @@ public class DefaultThreadFactory implements ThreadFactory {
}
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(r, name);
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A special {@link ThreadLocal} which is operating over a predefined array, so it always operate in O(1) when called
* from a {@link FastThreadLocalThread}. This permits less indirection and offers a slight performance improvement,
* so is useful when invoked frequently.
*
* The fast path is only possible on threads that extend FastThreadLocalThread, as this class
* stores the necessary state. Access by any other kind of thread falls back to a regular ThreadLocal
*
* @param <V>
*/
public class FastThreadLocal<V> extends ThreadLocal<V> {
static final Object EMPTY = new Object();
private static final AtomicInteger NEXT_INDEX = new AtomicInteger(0);
private final ThreadLocal<V> fallback = new ThreadLocal<V>() {
@Override
protected V initialValue() {
return FastThreadLocal.this.initialValue();
}
};
private final int index;
public FastThreadLocal() {
index = NEXT_INDEX.getAndIncrement();
if (index < 0) {
NEXT_INDEX.decrementAndGet();
throw new IllegalStateException("Maximal number (" + Integer.MAX_VALUE + ") of FastThreadLocal exceeded");
}
}
/**
* Set the value for the current thread
*/
@Override
public void set(V value) {
Thread thread = Thread.currentThread();
if (!(thread instanceof FastThreadLocalThread)) {
fallback.set(value);
return;
}
FastThreadLocalThread fastThread = (FastThreadLocalThread) thread;
Object[] lookup = fastThread.lookup;
if (index >= lookup.length) {
lookup = fastThread.expandArray(index);
}
lookup[index] = value;
}
/**
* Sets the value to uninitialized; a proceeding call to get() will trigger a call to initialValue()
*/
@Override
public void remove() {
Thread thread = Thread.currentThread();
if (!(thread instanceof FastThreadLocalThread)) {
fallback.remove();
return;
}
Object[] lookup = ((FastThreadLocalThread) thread).lookup;
if (index >= lookup.length) {
return;
}
lookup[index] = EMPTY;
}
/**
* @return the current value for the current thread
*/
@Override
@SuppressWarnings("unchecked")
public V get() {
Thread thread = Thread.currentThread();
if (!(thread instanceof FastThreadLocalThread)) {
return fallback.get();
}
FastThreadLocalThread fastThread = (FastThreadLocalThread) thread;
Object[] lookup = fastThread.lookup;
Object v;
if (index >= lookup.length) {
v = initialValue();
lookup = fastThread.expandArray(index);
lookup[index] = v;
} else {
v = lookup[index];
if (v == EMPTY) {
v = initialValue();
lookup[index] = v;
}
}
return (V) v;
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.util.Arrays;
/**
* To utilise the {@link FastThreadLocal} fast-path, all threads accessing a {@link FastThreadLocal} must extend this
* class.
*/
public class FastThreadLocalThread extends Thread {
Object[] lookup = newArray();
public FastThreadLocalThread() { }
public FastThreadLocalThread(Runnable target) {
super(target);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target) {
super(group, target);
}
public FastThreadLocalThread(String name) {
super(name);
}
public FastThreadLocalThread(ThreadGroup group, String name) {
super(group, name);
}
public FastThreadLocalThread(Runnable target, String name) {
super(target, name);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) {
super(group, target, name);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) {
super(group, target, name, stackSize);
}
private static Object[] newArray() {
Object[] array = new Object[32];
Arrays.fill(array, FastThreadLocal.EMPTY);
return array;
}
Object[] expandArray(int length) {
int newCapacity = lookup.length;
do {
// double capacity until it is big enough
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (length > newCapacity);
Object[] array = new Object[newCapacity];
System.arraycopy(lookup, 0, array, 0, lookup.length);
Arrays.fill(array, lookup.length, array.length, FastThreadLocal.EMPTY);
lookup = array;
return lookup;
}
}

View File

@ -31,7 +31,7 @@ public abstract class TypeParameterMatcher {
private static final Object TEST_OBJECT = new Object();
private static final ThreadLocal<Map<Class<?>, TypeParameterMatcher>> getCache =
new ThreadLocal<Map<Class<?>, TypeParameterMatcher>>() {
new FastThreadLocal<Map<Class<?>, TypeParameterMatcher>>() {
@Override
protected Map<Class<?>, TypeParameterMatcher> initialValue() {
return new IdentityHashMap<Class<?>, TypeParameterMatcher>();
@ -69,7 +69,7 @@ public abstract class TypeParameterMatcher {
}
private static final ThreadLocal<Map<Class<?>, Map<String, TypeParameterMatcher>>> findCache =
new ThreadLocal<Map<Class<?>, Map<String, TypeParameterMatcher>>>() {
new FastThreadLocal<Map<Class<?>, Map<String, TypeParameterMatcher>>>() {
@Override
protected Map<Class<?>, Map<String, TypeParameterMatcher>> initialValue() {
return new IdentityHashMap<Class<?>, Map<String, TypeParameterMatcher>>();

View File

@ -19,6 +19,7 @@ package io.netty.handler.ssl.util;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.FastThreadLocal;
import javax.net.ssl.ManagerFactoryParameters;
import javax.net.ssl.TrustManager;
@ -67,7 +68,7 @@ public final class FingerprintTrustManagerFactory extends SimpleTrustManagerFact
private static final int SHA1_BYTE_LEN = 20;
private static final int SHA1_HEX_LEN = SHA1_BYTE_LEN * 2;
private static final ThreadLocal<MessageDigest> tlmd = new ThreadLocal<MessageDigest>() {
private static final ThreadLocal<MessageDigest> tlmd = new FastThreadLocal<MessageDigest>() {
@Override
protected MessageDigest initialValue() {
try {

View File

@ -16,6 +16,8 @@
package io.netty.handler.ssl.util;
import io.netty.util.internal.FastThreadLocal;
import javax.net.ssl.ManagerFactoryParameters;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
@ -43,7 +45,7 @@ public abstract class SimpleTrustManagerFactory extends TrustManagerFactory {
* To work around this issue, we use an ugly hack which uses a {@link ThreadLocal}.
*/
private static final ThreadLocal<SimpleTrustManagerFactorySpi> CURRENT_SPI =
new ThreadLocal<SimpleTrustManagerFactorySpi>() {
new FastThreadLocal<SimpleTrustManagerFactorySpi>() {
@Override
protected SimpleTrustManagerFactorySpi initialValue() {
return new SimpleTrustManagerFactorySpi();

View File

@ -83,4 +83,16 @@ public class ByteBufAllocatorBenchmark extends AbstractMicrobenchmark {
}
pooledDirectBuffers[idx] = pooledAllocator.directBuffer(size);
}
@GenerateMicroBenchmark
public void defaultPooledHeapAllocAndFree() {
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(size);
buffer.release();
}
@GenerateMicroBenchmark
public void defaultPooledDirectAllocAndFree() {
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
buffer.release();
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.microbench.internal;
import io.netty.microbench.util.AbstractMicrobenchmark;
import io.netty.util.internal.RecyclableArrayList;
import org.openjdk.jmh.annotations.GenerateMicroBenchmark;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Threads;
/**
* This class benchmarks different allocators with different allocation sizes.
*/
@Threads(4)
@Measurement(iterations = 10, batchSize = 100)
public class RecyclableArrayListBenchmark extends AbstractMicrobenchmark {
@Param({ "00000", "00256", "01024", "04096", "16384", "65536" })
public int size;
@GenerateMicroBenchmark
public void recycleSameThread() {
RecyclableArrayList list = RecyclableArrayList.newInstance(size);
list.recycle();
}
}

View File

@ -16,6 +16,7 @@
package io.netty.microbench.util;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.internal.SystemPropertyUtil;
import org.junit.Test;
import org.openjdk.jmh.annotations.Fork;
@ -29,6 +30,9 @@ import org.openjdk.jmh.runner.options.ChainedOptionsBuilder;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.io.File;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Base class for all JMH benchmarks.
@ -43,11 +47,21 @@ public class AbstractMicrobenchmark {
protected static final int DEFAULT_MEASURE_ITERATIONS = 10;
protected static final int DEFAULT_FORKS = 2;
public static final class HarnessExecutor extends ThreadPoolExecutor {
public HarnessExecutor(int maxThreads, String prefix) {
super(0, maxThreads, 1L, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
new DefaultThreadFactory(prefix));
System.out.println("Using harness executor");
}
}
protected static final String[] JVM_ARGS = {
"-server", "-dsa", "-da", "-ea:io.netty...", "-Xms768m", "-Xmx768m",
"-XX:MaxDirectMemorySize=768m", "-XX:+AggressiveOpts", "-XX:+UseBiasedLocking",
"-XX:+UseFastAccessorMethods", "-XX:+UseStringCache", "-XX:+OptimizeStringConcat",
"-XX:+HeapDumpOnOutOfMemoryError", "-Dio.netty.noResourceLeakDetection"
"-XX:+HeapDumpOnOutOfMemoryError", "-Dio.netty.noResourceLeakDetection",
"-Dharness.executor=CUSTOM",
"-Dharness.executor.class=io.netty.microbench.util.AbstractMicrobenchmark$HarnessExecutor"
};
static {

View File

@ -16,6 +16,8 @@
package io.netty.channel;
import io.netty.util.internal.FastThreadLocal;
import java.util.Map;
import java.util.WeakHashMap;
@ -33,7 +35,7 @@ public abstract class ChannelHandlerAdapter implements ChannelHandler {
* See <a href="See https://github.com/netty/netty/issues/2289">#2289</a>.
*/
private static final ThreadLocal<Map<Class<?>, Boolean>> SHARABLE_CACHE =
new ThreadLocal<Map<Class<?>, Boolean>>() {
new FastThreadLocal<Map<Class<?>, Boolean>>() {
@Override
protected Map<Class<?>, Boolean> initialValue() {
// Start with small capacity to keep memory overhead as low as possible.

View File

@ -28,6 +28,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.FastThreadLocal;
import java.net.SocketAddress;
import java.nio.channels.AlreadyConnectedException;
@ -48,7 +49,7 @@ public class LocalChannel extends AbstractChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final int MAX_READER_STACK_DEPTH = 8;
private static final ThreadLocal<Integer> READER_STACK_DEPTH = new ThreadLocal<Integer>() {
private static final ThreadLocal<Integer> READER_STACK_DEPTH = new FastThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return 0;