Make use of an array to store FastThreadLocals and so allow to also use it in PooledByteBufAllocator that is instanced by users.

Motivation:
Allow to make use of our new FastThreadLocal whereever possible

Modification:
Make use of an array to store FastThreadLocals and so allow to also use it in PooledByteBufAllocator that is instanced by users.
The maximal size of the array is configurable per system property to allow to tune it if needed. As default we use 64 entries which should be good enough.

Result:
More flexible usage of FastThreadLocal
This commit is contained in:
Norman Maurer 2014-06-10 08:04:20 +02:00
parent 1ac2ff8d7b
commit 76043bc8c8
24 changed files with 251 additions and 256 deletions

View File

@ -17,7 +17,7 @@
package io.netty.buffer; package io.netty.buffer;
import io.netty.util.ThreadDeathWatcher; import io.netty.util.ThreadDeathWatcher;
import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.FastThreadLocal;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
public class PooledByteBufAllocator extends AbstractByteBufAllocator { public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
private static final int DEFAULT_NUM_HEAP_ARENA; private static final int DEFAULT_NUM_HEAP_ARENA;
private static final int DEFAULT_NUM_DIRECT_ARENA; private static final int DEFAULT_NUM_DIRECT_ARENA;
@ -119,7 +118,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
} }
public static final PooledByteBufAllocator DEFAULT = public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(FastThreadLocal.Type.PooledByteBufAllocator_DefaultAllocator); new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
private final PoolArena<byte[]>[] heapArenas; private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas; private final PoolArena<ByteBuffer>[] directArenas;
@ -148,21 +147,8 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) { int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize,
normalCacheSize, null);
}
private PooledByteBufAllocator(FastThreadLocal.Type fastThreadLocalType) {
this(PlatformDependent.directBufferPreferred(),
DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER,
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE, fastThreadLocalType);
}
private PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
FastThreadLocal.Type fastThreadLocalType) {
super(preferDirect); super(preferDirect);
threadCache = new PoolThreadLocalCache(fastThreadLocalType); threadCache = new PoolThreadLocalCache();
this.tinyCacheSize = tinyCacheSize; this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize; this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize; this.normalCacheSize = normalCacheSize;
@ -202,7 +188,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
int tinyCacheSize, int smallCacheSize, int normalCacheSize, int tinyCacheSize, int smallCacheSize, int normalCacheSize,
long cacheThreadAliveCheckInterval) { long cacheThreadAliveCheckInterval) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
tinyCacheSize, smallCacheSize, normalCacheSize, null); tinyCacheSize, smallCacheSize, normalCacheSize);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -300,10 +286,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private final AtomicInteger index = new AtomicInteger(); private final AtomicInteger index = new AtomicInteger();
private boolean initialized; private boolean initialized;
PoolThreadLocalCache(FastThreadLocal.Type fastThreadLocalType) {
super(fastThreadLocalType);
}
@Override @Override
protected PoolThreadCache initialValue() { protected PoolThreadCache initialValue() {
final int idx = index.getAndIncrement(); final int idx = index.getAndIncrement();

View File

@ -17,7 +17,6 @@
package io.netty.buffer; package io.netty.buffer;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -29,8 +28,7 @@ import java.nio.channels.ScatteringByteChannel;
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> { final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledDirectByteBuf> RECYCLER private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
= new Recycler<PooledDirectByteBuf>(FastThreadLocal.Type.PooledDirectByteBuf_Recycler) {
@Override @Override
protected PooledDirectByteBuf newObject(Handle handle) { protected PooledDirectByteBuf newObject(Handle handle) {
return new PooledDirectByteBuf(handle, 0); return new PooledDirectByteBuf(handle, 0);

View File

@ -15,7 +15,6 @@
package io.netty.buffer; package io.netty.buffer;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import java.io.IOException; import java.io.IOException;
@ -28,8 +27,7 @@ import java.nio.channels.ScatteringByteChannel;
final class PooledHeapByteBuf extends PooledByteBuf<byte[]> { final class PooledHeapByteBuf extends PooledByteBuf<byte[]> {
private static final Recycler<PooledHeapByteBuf> RECYCLER private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
= new Recycler<PooledHeapByteBuf>(FastThreadLocal.Type.PooledHeapByteBuf_Recycler) {
@Override @Override
protected PooledHeapByteBuf newObject(Handle handle) { protected PooledHeapByteBuf newObject(Handle handle) {
return new PooledHeapByteBuf(handle, 0); return new PooledHeapByteBuf(handle, 0);

View File

@ -17,7 +17,6 @@
package io.netty.buffer; package io.netty.buffer;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import java.io.IOException; import java.io.IOException;
@ -33,8 +32,7 @@ final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
= new Recycler<PooledUnsafeDirectByteBuf>(FastThreadLocal.Type.PooledUnsafeDirectByteBuf_Recycler) {
@Override @Override
protected PooledUnsafeDirectByteBuf newObject(Handle handle) { protected PooledUnsafeDirectByteBuf newObject(Handle handle) {
return new PooledUnsafeDirectByteBuf(handle, 0); return new PooledUnsafeDirectByteBuf(handle, 0);

View File

@ -16,9 +16,11 @@
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import io.netty.util.internal.FastThreadLocal;
final class CookieEncoderUtil { final class CookieEncoderUtil {
static final ThreadLocal<StringBuilder> buffer = new ThreadLocal<StringBuilder>() { static final ThreadLocal<StringBuilder> buffer = new FastThreadLocal<StringBuilder>() {
@Override @Override
public StringBuilder get() { public StringBuilder get() {
StringBuilder buf = super.get(); StringBuilder buf = super.get();

View File

@ -15,6 +15,8 @@
*/ */
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import io.netty.util.internal.FastThreadLocal;
import java.text.ParsePosition; import java.text.ParsePosition;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.util.Date;
@ -38,7 +40,7 @@ final class HttpHeaderDateFormat extends SimpleDateFormat {
private final SimpleDateFormat format2 = new HttpHeaderDateFormatObsolete2(); private final SimpleDateFormat format2 = new HttpHeaderDateFormatObsolete2();
private static final ThreadLocal<HttpHeaderDateFormat> dateFormatThreadLocal = private static final ThreadLocal<HttpHeaderDateFormat> dateFormatThreadLocal =
new ThreadLocal<HttpHeaderDateFormat>() { new FastThreadLocal<HttpHeaderDateFormat>() {
@Override @Override
protected HttpHeaderDateFormat initialValue() { protected HttpHeaderDateFormat initialValue() {
return new HttpHeaderDateFormat(); return new HttpHeaderDateFormat();

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.marshalling;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.util.internal.FastThreadLocal;
import org.jboss.marshalling.Marshaller; import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.MarshallingConfiguration; import org.jboss.marshalling.MarshallingConfiguration;
@ -27,7 +28,7 @@ import org.jboss.marshalling.MarshallingConfiguration;
* many small {@link Object}'s and your actual Thread count is not to big * many small {@link Object}'s and your actual Thread count is not to big
*/ */
public class ThreadLocalMarshallerProvider implements MarshallerProvider { public class ThreadLocalMarshallerProvider implements MarshallerProvider {
private final ThreadLocal<Marshaller> marshallers = new ThreadLocal<Marshaller>(); private final ThreadLocal<Marshaller> marshallers = new FastThreadLocal<Marshaller>();
private final MarshallerFactory factory; private final MarshallerFactory factory;
private final MarshallingConfiguration config; private final MarshallingConfiguration config;

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.marshalling;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.util.internal.FastThreadLocal;
import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.MarshallingConfiguration; import org.jboss.marshalling.MarshallingConfiguration;
import org.jboss.marshalling.Unmarshaller; import org.jboss.marshalling.Unmarshaller;
@ -27,7 +28,7 @@ import org.jboss.marshalling.Unmarshaller;
* many small {@link Object}'s. * many small {@link Object}'s.
*/ */
public class ThreadLocalUnmarshallerProvider implements UnmarshallerProvider { public class ThreadLocalUnmarshallerProvider implements UnmarshallerProvider {
private final ThreadLocal<Unmarshaller> unmarshallers = new ThreadLocal<Unmarshaller>(); private final ThreadLocal<Unmarshaller> unmarshallers = new FastThreadLocal<Unmarshaller>();
private final MarshallerFactory factory; private final MarshallerFactory factory;
private final MarshallingConfiguration config; private final MarshallingConfiguration config;

View File

@ -15,6 +15,8 @@
*/ */
package io.netty.util; package io.netty.util;
import io.netty.util.internal.FastThreadLocal;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder; import java.nio.charset.CharsetEncoder;
@ -61,7 +63,7 @@ public final class CharsetUtil {
public static final Charset US_ASCII = Charset.forName("US-ASCII"); public static final Charset US_ASCII = Charset.forName("US-ASCII");
private static final ThreadLocal<Map<Charset, CharsetEncoder>> encoders = private static final ThreadLocal<Map<Charset, CharsetEncoder>> encoders =
new ThreadLocal<Map<Charset, CharsetEncoder>>() { new FastThreadLocal<Map<Charset, CharsetEncoder>>() {
@Override @Override
protected Map<Charset, CharsetEncoder> initialValue() { protected Map<Charset, CharsetEncoder> initialValue() {
return new IdentityHashMap<Charset, CharsetEncoder>(); return new IdentityHashMap<Charset, CharsetEncoder>();
@ -69,7 +71,7 @@ public final class CharsetUtil {
}; };
private static final ThreadLocal<Map<Charset, CharsetDecoder>> decoders = private static final ThreadLocal<Map<Charset, CharsetDecoder>> decoders =
new ThreadLocal<Map<Charset, CharsetDecoder>>() { new FastThreadLocal<Map<Charset, CharsetDecoder>>() {
@Override @Override
protected Map<Charset, CharsetDecoder> initialValue() { protected Map<Charset, CharsetDecoder> initialValue() {
return new IdentityHashMap<Charset, CharsetDecoder>(); return new IdentityHashMap<Charset, CharsetDecoder>();

View File

@ -23,7 +23,7 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.IdentityHashMap; import java.util.IdentityHashMap;
import java.util.Map; import java.util.Map;
import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.FastThreadLocal;
/** /**
* Light-weight object pool based on a thread-local stack. * Light-weight object pool based on a thread-local stack.
@ -56,21 +56,19 @@ public abstract class Recycler<T> {
} }
private final int maxCapacity; private final int maxCapacity;
private final FastThreadLocal<Stack<T>> threadLocal; private final ThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacity);
}
};
protected Recycler(FastThreadLocal.Type type) { protected Recycler() {
this(DEFAULT_MAX_CAPACITY, type); this(DEFAULT_MAX_CAPACITY);
} }
public Recycler(int maxCapacity, FastThreadLocal.Type type) { public Recycler(int maxCapacity) {
super();
this.maxCapacity = Math.max(0, maxCapacity); this.maxCapacity = Math.max(0, maxCapacity);
threadLocal = new FastThreadLocal<Stack<T>>(type) {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), Recycler.this.maxCapacity);
}
};
} }
public final T get() { public final T get() {

View File

@ -17,6 +17,7 @@ package io.netty.util.concurrent;
import io.netty.util.Signal; import io.netty.util.Signal;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.FastThreadLocal;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -35,8 +36,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution"); InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");
private static final int MAX_LISTENER_STACK_DEPTH = 8; private static final int MAX_LISTENER_STACK_DEPTH = 8;
private static final FastThreadLocal<Integer> LISTENER_STACK_DEPTH private static final ThreadLocal<Integer> LISTENER_STACK_DEPTH = new FastThreadLocal<Integer>() {
= new FastThreadLocal<Integer>(FastThreadLocal.Type.DefaultPromise_ListenerStackDepth) {
@Override @Override
protected Integer initialValue() { protected Integer initialValue() {
return 0; return 0;

View File

@ -16,6 +16,7 @@
package io.netty.util.concurrent; package io.netty.util.concurrent;
import io.netty.util.internal.FastThreadLocalThread;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.util.Locale; import java.util.Locale;
@ -98,7 +99,7 @@ public class DefaultThreadFactory implements ThreadFactory {
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread t = new FastThreadLocal.FastThreadLocalThread(r, prefix + nextId.incrementAndGet()); Thread t = newThread(r, prefix + nextId.incrementAndGet());
try { try {
if (t.isDaemon()) { if (t.isDaemon()) {
if (!daemon) { if (!daemon) {
@ -118,4 +119,8 @@ public class DefaultThreadFactory implements ThreadFactory {
} }
return t; return t;
} }
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(r, name);
}
} }

View File

@ -1,176 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import java.util.EnumMap;
/**
* A simple class providing equivalent functionality to java.lang,ThreadLocal, but operating
* over a predefined hash range, so that we can hash perfectly. This permits less indirection
* and offers a slight performance improvement, so is useful when invoked frequently.
*
* The fast path is only possible on threads that extend FastThreadLocalThread, as this class
* stores the necessary state. Access by any other kind of thread falls back to a regular ThreadLocal
*
* @param <V>
*/
public class FastThreadLocal<V> {
public static enum Type {
LocalChannel_ReaderStackDepth,
PooledDirectByteBuf_Recycler,
PooledHeapByteBuf_Recycler,
PooledUnsafeDirectByteBuf_Recycler,
ChannelOutboundBuffer_Recycler,
ChannelOutboundBuffer_PooledByteBuf_Recycler,
DefaultChannelHandlerContext_WriteAndFlushTask_Recycler,
DefaultChannelHandlerContext_WriteTask_Recycler,
PendingWrite_Recycler,
RecyclableArrayList_Recycler,
DefaultPromise_ListenerStackDepth,
PooledByteBufAllocator_DefaultAllocator
}
// the set of already defined FastThreadLocals
private static final EnumMap<Type, Boolean> SET = new EnumMap<Type, Boolean>(Type.class);
// the type values, for cheap iteration
private static final Type[] TYPES = Type.values();
// a marker to indicate a value has not yet been initialised
private static final Object EMPTY = new Object();
/**
* To utilise the FastThreadLocal fast-path, all threads accessing a FastThreadLocal must extend this class
*/
public static class FastThreadLocalThread extends Thread {
private final EnumMap<Type, Object> lookup = initialMap();
static EnumMap<Type, Object> initialMap() {
EnumMap<Type, Object> r = new EnumMap<Type, Object>(Type.class);
for (Type type : TYPES) {
r.put(type, EMPTY);
}
return r;
}
public FastThreadLocalThread() {
super();
}
public FastThreadLocalThread(Runnable target) {
super(target);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target) {
super(group, target);
}
public FastThreadLocalThread(String name) {
super(name);
}
public FastThreadLocalThread(ThreadGroup group, String name) {
super(group, name);
}
public FastThreadLocalThread(Runnable target, String name) {
super(target, name);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) {
super(group, target, name);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) {
super(group, target, name, stackSize);
}
}
final Type type;
final ThreadLocal<V> fallback = new ThreadLocal<V>() {
protected V initialValue() {
return FastThreadLocal.this.initialValue();
}
};
/**
* @param type the predefined type this FastThreadLocal represents; each type may be used only once
* globally in a single VM
*/
public FastThreadLocal(Type type) {
if (type != null) {
synchronized (SET) {
if (SET.put(type, Boolean.TRUE) != null) {
throw new IllegalStateException(type + " has been assigned multiple times");
}
}
}
this.type = type;
}
/**
* Override this method to define the default value to assign
* when a thread first calls get() without a preceding set()
*
* @return the initial value
*/
protected V initialValue() {
return null;
}
/**
* Set the value for the current thread
* @param value
*/
public void set(V value) {
Thread thread = Thread.currentThread();
if (type == null || !(thread instanceof FastThreadLocalThread)) {
fallback.set(value);
return;
}
EnumMap<Type, Object> lookup = ((FastThreadLocalThread) thread).lookup;
lookup.put(type, value);
}
/**
* Sets the value to uninitialized; a proceeding call to get() will trigger a call to initialValue()
*/
public void remove() {
Thread thread = Thread.currentThread();
if (type == null || !(thread instanceof FastThreadLocalThread)) {
fallback.remove();
return;
}
EnumMap<Type, Object> lookup = ((FastThreadLocalThread) thread).lookup;
lookup.put(type, EMPTY);
}
/**
* @return the current value for the current thread
*/
public final V get() {
Thread thread = Thread.currentThread();
if (type == null || !(thread instanceof FastThreadLocalThread)) {
return fallback.get();
}
EnumMap<Type, Object> lookup = ((FastThreadLocalThread) thread).lookup;
Object v = lookup.get(type);
if (v == EMPTY) {
lookup.put(type, v = initialValue());
}
return (V) v;
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A special {@link ThreadLocal} which is operating over a predefined array, so it always operate in O(1) when called
* from a {@link FastThreadLocalThread}. This permits less indirection and offers a slight performance improvement,
* so is useful when invoked frequently.
*
* The fast path is only possible on threads that extend FastThreadLocalThread, as this class
* stores the necessary state. Access by any other kind of thread falls back to a regular ThreadLocal
*
* @param <V>
*/
public class FastThreadLocal<V> extends ThreadLocal<V> {
static final Object EMPTY = new Object();
private static final AtomicInteger NEXT_INDEX = new AtomicInteger(0);
private final ThreadLocal<V> fallback = new ThreadLocal<V>() {
@Override
protected V initialValue() {
return FastThreadLocal.this.initialValue();
}
};
private final int index;
public FastThreadLocal() {
index = NEXT_INDEX.getAndIncrement();
if (index < 0) {
NEXT_INDEX.decrementAndGet();
throw new IllegalStateException("Maximal number (" + Integer.MAX_VALUE + ") of FastThreadLocal exceeded");
}
}
/**
* Set the value for the current thread
*/
@Override
public void set(V value) {
Thread thread = Thread.currentThread();
if (!(thread instanceof FastThreadLocalThread)) {
fallback.set(value);
return;
}
FastThreadLocalThread fastThread = (FastThreadLocalThread) thread;
Object[] lookup = fastThread.lookup;
if (index >= lookup.length) {
lookup = fastThread.expandArray(index);
}
lookup[index] = value;
}
/**
* Sets the value to uninitialized; a proceeding call to get() will trigger a call to initialValue()
*/
@Override
public void remove() {
Thread thread = Thread.currentThread();
if (!(thread instanceof FastThreadLocalThread)) {
fallback.remove();
return;
}
Object[] lookup = ((FastThreadLocalThread) thread).lookup;
if (index >= lookup.length) {
return;
}
lookup[index] = EMPTY;
}
/**
* @return the current value for the current thread
*/
@Override
@SuppressWarnings("unchecked")
public V get() {
Thread thread = Thread.currentThread();
if (!(thread instanceof FastThreadLocalThread)) {
return fallback.get();
}
FastThreadLocalThread fastThread = (FastThreadLocalThread) thread;
Object[] lookup = fastThread.lookup;
Object v;
if (index >= lookup.length) {
v = initialValue();
lookup = fastThread.expandArray(index);
lookup[index] = v;
} else {
v = lookup[index];
if (v == EMPTY) {
v = initialValue();
lookup[index] = v;
}
}
return (V) v;
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.util.Arrays;
/**
* To utilise the {@link FastThreadLocal} fast-path, all threads accessing a {@link FastThreadLocal} must extend this
* class.
*/
public class FastThreadLocalThread extends Thread {
Object[] lookup = newArray();
public FastThreadLocalThread() { }
public FastThreadLocalThread(Runnable target) {
super(target);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target) {
super(group, target);
}
public FastThreadLocalThread(String name) {
super(name);
}
public FastThreadLocalThread(ThreadGroup group, String name) {
super(group, name);
}
public FastThreadLocalThread(Runnable target, String name) {
super(target, name);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) {
super(group, target, name);
}
public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) {
super(group, target, name, stackSize);
}
private static Object[] newArray() {
Object[] array = new Object[32];
Arrays.fill(array, FastThreadLocal.EMPTY);
return array;
}
Object[] expandArray(int length) {
int newCapacity = lookup.length;
do {
// double capacity until it is big enough
newCapacity <<= 1;
if (newCapacity < 0) {
throw new IllegalStateException();
}
} while (length > newCapacity);
Object[] array = new Object[newCapacity];
System.arraycopy(lookup, 0, array, 0, lookup.length);
Arrays.fill(array, lookup.length, array.length, FastThreadLocal.EMPTY);
lookup = array;
return lookup;
}
}

View File

@ -17,17 +17,13 @@ package io.netty.util.internal;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;
import static io.netty.util.concurrent.FastThreadLocal.Type.PendingWrite_Recycler;
/** /**
* Some pending write which should be picked up later. * Some pending write which should be picked up later.
*/ */
public final class PendingWrite { public final class PendingWrite {
private static final Recycler<PendingWrite> RECYCLER private static final Recycler<PendingWrite> RECYCLER = new Recycler<PendingWrite>() {
= new Recycler<PendingWrite>(PendingWrite_Recycler) {
@Override @Override
protected PendingWrite newObject(Handle handle) { protected PendingWrite newObject(Handle handle) {
return new PendingWrite(handle); return new PendingWrite(handle);

View File

@ -18,7 +18,6 @@ package io.netty.util.internal;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle; import io.netty.util.Recycler.Handle;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -34,8 +33,7 @@ public final class RecyclableArrayList extends ArrayList<Object> {
private static final int DEFAULT_INITIAL_CAPACITY = 8; private static final int DEFAULT_INITIAL_CAPACITY = 8;
private static final Recycler<RecyclableArrayList> RECYCLER private static final Recycler<RecyclableArrayList> RECYCLER = new Recycler<RecyclableArrayList>() {
= new Recycler<RecyclableArrayList>(FastThreadLocal.Type.RecyclableArrayList_Recycler) {
@Override @Override
protected RecyclableArrayList newObject(Handle handle) { protected RecyclableArrayList newObject(Handle handle) {
return new RecyclableArrayList(handle); return new RecyclableArrayList(handle);

View File

@ -31,7 +31,7 @@ public abstract class TypeParameterMatcher {
private static final Object TEST_OBJECT = new Object(); private static final Object TEST_OBJECT = new Object();
private static final ThreadLocal<Map<Class<?>, TypeParameterMatcher>> getCache = private static final ThreadLocal<Map<Class<?>, TypeParameterMatcher>> getCache =
new ThreadLocal<Map<Class<?>, TypeParameterMatcher>>() { new FastThreadLocal<Map<Class<?>, TypeParameterMatcher>>() {
@Override @Override
protected Map<Class<?>, TypeParameterMatcher> initialValue() { protected Map<Class<?>, TypeParameterMatcher> initialValue() {
return new IdentityHashMap<Class<?>, TypeParameterMatcher>(); return new IdentityHashMap<Class<?>, TypeParameterMatcher>();
@ -69,7 +69,7 @@ public abstract class TypeParameterMatcher {
} }
private static final ThreadLocal<Map<Class<?>, Map<String, TypeParameterMatcher>>> findCache = private static final ThreadLocal<Map<Class<?>, Map<String, TypeParameterMatcher>>> findCache =
new ThreadLocal<Map<Class<?>, Map<String, TypeParameterMatcher>>>() { new FastThreadLocal<Map<Class<?>, Map<String, TypeParameterMatcher>>>() {
@Override @Override
protected Map<Class<?>, Map<String, TypeParameterMatcher>> initialValue() { protected Map<Class<?>, Map<String, TypeParameterMatcher>> initialValue() {
return new IdentityHashMap<Class<?>, Map<String, TypeParameterMatcher>>(); return new IdentityHashMap<Class<?>, Map<String, TypeParameterMatcher>>();

View File

@ -19,6 +19,7 @@ package io.netty.handler.ssl.util;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.FastThreadLocal;
import javax.net.ssl.ManagerFactoryParameters; import javax.net.ssl.ManagerFactoryParameters;
import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManager;
@ -67,7 +68,7 @@ public final class FingerprintTrustManagerFactory extends SimpleTrustManagerFact
private static final int SHA1_BYTE_LEN = 20; private static final int SHA1_BYTE_LEN = 20;
private static final int SHA1_HEX_LEN = SHA1_BYTE_LEN * 2; private static final int SHA1_HEX_LEN = SHA1_BYTE_LEN * 2;
private static final ThreadLocal<MessageDigest> tlmd = new ThreadLocal<MessageDigest>() { private static final ThreadLocal<MessageDigest> tlmd = new FastThreadLocal<MessageDigest>() {
@Override @Override
protected MessageDigest initialValue() { protected MessageDigest initialValue() {
try { try {

View File

@ -16,6 +16,8 @@
package io.netty.handler.ssl.util; package io.netty.handler.ssl.util;
import io.netty.util.internal.FastThreadLocal;
import javax.net.ssl.ManagerFactoryParameters; import javax.net.ssl.ManagerFactoryParameters;
import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;
@ -43,7 +45,7 @@ public abstract class SimpleTrustManagerFactory extends TrustManagerFactory {
* To work around this issue, we use an ugly hack which uses a {@link ThreadLocal}. * To work around this issue, we use an ugly hack which uses a {@link ThreadLocal}.
*/ */
private static final ThreadLocal<SimpleTrustManagerFactorySpi> CURRENT_SPI = private static final ThreadLocal<SimpleTrustManagerFactorySpi> CURRENT_SPI =
new ThreadLocal<SimpleTrustManagerFactorySpi>() { new FastThreadLocal<SimpleTrustManagerFactorySpi>() {
@Override @Override
protected SimpleTrustManagerFactorySpi initialValue() { protected SimpleTrustManagerFactorySpi initialValue() {
return new SimpleTrustManagerFactorySpi(); return new SimpleTrustManagerFactorySpi();

View File

@ -16,8 +16,6 @@
package io.netty.channel; package io.netty.channel;
import static io.netty.channel.DefaultChannelPipeline.logger; import static io.netty.channel.DefaultChannelPipeline.logger;
import static io.netty.util.concurrent.FastThreadLocal.Type.DefaultChannelHandlerContext_WriteAndFlushTask_Recycler;
import static io.netty.util.concurrent.FastThreadLocal.Type.DefaultChannelHandlerContext_WriteTask_Recycler;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.util.DefaultAttributeMap; import io.netty.util.DefaultAttributeMap;
@ -25,7 +23,6 @@ import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
@ -944,8 +941,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable { static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
private static final Recycler<WriteTask> RECYCLER = private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
new Recycler<WriteTask>(DefaultChannelHandlerContext_WriteTask_Recycler) {
@Override @Override
protected WriteTask newObject(Handle handle) { protected WriteTask newObject(Handle handle) {
return new WriteTask(handle); return new WriteTask(handle);
@ -971,8 +967,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme
static final class WriteAndFlushTask extends AbstractWriteTask { static final class WriteAndFlushTask extends AbstractWriteTask {
private static final Recycler<WriteAndFlushTask> RECYCLER = private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
new Recycler<WriteAndFlushTask>(DefaultChannelHandlerContext_WriteAndFlushTask_Recycler) {
@Override @Override
protected WriteAndFlushTask newObject(Handle handle) { protected WriteAndFlushTask newObject(Handle handle) {
return new WriteAndFlushTask(handle); return new WriteAndFlushTask(handle);

View File

@ -16,6 +16,8 @@
package io.netty.channel; package io.netty.channel;
import io.netty.util.internal.FastThreadLocal;
import java.util.Map; import java.util.Map;
import java.util.WeakHashMap; import java.util.WeakHashMap;
@ -33,7 +35,7 @@ public abstract class ChannelHandlerAdapter implements ChannelHandler {
* See <a href="See https://github.com/netty/netty/issues/2289">#2289</a>. * See <a href="See https://github.com/netty/netty/issues/2289">#2289</a>.
*/ */
private static final ThreadLocal<Map<Class<?>, Boolean>> SHARABLE_CACHE = private static final ThreadLocal<Map<Class<?>, Boolean>> SHARABLE_CACHE =
new ThreadLocal<Map<Class<?>, Boolean>>() { new FastThreadLocal<Map<Class<?>, Boolean>>() {
@Override @Override
protected Map<Class<?>, Boolean> initialValue() { protected Map<Class<?>, Boolean> initialValue() {
// Start with small capacity to keep memory overhead as low as possible. // Start with small capacity to keep memory overhead as low as possible.

View File

@ -29,7 +29,6 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle; import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
@ -58,8 +57,7 @@ public final class ChannelOutboundBuffer {
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize); logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize);
} }
private static final Recycler<ChannelOutboundBuffer> RECYCLER private static final Recycler<ChannelOutboundBuffer> RECYCLER = new Recycler<ChannelOutboundBuffer>() {
= new Recycler<ChannelOutboundBuffer>(FastThreadLocal.Type.ChannelOutboundBuffer_Recycler) {
@Override @Override
protected ChannelOutboundBuffer newObject(Handle handle) { protected ChannelOutboundBuffer newObject(Handle handle) {
return new ChannelOutboundBuffer(handle); return new ChannelOutboundBuffer(handle);
@ -695,8 +693,7 @@ public final class ChannelOutboundBuffer {
static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf { static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf {
private final Recycler.Handle handle; private final Recycler.Handle handle;
private static final Recycler<ThreadLocalPooledByteBuf> RECYCLER private static final Recycler<ThreadLocalPooledByteBuf> RECYCLER = new Recycler<ThreadLocalPooledByteBuf>() {
= new Recycler<ThreadLocalPooledByteBuf>(FastThreadLocal.Type.ChannelOutboundBuffer_PooledByteBuf_Recycler) {
@Override @Override
protected ThreadLocalPooledByteBuf newObject(Handle handle) { protected ThreadLocalPooledByteBuf newObject(Handle handle) {
return new ThreadLocalPooledByteBuf(handle); return new ThreadLocalPooledByteBuf(handle);

View File

@ -27,8 +27,8 @@ import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.FastThreadLocal;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.AlreadyConnectedException; import java.nio.channels.AlreadyConnectedException;
@ -47,8 +47,7 @@ public class LocalChannel extends AbstractChannel {
private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final ChannelMetadata METADATA = new ChannelMetadata(false);
private static final int MAX_READER_STACK_DEPTH = 8; private static final int MAX_READER_STACK_DEPTH = 8;
private static final FastThreadLocal<Integer> READER_STACK_DEPTH private static final ThreadLocal<Integer> READER_STACK_DEPTH = new FastThreadLocal<Integer>() {
= new FastThreadLocal<Integer>(FastThreadLocal.Type.LocalChannel_ReaderStackDepth) {
@Override @Override
protected Integer initialValue() { protected Integer initialValue() {
return 0; return 0;