From 1ac2ff8d7bee9928ade1092f6943d85d96012052 Mon Sep 17 00:00:00 2001 From: belliottsmith Date: Mon, 9 Jun 2014 01:18:46 +0100 Subject: [PATCH] Introduce FastThreadLocal which uses an EnumMap and a predefined fixed set of possible thread locals Motivation: Provide a faster ThreadLocal implementation Modification: Add a "FastThreadLocal" which uses an EnumMap and a predefined fixed set of possible thread locals (all of the static instances created by netty) that is around 10-20% faster than standard ThreadLocal in my benchmarks (and can be seen having an effect in the direct PooledByteBufAllocator benchmark that uses the DEFAULT ByteBufAllocator which uses this FastThreadLocal, as opposed to normal instantiations that do not, and in the new RecyclableArrayList benchmark); Result: Improved performance --- .../netty/buffer/PooledByteBufAllocator.java | 26 ++- .../io/netty/buffer/PooledDirectByteBuf.java | 4 +- .../io/netty/buffer/PooledHeapByteBuf.java | 4 +- .../buffer/PooledUnsafeDirectByteBuf.java | 4 +- .../src/main/java/io/netty/util/Recycler.java | 28 +-- .../netty/util/concurrent/DefaultPromise.java | 3 +- .../util/concurrent/DefaultThreadFactory.java | 2 +- .../util/concurrent/FastThreadLocal.java | 176 ++++++++++++++++++ .../io/netty/util/internal/PendingWrite.java | 6 +- .../util/internal/RecyclableArrayList.java | 4 +- .../buffer/ByteBufAllocatorBenchmark.java | 12 ++ .../RecyclableArrayListBenchmark.java | 40 ++++ .../util/AbstractMicrobenchmark.java | 16 +- .../AbstractChannelHandlerContext.java | 10 +- .../netty/channel/ChannelOutboundBuffer.java | 7 +- .../io/netty/channel/local/LocalChannel.java | 4 +- 16 files changed, 315 insertions(+), 31 deletions(-) create mode 100644 common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java create mode 100644 microbench/src/test/java/io/netty/microbench/internal/RecyclableArrayListBenchmark.java diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index e0184acca4..597b149fe8 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -17,6 +17,7 @@ package io.netty.buffer; import io.netty.util.ThreadDeathWatcher; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -118,7 +119,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { } public static final PooledByteBufAllocator DEFAULT = - new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); + new PooledByteBufAllocator(FastThreadLocal.Type.PooledByteBufAllocator_DefaultAllocator); private final PoolArena[] heapArenas; private final PoolArena[] directArenas; @@ -147,8 +148,21 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize, int smallCacheSize, int normalCacheSize) { + this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, tinyCacheSize, smallCacheSize, + normalCacheSize, null); + } + + private PooledByteBufAllocator(FastThreadLocal.Type fastThreadLocalType) { + this(PlatformDependent.directBufferPreferred(), + DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER, + DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE, fastThreadLocalType); + } + + private PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder, + int tinyCacheSize, int smallCacheSize, int normalCacheSize, + FastThreadLocal.Type fastThreadLocalType) { super(preferDirect); - threadCache = new PoolThreadLocalCache(); + threadCache = new PoolThreadLocalCache(fastThreadLocalType); this.tinyCacheSize = tinyCacheSize; this.smallCacheSize = smallCacheSize; this.normalCacheSize = normalCacheSize; @@ -188,7 +202,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { int tinyCacheSize, int smallCacheSize, int normalCacheSize, long cacheThreadAliveCheckInterval) { this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder, - tinyCacheSize, smallCacheSize, normalCacheSize); + tinyCacheSize, smallCacheSize, normalCacheSize, null); } @SuppressWarnings("unchecked") @@ -282,10 +296,14 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { threadCache.free(); } - final class PoolThreadLocalCache extends ThreadLocal { + final class PoolThreadLocalCache extends FastThreadLocal { private final AtomicInteger index = new AtomicInteger(); private boolean initialized; + PoolThreadLocalCache(FastThreadLocal.Type fastThreadLocalType) { + super(fastThreadLocalType); + } + @Override protected PoolThreadCache initialValue() { final int idx = index.getAndIncrement(); diff --git a/buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java index f8ec15ddac..fbc225084b 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/PooledDirectByteBuf.java @@ -17,6 +17,7 @@ package io.netty.buffer; import io.netty.util.Recycler; +import io.netty.util.concurrent.FastThreadLocal; import java.io.IOException; import java.io.InputStream; @@ -28,7 +29,8 @@ import java.nio.channels.ScatteringByteChannel; final class PooledDirectByteBuf extends PooledByteBuf { - private static final Recycler RECYCLER = new Recycler() { + private static final Recycler RECYCLER + = new Recycler(FastThreadLocal.Type.PooledDirectByteBuf_Recycler) { @Override protected PooledDirectByteBuf newObject(Handle handle) { return new PooledDirectByteBuf(handle, 0); diff --git a/buffer/src/main/java/io/netty/buffer/PooledHeapByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledHeapByteBuf.java index 1eb445b04e..7453558342 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledHeapByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/PooledHeapByteBuf.java @@ -15,6 +15,7 @@ package io.netty.buffer; import io.netty.util.Recycler; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.PlatformDependent; import java.io.IOException; @@ -27,7 +28,8 @@ import java.nio.channels.ScatteringByteChannel; final class PooledHeapByteBuf extends PooledByteBuf { - private static final Recycler RECYCLER = new Recycler() { + private static final Recycler RECYCLER + = new Recycler(FastThreadLocal.Type.PooledHeapByteBuf_Recycler) { @Override protected PooledHeapByteBuf newObject(Handle handle) { return new PooledHeapByteBuf(handle, 0); diff --git a/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java index d79b637ee6..67373bb35c 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java @@ -17,6 +17,7 @@ package io.netty.buffer; import io.netty.util.Recycler; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.PlatformDependent; import java.io.IOException; @@ -32,7 +33,8 @@ final class PooledUnsafeDirectByteBuf extends PooledByteBuf { private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; - private static final Recycler RECYCLER = new Recycler() { + private static final Recycler RECYCLER + = new Recycler(FastThreadLocal.Type.PooledUnsafeDirectByteBuf_Recycler) { @Override protected PooledUnsafeDirectByteBuf newObject(Handle handle) { return new PooledUnsafeDirectByteBuf(handle, 0); diff --git a/common/src/main/java/io/netty/util/Recycler.java b/common/src/main/java/io/netty/util/Recycler.java index d257e94f3b..81d66b0f29 100644 --- a/common/src/main/java/io/netty/util/Recycler.java +++ b/common/src/main/java/io/netty/util/Recycler.java @@ -23,6 +23,8 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.IdentityHashMap; import java.util.Map; +import io.netty.util.concurrent.FastThreadLocal; + /** * Light-weight object pool based on a thread-local stack. * @@ -54,23 +56,21 @@ public abstract class Recycler { } private final int maxCapacity; + private final FastThreadLocal> threadLocal; - private final ThreadLocal> threadLocal = new ThreadLocal>() { - @Override - protected Stack initialValue() { - return new Stack(Recycler.this, Thread.currentThread(), maxCapacity); - } - }; - - protected Recycler() { - this(DEFAULT_MAX_CAPACITY); + protected Recycler(FastThreadLocal.Type type) { + this(DEFAULT_MAX_CAPACITY, type); } - protected Recycler(int maxCapacity) { - if (maxCapacity <= 0) { - maxCapacity = 0; - } - this.maxCapacity = maxCapacity; + public Recycler(int maxCapacity, FastThreadLocal.Type type) { + super(); + this.maxCapacity = Math.max(0, maxCapacity); + threadLocal = new FastThreadLocal>(type) { + @Override + protected Stack initialValue() { + return new Stack(Recycler.this, Thread.currentThread(), Recycler.this.maxCapacity); + } + }; } public final T get() { diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index bd6e328a20..f0dd4e9361 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -35,7 +35,8 @@ public class DefaultPromise extends AbstractFuture implements Promise { InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution"); private static final int MAX_LISTENER_STACK_DEPTH = 8; - private static final ThreadLocal LISTENER_STACK_DEPTH = new ThreadLocal() { + private static final FastThreadLocal LISTENER_STACK_DEPTH + = new FastThreadLocal(FastThreadLocal.Type.DefaultPromise_ListenerStackDepth) { @Override protected Integer initialValue() { return 0; diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java b/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java index 3142a1b029..27f578e912 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultThreadFactory.java @@ -98,7 +98,7 @@ public class DefaultThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { - Thread t = new Thread(r, prefix + nextId.incrementAndGet()); + Thread t = new FastThreadLocal.FastThreadLocalThread(r, prefix + nextId.incrementAndGet()); try { if (t.isDaemon()) { if (!daemon) { diff --git a/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java b/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java new file mode 100644 index 0000000000..32ce65fb9e --- /dev/null +++ b/common/src/main/java/io/netty/util/concurrent/FastThreadLocal.java @@ -0,0 +1,176 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import java.util.EnumMap; + +/** + * A simple class providing equivalent functionality to java.lang,ThreadLocal, but operating + * over a predefined hash range, so that we can hash perfectly. This permits less indirection + * and offers a slight performance improvement, so is useful when invoked frequently. + * + * The fast path is only possible on threads that extend FastThreadLocalThread, as this class + * stores the necessary state. Access by any other kind of thread falls back to a regular ThreadLocal + * + * @param + */ +public class FastThreadLocal { + + public static enum Type { + LocalChannel_ReaderStackDepth, + PooledDirectByteBuf_Recycler, + PooledHeapByteBuf_Recycler, + PooledUnsafeDirectByteBuf_Recycler, + ChannelOutboundBuffer_Recycler, + ChannelOutboundBuffer_PooledByteBuf_Recycler, + DefaultChannelHandlerContext_WriteAndFlushTask_Recycler, + DefaultChannelHandlerContext_WriteTask_Recycler, + PendingWrite_Recycler, + RecyclableArrayList_Recycler, + DefaultPromise_ListenerStackDepth, + PooledByteBufAllocator_DefaultAllocator + } + + // the set of already defined FastThreadLocals + private static final EnumMap SET = new EnumMap(Type.class); + // the type values, for cheap iteration + private static final Type[] TYPES = Type.values(); + // a marker to indicate a value has not yet been initialised + private static final Object EMPTY = new Object(); + + /** + * To utilise the FastThreadLocal fast-path, all threads accessing a FastThreadLocal must extend this class + */ + public static class FastThreadLocalThread extends Thread { + + private final EnumMap lookup = initialMap(); + + static EnumMap initialMap() { + EnumMap r = new EnumMap(Type.class); + for (Type type : TYPES) { + r.put(type, EMPTY); + } + return r; + } + + public FastThreadLocalThread() { + super(); + } + + public FastThreadLocalThread(Runnable target) { + super(target); + } + + public FastThreadLocalThread(ThreadGroup group, Runnable target) { + super(group, target); + } + + public FastThreadLocalThread(String name) { + super(name); + } + + public FastThreadLocalThread(ThreadGroup group, String name) { + super(group, name); + } + + public FastThreadLocalThread(Runnable target, String name) { + super(target, name); + } + + public FastThreadLocalThread(ThreadGroup group, Runnable target, String name) { + super(group, target, name); + } + + public FastThreadLocalThread(ThreadGroup group, Runnable target, String name, long stackSize) { + super(group, target, name, stackSize); + } + } + + final Type type; + final ThreadLocal fallback = new ThreadLocal() { + protected V initialValue() { + return FastThreadLocal.this.initialValue(); + } + }; + + /** + * @param type the predefined type this FastThreadLocal represents; each type may be used only once + * globally in a single VM + */ + public FastThreadLocal(Type type) { + if (type != null) { + synchronized (SET) { + if (SET.put(type, Boolean.TRUE) != null) { + throw new IllegalStateException(type + " has been assigned multiple times"); + } + } + } + this.type = type; + } + + /** + * Override this method to define the default value to assign + * when a thread first calls get() without a preceding set() + * + * @return the initial value + */ + protected V initialValue() { + return null; + } + + /** + * Set the value for the current thread + * @param value + */ + public void set(V value) { + Thread thread = Thread.currentThread(); + if (type == null || !(thread instanceof FastThreadLocalThread)) { + fallback.set(value); + return; + } + EnumMap lookup = ((FastThreadLocalThread) thread).lookup; + lookup.put(type, value); + } + + /** + * Sets the value to uninitialized; a proceeding call to get() will trigger a call to initialValue() + */ + public void remove() { + Thread thread = Thread.currentThread(); + if (type == null || !(thread instanceof FastThreadLocalThread)) { + fallback.remove(); + return; + } + EnumMap lookup = ((FastThreadLocalThread) thread).lookup; + lookup.put(type, EMPTY); + } + + /** + * @return the current value for the current thread + */ + public final V get() { + Thread thread = Thread.currentThread(); + if (type == null || !(thread instanceof FastThreadLocalThread)) { + return fallback.get(); + } + EnumMap lookup = ((FastThreadLocalThread) thread).lookup; + Object v = lookup.get(type); + if (v == EMPTY) { + lookup.put(type, v = initialValue()); + } + return (V) v; + } +} diff --git a/common/src/main/java/io/netty/util/internal/PendingWrite.java b/common/src/main/java/io/netty/util/internal/PendingWrite.java index c731dab60f..269ed7370b 100644 --- a/common/src/main/java/io/netty/util/internal/PendingWrite.java +++ b/common/src/main/java/io/netty/util/internal/PendingWrite.java @@ -17,13 +17,17 @@ package io.netty.util.internal; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.Promise; +import static io.netty.util.concurrent.FastThreadLocal.Type.PendingWrite_Recycler; + /** * Some pending write which should be picked up later. */ public final class PendingWrite { - private static final Recycler RECYCLER = new Recycler() { + private static final Recycler RECYCLER + = new Recycler(PendingWrite_Recycler) { @Override protected PendingWrite newObject(Handle handle) { return new PendingWrite(handle); diff --git a/common/src/main/java/io/netty/util/internal/RecyclableArrayList.java b/common/src/main/java/io/netty/util/internal/RecyclableArrayList.java index 2f9668d497..0e1ea1b097 100644 --- a/common/src/main/java/io/netty/util/internal/RecyclableArrayList.java +++ b/common/src/main/java/io/netty/util/internal/RecyclableArrayList.java @@ -18,6 +18,7 @@ package io.netty.util.internal; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; +import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.Collection; @@ -33,7 +34,8 @@ public final class RecyclableArrayList extends ArrayList { private static final int DEFAULT_INITIAL_CAPACITY = 8; - private static final Recycler RECYCLER = new Recycler() { + private static final Recycler RECYCLER + = new Recycler(FastThreadLocal.Type.RecyclableArrayList_Recycler) { @Override protected RecyclableArrayList newObject(Handle handle) { return new RecyclableArrayList(handle); diff --git a/microbench/src/test/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java b/microbench/src/test/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java index aa361aff13..9158855f85 100644 --- a/microbench/src/test/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java +++ b/microbench/src/test/java/io/netty/microbench/buffer/ByteBufAllocatorBenchmark.java @@ -83,4 +83,16 @@ public class ByteBufAllocatorBenchmark extends AbstractMicrobenchmark { } pooledDirectBuffers[idx] = pooledAllocator.directBuffer(size); } + + @GenerateMicroBenchmark + public void defaultPooledHeapAllocAndFree() { + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(size); + buffer.release(); + } + + @GenerateMicroBenchmark + public void defaultPooledDirectAllocAndFree() { + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size); + buffer.release(); + } } diff --git a/microbench/src/test/java/io/netty/microbench/internal/RecyclableArrayListBenchmark.java b/microbench/src/test/java/io/netty/microbench/internal/RecyclableArrayListBenchmark.java new file mode 100644 index 0000000000..e10599fed5 --- /dev/null +++ b/microbench/src/test/java/io/netty/microbench/internal/RecyclableArrayListBenchmark.java @@ -0,0 +1,40 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.microbench.internal; + +import io.netty.microbench.util.AbstractMicrobenchmark; +import io.netty.util.internal.RecyclableArrayList; +import org.openjdk.jmh.annotations.GenerateMicroBenchmark; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Threads; + +/** + * This class benchmarks different allocators with different allocation sizes. + */ +@Threads(4) +@Measurement(iterations = 10, batchSize = 100) +public class RecyclableArrayListBenchmark extends AbstractMicrobenchmark { + + @Param({ "00000", "00256", "01024", "04096", "16384", "65536" }) + public int size; + + @GenerateMicroBenchmark + public void recycleSameThread() { + RecyclableArrayList list = RecyclableArrayList.newInstance(size); + list.recycle(); + } +} diff --git a/microbench/src/test/java/io/netty/microbench/util/AbstractMicrobenchmark.java b/microbench/src/test/java/io/netty/microbench/util/AbstractMicrobenchmark.java index 82adf78133..d9ab838e7c 100644 --- a/microbench/src/test/java/io/netty/microbench/util/AbstractMicrobenchmark.java +++ b/microbench/src/test/java/io/netty/microbench/util/AbstractMicrobenchmark.java @@ -16,6 +16,7 @@ package io.netty.microbench.util; import io.netty.util.ResourceLeakDetector; +import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.internal.SystemPropertyUtil; import org.junit.Test; import org.openjdk.jmh.annotations.Fork; @@ -29,6 +30,9 @@ import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; import org.openjdk.jmh.runner.options.OptionsBuilder; import java.io.File; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Base class for all JMH benchmarks. @@ -43,11 +47,21 @@ public class AbstractMicrobenchmark { protected static final int DEFAULT_MEASURE_ITERATIONS = 10; protected static final int DEFAULT_FORKS = 2; + public static final class HarnessExecutor extends ThreadPoolExecutor { + public HarnessExecutor(int maxThreads, String prefix) { + super(0, maxThreads, 1L, TimeUnit.DAYS, new SynchronousQueue(), + new DefaultThreadFactory(prefix)); + System.out.println("Using harness executor"); + } + } + protected static final String[] JVM_ARGS = { "-server", "-dsa", "-da", "-ea:io.netty...", "-Xms768m", "-Xmx768m", "-XX:MaxDirectMemorySize=768m", "-XX:+AggressiveOpts", "-XX:+UseBiasedLocking", "-XX:+UseFastAccessorMethods", "-XX:+UseStringCache", "-XX:+OptimizeStringConcat", - "-XX:+HeapDumpOnOutOfMemoryError", "-Dio.netty.noResourceLeakDetection" + "-XX:+HeapDumpOnOutOfMemoryError", "-Dio.netty.noResourceLeakDetection", + "-Dharness.executor=CUSTOM", + "-Dharness.executor.class=io.netty.microbench.util.AbstractMicrobenchmark$HarnessExecutor" }; static { diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 6a604c9f0b..b3d164e3ee 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -16,12 +16,16 @@ package io.netty.channel; import static io.netty.channel.DefaultChannelPipeline.logger; +import static io.netty.util.concurrent.FastThreadLocal.Type.DefaultChannelHandlerContext_WriteAndFlushTask_Recycler; +import static io.netty.util.concurrent.FastThreadLocal.Type.DefaultChannelHandlerContext_WriteTask_Recycler; + import io.netty.buffer.ByteBufAllocator; import io.netty.util.DefaultAttributeMap; import io.netty.util.Recycler; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; + import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.StringUtil; @@ -940,7 +944,8 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable { - private static final Recycler RECYCLER = new Recycler() { + private static final Recycler RECYCLER = + new Recycler(DefaultChannelHandlerContext_WriteTask_Recycler) { @Override protected WriteTask newObject(Handle handle) { return new WriteTask(handle); @@ -966,7 +971,8 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme static final class WriteAndFlushTask extends AbstractWriteTask { - private static final Recycler RECYCLER = new Recycler() { + private static final Recycler RECYCLER = + new Recycler(DefaultChannelHandlerContext_WriteAndFlushTask_Recycler) { @Override protected WriteAndFlushTask newObject(Handle handle) { return new WriteAndFlushTask(handle); diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 3d1b7a59ff..53b74b2601 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -29,6 +29,7 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -57,7 +58,8 @@ public final class ChannelOutboundBuffer { logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", threadLocalDirectBufferSize); } - private static final Recycler RECYCLER = new Recycler() { + private static final Recycler RECYCLER + = new Recycler(FastThreadLocal.Type.ChannelOutboundBuffer_Recycler) { @Override protected ChannelOutboundBuffer newObject(Handle handle) { return new ChannelOutboundBuffer(handle); @@ -693,7 +695,8 @@ public final class ChannelOutboundBuffer { static final class ThreadLocalPooledByteBuf extends UnpooledDirectByteBuf { private final Recycler.Handle handle; - private static final Recycler RECYCLER = new Recycler() { + private static final Recycler RECYCLER + = new Recycler(FastThreadLocal.Type.ChannelOutboundBuffer_PooledByteBuf_Recycler) { @Override protected ThreadLocalPooledByteBuf newObject(Handle handle) { return new ThreadLocalPooledByteBuf(handle); diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 3aba344be4..4448b885f8 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -27,6 +27,7 @@ import io.netty.channel.DefaultChannelConfig; import io.netty.channel.EventLoop; import io.netty.channel.SingleThreadEventLoop; import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.SingleThreadEventExecutor; import java.net.SocketAddress; @@ -46,7 +47,8 @@ public class LocalChannel extends AbstractChannel { private static final ChannelMetadata METADATA = new ChannelMetadata(false); private static final int MAX_READER_STACK_DEPTH = 8; - private static final ThreadLocal READER_STACK_DEPTH = new ThreadLocal() { + private static final FastThreadLocal READER_STACK_DEPTH + = new FastThreadLocal(FastThreadLocal.Type.LocalChannel_ReaderStackDepth) { @Override protected Integer initialValue() { return 0;