Disable caching of PooledByteBuf for different threads.
Motivation: We introduced a PoolThreadCache which is used in our PooledByteBufAllocator to reduce the synchronization overhead on PoolArenas when allocate / deallocate PooledByteBuf instances. This cache is used for both the allocation path and deallocation path by: - Look for cached memory in the PoolThreadCache for the Thread that tries to allocate a new PooledByteBuf and if one is found return it. - Add the memory that is used by a PooledByteBuf to the PoolThreadCache of the Thread that release the PooledByteBuf This works out very well when all allocation / deallocation is done in the EventLoop as the EventLoop will be used for read and write. On the otherside this can lead to surprising side-effects if the user allocate from outside the EventLoop and and pass the ByteBuf over for writing. The problem here is that the memory will be added to the PoolThreadCache that did the actual write on the underlying transport and not on the Thread that previously allocated the buffer. Modifications: Don't cache if different Threads are used for allocating/deallocating Result: Less confusing behavior for users that allocate PooledByteBufs from outside the EventLoop.
This commit is contained in:
parent
687d3d3b5c
commit
cb85ed9d66
@ -187,15 +187,18 @@ abstract class PoolArena<T> {
|
|||||||
buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
|
buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
void free(PoolChunk<T> chunk, long handle, int normCapacity) {
|
void free(PoolChunk<T> chunk, long handle, int normCapacity, boolean sameThreads) {
|
||||||
if (chunk.unpooled) {
|
if (chunk.unpooled) {
|
||||||
destroyChunk(chunk);
|
destroyChunk(chunk);
|
||||||
} else {
|
} else {
|
||||||
|
if (sameThreads) {
|
||||||
PoolThreadCache cache = parent.threadCache.get();
|
PoolThreadCache cache = parent.threadCache.get();
|
||||||
if (cache.add(this, chunk, handle, normCapacity)) {
|
if (cache.add(this, chunk, handle, normCapacity)) {
|
||||||
// cached so not free it.
|
// cached so not free it.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
chunk.parent.free(chunk, handle);
|
chunk.parent.free(chunk, handle);
|
||||||
}
|
}
|
||||||
@ -295,7 +298,7 @@ abstract class PoolArena<T> {
|
|||||||
buf.setIndex(readerIndex, writerIndex);
|
buf.setIndex(readerIndex, writerIndex);
|
||||||
|
|
||||||
if (freeOldMemory) {
|
if (freeOldMemory) {
|
||||||
free(oldChunk, oldHandle, oldMaxLength);
|
free(oldChunk, oldHandle, oldMaxLength, buf.initThread == Thread.currentThread());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
|
|||||||
protected int offset;
|
protected int offset;
|
||||||
protected int length;
|
protected int length;
|
||||||
int maxLength;
|
int maxLength;
|
||||||
|
Thread initThread;
|
||||||
private ByteBuffer tmpNioBuf;
|
private ByteBuffer tmpNioBuf;
|
||||||
|
|
||||||
protected PooledByteBuf(Recycler.Handle recyclerHandle, int maxCapacity) {
|
protected PooledByteBuf(Recycler.Handle recyclerHandle, int maxCapacity) {
|
||||||
@ -51,6 +51,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
|
|||||||
this.maxLength = maxLength;
|
this.maxLength = maxLength;
|
||||||
setIndex(0, 0);
|
setIndex(0, 0);
|
||||||
tmpNioBuf = null;
|
tmpNioBuf = null;
|
||||||
|
initThread = Thread.currentThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
void initUnpooled(PoolChunk<T> chunk, int length) {
|
void initUnpooled(PoolChunk<T> chunk, int length) {
|
||||||
@ -63,6 +64,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
|
|||||||
this.length = maxLength = length;
|
this.length = maxLength = length;
|
||||||
setIndex(0, 0);
|
setIndex(0, 0);
|
||||||
tmpNioBuf = null;
|
tmpNioBuf = null;
|
||||||
|
initThread = Thread.currentThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -140,7 +142,9 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
|
|||||||
final long handle = this.handle;
|
final long handle = this.handle;
|
||||||
this.handle = -1;
|
this.handle = -1;
|
||||||
memory = null;
|
memory = null;
|
||||||
chunk.arena.free(chunk, handle, maxLength);
|
boolean sameThread = initThread == Thread.currentThread();
|
||||||
|
initThread = null;
|
||||||
|
chunk.arena.free(chunk, handle, maxLength, sameThread);
|
||||||
recycle();
|
recycle();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user