Provide a way to cache the internal nioBuffer of the PooledByteBuffer… (#8603)

Motivation:

Often a temporary ByteBuffer is used which can be cached to reduce the GC pressure.

Modifications:

Cache the ByteBuffer in the PoolThreadCache as well.

Result:

Less GC.
This commit is contained in:
Norman Maurer 2018-12-04 15:26:05 +01:00
parent 361ace7671
commit 85c1590a90
7 changed files with 97 additions and 60 deletions

View File

@ -205,7 +205,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
@ -242,9 +242,8 @@ abstract class PoolArena<T> implements PoolArenaMetric {
// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
long handle = c.allocate(normCapacity);
assert handle > 0;
c.initBuf(buf, handle, reqCapacity);
boolean success = c.allocate(buf, reqCapacity, normCapacity);
assert success;
qInit.add(c);
}
@ -263,7 +262,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
allocationsHuge.increment();
}
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
@ -271,12 +270,12 @@ abstract class PoolArena<T> implements PoolArenaMetric {
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass);
freeChunk(chunk, handle, sizeClass, nioBuffer);
}
}
@ -287,7 +286,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
}
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer) {
final boolean destroyChunk;
synchronized (this) {
switch (sizeClass) {
@ -303,7 +302,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
default:
throw new Error();
}
destroyChunk = !chunk.parent.free(chunk, handle);
destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
}
if (destroyChunk) {
// destroyChunk not need to be called while holding the synchronized lock.
@ -387,6 +386,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
}
PoolChunk<T> oldChunk = buf.chunk;
ByteBuffer oldNioBuffer = buf.tmpNioBuf;
long oldHandle = buf.handle;
T oldMemory = buf.memory;
int oldOffset = buf.offset;
@ -415,7 +415,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
buf.setIndex(readerIndex, writerIndex);
if (freeOldMemory) {
free(oldChunk, oldHandle, oldMaxLength, buf.cache);
free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache);
}
}

View File

@ -16,6 +16,10 @@
package io.netty.buffer;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
/**
* Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
*
@ -107,7 +111,6 @@ final class PoolChunk<T> implements PoolChunkMetric {
final T memory;
final boolean unpooled;
final int offset;
private final byte[] memoryMap;
private final byte[] depthMap;
private final PoolSubpage<T>[] subpages;
@ -122,6 +125,13 @@ final class PoolChunk<T> implements PoolChunkMetric {
/** Used to mark memory as unusable */
private final byte unusable;
// Use as cache for ByteBuffer created from the memory. These are just duplicates and so are only a container
// around the memory itself. These are often needed for operations within the Pooled*ByteBuf and so
// may produce extra GC, which can be greatly reduced by caching the duplicates.
//
// This may be null if the PoolChunk is unpooled as pooling the ByteBuffer instances does not make any sense here.
private final Deque<ByteBuffer> cachedNioBuffers;
private int freeBytes;
PoolChunkList<T> parent;
@ -163,6 +173,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
}
subpages = newSubpageArray(maxSubpageAllocs);
cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
}
/** Creates a special chunk that is not pooled. */
@ -182,6 +193,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
chunkSize = size;
log2ChunkSize = log2(chunkSize);
maxSubpageAllocs = 0;
cachedNioBuffers = null;
}
@SuppressWarnings("unchecked")
@ -210,12 +222,20 @@ final class PoolChunk<T> implements PoolChunkMetric {
return 100 - freePercentage;
}
long allocate(int normCapacity) {
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
final long handle;
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
return allocateRun(normCapacity);
handle = allocateRun(normCapacity);
} else {
return allocateSubpage(normCapacity);
handle = allocateSubpage(normCapacity);
}
if (handle < 0) {
return false;
}
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
initBuf(buf, nioBuffer, handle, reqCapacity);
return true;
}
/**
@ -310,8 +330,8 @@ final class PoolChunk<T> implements PoolChunkMetric {
}
/**
* Create/ initialize a new PoolSubpage of normCapacity
* Any PoolSubpage created/ initialized here is added to subpage pool in the PoolArena that owns this PoolChunk
* Create / initialize a new PoolSubpage of normCapacity
* Any PoolSubpage created / initialized here is added to subpage pool in the PoolArena that owns this PoolChunk
*
* @param normCapacity normalized capacity
* @return index in memoryMap
@ -320,8 +340,8 @@ final class PoolChunk<T> implements PoolChunkMetric {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
synchronized (head) {
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
int id = allocateNode(d);
if (id < 0) {
return id;
@ -352,7 +372,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
*
* @param handle handle to free
*/
void free(long handle) {
void free(long handle, ByteBuffer nioBuffer) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
@ -372,26 +392,32 @@ final class PoolChunk<T> implements PoolChunkMetric {
freeBytes += runLength(memoryMapIdx);
setValue(memoryMapIdx, depth(memoryMapIdx));
updateParentsFree(memoryMapIdx);
if (nioBuffer != null && cachedNioBuffers != null &&
cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
cachedNioBuffers.offer(nioBuffer);
}
}
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache());
} else {
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity);
}
}
void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity);
void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx(handle), reqCapacity);
}
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
int memoryMapIdx = memoryMapIdx(handle);
@ -401,7 +427,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
assert reqCapacity <= subpage.elemSize;
buf.init(
this, handle,
this, nioBuffer, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
}

View File

