diff --git a/buffer/src/main/java/io/netty/buffer/PoolArena.java b/buffer/src/main/java/io/netty/buffer/PoolArena.java index 33445241b7..240a7cbcb5 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolArena.java +++ b/buffer/src/main/java/io/netty/buffer/PoolArena.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; abstract class PoolArena implements PoolArenaMetric { static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe(); @@ -69,6 +70,9 @@ abstract class PoolArena implements PoolArenaMetric { // We need to use the LongCounter here as this is not guarded via synchronized block. private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter(); + // Number of thread caches backed by this arena. + final AtomicInteger numThreadCaches = new AtomicInteger(); + // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; @@ -381,6 +385,13 @@ abstract class PoolArena implements PoolArenaMetric { } } + /** + * Returns the number of thread caches backed by this arena. + */ + public int numThreadCaches() { + return numThreadCaches.get(); + } + @Override public int numTinySubpages() { return tinySubpagePools.length; diff --git a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java index def11d5800..ec0b4beb54 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java +++ b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java @@ -91,6 +91,8 @@ final class PoolThreadCache { numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); + + directArena.numThreadCaches.getAndIncrement(); } else { // No directArea is configured so just null out all caches tinySubPageDirectCaches = null; @@ -108,6 +110,8 @@ final class PoolThreadCache { numShiftsNormalHeap = log2(heapArena.pageSize); normalHeapCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, heapArena); + + heapArena.numThreadCaches.getAndIncrement(); } else { // No heapArea is configured so just null out all caches tinySubPageHeapCaches = null; @@ -242,6 +246,14 @@ final class PoolThreadCache { if (numFreed > 0 && logger.isDebugEnabled()) { logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed, thread.getName()); } + + if (directArena != null) { + directArena.numThreadCaches.getAndDecrement(); + } + + if (heapArena != null) { + heapArena.numThreadCaches.getAndDecrement(); + } } private static int free(MemoryRegionCache[] caches) { diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index 21b80fa36d..c7e4555cf3 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -26,7 +26,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; public class PooledByteBufAllocator extends AbstractByteBufAllocator { @@ -352,36 +351,36 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { } final class PoolThreadLocalCache extends FastThreadLocal { - private final AtomicInteger index = new AtomicInteger(); - final AtomicInteger caches = new AtomicInteger(); @Override - protected PoolThreadCache initialValue() { - caches.incrementAndGet(); - final int idx = index.getAndIncrement(); - final PoolArena heapArena; - final PoolArena directArena; + protected synchronized PoolThreadCache initialValue() { + final PoolArena heapArena = leastUsedArena(heapArenas); + final PoolArena directArena = leastUsedArena(directArenas); - if (heapArenas != null) { - heapArena = heapArenas[Math.abs(idx % heapArenas.length)]; - } else { - heapArena = null; - } - - if (directArenas != null) { - directArena = directArenas[Math.abs(idx % directArenas.length)]; - } else { - directArena = null; - } return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); } @Override - protected void onRemoval(PoolThreadCache value) { - value.free(); - caches.decrementAndGet(); + protected void onRemoval(PoolThreadCache threadCache) { + threadCache.free(); + } + + private PoolArena leastUsedArena(PoolArena[] arenas) { + if (arenas == null || arenas.length == 0) { + return null; + } + + PoolArena minArena = arenas[0]; + for (int i = 1; i < arenas.length; i++) { + PoolArena arena = arenas[i]; + if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) { + minArena = arena; + } + } + + return minArena; } } @@ -417,7 +416,17 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { * Return the number of thread local caches used by this {@link PooledByteBufAllocator}. */ public int numThreadLocalCaches() { - return threadCache.caches.get(); + PoolArena[] arenas = heapArenas != null ? heapArenas : directArenas; + if (arenas == null) { + return 0; + } + + int total = 0; + for (int i = 0; i < arenas.length; i++) { + total += arenas[i].numThreadCaches.get(); + } + + return total; } /** diff --git a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java index 907a7a2651..55b74fa68e 100644 --- a/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java +++ b/buffer/src/test/java/io/netty/buffer/PooledByteBufAllocatorTest.java @@ -16,6 +16,8 @@ package io.netty.buffer; +import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocalThread; import io.netty.util.internal.SystemPropertyUtil; import org.junit.Test; @@ -24,20 +26,123 @@ import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class PooledByteBufAllocatorTest { - private static final int[] ALLOCATION_SIZES = new int[16 * 1024]; - static { - for (int i = 0; i < ALLOCATION_SIZES.length; i++) { - ALLOCATION_SIZES[i] = i; + // The ThreadDeathWatcher sleeps 1s, give it double that time. + @Test (timeout = 2000) + public void testThreadCacheDestroyedByThreadDeathWatcher() { + int numArenas = 11; + final PooledByteBufAllocator allocator = + new PooledByteBufAllocator(numArenas, numArenas, 8192, 1); + + for (int i = 0; i < numArenas; i++) { + new FastThreadLocalThread(new Runnable() { + @Override + public void run() { + ByteBuf buf = allocator.newHeapBuffer(1024, 1024); + for (int i = 0; i < buf.capacity(); i++) { + buf.writeByte(0); + } + + assertTrue(allocator.numThreadLocalCaches() > 0); + + buf.release(); + } + }).start(); + } + + // Wait for the ThreadDeathWatcher to have destroyed all thread caches + while (allocator.numThreadLocalCaches() > 0) { + LockSupport.parkNanos(MILLISECONDS.toNanos(100)); } } + @Test + public void testNumThreadCachesWithNoDirectArenas() { + int numHeapArenas = 1; + final PooledByteBufAllocator allocator = + new PooledByteBufAllocator(numHeapArenas, 0, 8192, 1); + + CountDownLatch tcache0 = createNewThreadCache(allocator); + assertEquals(1, allocator.numThreadLocalCaches()); + + CountDownLatch tcache1 = createNewThreadCache(allocator); + assertEquals(2, allocator.numThreadLocalCaches()); + + destroyThreadCache(tcache0); + assertEquals(1, allocator.numThreadLocalCaches()); + + destroyThreadCache(tcache1); + assertEquals(0, allocator.numThreadLocalCaches()); + } + + @Test + public void testThreadCacheToArenaMappings() throws InterruptedException { + int numArenas = 2; + final PooledByteBufAllocator allocator = + new PooledByteBufAllocator(numArenas, numArenas, 8192, 1); + + CountDownLatch tcache0 = createNewThreadCache(allocator); + CountDownLatch tcache1 = createNewThreadCache(allocator); + assertEquals(2, allocator.numThreadLocalCaches()); + destroyThreadCache(tcache1); + assertEquals(1, allocator.numThreadLocalCaches()); + + CountDownLatch tcache2 = createNewThreadCache(allocator); + assertEquals(2, allocator.numThreadLocalCaches()); + + destroyThreadCache(tcache0); + assertEquals(1, allocator.numThreadLocalCaches()); + + destroyThreadCache(tcache2); + assertEquals(0, allocator.numThreadLocalCaches()); + } + + private static void destroyThreadCache(CountDownLatch tcache) { + tcache.countDown(); + LockSupport.parkNanos(MILLISECONDS.toNanos(100)); + } + + private static CountDownLatch createNewThreadCache(final PooledByteBufAllocator allocator) { + final CountDownLatch latch = new CountDownLatch(1); + + Thread t = new FastThreadLocalThread(new Runnable() { + + @Override + public void run() { + ByteBuf buf = allocator.newHeapBuffer(1024, 1024); + for (int i = 0; i < buf.capacity(); i++) { + buf.writeByte(0); + } + + try { + latch.await(); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + + buf.release(); + + FastThreadLocal.removeAll(); + } + }); + t.start(); + + // Wait a bit for the thread & thread cache to be created. + LockSupport.parkNanos(MILLISECONDS.toNanos(100)); + + return latch; + } + @Test public void testConcurrentUsage() throws Throwable { - long runningTime = TimeUnit.MILLISECONDS.toNanos(SystemPropertyUtil.getLong( + long runningTime = MILLISECONDS.toNanos(SystemPropertyUtil.getLong( "io.netty.buffer.PooledByteBufAllocatorTest.testConcurrentUsageTime", 15000)); // We use no caches and only one arena to maximize the chance of hitting the race-condition we @@ -76,6 +181,14 @@ public class PooledByteBufAllocatorTest { } private static final class AllocationThread extends Thread { + + private static final int[] ALLOCATION_SIZES = new int[16 * 1024]; + static { + for (int i = 0; i < ALLOCATION_SIZES.length; i++) { + ALLOCATION_SIZES[i] = i; + } + } + private final CountDownLatch latch = new CountDownLatch(1); private final Queue buffers = new ArrayDeque(10); private final ByteBufAllocator allocator;