Implement Thread caches for pooled buffers to minimize conditions. This fixes [#2264] and [#808].

Motivation:
Remove the synchronization bottleneck in PoolArena and so speed up things

Modifications:

This implementation uses kind of the same technics as outlined in the jemalloc paper and jemalloc
blogpost https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/480222803919.

At the moment we only cache for "known" Threads (that powers EventExecutors) and not for others to keep the overhead
minimal when need to free up unused buffers in the cache and free up cached buffers once the Thread completes. Here
we use multi-level caches for tiny, small and normal allocations. Huge allocations are not cached at all to keep the
memory usage at a sane level. All the different cache configurations can be adjusted via system properties or the constructor
directly where it makes sense.

Result:
Less conditions as most allocations can be served by the cache itself
This commit is contained in:
Norman Maurer 2014-03-01 15:47:03 +01:00
parent 2afe58f5f1
commit 13fd69e871
4 changed files with 602 additions and 52 deletions

View File

@ -23,14 +23,16 @@ import java.nio.ByteBuffer;
abstract class PoolArena<T> { abstract class PoolArena<T> {
static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent; final PooledByteBufAllocator parent;
private final int pageSize;
private final int maxOrder; private final int maxOrder;
private final int pageShifts; final int pageSize;
private final int chunkSize; final int pageShifts;
private final int subpageOverflowMask; final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
private final PoolSubpage<T>[] tinySubpagePools; private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools; private final PoolSubpage<T>[] smallSubpagePools;
@ -51,13 +53,13 @@ abstract class PoolArena<T> {
this.pageShifts = pageShifts; this.pageShifts = pageShifts;
this.chunkSize = chunkSize; this.chunkSize = chunkSize;
subpageOverflowMask = ~(pageSize - 1); subpageOverflowMask = ~(pageSize - 1);
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
tinySubpagePools = newSubpagePoolArray(512 >>> 4);
for (int i = 0; i < tinySubpagePools.length; i ++) { for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize); tinySubpagePools[i] = newSubpagePoolHead(pageSize);
} }
smallSubpagePools = newSubpagePoolArray(pageShifts - 9); numSmallSubpagePools = pageShifts - 9;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) { for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize); smallSubpagePools[i] = newSubpagePoolHead(pageSize);
} }
@ -89,27 +91,56 @@ abstract class PoolArena<T> {
return new PoolSubpage[size]; return new PoolSubpage[size];
} }
abstract boolean isDirect();
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity); PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity); allocate(cache, buf, reqCapacity);
return buf; return buf;
} }
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}
static int smallIdx(int normCapacity) {
int tableIdx = 0;
int i = normCapacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
}
return tableIdx;
}
// capacity < pageSize
boolean isTinyOrSmall(int normCapacity) {
return (normCapacity & subpageOverflowMask) == 0;
}
// normCapacity < 512
static boolean isTiny(int normCapacity) {
return (normCapacity & 0xFFFFFE00) == 0;
}
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity); final int normCapacity = normalizeCapacity(reqCapacity);
if ((normCapacity & subpageOverflowMask) == 0) { // capacity < pageSize if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx; int tableIdx;
PoolSubpage<T>[] table; PoolSubpage<T>[] table;
if ((normCapacity & 0xFFFFFE00) == 0) { // < 512 if (isTiny(normCapacity)) { // < 512
tableIdx = normCapacity >>> 4; if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools; table = tinySubpagePools;
} else { } else {
tableIdx = 0; if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
int i = normCapacity >>> 10; // was able to allocate out of the cache so move on
while (i != 0) { return;
i >>>= 1;
tableIdx ++;
} }
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools; table = smallSubpagePools;
} }
@ -124,11 +155,16 @@ abstract class PoolArena<T> {
return; return;
} }
} }
} else if (normCapacity > chunkSize) { } else if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity); allocateHuge(buf, reqCapacity);
return; return;
} }
allocateNormal(buf, reqCapacity, normCapacity); allocateNormal(buf, reqCapacity, normCapacity);
} }
@ -151,10 +187,15 @@ abstract class PoolArena<T> {
buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity); buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
} }
void free(PoolChunk<T> chunk, long handle) { void free(PoolChunk<T> chunk, long handle, int normCapacity) {
if (chunk.unpooled) { if (chunk.unpooled) {
destroyChunk(chunk); destroyChunk(chunk);
} else { } else {
PoolThreadCache cache = parent.threadCache.get();
if (cache.add(this, chunk, handle, normCapacity)) {
// cached so not free it.
return;
}
synchronized (this) { synchronized (this) {
chunk.parent.free(chunk, handle); chunk.parent.free(chunk, handle);
} }
@ -164,7 +205,7 @@ abstract class PoolArena<T> {
PoolSubpage<T> findSubpagePoolHead(int elemSize) { PoolSubpage<T> findSubpagePoolHead(int elemSize) {
int tableIdx; int tableIdx;
PoolSubpage<T>[] table; PoolSubpage<T>[] table;
if ((elemSize & 0xFFFFFE00) == 0) { // < 512 if (isTiny(elemSize)) { // < 512
tableIdx = elemSize >>> 4; tableIdx = elemSize >>> 4;
table = tinySubpagePools; table = tinySubpagePools;
} else { } else {
@ -180,7 +221,7 @@ abstract class PoolArena<T> {
return table[tableIdx]; return table[tableIdx];
} }
private int normalizeCapacity(int reqCapacity) { int normalizeCapacity(int reqCapacity) {
if (reqCapacity < 0) { if (reqCapacity < 0) {
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)"); throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
} }
@ -188,7 +229,7 @@ abstract class PoolArena<T> {
return reqCapacity; return reqCapacity;
} }
if ((reqCapacity & 0xFFFFFE00) != 0) { // >= 512 if (!isTiny(reqCapacity)) { // >= 512
// Doubled // Doubled
int normalizedCapacity = reqCapacity; int normalizedCapacity = reqCapacity;
@ -228,7 +269,7 @@ abstract class PoolArena<T> {
long oldHandle = buf.handle; long oldHandle = buf.handle;
T oldMemory = buf.memory; T oldMemory = buf.memory;
int oldOffset = buf.offset; int oldOffset = buf.offset;
int oldMaxLength = buf.maxLength;
int readerIndex = buf.readerIndex(); int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex(); int writerIndex = buf.writerIndex();
@ -253,7 +294,7 @@ abstract class PoolArena<T> {
buf.setIndex(readerIndex, writerIndex); buf.setIndex(readerIndex, writerIndex);
if (freeOldMemory) { if (freeOldMemory) {
free(oldChunk, oldHandle); free(oldChunk, oldHandle, oldMaxLength);
} }
} }
@ -339,6 +380,11 @@ abstract class PoolArena<T> {
super(parent, pageSize, maxOrder, pageShifts, chunkSize); super(parent, pageSize, maxOrder, pageShifts, chunkSize);
} }
@Override
boolean isDirect() {
return false;
}
@Override @Override
protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize); return new PoolChunk<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize);
@ -377,6 +423,11 @@ abstract class PoolArena<T> {
super(parent, pageSize, maxOrder, pageShifts, chunkSize); super(parent, pageSize, maxOrder, pageShifts, chunkSize);
} }
@Override
boolean isDirect() {
return true;
}
@Override @Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<ByteBuffer>( return new PoolChunk<ByteBuffer>(

View File

@ -16,18 +16,436 @@
package io.netty.buffer; package io.netty.buffer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
/**
* Acts a Thread cache for allocations. This implementation is moduled after
* <a href="http://people.freebsd.org/~jasone/jemalloc/bsdcan2006/jemalloc.pdf">jemalloc</a> and the descripted
* technics of <a href="https://www.facebook.com/notes/facebook-engineering/scalable-memory-allocation-using-jemalloc/
* 480222803919">Scalable memory allocation using jemalloc</a>.
*/
final class PoolThreadCache { final class PoolThreadCache {
final PoolArena<byte[]> heapArena; final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena; final PoolArena<ByteBuffer> directArena;
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
// Used for bitshifting when calculate the index of normal caches later
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
private final int freeSweepAllocationThreshold;
private int allocations;
// TODO: Test if adding padding helps under contention // TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena) { PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
if (maxCachedBufferCapacity < 0) {
throw new IllegalArgumentException("maxCachedBufferCapacity: "
+ maxCachedBufferCapacity + " (expected: >= 0)");
}
if (freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ maxCachedBufferCapacity + " (expected: > 0)");
}
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena; this.heapArena = heapArena;
this.directArena = directArena; this.directArena = directArena;
if (directArena != null) {
tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools);
smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools);
smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
}
private static <T> SubPageMemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches) {
if (cacheSize > 0) {
@SuppressWarnings("unchecked")
SubPageMemoryRegionCache<T>[] cache = new SubPageMemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
}
return cache;
} else {
return null;
}
}
private static <T> NormalMemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
int arraySize = Math.max(1, max / area.pageSize);
@SuppressWarnings("unchecked")
NormalMemoryRegionCache<T>[] cache = new NormalMemoryRegionCache[arraySize];
for (int i = 0; i < cache.length; i++) {
cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
}
return cache;
} else {
return null;
}
}
private static int log2(int val) {
int res = 0;
while (val > 1) {
val >>= 1;
res++;
}
return res;
}
/**
* Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForSmall(area, normCapacity), buf, reqCapacity);
}
/**
* Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForNormal(area, normCapacity), buf, reqCapacity);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
/**
* Add {@link PoolChunk} and {@code handle} to the cache if there is enough room.
* Returns {@code true} if it fit into the cache {@code false} otherwise.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity) {
MemoryRegionCache<?> cache;
if (area.isTinyOrSmall(normCapacity)) {
if (PoolArena.isTiny(normCapacity)) {
cache = cacheForTiny(area, normCapacity);
} else {
cache = cacheForSmall(area, normCapacity);
}
} else {
cache = cacheForNormal(area, normCapacity);
}
if (cache == null) {
return false;
}
return cache.add(chunk, handle);
}
/**
* Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
*/
void free() {
free(tinySubPageDirectCaches);
free(smallSubPageDirectCaches);
free(normalDirectCaches);
free(tinySubPageHeapCaches);
free(smallSubPageHeapCaches);
free(normalHeapCaches);
}
private static void free(MemoryRegionCache<?>[] caches) {
if (caches == null) {
return;
}
for (int i = 0; i < caches.length; i++) {
free(caches[i]);
}
}
private static void free(MemoryRegionCache<?> cache) {
if (cache == null) {
return;
}
cache.free();
}
void trim() {
trim(tinySubPageDirectCaches);
trim(smallSubPageDirectCaches);
trim(normalDirectCaches);
trim(tinySubPageHeapCaches);
trim(smallSubPageHeapCaches);
trim(normalHeapCaches);
}
private static void trim(MemoryRegionCache<?>[] caches) {
if (caches == null) {
return;
}
for (int i = 0; i < caches.length; i++) {
trim(caches[i]);
}
}
private static void trim(MemoryRegionCache<?> cache) {
if (cache == null) {
return;
}
cache.trim();
}
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.smallIdx(normCapacity);
if (area.isDirect()) {
return cache(smallSubPageDirectCaches, idx);
}
return cache(smallSubPageHeapCaches, idx);
}
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {
if (area.isDirect()) {
int idx = log2(normCapacity >> numShiftsNormalDirect);
return cache(normalDirectCaches, idx);
}
int idx = log2(normCapacity >> numShiftsNormalHeap);
return cache(normalHeapCaches, idx);
}
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache[idx];
}
/**
* Cache used for buffers which are backed by TINY or SMALL size.
*/
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size) {
super(size);
}
@Override
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, handle, reqCapacity);
}
}
/**
* Cache used for buffers which are backed by NORMAL size.
*/
private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
NormalMemoryRegionCache(int size) {
super(size);
}
@Override
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBuf(buf, handle, reqCapacity);
}
}
/**
* Cache of {@link PoolChunk} and handles which can be used to allocate a buffer without locking at all.
*/
private abstract static class MemoryRegionCache<T> {
private final Entry<T>[] entries;
private final int maxUnusedCached;
private int head;
private int tail;
private int maxEntriesInUse;
private int entriesInUse;
@SuppressWarnings("unchecked")
MemoryRegionCache(int size) {
entries = new Entry[powerOfTwo(size)];
for (int i = 0; i < entries.length; i++) {
entries[i] = new Entry<T>();
}
maxUnusedCached = size / 2;
}
private static int powerOfTwo(int res) {
if (res <= 2) {
return 2;
}
res--;
res |= res >> 1;
res |= res >> 2;
res |= res >> 4;
res |= res >> 8;
res |= res >> 16;
res++;
return res;
}
/**
* Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
*/
protected abstract void initBuf(PoolChunk<T> chunk, long handle,
PooledByteBuf<T> buf, int reqCapacity);
/**
* Add to cache if not already full.
*/
public boolean add(PoolChunk<T> chunk, long handle) {
Entry<T> entry = entries[tail];
if (entry.chunk != null) {
// cache is full
return false;
}
entriesInUse --;
entry.chunk = chunk;
entry.handle = handle;
tail = nextIdx(tail);
return true;
}
/**
* Allocate something out of the cache if possible and remove the entry from the cache.
*/
public boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = entries[head];
if (entry.chunk == null) {
return false;
}
entriesInUse ++;
if (maxEntriesInUse < entriesInUse) {
maxEntriesInUse = entriesInUse;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
// only null out the chunk as we only use the chunk to check if the buffer is full or not.
entry.chunk = null;
head = nextIdx(head);
return true;
}
/**
* Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
*/
public void free() {
entriesInUse = 0;
maxEntriesInUse = 0;
for (int i = head;; i = nextIdx(i)) {
if (!freeEntry(entries[i])) {
// all cleared
return;
}
}
}
/**
* Free up cached {@link PoolChunk}s if not allocated frequently enough.
*/
private void trim() {
int free = size() - maxEntriesInUse;
entriesInUse = 0;
maxEntriesInUse = 0;
if (free <= maxUnusedCached) {
return;
}
int i = head;
for (; free > 0; free--) {
if (!freeEntry(entries[i])) {
// all freed
return;
}
i = nextIdx(i);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static boolean freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
if (chunk == null) {
return false;
}
// need to synchronize on the area from which it was allocated before.
synchronized (chunk.arena) {
chunk.parent.free(chunk, entry.handle);
}
entry.chunk = null;
return true;
}
/**
* Return the number of cached entries.
*/
private int size() {
return tail - head & entries.length - 1;
}
private int nextIdx(int index) {
// use bitwise operation as this is faster as using modulo.
return (index + 1) & entries.length - 1;
}
private static final class Entry<T> {
PoolChunk<T> chunk;
long handle;
}
} }
} }

