diff --git a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java index 497f12b0ef..37eea127c2 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java +++ b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java @@ -17,6 +17,10 @@ package io.netty.buffer; +import io.netty.util.ThreadDeathWatcher; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + import java.nio.ByteBuffer; /** @@ -26,6 +30,9 @@ import java.nio.ByteBuffer; * 480222803919">Scalable memory allocation using jemalloc. */ final class PoolThreadCache { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class); + final PoolArena heapArena; final PoolArena directArena; @@ -44,6 +51,14 @@ final class PoolThreadCache { private int allocations; + private final Thread thread = Thread.currentThread(); + private final Runnable freeTask = new Runnable() { + @Override + public void run() { + free0(); + } + }; + // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; @@ -90,6 +105,10 @@ final class PoolThreadCache { normalHeapCaches = null; numShiftsNormalHeap = -1; } + + // The thread-local cache will keep a list of pooled buffers which must be returned to + // the pool when the thread is not alive anymore. + ThreadDeathWatcher.watch(thread, freeTask); } private static SubPageMemoryRegionCache[] createSubPageCaches(int cacheSize, int numCaches) { @@ -192,13 +211,22 @@ final class PoolThreadCache { /** * Should be called if the Thread that uses this cache is about to exist to release resources out of the cache */ - int free() { - return free(tinySubPageDirectCaches) + + void free() { + ThreadDeathWatcher.unwatch(thread, freeTask); + free0(); + } + + private void free0() { + int numFreed = free(tinySubPageDirectCaches) + free(smallSubPageDirectCaches) + free(normalDirectCaches) + free(tinySubPageHeapCaches) + free(smallSubPageHeapCaches) + free(normalHeapCaches); + + if (numFreed > 0 && logger.isDebugEnabled()) { + logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, thread.getName()); + } } private static int free(MemoryRegionCache[] caches) { diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index ed220fd34e..c33e92d771 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -16,8 +16,7 @@ package io.netty.buffer; -import io.netty.util.ThreadDeathWatcher; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -277,24 +276,14 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { directArena = null; } - final PoolThreadCache cache = new PoolThreadCache( + return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); + } - // The thread-local cache will keep a list of pooled buffers which must be returned to - // the pool when the thread is not alive anymore. - final Thread thread = Thread.currentThread(); - ThreadDeathWatcher.watch(thread, new Runnable() { - @Override - public void run() { - int numFreed = cache.free(); - if (logger.isDebugEnabled()) { - logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, thread.getName()); - } - } - }); - - return cache; + @Override + protected void onRemoval(PoolThreadCache value) { + value.free(); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/ClientCookieEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/ClientCookieEncoder.java index 03dc1ad48d..5a5b5bae74 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/ClientCookieEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/ClientCookieEncoder.java @@ -42,7 +42,7 @@ public final class ClientCookieEncoder { throw new NullPointerException("cookie"); } - StringBuilder buf = buffer.get(); + StringBuilder buf = stringBuilder(); encode(buf, cookie); return stripTrailingSeparator(buf); } @@ -52,7 +52,7 @@ public final class ClientCookieEncoder { throw new NullPointerException("cookies"); } - StringBuilder buf = buffer.get(); + StringBuilder buf = stringBuilder(); for (Cookie c: cookies) { if (c == null) { break; @@ -68,7 +68,7 @@ public final class ClientCookieEncoder { throw new NullPointerException("cookies"); } - StringBuilder buf = buffer.get(); + StringBuilder buf = stringBuilder(); for (Cookie c: cookies) { if (c == null) { break; diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/CookieEncoderUtil.java b/codec-http/src/main/java/io/netty/handler/codec/http/CookieEncoderUtil.java index 57e2c261b4..bb45518db2 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/CookieEncoderUtil.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/CookieEncoderUtil.java @@ -16,23 +16,13 @@ package io.netty.handler.codec.http; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.internal.InternalThreadLocalMap; final class CookieEncoderUtil { - static final ThreadLocal buffer = new FastThreadLocal() { - @Override - public StringBuilder get() { - StringBuilder buf = super.get(); - buf.setLength(0); - return buf; - } - - @Override - protected StringBuilder initialValue() { - return new StringBuilder(512); - } - }; + static StringBuilder stringBuilder() { + return InternalThreadLocalMap.get().stringBuilder(); + } static String stripTrailingSeparator(StringBuilder buf) { if (buf.length() > 0) { diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaderDateFormat.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaderDateFormat.java index c70de41da5..5c89e7c5f1 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaderDateFormat.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpHeaderDateFormat.java @@ -15,7 +15,7 @@ */ package io.netty.handler.codec.http; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocal; import java.text.ParsePosition; import java.text.SimpleDateFormat; @@ -39,7 +39,7 @@ final class HttpHeaderDateFormat extends SimpleDateFormat { private final SimpleDateFormat format1 = new HttpHeaderDateFormatObsolete1(); private final SimpleDateFormat format2 = new HttpHeaderDateFormatObsolete2(); - private static final ThreadLocal dateFormatThreadLocal = + private static final FastThreadLocal dateFormatThreadLocal = new FastThreadLocal() { @Override protected HttpHeaderDateFormat initialValue() { diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/ServerCookieEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/ServerCookieEncoder.java index 21c906e7be..b49a033a53 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/ServerCookieEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/ServerCookieEncoder.java @@ -47,7 +47,7 @@ public final class ServerCookieEncoder { throw new NullPointerException("cookie"); } - StringBuilder buf = buffer.get(); + StringBuilder buf = stringBuilder(); add(buf, cookie.getName(), cookie.getValue()); diff --git a/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java b/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java index b11538d46f..735fb7cb7f 100644 --- a/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java +++ b/codec/src/main/java/io/netty/handler/codec/DefaultTextHeaders.java @@ -16,7 +16,7 @@ package io.netty.handler.codec; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.PlatformDependent; import java.text.DateFormat; @@ -769,7 +769,7 @@ public class DefaultTextHeaders implements TextHeaders { static final class HttpHeaderDateFormat { private static final ParsePosition parsePos = new ParsePosition(0); - private static final ThreadLocal dateFormatThreadLocal = + private static final FastThreadLocal dateFormatThreadLocal = new FastThreadLocal() { @Override protected HttpHeaderDateFormat initialValue() { diff --git a/codec/src/main/java/io/netty/handler/codec/marshalling/ThreadLocalMarshallerProvider.java b/codec/src/main/java/io/netty/handler/codec/marshalling/ThreadLocalMarshallerProvider.java index a64f5b8595..ce13601491 100644 --- a/codec/src/main/java/io/netty/handler/codec/marshalling/ThreadLocalMarshallerProvider.java +++ b/codec/src/main/java/io/netty/handler/codec/marshalling/ThreadLocalMarshallerProvider.java @@ -17,7 +17,7 @@ package io.netty.handler.codec.marshalling; import io.netty.channel.ChannelHandlerContext; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocal; import org.jboss.marshalling.Marshaller; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.MarshallingConfiguration; @@ -28,7 +28,7 @@ import org.jboss.marshalling.MarshallingConfiguration; * many small {@link Object}'s and your actual Thread count is not to big */ public class ThreadLocalMarshallerProvider implements MarshallerProvider { - private final ThreadLocal marshallers = new FastThreadLocal(); + private final FastThreadLocal marshallers = new FastThreadLocal(); private final MarshallerFactory factory; private final MarshallingConfiguration config; diff --git a/codec/src/main/java/io/netty/handler/codec/marshalling/ThreadLocalUnmarshallerProvider.java b/codec/src/main/java/io/netty/handler/codec/marshalling/ThreadLocalUnmarshallerProvider.java index 2c31903ef1..080d48921a 100644 --- a/codec/src/main/java/io/netty/handler/codec/marshalling/ThreadLocalUnmarshallerProvider.java +++ b/codec/src/main/java/io/netty/handler/codec/marshalling/ThreadLocalUnmarshallerProvider.java @@ -17,7 +17,7 @@ package io.netty.handler.codec.marshalling; import io.netty.channel.ChannelHandlerContext; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocal; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.MarshallingConfiguration; import org.jboss.marshalling.Unmarshaller; @@ -28,7 +28,7 @@ import org.jboss.marshalling.Unmarshaller; * many small {@link Object}'s. */ public class ThreadLocalUnmarshallerProvider implements UnmarshallerProvider { - private final ThreadLocal unmarshallers = new FastThreadLocal(); + private final FastThreadLocal unmarshallers = new FastThreadLocal(); private final MarshallerFactory factory; private final MarshallingConfiguration config; diff --git a/common/src/main/java/io/netty/util/CharsetUtil.java b/common/src/main/java/io/netty/util/CharsetUtil.java index d2c20df35d..74c0a3dc91 100644 --- a/common/src/main/java/io/netty/util/CharsetUtil.java +++ b/common/src/main/java/io/netty/util/CharsetUtil.java @@ -15,13 +15,12 @@ */ package io.netty.util; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.internal.InternalThreadLocalMap; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; import java.nio.charset.CodingErrorAction; -import java.util.IdentityHashMap; import java.util.Map; /** @@ -62,22 +61,6 @@ public final class CharsetUtil { */ public static final Charset US_ASCII = Charset.forName("US-ASCII"); - private static final ThreadLocal> encoders = - new FastThreadLocal>() { - @Override - protected Map initialValue() { - return new IdentityHashMap(); - } - }; - - private static final ThreadLocal> decoders = - new FastThreadLocal>() { - @Override - protected Map initialValue() { - return new IdentityHashMap(); - } - }; - /** * Returns a cached thread-local {@link CharsetEncoder} for the specified * charset. @@ -87,7 +70,7 @@ public final class CharsetUtil { throw new NullPointerException("charset"); } - Map map = encoders.get(); + Map map = InternalThreadLocalMap.get().charsetEncoderCache(); CharsetEncoder e = map.get(charset); if (e != null) { e.reset(); @@ -112,7 +95,7 @@ public final class CharsetUtil { throw new NullPointerException("charset"); } - Map map = decoders.get(); + Map map = InternalThreadLocalMap.get().charsetDecoderCache(); CharsetDecoder d = map.get(charset); if (d != null) { d.reset(); diff --git a/common/src/main/java/io/netty/util/Recycler.java b/common/src/main/java/io/netty/util/Recycler.java index 3d96036154..ea895da301 100644 --- a/common/src/main/java/io/netty/util/Recycler.java +++ b/common/src/main/java/io/netty/util/Recycler.java @@ -16,6 +16,7 @@ package io.netty.util; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -23,8 +24,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.IdentityHashMap; import java.util.Map; -import io.netty.util.internal.FastThreadLocal; - /** * Light-weight object pool based on a thread-local stack. * @@ -56,7 +55,7 @@ public abstract class Recycler { } private final int maxCapacity; - private final ThreadLocal> threadLocal = new FastThreadLocal>() { + private final FastThreadLocal> threadLocal = new FastThreadLocal>() { @Override protected Stack initialValue() { return new Stack(Recycler.this, Thread.currentThread(), maxCapacity); @@ -67,7 +66,7 @@ public abstract class Recycler { this(DEFAULT_MAX_CAPACITY); } - public Recycler(int maxCapacity) { + protected Recycler(int maxCapacity) { this.maxCapacity = Math.max(0, maxCapacity); } diff --git a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java index 84dbd83c88..2e580245f6 100644 --- a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java +++ b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java @@ -67,7 +67,25 @@ public final class ThreadDeathWatcher { throw new IllegalArgumentException("thread must be alive."); } - pendingEntries.add(new Entry(thread, task)); + schedule(thread, task, true); + } + + /** + * Cancels the task scheduled via {@link #watch(Thread, Runnable)}. + */ + public static void unwatch(Thread thread, Runnable task) { + if (thread == null) { + throw new NullPointerException("thread"); + } + if (task == null) { + throw new NullPointerException("task"); + } + + schedule(thread, task, false); + } + + private static void schedule(Thread thread, Runnable task, boolean isWatch) { + pendingEntries.add(new Entry(thread, task, isWatch)); if (started.compareAndSet(false, true)) { Thread watcherThread = threadFactory.newThread(watcher); @@ -93,8 +111,10 @@ public final class ThreadDeathWatcher { Thread watcherThread = ThreadDeathWatcher.watcherThread; if (watcherThread != null) { watcherThread.join(unit.toMillis(timeout)); + return !watcherThread.isAlive(); + } else { + return true; } - return !watcherThread.isAlive(); } private ThreadDeathWatcher() { } @@ -109,6 +129,10 @@ public final class ThreadDeathWatcher { fetchWatchees(); notifyWatchees(); + // Try once again just in case notifyWatchees() triggered watch() or unwatch(). + fetchWatchees(); + notifyWatchees(); + try { Thread.sleep(1000); } catch (InterruptedException ignore) { @@ -153,7 +177,11 @@ public final class ThreadDeathWatcher { break; } - watchees.add(e); + if (e.isWatch) { + watchees.add(e); + } else { + watchees.remove(e); + } } } @@ -178,15 +206,36 @@ public final class ThreadDeathWatcher { private static final class Entry extends MpscLinkedQueueNode { final Thread thread; final Runnable task; + final boolean isWatch; - Entry(Thread thread, Runnable task) { + Entry(Thread thread, Runnable task, boolean isWatch) { this.thread = thread; this.task = task; + this.isWatch = isWatch; } @Override public Entry value() { return this; } + + @Override + public int hashCode() { + return thread.hashCode() ^ task.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof Entry)) { + return false; + } + + Entry that = (Entry) obj; + return thread == that.thread && task == that.task; + } } } diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index c975e419d6..9e0eb30ab2 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -17,7 +17,7 @@ package io.netty.util.concurrent; import io.netty.util.Signal; import io.netty.util.internal.EmptyArrays; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.internal.InternalThreadLocalMap; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -36,12 +36,6 @@ public class DefaultPromise extends AbstractFuture implements Promise { InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution"); private static final int MAX_LISTENER_STACK_DEPTH = 8; - private static final ThreadLocal LISTENER_STACK_DEPTH = new FastThreadLocal() { - @Override - protected Integer initialValue() { - return 0; - } - }; private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS"); private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE"); private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(new CancellationException()); @@ -557,9 +551,10 @@ public class DefaultPromise extends AbstractFuture implements Promise { EventExecutor executor = executor(); if (executor.inEventLoop()) { - final Integer stackDepth = LISTENER_STACK_DEPTH.get(); + final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); + final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { - LISTENER_STACK_DEPTH.set(stackDepth + 1); + threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { if (listeners instanceof DefaultFutureListeners) { notifyListeners0(this, (DefaultFutureListeners) listeners); @@ -571,7 +566,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } } finally { this.listeners = null; - LISTENER_STACK_DEPTH.set(stackDepth); + threadLocals.setFutureListenerStackDepth(stackDepth); } return; } @@ -617,13 +612,14 @@ public class DefaultPromise extends AbstractFuture implements Promise { final EventExecutor executor = executor(); if (executor.inEventLoop()) { if (listeners == null && lateListeners == null) { - final Integer stackDepth = LISTENER_STACK_DEPTH.get(); + final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); + final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { - LISTENER_STACK_DEPTH.set(stackDepth + 1); + threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListener0(this, l); } finally { - LISTENER_STACK_DEPTH.set(stackDepth); + threadLocals.setFutureListenerStackDepth(stackDepth); } return; } @@ -648,13 +644,14 @@ public class DefaultPromise extends AbstractFuture implements Promise { final EventExecutor eventExecutor, final Future future, final GenericFutureListener l) { if (eventExecutor.inEventLoop()) { - final Integer stackDepth = LISTENER_STACK_DEPTH.get(); + final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); + final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) { - LISTENER_STACK_DEPTH.set(stackDepth + 1); + threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { notifyListener0(future, l); } finally { - LISTENER_STACK_DEPTH.set(stackDepth); + threadLocals.setFutureListenerStackDepth(stackDepth); } return; } diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java b/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java index 94a89654b8..84d72cdf24 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java @@ -16,7 +16,6 @@ package io.netty.util.concurrent; -import io.netty.util.internal.FastThreadLocalThread; import io.netty.util.internal.StringUtil; import java.util.Locale; @@ -99,7 +98,7 @@ public class DefaultThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { - Thread t = newThread(r, prefix + nextId.incrementAndGet()); + Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon()) { if (!daemon) { @@ -123,4 +122,22 @@ public class DefaultThreadFactory implements ThreadFactory { protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(r, name); } + + private static final class DefaultRunnableDecorator implements Runnable { + + private final Runnable r; + + DefaultRunnableDecorator(Runnable r) { + this.r = r; + } + + @Override + public void run() { + try { + r.run(); + } finally { + FastThreadLocal.removeAll(); + } + } + } } diff --git a/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java b/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java new file mode 100644 index 0000000000..df943b4997 --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java @@ -0,0 +1,244 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import io.netty.util.internal.InternalThreadLocalMap; +import io.netty.util.internal.PlatformDependent; + +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; + +/** + * A special variant of {@link ThreadLocal} that yields higher access performan when accessed from a + * {@link FastThreadLocalThread}. + *

+ * Internally, a {@link FastThreadLocal} uses a constant index in an array, instead of using hash code and hash table, + * to look for a variable. Although seemingly very subtle, it yields slight performance advantage over using a hash + * table, and it is useful when accessed frequently. + *

+ * To take advantage of this thread-local variable, your thread must be a {@link FastThreadLocalThread} or its subtype. + * By default, all threads created by {@link DefaultThreadFactory} are {@link FastThreadLocalThread} due to this reason. + *

+ * Note that the fast path is only possible on threads that extend {@link FastThreadLocalThread}, because it requires + * a special field to store the necessary state. An access by any other kind of thread falls back to a regular + * {@link ThreadLocal}. + *

+ * + * @param the type of the thread-local variable + * @see ThreadLocal + */ +public class FastThreadLocal { + + private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); + + /** + * Removes all {@link FastThreadLocal} variables bound to the current thread. This operation is useful when you + * are in a container environment, and you don't want to leave the thread local variables in the threads you do not + * manage. + */ + public static void removeAll() { + InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); + if (threadLocalMap == null) { + return; + } + + try { + Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); + if (v != null && v != InternalThreadLocalMap.UNSET) { + @SuppressWarnings("unchecked") + Set> variablesToRemove = (Set>) v; + FastThreadLocal[] variablesToRemoveArray = + variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]); + for (FastThreadLocal tlv: variablesToRemoveArray) { + tlv.remove(threadLocalMap); + } + } + } finally { + InternalThreadLocalMap.remove(); + } + } + + /** + * Returns the number of thread local variables bound to the current thread. + */ + public static int size() { + InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); + if (threadLocalMap == null) { + return 0; + } else { + return threadLocalMap.size(); + } + } + + /** + * Destroys the data structure that keeps all {@link FastThreadLocal} variables accessed from + * non-{@link FastThreadLocalThread}s. This operation is useful when you are in a container environment, and you + * do not want to leave the thread local variables in the threads you do not manage. Call this method when your + * application is being unloaded from the container. + */ + public static void destroy() { + InternalThreadLocalMap.destroy(); + } + + @SuppressWarnings("unchecked") + private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal variable) { + Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); + Set> variablesToRemove; + if (v == InternalThreadLocalMap.UNSET || v == null) { + variablesToRemove = Collections.newSetFromMap(new IdentityHashMap, Boolean>()); + threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); + } else { + variablesToRemove = (Set>) v; + } + + variablesToRemove.add(variable); + } + + private static void removeFromVariablesToRemove( + InternalThreadLocalMap threadLocalMap, FastThreadLocal variable) { + + Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); + + if (v == InternalThreadLocalMap.UNSET || v == null) { + return; + } + + @SuppressWarnings("unchecked") + Set> variablesToRemove = (Set>) v; + variablesToRemove.remove(variable); + } + + private final int index; + + public FastThreadLocal() { + index = InternalThreadLocalMap.nextVariableIndex(); + } + + /** + * Returns the current value for the current thread + */ + public final V get() { + return get(InternalThreadLocalMap.get()); + } + + /** + * Returns the current value for the specified thread local map. + * The specified thread local map must be for the current thread. + */ + @SuppressWarnings("unchecked") + public final V get(InternalThreadLocalMap threadLocalMap) { + Object v = threadLocalMap.indexedVariable(index); + if (v != InternalThreadLocalMap.UNSET) { + return (V) v; + } + + return initialize(threadLocalMap); + } + + private V initialize(InternalThreadLocalMap threadLocalMap) { + V v = null; + try { + v = initialValue(); + } catch (Exception e) { + PlatformDependent.throwException(e); + } + + threadLocalMap.setIndexedVariable(index, v); + addToVariablesToRemove(threadLocalMap, this); + return v; + } + + /** + * Set the value for the current thread. + */ + public final void set(V value) { + if (value != InternalThreadLocalMap.UNSET) { + set(InternalThreadLocalMap.get(), value); + } else { + remove(); + } + } + + /** + * Set the value for the specified thread local map. The specified thread local map must be for the current thread. + */ + public final void set(InternalThreadLocalMap threadLocalMap, V value) { + if (value != InternalThreadLocalMap.UNSET) { + if (threadLocalMap.setIndexedVariable(index, value)) { + addToVariablesToRemove(threadLocalMap, this); + } + } else { + remove(threadLocalMap); + } + } + + /** + * Returns {@code true} if and only if this thread-local variable is set. + */ + public final boolean isSet() { + return isSet(InternalThreadLocalMap.getIfSet()); + } + + /** + * Returns {@code true} if and only if this thread-local variable is set. + * The specified thread local map must be for the current thread. + */ + public final boolean isSet(InternalThreadLocalMap threadLocalMap) { + return threadLocalMap != null && threadLocalMap.isIndexedVariableSet(index); + } + /** + * Sets the value to uninitialized; a proceeding call to get() will trigger a call to initialValue(). + */ + public final void remove() { + remove(InternalThreadLocalMap.getIfSet()); + } + + /** + * Sets the value to uninitialized for the specified thread local map; + * a proceeding call to get() will trigger a call to initialValue(). + * The specified thread local map must be for the current thread. + */ + @SuppressWarnings("unchecked") + public final void remove(InternalThreadLocalMap threadLocalMap) { + if (threadLocalMap == null) { + return; + } + + Object v = threadLocalMap.removeIndexedVariable(index); + removeFromVariablesToRemove(threadLocalMap, this); + + if (v != InternalThreadLocalMap.UNSET) { + try { + onRemoval((V) v); + } catch (Exception e) { + PlatformDependent.throwException(e); + } + } + } + + /** + * Returns the initial value for this thread-local variable. + */ + protected V initialValue() throws Exception { + return null; + } + + /** + * Invoked when this thread local variable is removed by {@link #remove()}. + */ + protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { } +} diff --git a/common/src/main/java/io/netty/util/internal/FastThreadLocalThread.java b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java similarity index 61% rename from common/src/main/java/io/netty/util/internal/FastThreadLocalThread.java rename to common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java index dd7d2acd8a..bd3e1c4441 100644 --- a/common/src/main/java/io/netty/util/internal/FastThreadLocalThread.java +++ b/common/src/main/java/io/netty/util/concurrent/FastThreadLocalThread.java @@ -13,17 +13,16 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.util.internal; +package io.netty.util.concurrent; -import java.util.Arrays; +import io.netty.util.internal.InternalThreadLocalMap; /** - * To utilise the {@link FastThreadLocal} fast-path, all threads accessing a {@link FastThreadLocal} must extend this - * class. + * A special {@link Thread} that provides fast access to {@link FastThreadLocal} variables. */ public class FastThreadLocalThread extends Thread { - Object[] lookup = newArray(); + private InternalThreadLocalMap threadLocalMap; public FastThreadLocalThread() { } @@ -55,28 +54,19 @@ public class FastThreadLocalThread extends Thread { super(group, target, name, stackSize); } - private static Object[] newArray() { - Object[] array = new Object[32]; - Arrays.fill(array, FastThreadLocal.EMPTY); - return array; + /** + * Returns the internal data structure that keeps the thread-local variables bound to this thread. + * Note that this method is for internal use only, and thus is subject to change at any time. + */ + public final InternalThreadLocalMap threadLocalMap() { + return threadLocalMap; } - Object[] expandArray(int length) { - int newCapacity = lookup.length; - do { - // double capacity until it is big enough - newCapacity <<= 1; - - if (newCapacity < 0) { - throw new IllegalStateException(); - } - - } while (length > newCapacity); - - Object[] array = new Object[newCapacity]; - System.arraycopy(lookup, 0, array, 0, lookup.length); - Arrays.fill(array, lookup.length, array.length, FastThreadLocal.EMPTY); - lookup = array; - return lookup; + /** + * Sets the internal data structure that keeps the thread-local variables bound to this thread. + * Note that this method is for internal use only, and thus is subject to change at any time. + */ + public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) { + this.threadLocalMap = threadLocalMap; } } diff --git a/common/src/main/java/io/netty/util/internal/FastThreadLocal.java b/common/src/main/java/io/netty/util/internal/FastThreadLocal.java deleted file mode 100644 index 7e0aec53a4..0000000000 --- a/common/src/main/java/io/netty/util/internal/FastThreadLocal.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.internal; - -import java.util.concurrent.atomic.AtomicInteger; - -/** - * A special {@link ThreadLocal} which is operating over a predefined array, so it always operate in O(1) when called - * from a {@link FastThreadLocalThread}. This permits less indirection and offers a slight performance improvement, - * so is useful when invoked frequently. - * - * The fast path is only possible on threads that extend FastThreadLocalThread, as this class - * stores the necessary state. Access by any other kind of thread falls back to a regular ThreadLocal - * - * @param - */ -public class FastThreadLocal extends ThreadLocal { - static final Object EMPTY = new Object(); - - private static final AtomicInteger NEXT_INDEX = new AtomicInteger(0); - private final ThreadLocal fallback = new ThreadLocal() { - @Override - protected V initialValue() { - return FastThreadLocal.this.initialValue(); - } - }; - private final int index; - - public FastThreadLocal() { - index = NEXT_INDEX.getAndIncrement(); - if (index < 0) { - NEXT_INDEX.decrementAndGet(); - throw new IllegalStateException("Maximal number (" + Integer.MAX_VALUE + ") of FastThreadLocal exceeded"); - } - } - - /** - * Set the value for the current thread - */ - @Override - public void set(V value) { - Thread thread = Thread.currentThread(); - if (!(thread instanceof FastThreadLocalThread)) { - fallback.set(value); - return; - } - FastThreadLocalThread fastThread = (FastThreadLocalThread) thread; - Object[] lookup = fastThread.lookup; - if (index >= lookup.length) { - lookup = fastThread.expandArray(index); - } - lookup[index] = value; - } - - /** - * Sets the value to uninitialized; a proceeding call to get() will trigger a call to initialValue() - */ - @Override - public void remove() { - Thread thread = Thread.currentThread(); - if (!(thread instanceof FastThreadLocalThread)) { - fallback.remove(); - return; - } - Object[] lookup = ((FastThreadLocalThread) thread).lookup; - if (index >= lookup.length) { - return; - } - lookup[index] = EMPTY; - } - - /** - * @return the current value for the current thread - */ - @Override - @SuppressWarnings("unchecked") - public V get() { - Thread thread = Thread.currentThread(); - if (!(thread instanceof FastThreadLocalThread)) { - return fallback.get(); - } - FastThreadLocalThread fastThread = (FastThreadLocalThread) thread; - - Object[] lookup = fastThread.lookup; - Object v; - if (index >= lookup.length) { - v = initialValue(); - lookup = fastThread.expandArray(index); - lookup[index] = v; - } else { - v = lookup[index]; - if (v == EMPTY) { - v = initialValue(); - lookup[index] = v; - } - } - return (V) v; - } -} diff --git a/common/src/main/java/io/netty/util/internal/IntegerHolder.java b/common/src/main/java/io/netty/util/internal/IntegerHolder.java new file mode 100644 index 0000000000..2a8d069a27 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/IntegerHolder.java @@ -0,0 +1,21 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.internal; + +public final class IntegerHolder { + public int value; +} diff --git a/common/src/main/java/io/netty/util/internal/InternalThreadLocalMap.java b/common/src/main/java/io/netty/util/internal/InternalThreadLocalMap.java new file mode 100644 index 0000000000..64f3c77ff1 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/InternalThreadLocalMap.java @@ -0,0 +1,310 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.internal; + +import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocalThread; + +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.util.Arrays; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.WeakHashMap; + +/** + * The internal data structure that stores the thread-local variables for Netty and all {@link FastThreadLocal}s. + * Note that this class is for internal use only and is subject to change at any time. Use {@link FastThreadLocal} + * unless you know what you are doing. + */ +public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { + + public static final Object UNSET = new Object(); + + public static InternalThreadLocalMap getIfSet() { + Thread thread = Thread.currentThread(); + InternalThreadLocalMap threadLocalMap; + if (thread instanceof FastThreadLocalThread) { + threadLocalMap = ((FastThreadLocalThread) thread).threadLocalMap(); + } else { + ThreadLocal slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; + if (slowThreadLocalMap == null) { + threadLocalMap = null; + } else { + threadLocalMap = slowThreadLocalMap.get(); + } + } + return threadLocalMap; + } + + public static InternalThreadLocalMap get() { + Thread thread = Thread.currentThread(); + if (thread instanceof FastThreadLocalThread) { + return fastGet((FastThreadLocalThread) thread); + } else { + return slowGet(); + } + } + + private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { + InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); + if (threadLocalMap == null) { + thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); + } + return threadLocalMap; + } + + private static InternalThreadLocalMap slowGet() { + ThreadLocal slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; + if (slowThreadLocalMap == null) { + UnpaddedInternalThreadLocalMap.slowThreadLocalMap = + slowThreadLocalMap = new ThreadLocal(); + } + + InternalThreadLocalMap ret = slowThreadLocalMap.get(); + if (ret == null) { + ret = new InternalThreadLocalMap(); + slowThreadLocalMap.set(ret); + } + return ret; + } + + public static void remove() { + Thread thread = Thread.currentThread(); + if (thread instanceof FastThreadLocalThread) { + ((FastThreadLocalThread) thread).setThreadLocalMap(null); + } else { + ThreadLocal slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; + if (slowThreadLocalMap != null) { + slowThreadLocalMap.remove(); + } + } + } + + public static void destroy() { + slowThreadLocalMap = null; + } + + public static int nextVariableIndex() { + int index = nextIndex.getAndIncrement(); + if (index < 0) { + nextIndex.decrementAndGet(); + throw new IllegalStateException("too many thread-local indexed variables"); + } + return index; + } + + public static int lastVariableIndex() { + return nextIndex.get() - 1; + } + + // Cache line padding (must be public) + // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes. + public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9; + + private InternalThreadLocalMap() { + super(newIndexedVariableTable()); + } + + private static Object[] newIndexedVariableTable() { + Object[] array = new Object[32]; + Arrays.fill(array, UNSET); + return array; + } + + public int size() { + int count = 0; + + if (futureListenerStackDepth != 0) { + count ++; + } + if (localChannelReaderStackDepth != 0) { + count ++; + } + if (handlerSharableCache != null) { + count ++; + } + if (counterHashCode != null) { + count ++; + } + if (random != null) { + count ++; + } + if (typeParameterMatcherGetCache != null) { + count ++; + } + if (typeParameterMatcherFindCache != null) { + count ++; + } + if (stringBuilder != null) { + count ++; + } + if (charsetEncoderCache != null) { + count ++; + } + if (charsetDecoderCache != null) { + count ++; + } + + for (Object o: indexedVariables) { + if (o != UNSET) { + count ++; + } + } + + // We should subtract 1 from the count because the first element in 'indexedVariables' is reserved + // by 'FastThreadLocal' to keep the list of 'FastThreadLocal's to remove on 'FastThreadLocal.removeAll()'. + return count - 1; + } + + public StringBuilder stringBuilder() { + StringBuilder builder = stringBuilder; + if (builder == null) { + stringBuilder = builder = new StringBuilder(512); + } else { + builder.setLength(0); + } + return builder; + } + + public Map charsetEncoderCache() { + Map cache = charsetEncoderCache; + if (cache == null) { + charsetEncoderCache = cache = new IdentityHashMap(); + } + return cache; + } + + public Map charsetDecoderCache() { + Map cache = charsetDecoderCache; + if (cache == null) { + charsetDecoderCache = cache = new IdentityHashMap(); + } + return cache; + } + + public int futureListenerStackDepth() { + return futureListenerStackDepth; + } + + public void setFutureListenerStackDepth(int futureListenerStackDepth) { + this.futureListenerStackDepth = futureListenerStackDepth; + } + + public ThreadLocalRandom random() { + ThreadLocalRandom r = random; + if (r == null) { + random = r = new ThreadLocalRandom(); + } + return r; + } + + public Map, TypeParameterMatcher> typeParameterMatcherGetCache() { + Map, TypeParameterMatcher> cache = typeParameterMatcherGetCache; + if (cache == null) { + typeParameterMatcherGetCache = cache = new IdentityHashMap, TypeParameterMatcher>(); + } + return cache; + } + + public Map, Map> typeParameterMatcherFindCache() { + Map, Map> cache = typeParameterMatcherFindCache; + if (cache == null) { + typeParameterMatcherFindCache = cache = new IdentityHashMap, Map>(); + } + return cache; + } + + public IntegerHolder counterHashCode() { + return counterHashCode; + } + + public void setCounterHashCode(IntegerHolder counterHashCode) { + this.counterHashCode = counterHashCode; + } + + public Map, Boolean> handlerSharableCache() { + Map, Boolean> cache = handlerSharableCache; + if (cache == null) { + // Start with small capacity to keep memory overhead as low as possible. + handlerSharableCache = cache = new WeakHashMap, Boolean>(4); + } + return cache; + } + + public int localChannelReaderStackDepth() { + return localChannelReaderStackDepth; + } + + public void setLocalChannelReaderStackDepth(int localChannelReaderStackDepth) { + this.localChannelReaderStackDepth = localChannelReaderStackDepth; + } + + public Object indexedVariable(int index) { + Object[] lookup = indexedVariables; + return index < lookup.length? lookup[index] : UNSET; + } + + /** + * @return {@code true} if and only if a new thread-local variable has been created + */ + public boolean setIndexedVariable(int index, Object value) { + Object[] lookup = indexedVariables; + if (index < lookup.length) { + Object oldValue = lookup[index]; + lookup[index] = value; + return oldValue == UNSET; + } else { + expandIndexedVariableTableAndSet(index, value); + return true; + } + } + + private void expandIndexedVariableTableAndSet(int index, Object value) { + Object[] oldArray = indexedVariables; + final int oldCapacity = oldArray.length; + int newCapacity = index; + newCapacity |= newCapacity >>> 1; + newCapacity |= newCapacity >>> 2; + newCapacity |= newCapacity >>> 4; + newCapacity |= newCapacity >>> 8; + newCapacity |= newCapacity >>> 16; + newCapacity ++; + + Object[] newArray = Arrays.copyOf(oldArray, newCapacity); + Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); + newArray[index] = value; + indexedVariables = newArray; + } + + public Object removeIndexedVariable(int index) { + Object[] lookup = indexedVariables; + if (index < lookup.length) { + Object v = lookup[index]; + lookup[index] = UNSET; + return v; + } else { + return UNSET; + } + } + + public boolean isIndexedVariableSet(int index) { + Object[] lookup = indexedVariables; + return index < lookup.length && lookup[index] != UNSET; + } +} diff --git a/common/src/main/java/io/netty/util/internal/ThreadLocalRandom.java b/common/src/main/java/io/netty/util/internal/ThreadLocalRandom.java index a80a181fe9..5ab5db5a11 100644 --- a/common/src/main/java/io/netty/util/internal/ThreadLocalRandom.java +++ b/common/src/main/java/io/netty/util/internal/ThreadLocalRandom.java @@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicLong; * //author Doug Lea */ @SuppressWarnings("all") -public class ThreadLocalRandom extends Random { +public final class ThreadLocalRandom extends Random { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ThreadLocalRandom.class); @@ -203,23 +203,13 @@ public class ThreadLocalRandom extends Random { initialized = true; } - /** - * The actual ThreadLocal - */ - private static final ThreadLocal localRandom = - new FastThreadLocal() { - protected ThreadLocalRandom initialValue() { - return new ThreadLocalRandom(); - } - }; - /** * Returns the current thread's {@code ThreadLocalRandom}. * * @return the current thread's {@code ThreadLocalRandom} */ public static ThreadLocalRandom current() { - return localRandom.get(); + return InternalThreadLocalMap.get().random(); } /** diff --git a/common/src/main/java/io/netty/util/internal/TypeParameterMatcher.java b/common/src/main/java/io/netty/util/internal/TypeParameterMatcher.java index d507f5c5ba..a10b149d78 100644 --- a/common/src/main/java/io/netty/util/internal/TypeParameterMatcher.java +++ b/common/src/main/java/io/netty/util/internal/TypeParameterMatcher.java @@ -22,7 +22,6 @@ import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.lang.reflect.TypeVariable; import java.util.HashMap; -import java.util.IdentityHashMap; import java.util.Map; public abstract class TypeParameterMatcher { @@ -30,16 +29,9 @@ public abstract class TypeParameterMatcher { private static final TypeParameterMatcher NOOP = new NoOpTypeParameterMatcher(); private static final Object TEST_OBJECT = new Object(); - private static final ThreadLocal, TypeParameterMatcher>> getCache = - new FastThreadLocal, TypeParameterMatcher>>() { - @Override - protected Map, TypeParameterMatcher> initialValue() { - return new IdentityHashMap, TypeParameterMatcher>(); - } - }; - public static TypeParameterMatcher get(final Class parameterType) { - final Map, TypeParameterMatcher> getCache = TypeParameterMatcher.getCache.get(); + final Map, TypeParameterMatcher> getCache = + InternalThreadLocalMap.get().typeParameterMatcherGetCache(); TypeParameterMatcher matcher = getCache.get(parameterType); if (matcher == null) { @@ -68,18 +60,11 @@ public abstract class TypeParameterMatcher { return matcher; } - private static final ThreadLocal, Map>> findCache = - new FastThreadLocal, Map>>() { - @Override - protected Map, Map> initialValue() { - return new IdentityHashMap, Map>(); - } - }; - public static TypeParameterMatcher find( final Object object, final Class parameterizedSuperclass, final String typeParamName) { - final Map, Map> findCache = TypeParameterMatcher.findCache.get(); + final Map, Map> findCache = + InternalThreadLocalMap.get().typeParameterMatcherFindCache(); final Class thisClass = object.getClass(); Map map = findCache.get(thisClass); diff --git a/common/src/main/java/io/netty/util/internal/UnpaddedInternalThreadLocalMap.java b/common/src/main/java/io/netty/util/internal/UnpaddedInternalThreadLocalMap.java new file mode 100644 index 0000000000..21caec99ef --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/UnpaddedInternalThreadLocalMap.java @@ -0,0 +1,57 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.internal; + +import io.netty.util.concurrent.FastThreadLocal; + +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; +import java.nio.charset.CharsetEncoder; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * The internal data structure that stores the thread-local variables for Netty and all {@link FastThreadLocal}s. + * Note that this class is for internal use only and is subject to change at any time. Use {@link FastThreadLocal} + * unless you know what you are doing. + */ +class UnpaddedInternalThreadLocalMap { + + static ThreadLocal slowThreadLocalMap; + static final AtomicInteger nextIndex = new AtomicInteger(); + + /** Used by {@link FastThreadLocal} */ + Object[] indexedVariables; + + // Core thread-locals + int futureListenerStackDepth; + int localChannelReaderStackDepth; + Map, Boolean> handlerSharableCache; + IntegerHolder counterHashCode; + ThreadLocalRandom random; + Map, TypeParameterMatcher> typeParameterMatcherGetCache; + Map, Map> typeParameterMatcherFindCache; + + // String-related thread-locals + StringBuilder stringBuilder; + Map charsetEncoderCache; + Map charsetDecoderCache; + + UnpaddedInternalThreadLocalMap(Object[] indexedVariables) { + this.indexedVariables = indexedVariables; + } +} diff --git a/common/src/main/java/io/netty/util/internal/chmv8/ConcurrentHashMapV8.java b/common/src/main/java/io/netty/util/internal/chmv8/ConcurrentHashMapV8.java index 3e27a2f1df..fa69fc55db 100644 --- a/common/src/main/java/io/netty/util/internal/chmv8/ConcurrentHashMapV8.java +++ b/common/src/main/java/io/netty/util/internal/chmv8/ConcurrentHashMapV8.java @@ -22,7 +22,8 @@ package io.netty.util.internal.chmv8; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.internal.IntegerHolder; +import io.netty.util.internal.InternalThreadLocalMap; import java.io.ObjectStreamField; import java.io.Serializable; @@ -2238,14 +2239,15 @@ public class ConcurrentHashMapV8 CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { - CounterHashCode hc; CounterCell a; long v; int m; + IntegerHolder hc; CounterCell a; long v; int m; boolean uncontended = true; - if ((hc = threadCounterHashCode.get()) == null || + InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); + if ((hc = threadLocals.counterHashCode()) == null || as == null || (m = as.length - 1) < 0 || - (a = as[m & hc.code]) == null || + (a = as[m & hc.value]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { - fullAddCount(x, hc, uncontended); + fullAddCount(threadLocals, x, hc, uncontended); return; } if (check <= 1) @@ -6030,13 +6032,6 @@ public class ConcurrentHashMapV8 */ static final int SEED_INCREMENT = 0x61c88647; - /** - * Per-thread counter hash codes. Shared across all instances. - */ - static final ThreadLocal threadCounterHashCode = - new FastThreadLocal(); - - final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; @@ -6050,17 +6045,18 @@ public class ConcurrentHashMapV8 } // See LongAdder version for explanation - private final void fullAddCount(long x, CounterHashCode hc, + private final void fullAddCount(InternalThreadLocalMap threadLocals, + long x, IntegerHolder hc, boolean wasUncontended) { int h; if (hc == null) { - hc = new CounterHashCode(); + hc = new IntegerHolder(); int s = counterHashCodeGenerator.addAndGet(SEED_INCREMENT); - h = hc.code = (s == 0) ? 1 : s; // Avoid zero - threadCounterHashCode.set(hc); + h = hc.value = (s == 0) ? 1 : s; // Avoid zero + threadLocals.setCounterHashCode(hc); } else - h = hc.code; + h = hc.value; boolean collide = false; // True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; @@ -6135,7 +6131,7 @@ public class ConcurrentHashMapV8 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } - hc.code = h; // Record index for next time + hc.value = h; // Record index for next time } // Unsafe mechanics diff --git a/common/src/main/java/io/netty/util/internal/chmv8/LongAdder.java b/common/src/main/java/io/netty/util/internal/chmv8/LongAdder.java deleted file mode 100644 index c9d0712a02..0000000000 --- a/common/src/main/java/io/netty/util/internal/chmv8/LongAdder.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - */ - -package io.netty.util.internal.chmv8; - -import java.io.Serializable; -import java.util.concurrent.atomic.AtomicLong; - -/** - * One or more variables that together maintain an initially zero - * {@code long} sum. When updates (method {@link #add}) are contended - * across threads, the set of variables may grow dynamically to reduce - * contention. Method {@link #sum} (or, equivalently, {@link - * #longValue}) returns the current total combined across the - * variables maintaining the sum. - * - *

This class is usually preferable to {@link AtomicLong} when - * multiple threads update a common sum that is used for purposes such - * as collecting statistics, not for fine-grained synchronization - * control. Under low update contention, the two classes have similar - * characteristics. But under high contention, expected throughput of - * this class is significantly higher, at the expense of higher space - * consumption. - * - *

This class extends {@link Number}, but does not define - * methods such as {@code equals}, {@code hashCode} and {@code - * compareTo} because instances are expected to be mutated, and so are - * not useful as collection keys. - * - *

jsr166e note: This class is targeted to be placed in - * java.util.concurrent.atomic. - * - * @since 1.8 - * @author Doug Lea - */ -@SuppressWarnings("all") -public class LongAdder extends Striped64 implements Serializable { - private static final long serialVersionUID = 7249069246863182397L; - - /** - * Version of plus for use in retryUpdate - */ - final long fn(long v, long x) { return v + x; } - - /** - * Creates a new adder with initial sum of zero. - */ - public LongAdder() { - } - - /** - * Adds the given value. - * - * @param x the value to add - */ - public void add(long x) { - Cell[] as; long b, v; HashCode hc; Cell a; int n; - if ((as = cells) != null || !casBase(b = base, b + x)) { - boolean uncontended = true; - int h = (hc = threadHashCode.get()).code; - if (as == null || (n = as.length) < 1 || - (a = as[(n - 1) & h]) == null || - !(uncontended = a.cas(v = a.value, v + x))) - retryUpdate(x, hc, uncontended); - } - } - - /** - * Equivalent to {@code add(1)}. - */ - public void increment() { - add(1L); - } - - /** - * Equivalent to {@code add(-1)}. - */ - public void decrement() { - add(-1L); - } - - /** - * Returns the current sum. The returned value is NOT an - * atomic snapshot; invocation in the absence of concurrent - * updates returns an accurate result, but concurrent updates that - * occur while the sum is being calculated might not be - * incorporated. - * - * @return the sum - */ - public long sum() { - long sum = base; - Cell[] as = cells; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Cell a = as[i]; - if (a != null) - sum += a.value; - } - } - return sum; - } - - /** - * Resets variables maintaining the sum to zero. This method may - * be a useful alternative to creating a new adder, but is only - * effective if there are no concurrent updates. Because this - * method is intrinsically racy, it should only be used when it is - * known that no threads are concurrently updating. - */ - public void reset() { - internalReset(0L); - } - - /** - * Equivalent in effect to {@link #sum} followed by {@link - * #reset}. This method may apply for example during quiescent - * points between multithreaded computations. If there are - * updates concurrent with this method, the returned value is - * not guaranteed to be the final value occurring before - * the reset. - * - * @return the sum - */ - public long sumThenReset() { - long sum = base; - Cell[] as = cells; - base = 0L; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Cell a = as[i]; - if (a != null) { - sum += a.value; - a.value = 0L; - } - } - } - return sum; - } - - /** - * Returns the String representation of the {@link #sum}. - * @return the String representation of the {@link #sum} - */ - public String toString() { - return Long.toString(sum()); - } - - /** - * Equivalent to {@link #sum}. - * - * @return the sum - */ - public long longValue() { - return sum(); - } - - /** - * Returns the {@link #sum} as an {@code int} after a narrowing - * primitive conversion. - */ - public int intValue() { - return (int)sum(); - } - - /** - * Returns the {@link #sum} as a {@code float} - * after a widening primitive conversion. - */ - public float floatValue() { - return (float)sum(); - } - - /** - * Returns the {@link #sum} as a {@code double} after a widening - * primitive conversion. - */ - public double doubleValue() { - return (double)sum(); - } - - private void writeObject(java.io.ObjectOutputStream s) - throws java.io.IOException { - s.defaultWriteObject(); - s.writeLong(sum()); - } - - private void readObject(java.io.ObjectInputStream s) - throws java.io.IOException, ClassNotFoundException { - s.defaultReadObject(); - busy = 0; - cells = null; - base = s.readLong(); - } - -} diff --git a/common/src/main/java/io/netty/util/internal/chmv8/Striped64.java b/common/src/main/java/io/netty/util/internal/chmv8/Striped64.java deleted file mode 100644 index 45ae9d6063..0000000000 --- a/common/src/main/java/io/netty/util/internal/chmv8/Striped64.java +++ /dev/null @@ -1,359 +0,0 @@ -/* - * Copyright 2013 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -/* - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/publicdomain/zero/1.0/ - */ - -package io.netty.util.internal.chmv8; - -import java.util.Random; - -/** - * A package-local class holding common representation and mechanics - * for classes supporting dynamic striping on 64bit values. The class - * extends Number so that concrete subclasses must publicly do so. - */ -@SuppressWarnings("all") -abstract class Striped64 extends Number { - /* - * This class maintains a lazily-initialized table of atomically - * updated variables, plus an extra "base" field. The table size - * is a power of two. Indexing uses masked per-thread hash codes. - * Nearly all declarations in this class are package-private, - * accessed directly by subclasses. - * - * Table entries are of class Cell; a variant of AtomicLong padded - * to reduce cache contention on most processors. Padding is - * overkill for most Atomics because they are usually irregularly - * scattered in memory and thus don't interfere much with each - * other. But Atomic objects residing in arrays will tend to be - * placed adjacent to each other, and so will most often share - * cache lines (with a huge negative performance impact) without - * this precaution. - * - * In part because Cells are relatively large, we avoid creating - * them until they are needed. When there is no contention, all - * updates are made to the base field. Upon first contention (a - * failed CAS on base update), the table is initialized to size 2. - * The table size is doubled upon further contention until - * reaching the nearest power of two greater than or equal to the - * number of CPUS. Table slots remain empty (null) until they are - * needed. - * - * A single spinlock ("busy") is used for initializing and - * resizing the table, as well as populating slots with new Cells. - * There is no need for a blocking lock; when the lock is not - * available, threads try other slots (or the base). During these - * retries, there is increased contention and reduced locality, - * which is still better than alternatives. - * - * Per-thread hash codes are initialized to random values. - * Contention and/or table collisions are indicated by failed - * CASes when performing an update operation (see method - * retryUpdate). Upon a collision, if the table size is less than - * the capacity, it is doubled in size unless some other thread - * holds the lock. If a hashed slot is empty, and lock is - * available, a new Cell is created. Otherwise, if the slot - * exists, a CAS is tried. Retries proceed by "double hashing", - * using a secondary hash (Marsaglia XorShift) to try to find a - * free slot. - * - * The table size is capped because, when there are more threads - * than CPUs, supposing that each thread were bound to a CPU, - * there would exist a perfect hash function mapping threads to - * slots that eliminates collisions. When we reach capacity, we - * search for this mapping by randomly varying the hash codes of - * colliding threads. Because search is random, and collisions - * only become known via CAS failures, convergence can be slow, - * and because threads are typically not bound to CPUS forever, - * may not occur at all. However, despite these limitations, - * observed contention rates are typically low in these cases. - * - * It is possible for a Cell to become unused when threads that - * once hashed to it terminate, as well as in the case where - * doubling the table causes no thread to hash to it under - * expanded mask. We do not try to detect or remove such cells, - * under the assumption that for long-running instances, observed - * contention levels will recur, so the cells will eventually be - * needed again; and for short-lived ones, it does not matter. - */ - - /** - * Padded variant of AtomicLong supporting only raw accesses plus CAS. - * The value field is placed between pads, hoping that the JVM doesn't - * reorder them. - * - * JVM intrinsics note: It would be possible to use a release-only - * form of CAS here, if it were provided. - */ - static final class Cell { - volatile long p0, p1, p2, p3, p4, p5, p6; - volatile long value; - volatile long q0, q1, q2, q3, q4, q5, q6; - Cell(long x) { value = x; } - - final boolean cas(long cmp, long val) { - return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); - } - - // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long valueOffset; - static { - try { - UNSAFE = getUnsafe(); - Class ak = Cell.class; - valueOffset = UNSAFE.objectFieldOffset - (ak.getDeclaredField("value")); - } catch (Exception e) { - throw new Error(e); - } - } - - } - - /** - * Holder for the thread-local hash code. The code is initially - * random, but may be set to a different value upon collisions. - */ - static final class HashCode { - static final Random rng = new Random(); - int code; - HashCode() { - int h = rng.nextInt(); // Avoid zero to allow xorShift rehash - code = (h == 0) ? 1 : h; - } - } - - /** - * The corresponding ThreadLocal class - */ - static final class ThreadHashCode extends ThreadLocal { - public HashCode initialValue() { return new HashCode(); } - } - - /** - * Static per-thread hash codes. Shared across all instances to - * reduce ThreadLocal pollution and because adjustments due to - * collisions in one table are likely to be appropriate for - * others. - */ - static final ThreadHashCode threadHashCode = new ThreadHashCode(); - - /** Number of CPUS, to place bound on table size */ - static final int NCPU = Runtime.getRuntime().availableProcessors(); - - /** - * Table of cells. When non-null, size is a power of 2. - */ - transient volatile Cell[] cells; - - /** - * Base value, used mainly when there is no contention, but also as - * a fallback during table initialization races. Updated via CAS. - */ - transient volatile long base; - - /** - * Spinlock (locked via CAS) used when resizing and/or creating Cells. - */ - transient volatile int busy; - - /** - * Package-private default constructor - */ - Striped64() { - } - - /** - * CASes the base field. - */ - final boolean casBase(long cmp, long val) { - return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val); - } - - /** - * CASes the busy field from 0 to 1 to acquire lock. - */ - final boolean casBusy() { - return UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1); - } - - /** - * Computes the function of current and new value. Subclasses - * should open-code this update function for most uses, but the - * virtualized form is needed within retryUpdate. - * - * @param currentValue the current value (of either base or a cell) - * @param newValue the argument from a user update call - * @return result of the update function - */ - abstract long fn(long currentValue, long newValue); - - /** - * Handles cases of updates involving initialization, resizing, - * creating new Cells, and/or contention. See above for - * explanation. This method suffers the usual non-modularity - * problems of optimistic retry code, relying on rechecked sets of - * reads. - * - * @param x the value - * @param hc the hash code holder - * @param wasUncontended false if CAS failed before call - */ - final void retryUpdate(long x, HashCode hc, boolean wasUncontended) { - int h = hc.code; - boolean collide = false; // True if last slot nonempty - for (;;) { - Cell[] as; Cell a; int n; long v; - if ((as = cells) != null && (n = as.length) > 0) { - if ((a = as[(n - 1) & h]) == null) { - if (busy == 0) { // Try to attach new Cell - Cell r = new Cell(x); // Optimistically create - if (busy == 0 && casBusy()) { - boolean created = false; - try { // Recheck under lock - Cell[] rs; int m, j; - if ((rs = cells) != null && - (m = rs.length) > 0 && - rs[j = (m - 1) & h] == null) { - rs[j] = r; - created = true; - } - } finally { - busy = 0; - } - if (created) - break; - continue; // Slot is now non-empty - } - } - collide = false; - } - else if (!wasUncontended) // CAS already known to fail - wasUncontended = true; // Continue after rehash - else if (a.cas(v = a.value, fn(v, x))) - break; - else if (n >= NCPU || cells != as) - collide = false; // At max size or stale - else if (!collide) - collide = true; - else if (busy == 0 && casBusy()) { - try { - if (cells == as) { // Expand table unless stale - Cell[] rs = new Cell[n << 1]; - for (int i = 0; i < n; ++i) - rs[i] = as[i]; - cells = rs; - } - } finally { - busy = 0; - } - collide = false; - continue; // Retry with expanded table - } - h ^= h << 13; // Rehash - h ^= h >>> 17; - h ^= h << 5; - } - else if (busy == 0 && cells == as && casBusy()) { - boolean init = false; - try { // Initialize table - if (cells == as) { - Cell[] rs = new Cell[2]; - rs[h & 1] = new Cell(x); - cells = rs; - init = true; - } - } finally { - busy = 0; - } - if (init) - break; - } - else if (casBase(v = base, fn(v, x))) - break; // Fall back on using base - } - hc.code = h; // Record index for next time - } - - - /** - * Sets base and all cells to the given value. - */ - final void internalReset(long initialValue) { - Cell[] as = cells; - base = initialValue; - if (as != null) { - int n = as.length; - for (int i = 0; i < n; ++i) { - Cell a = as[i]; - if (a != null) - a.value = initialValue; - } - } - } - - // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long baseOffset; - private static final long busyOffset; - static { - try { - UNSAFE = getUnsafe(); - Class sk = Striped64.class; - baseOffset = UNSAFE.objectFieldOffset - (sk.getDeclaredField("base")); - busyOffset = UNSAFE.objectFieldOffset - (sk.getDeclaredField("busy")); - } catch (Exception e) { - throw new Error(e); - } - } - - /** - * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. - * Replace with a simple call to Unsafe.getUnsafe when integrating - * into a jdk. - * - * @return a sun.misc.Unsafe - */ - private static sun.misc.Unsafe getUnsafe() { - try { - return sun.misc.Unsafe.getUnsafe(); - } catch (SecurityException tryReflectionInstead) {} - try { - return java.security.AccessController.doPrivileged - (new java.security.PrivilegedExceptionAction() { - public sun.misc.Unsafe run() throws Exception { - Class k = sun.misc.Unsafe.class; - for (java.lang.reflect.Field f : k.getDeclaredFields()) { - f.setAccessible(true); - Object x = f.get(null); - if (k.isInstance(x)) - return k.cast(x); - } - throw new NoSuchFieldError("the Unsafe"); - }}); - } catch (java.security.PrivilegedActionException e) { - throw new RuntimeException("Could not initialize intrinsics", - e.getCause()); - } - } -} diff --git a/common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java b/common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java index b637f76fa7..9dd088bac7 100644 --- a/common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java +++ b/common/src/test/java/io/netty/util/ThreadDeathWatcherTest.java @@ -20,6 +20,7 @@ import org.junit.Test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.*; @@ -27,7 +28,7 @@ import static org.junit.Assert.*; public class ThreadDeathWatcherTest { @Test(timeout = 10000) - public void testSimple() throws Exception { + public void testWatch() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final Thread t = new Thread() { @Override @@ -70,4 +71,46 @@ public class ThreadDeathWatcherTest { // The task must be run on termination. latch.await(); } + + @Test(timeout = 10000) + public void testUnwatch() throws Exception { + final AtomicBoolean run = new AtomicBoolean(); + final Thread t = new Thread() { + @Override + public void run() { + for (;;) { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + break; + } + } + } + }; + + final Runnable task = new Runnable() { + @Override + public void run() { + run.set(true); + } + }; + + t.start(); + + // Watch and then unwatch. + ThreadDeathWatcher.watch(t, task); + ThreadDeathWatcher.unwatch(t, task); + + // Interrupt the thread to terminate it. + t.interrupt(); + + // Wait until the thread dies. + t.join(); + + // Wait until the watcher thread terminates itself. + assertThat(ThreadDeathWatcher.awaitInactivity(Long.MAX_VALUE, TimeUnit.SECONDS), is(true)); + + // And the task should not run. + assertThat(run.get(), is(false)); + } } diff --git a/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java b/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java new file mode 100644 index 0000000000..2a758781f0 --- /dev/null +++ b/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.util.concurrent; + +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.*; + +public class FastThreadLocalTest { + @Before + public void setUp() { + FastThreadLocal.removeAll(); + assertThat(FastThreadLocal.size(), is(0)); + } + + @Test(timeout = 10000) + public void testRemoveAll() throws Exception { + final AtomicBoolean removed = new AtomicBoolean(); + final FastThreadLocal var = new FastThreadLocal() { + @Override + protected void onRemoval(Boolean value) { + removed.set(true); + } + }; + + // Initialize a thread-local variable. + assertThat(var.get(), is(nullValue())); + assertThat(FastThreadLocal.size(), is(1)); + + // And then remove it. + FastThreadLocal.removeAll(); + assertThat(removed.get(), is(true)); + assertThat(FastThreadLocal.size(), is(0)); + } + + @Test(timeout = 10000) + public void testRemoveAllFromFTLThread() throws Throwable { + final AtomicReference throwable = new AtomicReference(); + final Thread thread = new FastThreadLocalThread() { + @Override + public void run() { + try { + testRemoveAll(); + } catch (Throwable t) { + throwable.set(t); + } + } + }; + + thread.start(); + thread.join(); + + Throwable t = throwable.get(); + if (t != null) { + throw t; + } + } +} diff --git a/handler/src/main/java/io/netty/handler/ssl/util/FingerprintTrustManagerFactory.java b/handler/src/main/java/io/netty/handler/ssl/util/FingerprintTrustManagerFactory.java index bf1d3cde49..6439528ebe 100644 --- a/handler/src/main/java/io/netty/handler/ssl/util/FingerprintTrustManagerFactory.java +++ b/handler/src/main/java/io/netty/handler/ssl/util/FingerprintTrustManagerFactory.java @@ -19,7 +19,7 @@ package io.netty.handler.ssl.util; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.util.internal.EmptyArrays; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocal; import javax.net.ssl.ManagerFactoryParameters; import javax.net.ssl.TrustManager; @@ -68,7 +68,7 @@ public final class FingerprintTrustManagerFactory extends SimpleTrustManagerFact private static final int SHA1_BYTE_LEN = 20; private static final int SHA1_HEX_LEN = SHA1_BYTE_LEN * 2; - private static final ThreadLocal tlmd = new FastThreadLocal() { + private static final FastThreadLocal tlmd = new FastThreadLocal() { @Override protected MessageDigest initialValue() { try { diff --git a/handler/src/main/java/io/netty/handler/ssl/util/SimpleTrustManagerFactory.java b/handler/src/main/java/io/netty/handler/ssl/util/SimpleTrustManagerFactory.java index 7ac372a12f..05fbc46151 100644 --- a/handler/src/main/java/io/netty/handler/ssl/util/SimpleTrustManagerFactory.java +++ b/handler/src/main/java/io/netty/handler/ssl/util/SimpleTrustManagerFactory.java @@ -16,7 +16,7 @@ package io.netty.handler.ssl.util; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocal; import javax.net.ssl.ManagerFactoryParameters; import javax.net.ssl.TrustManager; @@ -44,7 +44,7 @@ public abstract class SimpleTrustManagerFactory extends TrustManagerFactory { * * To work around this issue, we use an ugly hack which uses a {@link ThreadLocal}. */ - private static final ThreadLocal CURRENT_SPI = + private static final FastThreadLocal CURRENT_SPI = new FastThreadLocal() { @Override protected SimpleTrustManagerFactorySpi initialValue() { diff --git a/microbench/pom.xml b/microbench/pom.xml index be9b2e2782..8cbd30173a 100644 --- a/microbench/pom.xml +++ b/microbench/pom.xml @@ -47,12 +47,12 @@ org.openjdk.jmh jmh-core - 0.8 + 0.9 org.openjdk.jmh jmh-generator-annprocess - 0.8 + 0.9 provided diff --git a/microbench/src/test/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java b/microbench/src/test/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java index 9158855f85..a7295118ad 100644 --- a/microbench/src/test/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java +++ b/microbench/src/test/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java @@ -20,7 +20,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.microbench.util.AbstractMicrobenchmark; -import org.openjdk.jmh.annotations.GenerateMicroBenchmark; +import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Param; import java.util.Random; @@ -44,7 +44,7 @@ public class ByteBufAllocatorBenchmark extends AbstractMicrobenchmark { @Param({ "00000", "00256", "01024", "04096", "16384", "65536" }) public int size; - @GenerateMicroBenchmark + @Benchmark public void unpooledHeapAllocAndFree() { int idx = rand.nextInt(unpooledHeapBuffers.length); ByteBuf oldBuf = unpooledHeapBuffers[idx]; @@ -54,7 +54,7 @@ public class ByteBufAllocatorBenchmark extends AbstractMicrobenchmark { unpooledHeapBuffers[idx] = unpooledAllocator.heapBuffer(size); } - @GenerateMicroBenchmark + @Benchmark public void unpooledDirectAllocAndFree() { int idx = rand.nextInt(unpooledDirectBuffers.length); ByteBuf oldBuf = unpooledDirectBuffers[idx]; @@ -64,7 +64,7 @@ public class ByteBufAllocatorBenchmark extends AbstractMicrobenchmark { unpooledDirectBuffers[idx] = unpooledAllocator.directBuffer(size); } - @GenerateMicroBenchmark + @Benchmark public void pooledHeapAllocAndFree() { int idx = rand.nextInt(pooledHeapBuffers.length); ByteBuf oldBuf = pooledHeapBuffers[idx]; @@ -74,7 +74,7 @@ public class ByteBufAllocatorBenchmark extends AbstractMicrobenchmark { pooledHeapBuffers[idx] = pooledAllocator.heapBuffer(size); } - @GenerateMicroBenchmark + @Benchmark public void pooledDirectAllocAndFree() { int idx = rand.nextInt(pooledDirectBuffers.length); ByteBuf oldBuf = pooledDirectBuffers[idx]; @@ -84,13 +84,13 @@ public class ByteBufAllocatorBenchmark extends AbstractMicrobenchmark { pooledDirectBuffers[idx] = pooledAllocator.directBuffer(size); } - @GenerateMicroBenchmark + @Benchmark public void defaultPooledHeapAllocAndFree() { ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(size); buffer.release(); } - @GenerateMicroBenchmark + @Benchmark public void defaultPooledDirectAllocAndFree() { ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size); buffer.release(); diff --git a/microbench/src/test/java/io/netty/microbench/buffer/SwappedByteBufBenchmark.java b/microbench/src/test/java/io/netty/microbench/buffer/SwappedByteBufBenchmark.java index e359a6c23c..587f8021c5 100644 --- a/microbench/src/test/java/io/netty/microbench/buffer/SwappedByteBufBenchmark.java +++ b/microbench/src/test/java/io/netty/microbench/buffer/SwappedByteBufBenchmark.java @@ -19,7 +19,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.SwappedByteBuf; import io.netty.buffer.Unpooled; import io.netty.microbench.util.AbstractMicrobenchmark; -import org.openjdk.jmh.annotations.GenerateMicroBenchmark; +import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; @@ -48,32 +48,32 @@ public class SwappedByteBufBenchmark extends AbstractMicrobenchmark { @Param("16384") public int size; - @GenerateMicroBenchmark + @Benchmark public void swappedByteBufSetInt() { swappedByteBuf.setLong(0, size); } - @GenerateMicroBenchmark + @Benchmark public void swappedByteBufSetShort() { swappedByteBuf.setShort(0, size); } - @GenerateMicroBenchmark + @Benchmark public void swappedByteBufSetLong() { swappedByteBuf.setLong(0, size); } - @GenerateMicroBenchmark + @Benchmark public void unsafeSwappedByteBufSetInt() { unsafeSwappedByteBuf.setInt(0, size); } - @GenerateMicroBenchmark + @Benchmark public void unsafeSwappedByteBufSetShort() { unsafeSwappedByteBuf.setShort(0, size); } - @GenerateMicroBenchmark + @Benchmark public void unsafeSwappedByteBufSetLong() { unsafeSwappedByteBuf.setLong(0, size); } diff --git a/microbench/src/test/java/io/netty/microbench/concurrent/FastThreadLocalBenchmark.java b/microbench/src/test/java/io/netty/microbench/concurrent/FastThreadLocalBenchmark.java new file mode 100644 index 0000000000..2993a08be5 --- /dev/null +++ b/microbench/src/test/java/io/netty/microbench/concurrent/FastThreadLocalBenchmark.java @@ -0,0 +1,77 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.microbench.concurrent; + +import io.netty.microbench.util.AbstractMicrobenchmark; +import io.netty.util.concurrent.FastThreadLocal; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Threads; + +import java.util.Random; + +/** + * This class benchmarks different allocators with different allocation sizes. + */ +@Threads(4) +@Measurement(iterations = 10, batchSize = 100) +public class FastThreadLocalBenchmark extends AbstractMicrobenchmark { + + private static final Random rand = new Random(); + + @SuppressWarnings("unchecked") + private static final ThreadLocal[] jdkThreadLocals = new ThreadLocal[128]; + @SuppressWarnings("unchecked") + private static final FastThreadLocal[] fastThreadLocals = new FastThreadLocal[jdkThreadLocals.length]; + + static { + for (int i = 0; i < jdkThreadLocals.length; i ++) { + jdkThreadLocals[i] = new ThreadLocal() { + @Override + protected Integer initialValue() { + return rand.nextInt(); + } + }; + } + + for (int i = 0; i < fastThreadLocals.length; i ++) { + fastThreadLocals[i] = new FastThreadLocal() { + @Override + protected Integer initialValue() { + return rand.nextInt(); + } + }; + } + } + + @Benchmark + public int jdkThreadLocalGet() { + int result = 0; + for (ThreadLocal i: jdkThreadLocals) { + result += i.get(); + } + return result; + } + + @Benchmark + public int fastThreadLocal() { + int result = 0; + for (FastThreadLocal i: fastThreadLocals) { + result += i.get(); + } + return result; + } +} diff --git a/microbench/src/test/java/io/netty/microbench/internal/RecyclableArrayListBenchmark.java b/microbench/src/test/java/io/netty/microbench/internal/RecyclableArrayListBenchmark.java index e10599fed5..b6be233c5d 100644 --- a/microbench/src/test/java/io/netty/microbench/internal/RecyclableArrayListBenchmark.java +++ b/microbench/src/test/java/io/netty/microbench/internal/RecyclableArrayListBenchmark.java @@ -17,7 +17,7 @@ package io.netty.microbench.internal; import io.netty.microbench.util.AbstractMicrobenchmark; import io.netty.util.internal.RecyclableArrayList; -import org.openjdk.jmh.annotations.GenerateMicroBenchmark; +import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Threads; @@ -32,7 +32,7 @@ public class RecyclableArrayListBenchmark extends AbstractMicrobenchmark { @Param({ "00000", "00256", "01024", "04096", "16384", "65536" }) public int size; - @GenerateMicroBenchmark + @Benchmark public void recycleSameThread() { RecyclableArrayList list = RecyclableArrayList.newInstance(size); list.recycle(); diff --git a/microbench/src/test/java/io/netty/microbench/util/AbstractMicrobenchmark.java b/microbench/src/test/java/io/netty/microbench/util/AbstractMicrobenchmark.java index d9ab838e7c..ec8e989c79 100644 --- a/microbench/src/test/java/io/netty/microbench/util/AbstractMicrobenchmark.java +++ b/microbench/src/test/java/io/netty/microbench/util/AbstractMicrobenchmark.java @@ -24,7 +24,7 @@ import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; -import org.openjdk.jmh.output.results.ResultFormatType; +import org.openjdk.jmh.results.format.ResultFormatType; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; import org.openjdk.jmh.runner.options.OptionsBuilder; diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index 04587a2fc0..70a21f1a33 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -16,9 +16,9 @@ package io.netty.channel; -import java.net.SocketAddress; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.internal.InternalThreadLocalMap; +import java.net.SocketAddress; import java.util.Map; import java.util.WeakHashMap; @@ -27,23 +27,6 @@ import java.util.WeakHashMap; */ public class ChannelHandlerAdapter implements ChannelHandler { - /** - * Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a - * {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different - * {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of - * {@link Thread}s are quite limited anyway. - * - * See #2289. - */ - private static final ThreadLocal, Boolean>> SHARABLE_CACHE = - new FastThreadLocal, Boolean>>() { - @Override - protected Map, Boolean> initialValue() { - // Start with small capacity to keep memory overhead as low as possible. - return new WeakHashMap, Boolean>(4); - } - }; - // Not using volatile because it's used only for a sanity check. boolean added; @@ -52,8 +35,16 @@ public class ChannelHandlerAdapter implements ChannelHandler { * to different {@link ChannelPipeline}s. */ public boolean isSharable() { + /** + * Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a + * {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different + * {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of + * {@link Thread}s are quite limited anyway. + * + * See #2289. + */ Class clazz = getClass(); - Map, Boolean> cache = SHARABLE_CACHE.get(); + Map, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null) { sharable = clazz.isAnnotationPresent(Sharable.class); diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 12305a8b47..eac78cb7b8 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -28,7 +28,7 @@ import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventLoop; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.SingleThreadEventExecutor; -import io.netty.util.internal.FastThreadLocal; +import io.netty.util.internal.InternalThreadLocalMap; import java.net.SocketAddress; import java.nio.channels.AlreadyConnectedException; @@ -49,12 +49,6 @@ public class LocalChannel extends AbstractChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final int MAX_READER_STACK_DEPTH = 8; - private static final ThreadLocal READER_STACK_DEPTH = new FastThreadLocal() { - @Override - protected Integer initialValue() { - return 0; - } - }; private final ChannelConfig config = new DefaultChannelConfig(this); private final Queue inboundBuffer = new ArrayDeque(); @@ -260,9 +254,10 @@ public class LocalChannel extends AbstractChannel { return; } - final Integer stackDepth = READER_STACK_DEPTH.get(); + final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); + final Integer stackDepth = threadLocals.localChannelReaderStackDepth(); if (stackDepth < MAX_READER_STACK_DEPTH) { - READER_STACK_DEPTH.set(stackDepth + 1); + threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1); try { for (;;) { Object received = inboundBuffer.poll(); @@ -273,7 +268,7 @@ public class LocalChannel extends AbstractChannel { } pipeline.fireChannelReadComplete(); } finally { - READER_STACK_DEPTH.set(stackDepth); + threadLocals.setLocalChannelReaderStackDepth(stackDepth); } } else { eventLoop().execute(readTask);