Change arena to thread cache mapping algorithm to be closer to ideal.

Motivation:
Circular assignment of arenas to thread caches can lead to less than optimal
mappings in cases where threads are (frequently) shutdown and started.

Example Scenario:
There are a total of 2 arenas. The first two threads performing an allocation
would lead to the following mapping:

Thread 0 -> Arena 0
Thread 1 -> Arena 1

Now, assume Thread 1 is shut down and another Thread 2 is started. The current
circular assignment algorithm would lead to the following mapping:

Thread 0 -> Arena 0
Thread 2 -> Arena 0

Ideally, we want Thread 2 to use Arena 1 though.

Presumably, this is not much of an issue for most Netty applications that do all
the allocations inside the eventloop, as eventloop threads are seldomly shut down
and restarted. However, applications that only use the netty-buffer package
or implement their own threading model outside the eventloop might suffer from
increased contention. For example, gRPC Java when using the blocking stub
performs some allocations outside the eventloop and within its own thread pool
that is dynamically sized depending on system load.

Modifications:

Implement a linear scan algorithm that assigns a new thread cache to the arena
that currently backs the fewest thread caches.

Result:

Closer to ideal mappings between thread caches and arenas. In order to always
get an ideal mapping, we would have to re-balance the mapping whenever a thread
dies. However, that's difficult because of deallocation.
This commit is contained in:
buchgr 2016-03-14 17:25:43 +01:00 committed by Norman Maurer
parent b9bf11d71a
commit dcf553f1aa
4 changed files with 174 additions and 29 deletions

View File

