Allow to automatically trim the PoolThreadCache in a timely interval (#8941)

Motivation:

PooledByteBufAllocator uses a PoolThreadCache per Thread that allocates / deallocates to minimize the performance overhead. This PoolThreadCache is trimmed after X allocations to free up buffers that are not allocated for a long time. This works out quite well when the app continues to allocate but fails if the app stops to allocate frequently (for whatever reason) and so a lot of memory is wasted and not given back to the arena / freed.

Modifications:

- Add a ThreadExecutorMap that offers multiple methods that wrap Runnable / ThreadFactory / Executor and allow to call ThreadExecutorMap.currentEventExecutor() to get the current executing EventExecutor for the calling Thread.
- Use these methods in the constructors of our EventExecutor implementations (which also covers the EventLoop implementations)
- Add io.netty.allocator.cacheTrimIntervalMillis system property which can be used to specify a fixed rate / interval on which we should try to trim the PoolThreadCache for a EventExecutor that allocates.
- Add PooledByteBufAllocator.trimCurrentThreadCache() to allow the user to trim the cache of the calling thread manually.
- Add testcases
- Introduce FastThreadLocal.getIfExists()

Result:

Allow to better / more frequently trim PoolThreadCache and so give back memory to the area / system.
This commit is contained in:
Norman Maurer 2019-03-22 11:08:37 +01:00
parent a8aa791768
commit 9681842d54
8 changed files with 250 additions and 5 deletions

View File

@ -19,11 +19,13 @@ package io.netty.buffer;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero; import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import io.netty.util.NettyRuntime; import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread; import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThreadExecutorMap;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -31,6 +33,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider { public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
@ -45,6 +48,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
private static final int DEFAULT_NORMAL_CACHE_SIZE; private static final int DEFAULT_NORMAL_CACHE_SIZE;
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
private static final int DEFAULT_CACHE_TRIM_INTERVAL; private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final long DEFAULT_CACHE_TRIM_INTERVAL_MILLIS;
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS; private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT; private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK; static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;
@ -52,6 +56,8 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
private static final int MIN_PAGE_SIZE = 4096; private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
private final Runnable trimTask = this::trimCurrentThreadCache;
static { static {
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192); int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
Throwable pageSizeFallbackCause = null; Throwable pageSizeFallbackCause = null;
@ -113,6 +119,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt( DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192); "io.netty.allocator.cacheTrimInterval", 8192);
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS = SystemPropertyUtil.getLong(
"io.netty.allocation.cacheTrimIntervalMillis", 0);
DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean( DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
"io.netty.allocator.useCacheForAllThreads", true); "io.netty.allocator.useCacheForAllThreads", true);
@ -143,6 +152,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE); logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY); logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL); logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
logger.debug("-Dio.netty.allocator.cacheTrimIntervalMillis: {}", DEFAULT_CACHE_TRIM_INTERVAL_MILLIS);
logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS); logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}", logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}",
DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK); DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK);
@ -438,11 +448,20 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas); final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread(); final Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) { if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
return new PoolThreadCache( final PoolThreadCache cache = new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
final EventExecutor executor = ThreadExecutorMap.currentExecutor();
if (executor != null) {
executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
}
return cache;
} }
// No caching so just use 0 as sizes. // No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0); return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
@ -603,6 +622,21 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
return cache; return cache;
} }
/**
* Trim thread local cache for the current {@link Thread}, which will give back any cached memory that was not
* allocated frequently since the last trim operation.
*
* Returns {@code true} if a cache for the current {@link Thread} exists and so was trimmed, false otherwise.
*/
public boolean trimCurrentThreadCache() {
PoolThreadCache cache = threadCache.getIfExists();
if (cache != null) {
cache.trim();
return true;
}
return false;
}
/** /**
* Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive * Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive
* and so should not called too frequently. * and so should not called too frequently.

View File

@ -63,6 +63,21 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
return allocator.metric().chunkSize(); return allocator.metric().chunkSize();
} }
@Test
public void testTrim() {
PooledByteBufAllocator allocator = newAllocator(true);
// Should return false as we never allocated from this thread yet.
assertFalse(allocator.trimCurrentThreadCache());
ByteBuf directBuffer = allocator.directBuffer();
assertTrue(directBuffer.release());
// Should return true now a cache exists for the calling thread.
assertTrue(allocator.trimCurrentThreadCache());
}
@Test @Test
public void testPooledUnsafeHeapBufferAndUnsafeDirectBuffer() { public void testPooledUnsafeHeapBufferAndUnsafeDirectBuffer() {
PooledByteBufAllocator allocator = newAllocator(true); PooledByteBufAllocator allocator = newAllocator(true);

View File

@ -142,6 +142,21 @@ public class FastThreadLocal<V> {
return initialize(threadLocalMap); return initialize(threadLocalMap);
} }
/**
* Returns the current value for the current thread if it exists, {@code null} otherwise.
*/
@SuppressWarnings("unchecked")
public final V getIfExists() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap != null) {
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
}
return null;
}
/** /**
* Returns the current value for the specified thread local map. * Returns the current value for the specified thread local map.
* The specified thread local map must be for the current thread. * The specified thread local map must be for the current thread.

View File

@ -17,6 +17,7 @@ package io.netty.util.concurrent;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import io.netty.util.internal.ThreadExecutorMap;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -61,8 +62,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
// can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
// be sticky about its thread group // be sticky about its thread group
// visible for testing // visible for testing
final ThreadFactory threadFactory = final ThreadFactory threadFactory;
new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null);
private final TaskRunner taskRunner = new TaskRunner(); private final TaskRunner taskRunner = new TaskRunner();
private final AtomicBoolean started = new AtomicBoolean(); private final AtomicBoolean started = new AtomicBoolean();
volatile Thread thread; volatile Thread thread;
@ -70,6 +70,8 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
private final Future<?> terminationFuture = new FailedFuture<>(this, new UnsupportedOperationException()); private final Future<?> terminationFuture = new FailedFuture<>(this, new UnsupportedOperationException());
private GlobalEventExecutor() { private GlobalEventExecutor() {
threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
} }
/** /**

View File

@ -19,6 +19,7 @@ import static java.util.Objects.requireNonNull;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThreadExecutorMap;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -142,7 +143,7 @@ public class SingleThreadEventExecutor extends AbstractScheduledEventExecutor im
* @param rejectedHandler the {@link RejectedExecutionHandler} to use. * @param rejectedHandler the {@link RejectedExecutionHandler} to use.
*/ */
public SingleThreadEventExecutor(Executor executor, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { public SingleThreadEventExecutor(Executor executor, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
this.executor = requireNonNull(executor, "executor"); this.executor = ThreadExecutorMap.apply(executor, this);
taskQueue = newTaskQueue(Math.max(16, maxPendingTasks)); taskQueue = newTaskQueue(Math.max(16, maxPendingTasks));
this.addTaskWakesUp = taskQueue instanceof BlockingQueue; this.addTaskWakesUp = taskQueue instanceof BlockingQueue;
rejectedExecutionHandler = requireNonNull(rejectedHandler, "rejectedHandler"); rejectedExecutionHandler = requireNonNull(rejectedHandler, "rejectedHandler");

View File

@ -0,0 +1,96 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* Allow to retrieve the {@link EventExecutor} for the calling {@link Thread}.
*/
public final class ThreadExecutorMap {
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
private ThreadExecutorMap() { }
/**
* Returns the current {@link EventExecutor} that uses the {@link Thread}, or {@code null} if none / unknown.
*/
public static EventExecutor currentExecutor() {
return mappings.get();
}
/**
* Set the current {@link EventExecutor} that is used by the {@link Thread}.
*/
private static void setCurrentEventExecutor(EventExecutor executor) {
mappings.set(executor);
}
/**
* Decorate the given {@link Executor} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
* when called from within the {@link Runnable} during execution.
*/
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}
/**
* Decorate the given {@link Runnable} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
* when called from within the {@link Runnable} during execution.
*/
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
/**
* Decorate the given {@link ThreadFactory} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
* when called from within the {@link Runnable} during execution.
*/
public static ThreadFactory apply(final ThreadFactory threadFactory, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(threadFactory, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return threadFactory.newThread(apply(r, eventExecutor));
}
};
}
}

View File

@ -27,7 +27,9 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class FastThreadLocalTest { public class FastThreadLocalTest {
@Before @Before
@ -36,6 +38,23 @@ public class FastThreadLocalTest {
assertThat(FastThreadLocal.size(), is(0)); assertThat(FastThreadLocal.size(), is(0));
} }
@Test
public void testGetIfExists() {
FastThreadLocal<Boolean> threadLocal = new FastThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.TRUE;
}
};
assertNull(threadLocal.getIfExists());
assertTrue(threadLocal.get());
assertTrue(threadLocal.getIfExists());
FastThreadLocal.removeAll();
assertNull(threadLocal.getIfExists());
}
@Test(timeout = 10000) @Test(timeout = 10000)
public void testRemoveAll() throws Exception { public void testRemoveAll() throws Exception {
final AtomicBoolean removed = new AtomicBoolean(); final AtomicBoolean removed = new AtomicBoolean();

View File

@ -0,0 +1,63 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.ImmediateExecutor;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class ThreadExecutorMapTest {
@Test
public void testDecorateExecutor() {
Executor executor = ThreadExecutorMap.apply(ImmediateExecutor.INSTANCE, ImmediateEventExecutor.INSTANCE);
executor.execute(new Runnable() {
@Override
public void run() {
Assert.assertSame(ImmediateEventExecutor.INSTANCE, ThreadExecutorMap.currentExecutor());
}
});
}
@Test
public void testDecorateRunnable() {
ThreadExecutorMap.apply(new Runnable() {
@Override
public void run() {
Assert.assertSame(ImmediateEventExecutor.INSTANCE, ThreadExecutorMap.currentExecutor());
}
}, ImmediateEventExecutor.INSTANCE).run();
}
@Test
public void testDecorateThreadFactory() throws InterruptedException {
ThreadFactory threadFactory =
ThreadExecutorMap.apply(Executors.defaultThreadFactory(), ImmediateEventExecutor.INSTANCE);
Thread thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
Assert.assertSame(ImmediateEventExecutor.INSTANCE, ThreadExecutorMap.currentExecutor());
}
});
thread.start();
thread.join();
}
}