Allow to automatically trim the PoolThreadCache in a timely interval (#8941)

Motivation:

PooledByteBufAllocator uses a PoolThreadCache per Thread that allocates / deallocates to minimize the performance overhead. This PoolThreadCache is trimmed after X allocations to free up buffers that are not allocated for a long time. This works out quite well when the app continues to allocate but fails if the app stops to allocate frequently (for whatever reason) and so a lot of memory is wasted and not given back to the arena / freed.

Modifications:

- Add a ThreadExecutorMap that offers multiple methods that wrap Runnable / ThreadFactory / Executor and allow to call ThreadExecutorMap.currentEventExecutor() to get the current executing EventExecutor for the calling Thread.
- Use these methods in the constructors of our EventExecutor implementations (which also covers the EventLoop implementations)
- Add io.netty.allocator.cacheTrimIntervalMillis system property which can be used to specify a fixed rate / interval on which we should try to trim the PoolThreadCache for a EventExecutor that allocates.
- Add PooledByteBufAllocator.trimCurrentThreadCache() to allow the user to trim the cache of the calling thread manually.
- Add testcases
- Introduce FastThreadLocal.getIfExists()

Result:

Allow to better / more frequently trim PoolThreadCache and so give back memory to the area / system.
This commit is contained in:
Norman Maurer 2019-03-22 11:08:37 +01:00 committed by GitHub
parent 35bc73f9b0
commit c83904a12a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 255 additions and 5 deletions

View File

@ -19,11 +19,13 @@ package io.netty.buffer;
import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThreadExecutorMap;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -31,6 +33,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocatorMetricProvider {
@ -45,6 +48,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
private static final int DEFAULT_NORMAL_CACHE_SIZE;
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final long DEFAULT_CACHE_TRIM_INTERVAL_MILLIS;
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;
@ -52,6 +56,13 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
private final Runnable trimTask = new Runnable() {
@Override
public void run() {
PooledByteBufAllocator.this.trimCurrentThreadCache();
}
};
static {
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
Throwable pageSizeFallbackCause = null;
@ -113,6 +124,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192);
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS = SystemPropertyUtil.getLong(
"io.netty.allocation.cacheTrimIntervalMillis", 0);
DEFAULT_USE_CACHE_FOR_ALL_THREADS = SystemPropertyUtil.getBoolean(
"io.netty.allocator.useCacheForAllThreads", true);
@ -143,6 +157,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
logger.debug("-Dio.netty.allocator.cacheTrimIntervalMillis: {}", DEFAULT_CACHE_TRIM_INTERVAL_MILLIS);
logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}",
DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK);
@ -438,11 +453,20 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread();
final Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
return new PoolThreadCache(
final PoolThreadCache cache = new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {
final EventExecutor executor = ThreadExecutorMap.currentExecutor();
if (executor != null) {
executor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,
DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
}
}
return cache;
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
@ -603,6 +627,21 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
return cache;
}
/**
* Trim thread local cache for the current {@link Thread}, which will give back any cached memory that was not
* allocated frequently since the last trim operation.
*
* Returns {@code true} if a cache for the current {@link Thread} exists and so was trimmed, false otherwise.
*/
public boolean trimCurrentThreadCache() {
PoolThreadCache cache = threadCache.getIfExists();
if (cache != null) {
cache.trim();
return true;
}
return false;
}
/**
* Returns the status of the allocator (which contains all metrics) as string. Be aware this may be expensive
* and so should not called too frequently.

View File

@ -63,6 +63,21 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
return allocator.metric().chunkSize();
}
@Test
public void testTrim() {
PooledByteBufAllocator allocator = newAllocator(true);
// Should return false as we never allocated from this thread yet.
assertFalse(allocator.trimCurrentThreadCache());
ByteBuf directBuffer = allocator.directBuffer();
assertTrue(directBuffer.release());
// Should return true now a cache exists for the calling thread.
assertTrue(allocator.trimCurrentThreadCache());
}
@Test
public void testPooledUnsafeHeapBufferAndUnsafeDirectBuffer() {
PooledByteBufAllocator allocator = newAllocator(true);

View File

@ -142,6 +142,21 @@ public class FastThreadLocal<V> {
return initialize(threadLocalMap);
}
/**
* Returns the current value for the current thread if it exists, {@code null} otherwise.
*/
@SuppressWarnings("unchecked")
public final V getIfExists() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap != null) {
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
}
return null;
}
/**
* Returns the current value for the specified thread local map.
* The specified thread local map must be for the current thread.

View File

@ -15,6 +15,7 @@
*/
package io.netty.util.concurrent;
import io.netty.util.internal.ThreadExecutorMap;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -55,8 +56,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
// can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
// be sticky about its thread group
// visible for testing
final ThreadFactory threadFactory =
new DefaultThreadFactory(DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null);
final ThreadFactory threadFactory;
private final TaskRunner taskRunner = new TaskRunner();
private final AtomicBoolean started = new AtomicBoolean();
volatile Thread thread;
@ -65,6 +65,8 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
private GlobalEventExecutor() {
scheduledTaskQueue().add(quietPeriodTask);
threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
}
/**

View File

@ -18,6 +18,7 @@ package io.netty.util.concurrent;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThreadExecutorMap;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -161,7 +162,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
this.executor = ThreadExecutorMap.apply(executor, this);
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

View File

@ -0,0 +1,96 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* Allow to retrieve the {@link EventExecutor} for the calling {@link Thread}.
*/
public final class ThreadExecutorMap {
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
private ThreadExecutorMap() { }
/**
* Returns the current {@link EventExecutor} that uses the {@link Thread}, or {@code null} if none / unknown.
*/
public static EventExecutor currentExecutor() {
return mappings.get();
}
/**
* Set the current {@link EventExecutor} that is used by the {@link Thread}.
*/
private static void setCurrentEventExecutor(EventExecutor executor) {
mappings.set(executor);
}
/**
* Decorate the given {@link Executor} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
* when called from within the {@link Runnable} during execution.
*/
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}
/**
* Decorate the given {@link Runnable} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
* when called from within the {@link Runnable} during execution.
*/
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
/**
* Decorate the given {@link ThreadFactory} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
* when called from within the {@link Runnable} during execution.
*/
public static ThreadFactory apply(final ThreadFactory threadFactory, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(threadFactory, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return threadFactory.newThread(apply(r, eventExecutor));
}
};
}
}

