Fix caching for normal allocations (#10825)
Motivation: https://github.com/netty/netty/pull/10267 introduced a change that reduced the fragmentation. Unfortunally it also introduced a regression when it comes to caching of normal allocations. This can have a negative performance impact depending on the allocation sizes. Modifications: - Fix algorithm to calculate the array size for normal allocation caches - Correctly calculate indeox for normal caches - Add unit test Result: Fixes https://github.com/netty/netty/issues/10805
This commit is contained in:
parent
6c446d14fd
commit
2dae6665f4
@ -28,6 +28,8 @@ import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
@ -52,9 +54,6 @@ final class PoolThreadCache {
|
||||
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
|
||||
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
|
||||
|
||||
// Used for bitshifting when calculate the index of normal caches later
|
||||
private final int numShiftsNormalDirect;
|
||||
private final int numShiftsNormalHeap;
|
||||
private final int freeSweepAllocationThreshold;
|
||||
private final AtomicBoolean freed = new AtomicBoolean();
|
||||
|
||||
@ -74,7 +73,6 @@ final class PoolThreadCache {
|
||||
smallSubPageDirectCaches = createSubPageCaches(
|
||||
smallCacheSize, directArena.numSmallSubpagePools);
|
||||
|
||||
numShiftsNormalDirect = log2(directArena.pageSize);
|
||||
normalDirectCaches = createNormalCaches(
|
||||
normalCacheSize, maxCachedBufferCapacity, directArena);
|
||||
|
||||
@ -83,14 +81,12 @@ final class PoolThreadCache {
|
||||
// No directArea is configured so just null out all caches
|
||||
smallSubPageDirectCaches = null;
|
||||
normalDirectCaches = null;
|
||||
numShiftsNormalDirect = -1;
|
||||
}
|
||||
if (heapArena != null) {
|
||||
// Create the caches for the heap allocations
|
||||
smallSubPageHeapCaches = createSubPageCaches(
|
||||
smallCacheSize, heapArena.numSmallSubpagePools);
|
||||
|
||||
numShiftsNormalHeap = log2(heapArena.pageSize);
|
||||
normalHeapCaches = createNormalCaches(
|
||||
normalCacheSize, maxCachedBufferCapacity, heapArena);
|
||||
|
||||
@ -99,7 +95,6 @@ final class PoolThreadCache {
|
||||
// No heapArea is configured so just null out all caches
|
||||
smallSubPageHeapCaches = null;
|
||||
normalHeapCaches = null;
|
||||
numShiftsNormalHeap = -1;
|
||||
}
|
||||
|
||||
// Only check if there are caches in use.
|
||||
@ -126,18 +121,19 @@ final class PoolThreadCache {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static <T> MemoryRegionCache<T>[] createNormalCaches(
|
||||
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
|
||||
if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
|
||||
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
|
||||
int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
|
||||
for (int i = 0; i < cache.length; i++) {
|
||||
cache[i] = new NormalMemoryRegionCache<>(cacheSize);
|
||||
// Create as many normal caches as we support based on how many sizeIdx we have and what the upper
|
||||
// bound is that we want to cache in general.
|
||||
List<MemoryRegionCache<T>> cache = new ArrayList<>() ;
|
||||
for (int idx = area.numSmallSubpagePools; idx < area.nSizes && area.sizeIdx2size(idx) <= max ; idx++) {
|
||||
cache.add(new NormalMemoryRegionCache<T>(cacheSize));
|
||||
}
|
||||
return cache;
|
||||
return cache.toArray(new MemoryRegionCache[0]);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
@ -158,8 +154,8 @@ final class PoolThreadCache {
|
||||
/**
|
||||
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
|
||||
*/
|
||||
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
|
||||
return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
|
||||
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx) {
|
||||
return allocate(cacheForNormal(area, sizeIdx), buf, reqCapacity);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@ -289,10 +285,12 @@ final class PoolThreadCache {
|
||||
}
|
||||
|
||||
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int sizeIdx) {
|
||||
// We need to substract area.numSmallSubpagePools as sizeIdx is the overall index for all sizes.
|
||||
int idx = sizeIdx - area.numSmallSubpagePools;
|
||||
if (area.isDirect()) {
|
||||
return cache(normalDirectCaches, sizeIdx);
|
||||
return cache(normalDirectCaches, idx);
|
||||
}
|
||||
return cache(normalHeapCaches, sizeIdx);
|
||||
return cache(normalHeapCaches, idx);
|
||||
}
|
||||
|
||||
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int sizeIdx) {
|
||||
|
@ -45,7 +45,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
|
||||
private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
|
||||
private static final int DEFAULT_SMALL_CACHE_SIZE;
|
||||
private static final int DEFAULT_NORMAL_CACHE_SIZE;
|
||||
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
|
||||
static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
|
||||
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
|
||||
private static final long DEFAULT_CACHE_TRIM_INTERVAL_MILLIS;
|
||||
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
|
||||
|
@ -625,4 +625,30 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
|
||||
private static PooledByteBuf<ByteBuffer> unwrapIfNeeded(ByteBuf buf) {
|
||||
return (PooledByteBuf<ByteBuffer>) (buf instanceof PooledByteBuf ? buf : buf.unwrap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCacheWorksForNormalAllocations() {
|
||||
int maxCachedBufferCapacity = PooledByteBufAllocator.DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
|
||||
final PooledByteBufAllocator allocator =
|
||||
new PooledByteBufAllocator(true, 1, 1,
|
||||
PooledByteBufAllocator.defaultPageSize(), PooledByteBufAllocator.defaultMaxOrder(),
|
||||
128, 128, true);
|
||||
ByteBuf buffer = allocator.directBuffer(maxCachedBufferCapacity);
|
||||
assertEquals(1, allocator.metric().directArenas().get(0).numNormalAllocations());
|
||||
buffer.release();
|
||||
|
||||
buffer = allocator.directBuffer(maxCachedBufferCapacity);
|
||||
// Should come out of the cache so the count should not be incremented
|
||||
assertEquals(1, allocator.metric().directArenas().get(0).numNormalAllocations());
|
||||
buffer.release();
|
||||
|
||||
// Should be allocated without cache and also not put back in a cache.
|
||||
buffer = allocator.directBuffer(maxCachedBufferCapacity + 1);
|
||||
assertEquals(2, allocator.metric().directArenas().get(0).numNormalAllocations());
|
||||
buffer.release();
|
||||
|
||||
buffer = allocator.directBuffer(maxCachedBufferCapacity + 1);
|
||||
assertEquals(3, allocator.metric().directArenas().get(0).numNormalAllocations());
|
||||
buffer.release();
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.microbench.buffer;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.microbench.util.AbstractMicrobenchmark;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.infra.Blackhole;
|
||||
|
||||
public class PooledByteBufAllocatorBenchmark extends
|
||||
AbstractMicrobenchmark {
|
||||
private ByteBufAllocator allocator;
|
||||
|
||||
@Setup
|
||||
public void setup() {
|
||||
allocator = new PooledByteBufAllocator(true);
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@BenchmarkMode(Mode.Throughput)
|
||||
public void allocateAndFree(Blackhole blackhole) {
|
||||
ByteBuf buf = allocator.directBuffer(32768);
|
||||
buf.release();
|
||||
blackhole.consume(buf);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user