View File

@ -30,7 +30,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
protected T memory; protected T memory;
protected int offset; protected int offset;
protected int length; protected int length;
private int maxLength; int maxLength;
private ByteBuffer tmpNioBuf; private ByteBuffer tmpNioBuf;
@ -140,7 +140,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
final long handle = this.handle; final long handle = this.handle;
this.handle = -1; this.handle = -1;
memory = null; memory = null;
chunk.arena.free(chunk, handle); chunk.arena.free(chunk, handle, maxLength);
recycle(); recycle();
} }
} }

View File

@ -33,6 +33,11 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private static final int DEFAULT_PAGE_SIZE; private static final int DEFAULT_PAGE_SIZE;
private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk private static final int DEFAULT_MAX_ORDER; // 8192 << 11 = 16 MiB per chunk
private static final int DEFAULT_TINY_CACHE_SIZE;
private static final int DEFAULT_SMALL_CACHE_SIZE;
private static final int DEFAULT_NORMAL_CACHE_SIZE;
private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY;
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final int MIN_PAGE_SIZE = 4096; private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2); private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
@ -75,6 +80,20 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
runtime.availableProcessors(), runtime.availableProcessors(),
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3))); PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
// cache sizes
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);
// 32 kb is the default maximum capacity of the cached buffer. Similar to what is explained in
// 'Scalable memory allocation using jemalloc'
DEFAULT_MAX_CACHED_BUFFER_CAPACITY = SystemPropertyUtil.getInt(
"io.netty.allocator.maxCachedBufferCapacity", 32 * 1024);
// the number of threshold of allocations when cached entries will be freed up if not frequently used
DEFAULT_CACHE_TRIM_INTERVAL = SystemPropertyUtil.getInt(
"io.netty.allocator.cacheTrimInterval", 8192);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA); logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA); logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
@ -89,6 +108,12 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause); logger.debug("-Dio.netty.allocator.maxOrder: {}", DEFAULT_MAX_ORDER, maxOrderFallbackCause);
} }
logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER); logger.debug("-Dio.netty.allocator.chunkSize: {}", DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER);
logger.debug("-Dio.netty.allocator.tinyCacheSize: {}", DEFAULT_TINY_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.smallCacheSize: {}", DEFAULT_SMALL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.normalCacheSize: {}", DEFAULT_NORMAL_CACHE_SIZE);
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}",
DEFAULT_CACHE_TRIM_INTERVAL);
} }
} }
@ -97,30 +122,11 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private final PoolArena<byte[]>[] heapArenas; private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas; private final PoolArena<ByteBuffer>[] directArenas;
private final int tinyCacheSize;
private final int smallCacheSize;
private final int normalCacheSize;
final ThreadLocal<PoolThreadCache> threadCache = new ThreadLocal<PoolThreadCache>() { final PoolThreadLocalCache threadCache = new PoolThreadLocalCache();
private final AtomicInteger index = new AtomicInteger();
@Override
protected PoolThreadCache initialValue() {
final int idx = index.getAndIncrement();
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
if (heapArenas != null) {
heapArena = heapArenas[Math.abs(idx % heapArenas.length)];
} else {
heapArena = null;
}
if (directArenas != null) {
directArena = directArenas[Math.abs(idx % directArenas.length)];
} else {
directArena = null;
}
return new PoolThreadCache(heapArena, directArena);
}
};
public PooledByteBufAllocator() { public PooledByteBufAllocator() {
this(false); this(false);
@ -135,8 +141,16 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
} }
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) { public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder) {
super(preferDirect); this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
DEFAULT_TINY_CACHE_SIZE, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE);
}
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize) {
super(preferDirect);
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder); final int chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
if (nHeapArena < 0) { if (nHeapArena < 0) {
@ -241,6 +255,73 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
return directArenas != null; return directArenas != null;
} }
/**
* Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated
* buffers.
*/
public boolean hasThreadLocalCache() {
return threadCache.exists();
}
/**
* Free all cached buffers for the calling {@link Thread}.
*/
public void freeThreadLocalCache() {
threadCache.free();
}
final class PoolThreadLocalCache extends ThreadLocal<PoolThreadCache> {
private final AtomicInteger index = new AtomicInteger();
@Override
public PoolThreadCache get() {
PoolThreadCache cache = super.get();
if (cache == null) {
final int idx = index.getAndIncrement();
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
if (heapArenas != null) {
heapArena = heapArenas[Math.abs(idx % heapArenas.length)];
} else {
heapArena = null;
}
if (directArenas != null) {
directArena = directArenas[Math.abs(idx % directArenas.length)];
} else {
directArena = null;
}
// If the current Thread is assigned to an EventExecutor we can
// easily free the cached stuff again once the EventExecutor completes later.
cache = new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
set(cache);
}
return cache;
}
/**
* Returns {@code true} if the calling {@link Thread} has a {@link ThreadLocal} cache for the allocated
* buffers.
*/
public boolean exists() {
return super.get() != null;
}
/**
* Free all cached buffers for the calling {@link Thread}.
*/
public void free() {
PoolThreadCache cache = super.get();
if (cache != null) {
cache.free();
}
}
}
// Too noisy at the moment. // Too noisy at the moment.
// //
// public String toString() { // public String toString() {