Fix caching for normal allocations (#10825)

Motivation:

https://github.com/netty/netty/pull/10267 introduced a change that reduced the fragmentation. Unfortunally it also introduced a regression when it comes to caching of normal allocations. This can have a negative performance impact depending on the allocation sizes.

Modifications:

- Fix algorithm to calculate the array size for normal allocation caches
- Correctly calculate indeox for normal caches
- Add unit test

Result:

Fixes https://github.com/netty/netty/issues/10805
This commit is contained in:
Norman Maurer 2020-11-25 15:05:30 +01:00 committed by GitHub
parent 0c2b761cfb
commit 221c1a1ed7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 86 additions and 19 deletions

View File

@ -29,6 +29,8 @@ import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -53,9 +55,6 @@ final class PoolThreadCache {
private final MemoryRegionCache<byte[]>[] normalHeapCaches; private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches; private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
// Used for bitshifting when calculate the index of normal caches later
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
private final int freeSweepAllocationThreshold; private final int freeSweepAllocationThreshold;
private final AtomicBoolean freed = new AtomicBoolean(); private final AtomicBoolean freed = new AtomicBoolean();
@ -75,7 +74,6 @@ final class PoolThreadCache {
smallSubPageDirectCaches = createSubPageCaches( smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools); smallCacheSize, directArena.numSmallSubpagePools);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches( normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena); normalCacheSize, maxCachedBufferCapacity, directArena);
@ -84,14 +82,12 @@ final class PoolThreadCache {
// No directArea is configured so just null out all caches // No directArea is configured so just null out all caches
smallSubPageDirectCaches = null; smallSubPageDirectCaches = null;
normalDirectCaches = null; normalDirectCaches = null;
numShiftsNormalDirect = -1;
} }
if (heapArena != null) { if (heapArena != null) {
// Create the caches for the heap allocations // Create the caches for the heap allocations
smallSubPageHeapCaches = createSubPageCaches( smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools); smallCacheSize, heapArena.numSmallSubpagePools);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches( normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena); normalCacheSize, maxCachedBufferCapacity, heapArena);
@ -100,7 +96,6 @@ final class PoolThreadCache {
// No heapArea is configured so just null out all caches // No heapArea is configured so just null out all caches
smallSubPageHeapCaches = null; smallSubPageHeapCaches = null;
normalHeapCaches = null; normalHeapCaches = null;
numShiftsNormalHeap = -1;
} }
// Only check if there are caches in use. // Only check if there are caches in use.
@ -127,18 +122,18 @@ final class PoolThreadCache {
} }
} }
@SuppressWarnings("unchecked")
private static <T> MemoryRegionCache<T>[] createNormalCaches( private static <T> MemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) { int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0 && maxCachedBufferCapacity > 0) { if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity); int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
int arraySize = Math.max(1, log2(max / area.pageSize) + 1); // Create as many normal caches as we support based on how many sizeIdx we have and what the upper
// bound is that we want to cache in general.
@SuppressWarnings("unchecked") List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;
MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize]; for (int idx = area.numSmallSubpagePools; idx < area.nSizes && area.sizeIdx2size(idx) <= max ; idx++) {
for (int i = 0; i < cache.length; i++) { cache.add(new NormalMemoryRegionCache<T>(cacheSize));
cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
} }
return cache; return cache.toArray(new MemoryRegionCache[0]);
} else { } else {
return null; return null;
} }
@ -159,8 +154,8 @@ final class PoolThreadCache {
/** /**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/ */
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) { boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity); return allocate(cacheForNormal(area, sizeIdx), buf, reqCapacity);
} }
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
@ -290,10 +285,12 @@ final class PoolThreadCache {
} }
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) { private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
// We need to substract area.numSmallSubpagePools as sizeIdx is the overall index for all sizes.
int idx = sizeIdx - area.numSmallSubpagePools;
if (area.isDirect()) { if (area.isDirect()) {
return cache(normalDirectCaches, sizeIdx); return cache(normalDirectCaches, idx);
} }
return cache(normalHeapCaches, sizeIdx); return cache(normalHeapCaches, idx);
} }
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) { private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {

View File

@ -45,7 +45,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
private static final int DEFAULT_SMALL_CACHE_SIZE; private static final int DEFAULT_SMALL_CACHE_SIZE;
private static final int DEFAULT_NORMAL_CACHE_SIZE; private static final int DEFAULT_NORMAL_CACHE_SIZE;
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY; static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
private static final int DEFAULT_CACHE_TRIM_INTERVAL; private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final long DEFAULT_CACHE_TRIM_INTERVAL_MILLIS; private static final long DEFAULT_CACHE_TRIM_INTERVAL_MILLIS;
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS; private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;

View File

@ -635,4 +635,30 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
private static PooledByteBuf<ByteBuffer> unwrapIfNeeded(ByteBuf buf) { private static PooledByteBuf<ByteBuffer> unwrapIfNeeded(ByteBuf buf) {
return (PooledByteBuf<ByteBuffer>) (buf instanceof PooledByteBuf ? buf : buf.unwrap()); return (PooledByteBuf<ByteBuffer>) (buf instanceof PooledByteBuf ? buf : buf.unwrap());
} }
@Test
public void testCacheWorksForNormalAllocations() {
int maxCachedBufferCapacity = PooledByteBufAllocator.DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
final PooledByteBufAllocator allocator =
new PooledByteBufAllocator(true, 1, 1,
PooledByteBufAllocator.defaultPageSize(), PooledByteBufAllocator.defaultMaxOrder(),
128, 128, true);
ByteBuf buffer = allocator.directBuffer(maxCachedBufferCapacity);
assertEquals(1, allocator.metric().directArenas().get(0).numNormalAllocations());
buffer.release();
buffer = allocator.directBuffer(maxCachedBufferCapacity);
// Should come out of the cache so the count should not be incremented
assertEquals(1, allocator.metric().directArenas().get(0).numNormalAllocations());
buffer.release();
// Should be allocated without cache and also not put back in a cache.
buffer = allocator.directBuffer(maxCachedBufferCapacity + 1);
assertEquals(2, allocator.metric().directArenas().get(0).numNormalAllocations());
buffer.release();
buffer = allocator.directBuffer(maxCachedBufferCapacity + 1);
assertEquals(3, allocator.metric().directArenas().get(0).numNormalAllocations());
buffer.release();
}
} }

View File

@ -0,0 +1,44 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.microbench.buffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.microbench.util.AbstractMicrobenchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.infra.Blackhole;
public class PooledByteBufAllocatorBenchmark extends
AbstractMicrobenchmark {
private ByteBufAllocator allocator;
@Setup
public void setup() {
allocator = new PooledByteBufAllocator(true);
}
@Benchmark
@BenchmarkMode(Mode.Throughput)
public void allocateAndFree(Blackhole blackhole) {
ByteBuf buf = allocator.directBuffer(32768);
buf.release();
blackhole.consume(buf);
}
}