View File

@ -27,7 +27,9 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class FastThreadLocalTest {
@Before
@ -36,6 +38,23 @@ public class FastThreadLocalTest {
assertThat(FastThreadLocal.size(), is(0));
}
@Test
public void testGetIfExists() {
FastThreadLocal<Boolean> threadLocal = new FastThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return Boolean.TRUE;
}
};
assertNull(threadLocal.getIfExists());
assertTrue(threadLocal.get());
assertTrue(threadLocal.getIfExists());
FastThreadLocal.removeAll();
assertNull(threadLocal.getIfExists());
}
@Test(timeout = 10000)
public void testRemoveAll() throws Exception {
final AtomicBoolean removed = new AtomicBoolean();

View File

@ -0,0 +1,63 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.ImmediateExecutor;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
public class ThreadExecutorMapTest {
@Test
public void testDecorateExecutor() {
Executor executor = ThreadExecutorMap.apply(ImmediateExecutor.INSTANCE, ImmediateEventExecutor.INSTANCE);
executor.execute(new Runnable() {
@Override
public void run() {
Assert.assertSame(ImmediateEventExecutor.INSTANCE, ThreadExecutorMap.currentExecutor());
}
});
}
@Test
public void testDecorateRunnable() {
ThreadExecutorMap.apply(new Runnable() {
@Override
public void run() {
Assert.assertSame(ImmediateEventExecutor.INSTANCE, ThreadExecutorMap.currentExecutor());
}
}, ImmediateEventExecutor.INSTANCE).run();
}
@Test
public void testDecorateThreadFactory() throws InterruptedException {
ThreadFactory threadFactory =
ThreadExecutorMap.apply(Executors.defaultThreadFactory(), ImmediateEventExecutor.INSTANCE);
Thread thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
Assert.assertSame(ImmediateEventExecutor.INSTANCE, ThreadExecutorMap.currentExecutor());
}
});
thread.start();
thread.join();
}
}