@ -24,6 +24,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
abstract class PoolArena<T> implements PoolArenaMetric {
static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
@ -69,6 +70,9 @@ abstract class PoolArena<T> implements PoolArenaMetric {
// We need to use the LongCounter here as this is not guarded via synchronized block.
private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter();
// Number of thread caches backed by this arena.
final AtomicInteger numThreadCaches = new AtomicInteger();
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
@ -381,6 +385,13 @@ abstract class PoolArena<T> implements PoolArenaMetric {
}
}
/**
* Returns the number of thread caches backed by this arena.
*/
public int numThreadCaches() {
return numThreadCaches.get();
}
@Override
public int numTinySubpages() {
return tinySubpagePools.length;

View File

@ -91,6 +91,8 @@ final class PoolThreadCache {
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
@ -108,6 +110,8 @@ final class PoolThreadCache {
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
@ -242,6 +246,14 @@ final class PoolThreadCache {
if (numFreed > 0 && logger.isDebugEnabled()) {
logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, thread.getName());
}
if (directArena != null) {
directArena.numThreadCaches.getAndDecrement();
}
if (heapArena != null) {
heapArena.numThreadCaches.getAndDecrement();
}
}
private static int free(MemoryRegionCache<?>[] caches) {

View File

@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
@ -352,36 +351,36 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
}
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final AtomicInteger index = new AtomicInteger();
final AtomicInteger caches = new AtomicInteger();
@Override
protected PoolThreadCache initialValue() {
caches.incrementAndGet();
final int idx = index.getAndIncrement();
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
if (heapArenas != null) {
heapArena = heapArenas[Math.abs(idx % heapArenas.length)];
} else {
heapArena = null;
}
if (directArenas != null) {
directArena = directArenas[Math.abs(idx % directArenas.length)];
} else {
directArena = null;
}
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
@Override
protected void onRemoval(PoolThreadCache value) {
value.free();
caches.decrementAndGet();
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
}
@ -417,7 +416,17 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
* Return the number of thread local caches used by this {@link PooledByteBufAllocator}.
*/
public int numThreadLocalCaches() {
return threadCache.caches.get();
PoolArena<?>[] arenas = heapArenas != null ? heapArenas : directArenas;
if (arenas == null) {
return 0;
}
int total = 0;
for (int i = 0; i < arenas.length; i++) {
total += arenas[i].numThreadCaches.get();
}
return total;
}
/**

View File

@ -16,6 +16,8 @@
package io.netty.buffer;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.netty.util.internal.SystemPropertyUtil;
import org.junit.Test;
@ -24,20 +26,123 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class PooledByteBufAllocatorTest {
private static final int[] ALLOCATION_SIZES = new int[16 * 1024];
static {
for (int i = 0; i < ALLOCATION_SIZES.length; i++) {
ALLOCATION_SIZES[i] = i;
// The ThreadDeathWatcher sleeps 1s, give it double that time.
@Test (timeout = 2000)
public void testThreadCacheDestroyedByThreadDeathWatcher() {
int numArenas = 11;
final PooledByteBufAllocator allocator =
new PooledByteBufAllocator(numArenas, numArenas, 8192, 1);
for (int i = 0; i < numArenas; i++) {
new FastThreadLocalThread(new Runnable() {
@Override
public void run() {
ByteBuf buf = allocator.newHeapBuffer(1024, 1024);
for (int i = 0; i < buf.capacity(); i++) {
buf.writeByte(0);
}
assertTrue(allocator.numThreadLocalCaches() > 0);
buf.release();
}
}).start();
}
// Wait for the ThreadDeathWatcher to have destroyed all thread caches
while (allocator.numThreadLocalCaches() > 0) {
LockSupport.parkNanos(MILLISECONDS.toNanos(100));
}
}
@Test
public void testNumThreadCachesWithNoDirectArenas() {
int numHeapArenas = 1;
final PooledByteBufAllocator allocator =
new PooledByteBufAllocator(numHeapArenas, 0, 8192, 1);
CountDownLatch tcache0 = createNewThreadCache(allocator);
assertEquals(1, allocator.numThreadLocalCaches());
CountDownLatch tcache1 = createNewThreadCache(allocator);
assertEquals(2, allocator.numThreadLocalCaches());
destroyThreadCache(tcache0);
assertEquals(1, allocator.numThreadLocalCaches());
destroyThreadCache(tcache1);
assertEquals(0, allocator.numThreadLocalCaches());
}
@Test
public void testThreadCacheToArenaMappings() throws InterruptedException {
int numArenas = 2;
final PooledByteBufAllocator allocator =
new PooledByteBufAllocator(numArenas, numArenas, 8192, 1);
CountDownLatch tcache0 = createNewThreadCache(allocator);
CountDownLatch tcache1 = createNewThreadCache(allocator);
assertEquals(2, allocator.numThreadLocalCaches());
destroyThreadCache(tcache1);
assertEquals(1, allocator.numThreadLocalCaches());
CountDownLatch tcache2 = createNewThreadCache(allocator);
assertEquals(2, allocator.numThreadLocalCaches());
destroyThreadCache(tcache0);
assertEquals(1, allocator.numThreadLocalCaches());
destroyThreadCache(tcache2);
assertEquals(0, allocator.numThreadLocalCaches());
}
private static void destroyThreadCache(CountDownLatch tcache) {
tcache.countDown();
LockSupport.parkNanos(MILLISECONDS.toNanos(100));
}
private static CountDownLatch createNewThreadCache(final PooledByteBufAllocator allocator) {
final CountDownLatch latch = new CountDownLatch(1);
Thread t = new FastThreadLocalThread(new Runnable() {
@Override
public void run() {
ByteBuf buf = allocator.newHeapBuffer(1024, 1024);
for (int i = 0; i < buf.capacity(); i++) {
buf.writeByte(0);
}
try {
latch.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
buf.release();
FastThreadLocal.removeAll();
}
});
t.start();
// Wait a bit for the thread & thread cache to be created.
LockSupport.parkNanos(MILLISECONDS.toNanos(100));
return latch;
}
@Test
public void testConcurrentUsage() throws Throwable {
long runningTime = TimeUnit.MILLISECONDS.toNanos(SystemPropertyUtil.getLong(
long runningTime = MILLISECONDS.toNanos(SystemPropertyUtil.getLong(
"io.netty.buffer.PooledByteBufAllocatorTest.testConcurrentUsageTime", 15000));
// We use no caches and only one arena to maximize the chance of hitting the race-condition we
@ -76,6 +181,14 @@ public class PooledByteBufAllocatorTest {
}
private static final class AllocationThread extends Thread {
private static final int[] ALLOCATION_SIZES = new int[16 * 1024];
static {
for (int i = 0; i < ALLOCATION_SIZES.length; i++) {
ALLOCATION_SIZES[i] = i;
}
}
private final CountDownLatch latch = new CountDownLatch(1);
private final Queue<ByteBuf> buffers = new ArrayDeque<ByteBuf>(10);
private final ByteBufAllocator allocator;