Disable caching of PooledByteBuf for different threads.
Motivation: We introduced a PoolThreadCache which is used in our PooledByteBufAllocator to reduce the synchronization overhead on PoolArenas when allocate / deallocate PooledByteBuf instances. This cache is used for both the allocation path and deallocation path by: - Look for cached memory in the PoolThreadCache for the Thread that tries to allocate a new PooledByteBuf and if one is found return it. - Add the memory that is used by a PooledByteBuf to the PoolThreadCache of the Thread that release the PooledByteBuf This works out very well when all allocation / deallocation is done in the EventLoop as the EventLoop will be used for read and write. On the otherside this can lead to surprising side-effects if the user allocate from outside the EventLoop and and pass the ByteBuf over for writing. The problem here is that the memory will be added to the PoolThreadCache that did the actual write on the underlying transport and not on the Thread that previously allocated the buffer. Modifications: Don't cache if different Threads are used for allocating/deallocating Result: Less confusing behavior for users that allocate PooledByteBufs from outside the EventLoop.
This commit is contained in:
parent
4131e1dfd3
commit
6bfe7d0f1e
@ -187,15 +187,18 @@ abstract class PoolArena<T> {
|
||||
buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
|
||||
}
|
||||
|
||||
void free(PoolChunk<T> chunk, long handle, int normCapacity) {
|
||||
void free(PoolChunk<T> chunk, long handle, int normCapacity, boolean sameThreads) {
|
||||
if (chunk.unpooled) {
|
||||
destroyChunk(chunk);
|
||||
} else {
|
||||
PoolThreadCache cache = parent.threadCache.get();
|
||||
if (cache.add(this, chunk, handle, normCapacity)) {
|
||||
// cached so not free it.
|
||||
return;
|
||||
if (sameThreads) {
|
||||
PoolThreadCache cache = parent.threadCache.get();
|
||||
if (cache.add(this, chunk, handle, normCapacity)) {
|
||||
// cached so not free it.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
chunk.parent.free(chunk, handle);
|
||||
}
|
||||
@ -295,7 +298,7 @@ abstract class PoolArena<T> {
|
||||
buf.setIndex(readerIndex, writerIndex);
|
||||
|
||||
if (freeOldMemory) {
|
||||
free(oldChunk, oldHandle, oldMaxLength);
|
||||
free(oldChunk, oldHandle, oldMaxLength, buf.initThread == Thread.currentThread());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
|
||||
protected int offset;
|
||||
protected int length;
|
||||
int maxLength;
|
||||
|
||||
Thread initThread;
|
||||
private ByteBuffer tmpNioBuf;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -53,6 +53,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
|
||||
this.maxLength = maxLength;
|
||||
setIndex(0, 0);
|
||||
tmpNioBuf = null;
|
||||
initThread = Thread.currentThread();
|
||||
}
|
||||
|
||||
void initUnpooled(PoolChunk<T> chunk, int length) {
|
||||
@ -65,6 +66,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
|
||||
this.length = maxLength = length;
|
||||
setIndex(0, 0);
|
||||
tmpNioBuf = null;
|
||||
initThread = Thread.currentThread();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -142,7 +144,9 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
|
||||
final long handle = this.handle;
|
||||
this.handle = -1;
|
||||
memory = null;
|
||||
chunk.arena.free(chunk, handle, maxLength);
|
||||
boolean sameThread = initThread == Thread.currentThread();
|
||||
initThread = null;
|
||||
chunk.arena.free(chunk, handle, maxLength, sameThread);
|
||||
recycle();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user