From c83904a12a603fe340477447d086674439ca8ac7 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 22 Mar 2019 11:08:37 +0100 Subject: [PATCH] Allow to automatically trim the PoolThreadCache in a timely interval (#8941) Motivation: PooledByteBufAllocator uses a PoolThreadCache per Thread that allocates / deallocates to minimize the performance overhead. This PoolThreadCache is trimmed after X allocations to free up buffers that are not allocated for a long time. This works out quite well when the app continues to allocate but fails if the app stops to allocate frequently (for whatever reason) and so a lot of memory is wasted and not given back to the arena / freed. Modifications: - Add a ThreadExecutorMap that offers multiple methods that wrap Runnable / ThreadFactory / Executor and allow to call ThreadExecutorMap.currentEventExecutor() to get the current executing EventExecutor for the calling Thread. - Use these methods in the constructors of our EventExecutor implementations (which also covers the EventLoop implementations) - Add io.netty.allocator.cacheTrimIntervalMillis system property which can be used to specify a fixed rate / interval on which we should try to trim the PoolThreadCache for a EventExecutor that allocates. - Add PooledByteBufAllocator.trimCurrentThreadCache() to allow the user to trim the cache of the calling thread manually. - Add testcases - Introduce FastThreadLocal.getIfExists() Result: Allow to better / more frequently trim PoolThreadCache and so give back memory to the area / system. --- .../netty/buffer/PooledByteBufAllocator.java | 43 ++++++++- .../buffer/PooledByteBufAllocatorTest.java | 15 +++ .../util/concurrent/FastThreadLocal.java | 15 +++ .../util/concurrent/GlobalEventExecutor.java | 6 +- .../concurrent/SingleThreadEventExecutor.java | 3 +- .../util/internal/ThreadExecutorMap.java | 96 +++++++++++++++++++ .../util/concurrent/FastThreadLocalTest.java | 19 ++++ .../util/internal/ThreadExecutorMapTest.java | 63 ++++++++++++ 8 files changed, 255 insertions(+), 5 deletions(-) create mode 100644 common/src/main/java/io/netty/util/internal/ThreadExecutorMap.java create mode 100644 common/src/test/java/io/netty/util/internal/ThreadExecutorMapTest.java diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index bcfc9ceb1a..562070bc04 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -19,11 +19,13 @@ package io.netty.buffer; import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; import io.netty.util.NettyRuntime; +import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.FastThreadLocalThread; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.SystemPropertyUtil; +import io.netty.util.internal.ThreadExecutorMap; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -31,6 +33,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider { @@ -45,6 +48,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements private static final int DEFAULT_NORMAL_CACHE_SIZE; private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; private static final int DEFAULT_CACHE_TRIM_INTERVAL; + private static final long DEFAULT_CACHE_TRIM_INTERVAL_MILLIS; private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS; private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT; static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK; @@ -52,6 +56,13 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements private static final int MIN_PAGE_SIZE = 4096; private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); + private final Runnable trimTask = new Runnable() { + @Override + public void run() { + PooledByteBufAllocator.this.trimCurrentThreadCache(); + } + }; + static { int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192); Throwable pageSizeFallbackCause = null; @@ -113,6 +124,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt( "io.netty.allocator.cacheTrimInterval", 8192); + DEFAULT_CACHE_TRIM_INTERVAL_MILLIS = SystemPropertyUtil.getLong( + "io.netty.allocation.cacheTrimIntervalMillis", 0); + DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean( "io.netty.allocator.useCacheForAllThreads", true); @@ -143,6 +157,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE); logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY); logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL); + logger.debug("-Dio.netty.allocator.cacheTrimIntervalMillis: {}", DEFAULT_CACHE_TRIM_INTERVAL_MILLIS); logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS); logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}", DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK); @@ -438,11 +453,20 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements final PoolArena heapArena = leastUsedArena(heapArenas); final PoolArena directArena = leastUsedArena(directArenas); - Thread current = Thread.currentThread(); + final Thread current = Thread.currentThread(); if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { - return new PoolThreadCache( + final PoolThreadCache cache = new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); + + if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) { + final EventExecutor executor = ThreadExecutorMap.currentExecutor(); + if (executor != null) { + executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, + DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + } + } + return cache; } // No caching so just use 0 as sizes. return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); @@ -603,6 +627,21 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements return cache; } + /** + * Trim thread local cache for the current {@link Thread}, which will give back any cached memory that was not + * allocated frequently since the last trim operation. + * + * Returns {@code true} if a cache for the current {@link Thread} exists and so was trimmed, false otherwise. + */ + public boolean trimCurrentThreadCache() { + PoolThreadCache cache = threadCache.getIfExists(); + if (cache != null) { + cache.trim(); + return true; + } + return false; + } + /** * Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive * and so should not called too frequently. diff --git a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java index 39a6132493..b1396373bc 100644 --- a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java +++ b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java @@ -63,6 +63,21 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest { return initialize(threadLocalMap); } + /** + * Returns the current value for the current thread if it exists, {@code null} otherwise. + */ + @SuppressWarnings("unchecked") + public final V getIfExists() { + InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); + if (threadLocalMap != null) { + Object v = threadLocalMap.indexedVariable(index); + if (v != InternalThreadLocalMap.UNSET) { + return (V) v; + } + } + return null; + } + /** * Returns the current value for the specified thread local map. * The specified thread local map must be for the current thread. diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index c5c3829b12..5a405f3800 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -15,6 +15,7 @@ */ package io.netty.util.concurrent; +import io.netty.util.internal.ThreadExecutorMap; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -55,8 +56,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not // be sticky about its thread group // visible for testing - final ThreadFactory threadFactory = - new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null); + final ThreadFactory threadFactory; private final TaskRunner taskRunner = new TaskRunner(); private final AtomicBoolean started = new AtomicBoolean(); volatile Thread thread; @@ -65,6 +65,8 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im private GlobalEventExecutor() { scheduledTaskQueue().add(quietPeriodTask); + threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory( + DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this); } /** diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 03ae3347ae..e406bcfd3e 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -18,6 +18,7 @@ package io.netty.util.concurrent; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; +import io.netty.util.internal.ThreadExecutorMap; import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -161,7 +162,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); - this.executor = ObjectUtil.checkNotNull(executor, "executor"); + this.executor = ThreadExecutorMap.apply(executor, this); taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } diff --git a/common/src/main/java/io/netty/util/internal/ThreadExecutorMap.java b/common/src/main/java/io/netty/util/internal/ThreadExecutorMap.java new file mode 100644 index 0000000000..807698a909 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/ThreadExecutorMap.java @@ -0,0 +1,96 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.FastThreadLocal; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; + +/** + * Allow to retrieve the {@link EventExecutor} for the calling {@link Thread}. + */ +public final class ThreadExecutorMap { + + private static final FastThreadLocal mappings = new FastThreadLocal(); + + private ThreadExecutorMap() { } + + /** + * Returns the current {@link EventExecutor} that uses the {@link Thread}, or {@code null} if none / unknown. + */ + public static EventExecutor currentExecutor() { + return mappings.get(); + } + + /** + * Set the current {@link EventExecutor} that is used by the {@link Thread}. + */ + private static void setCurrentEventExecutor(EventExecutor executor) { + mappings.set(executor); + } + + /** + * Decorate the given {@link Executor} and ensure {@link #currentExecutor()} will return {@code eventExecutor} + * when called from within the {@link Runnable} during execution. + */ + public static Executor apply(final Executor executor, final EventExecutor eventExecutor) { + ObjectUtil.checkNotNull(executor, "executor"); + ObjectUtil.checkNotNull(eventExecutor, "eventExecutor"); + return new Executor() { + @Override + public void execute(final Runnable command) { + executor.execute(apply(command, eventExecutor)); + } + }; + } + + /** + * Decorate the given {@link Runnable} and ensure {@link #currentExecutor()} will return {@code eventExecutor} + * when called from within the {@link Runnable} during execution. + */ + public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) { + ObjectUtil.checkNotNull(command, "command"); + ObjectUtil.checkNotNull(eventExecutor, "eventExecutor"); + return new Runnable() { + @Override + public void run() { + setCurrentEventExecutor(eventExecutor); + try { + command.run(); + } finally { + setCurrentEventExecutor(null); + } + } + }; + } + + /** + * Decorate the given {@link ThreadFactory} and ensure {@link #currentExecutor()} will return {@code eventExecutor} + * when called from within the {@link Runnable} during execution. + */ + public static ThreadFactory apply(final ThreadFactory threadFactory, final EventExecutor eventExecutor) { + ObjectUtil.checkNotNull(threadFactory, "command"); + ObjectUtil.checkNotNull(eventExecutor, "eventExecutor"); + return new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return threadFactory.newThread(apply(r, eventExecutor)); + } + }; + } +} diff --git a/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java b/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java index 6457de297a..c097a34a84 100644 --- a/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java +++ b/common/src/test/java/io/netty/util/concurrent/FastThreadLocalTest.java @@ -27,7 +27,9 @@ import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; public class FastThreadLocalTest { @Before @@ -36,6 +38,23 @@ public class FastThreadLocalTest { assertThat(FastThreadLocal.size(), is(0)); } + @Test + public void testGetIfExists() { + FastThreadLocal threadLocal = new FastThreadLocal() { + @Override + protected Boolean initialValue() { + return Boolean.TRUE; + } + }; + + assertNull(threadLocal.getIfExists()); + assertTrue(threadLocal.get()); + assertTrue(threadLocal.getIfExists()); + + FastThreadLocal.removeAll(); + assertNull(threadLocal.getIfExists()); + } + @Test(timeout = 10000) public void testRemoveAll() throws Exception { final AtomicBoolean removed = new AtomicBoolean(); diff --git a/common/src/test/java/io/netty/util/internal/ThreadExecutorMapTest.java b/common/src/test/java/io/netty/util/internal/ThreadExecutorMapTest.java new file mode 100644 index 0000000000..22069e4b30 --- /dev/null +++ b/common/src/test/java/io/netty/util/internal/ThreadExecutorMapTest.java @@ -0,0 +1,63 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.ImmediateExecutor; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +public class ThreadExecutorMapTest { + + @Test + public void testDecorateExecutor() { + Executor executor = ThreadExecutorMap.apply(ImmediateExecutor.INSTANCE, ImmediateEventExecutor.INSTANCE); + executor.execute(new Runnable() { + @Override + public void run() { + Assert.assertSame(ImmediateEventExecutor.INSTANCE, ThreadExecutorMap.currentExecutor()); + } + }); + } + + @Test + public void testDecorateRunnable() { + ThreadExecutorMap.apply(new Runnable() { + @Override + public void run() { + Assert.assertSame(ImmediateEventExecutor.INSTANCE, ThreadExecutorMap.currentExecutor()); + } + }, ImmediateEventExecutor.INSTANCE).run(); + } + + @Test + public void testDecorateThreadFactory() throws InterruptedException { + ThreadFactory threadFactory = + ThreadExecutorMap.apply(Executors.defaultThreadFactory(), ImmediateEventExecutor.INSTANCE); + Thread thread = threadFactory.newThread(new Runnable() { + @Override + public void run() { + Assert.assertSame(ImmediateEventExecutor.INSTANCE, ThreadExecutorMap.currentExecutor()); + } + }); + thread.start(); + thread.join(); + } +}