@ -25,6 +25,8 @@ import java.util.List;
import static java.lang.Math.*;
import java.nio.ByteBuffer;
final class PoolChunkList<T> implements PoolChunkListMetric {
private static final Iterator<PoolChunkMetric> EMPTY_METRICS = Collections.<PoolChunkMetric>emptyList().iterator();
private final PoolArena<T> arena;
@ -75,21 +77,14 @@ final class PoolChunkList<T> implements PoolChunkListMetric {
}
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (head == null || normCapacity > maxCapacity) {
if (normCapacity > maxCapacity) {
// Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
// be handled by the PoolChunks that are contained in this PoolChunkList.
return false;
}
for (PoolChunk<T> cur = head;;) {
long handle = cur.allocate(normCapacity);
if (handle < 0) {
cur = cur.next;
if (cur == null) {
return false;
}
} else {
cur.initBuf(buf, handle, reqCapacity);
for (PoolChunk<T> cur = head; cur != null; cur = cur.next) {
if (cur.allocate(buf, reqCapacity, normCapacity)) {
if (cur.usage() >= maxUsage) {
remove(cur);
nextList.add(cur);
@ -97,10 +92,11 @@ final class PoolChunkList<T> implements PoolChunkListMetric {
return true;
}
}
return false;
}
boolean free(PoolChunk<T> chunk, long handle) {
chunk.free(handle);
boolean free(PoolChunk<T> chunk, long handle, ByteBuffer nioBuffer) {
chunk.free(handle, nioBuffer);
if (chunk.usage() < minUsage) {
remove(chunk);
// Move the PoolChunk down the PoolChunkList linked-list.

View File

@ -200,12 +200,13 @@ final class PoolThreadCache {
* Returns {@code true} if it fit into the cache {@code false} otherwise.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
long handle, int normCapacity, SizeClass sizeClass) {
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
if (cache == null) {
return false;
}
return cache.add(chunk, handle);
return cache.add(chunk, nioBuffer, handle);
}
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
@ -346,8 +347,8 @@ final class PoolThreadCache {
@Override
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, handle, reqCapacity);
PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
}
}
@ -361,8 +362,8 @@ final class PoolThreadCache {
@Override
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBuf(buf, handle, reqCapacity);
PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBuf(buf, nioBuffer, handle, reqCapacity);
}
}
@ -381,15 +382,15 @@ final class PoolThreadCache {
/**
* Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
*/
protected abstract void initBuf(PoolChunk<T> chunk, long handle,
protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
PooledByteBuf<T> buf, int reqCapacity);
/**
* Add to cache if not already full.
*/
@SuppressWarnings("unchecked")
public final boolean add(PoolChunk<T> chunk, long handle) {
Entry<T> entry = newEntry(chunk, handle);
public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
Entry<T> entry = newEntry(chunk, nioBuffer, handle);
boolean queued = queue.offer(entry);
if (!queued) {
// If it was not possible to cache the chunk, immediately recycle the entry
@ -407,7 +408,7 @@ final class PoolThreadCache {
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
entry.recycle();
// allocations is not thread-safe which is fine as this is only called from the same thread all time.
@ -453,16 +454,18 @@ final class PoolThreadCache {
private void freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
long handle = entry.handle;
ByteBuffer nioBuffer = entry.nioBuffer;
// recycle now so PoolChunk can be GC'ed.
entry.recycle();
chunk.arena.freeChunk(chunk, handle, sizeClass);
chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer);
}
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
ByteBuffer nioBuffer;
long handle = -1;
Entry(Handle<Entry<?>> recyclerHandle) {
@ -471,15 +474,17 @@ final class PoolThreadCache {
void recycle() {
chunk = null;
nioBuffer = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
@SuppressWarnings("rawtypes")
private static Entry newEntry(PoolChunk<?> chunk, long handle) {
private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.nioBuffer = nioBuffer;
entry.handle = handle;
return entry;
}

View File

@ -33,7 +33,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
protected int length;
int maxLength;
PoolThreadCache cache;
private ByteBuffer tmpNioBuf;
ByteBuffer tmpNioBuf;
private ByteBufAllocator allocator;
@SuppressWarnings("unchecked")
@ -42,27 +42,29 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle;
}
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, handle, offset, length, maxLength, cache);
void init(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, nioBuffer, handle, offset, length, maxLength, cache);
}
void initUnpooled(PoolChunk<T> chunk, int length) {
init0(chunk, 0, chunk.offset, length, length, null);
init0(chunk, null, 0, chunk.offset, length, length, null);
}
private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
this.chunk = chunk;
memory = chunk.memory;
tmpNioBuf = nioBuffer;
allocator = chunk.arena.parent;
this.cache = cache;
this.handle = handle;
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
tmpNioBuf = null;
}
/**
@ -166,8 +168,8 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
final long handle = this.handle;
this.handle = -1;
memory = null;
chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
tmpNioBuf = null;
chunk.arena.free(chunk, handle, maxLength, cache);
chunk = null;
recycle();
}

View File

@ -45,6 +45,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;
private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
@ -116,6 +117,11 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
"io.netty.allocator.directMemoryCacheAlignment", 0);
// Use 1023 by default as we use an ArrayDeque as backing storage which will then allocate an internal array
// of 1024 elements. Otherwise we would allocate 2048 and only use 1024 which is wasteful.
DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK = SystemPropertyUtil.getInt(
"io.netty.allocator.maxCachedByteBuffersPerChunk", 1023);
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
@ -136,6 +142,8 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}",
DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK);
}
}

View File

@ -49,9 +49,9 @@ final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
}
@Override
void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,
PoolThreadCache cache) {
super.init(chunk, handle, offset, length, maxLength, cache);
void init(PoolChunk<ByteBuffer> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
super.init(chunk, nioBuffer, handle, offset, length, maxLength, cache);
initMemoryAddress();
}