DirectBufferPool should be static because it uses thread local now
This commit is contained in:
parent
4bf4d5f814
commit
84cf41890c
@ -29,7 +29,7 @@ final class DirectBufferPool {
|
|||||||
|
|
||||||
private static final int POOL_SIZE = 4;
|
private static final int POOL_SIZE = 4;
|
||||||
|
|
||||||
private final ThreadLocal<SoftReference<ByteBuffer>[]> pool =
|
private static final ThreadLocal<SoftReference<ByteBuffer>[]> pool =
|
||||||
new ThreadLocal<SoftReference<ByteBuffer>[]>() {
|
new ThreadLocal<SoftReference<ByteBuffer>[]>() {
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@ -40,19 +40,15 @@ final class DirectBufferPool {
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
DirectBufferPool() {
|
static final ByteBuffer acquire(ChannelBuffer src) {
|
||||||
super();
|
|
||||||
}
|
|
||||||
|
|
||||||
final ByteBuffer acquire(ChannelBuffer src) {
|
|
||||||
ByteBuffer dst = acquire(src.readableBytes());
|
ByteBuffer dst = acquire(src.readableBytes());
|
||||||
src.getBytes(src.readerIndex(), dst);
|
src.getBytes(src.readerIndex(), dst);
|
||||||
dst.rewind();
|
dst.rewind();
|
||||||
return dst;
|
return dst;
|
||||||
}
|
}
|
||||||
|
|
||||||
final ByteBuffer acquire(int size) {
|
static final ByteBuffer acquire(int size) {
|
||||||
final SoftReference<ByteBuffer>[] pool = this.pool.get();
|
final SoftReference<ByteBuffer>[] pool = DirectBufferPool.pool.get();
|
||||||
for (int i = 0; i < POOL_SIZE; i ++) {
|
for (int i = 0; i < POOL_SIZE; i ++) {
|
||||||
SoftReference<ByteBuffer> ref = pool[i];
|
SoftReference<ByteBuffer> ref = pool[i];
|
||||||
if (ref == null) {
|
if (ref == null) {
|
||||||
@ -81,8 +77,8 @@ final class DirectBufferPool {
|
|||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
final void release(ByteBuffer buffer) {
|
static final void release(ByteBuffer buffer) {
|
||||||
final SoftReference<ByteBuffer>[] pool = this.pool.get();
|
final SoftReference<ByteBuffer>[] pool = DirectBufferPool.pool.get();
|
||||||
for (int i = 0; i < POOL_SIZE; i ++) {
|
for (int i = 0; i < POOL_SIZE; i ++) {
|
||||||
SoftReference<ByteBuffer> ref = pool[i];
|
SoftReference<ByteBuffer> ref = pool[i];
|
||||||
if (ref == null || ref.get() == null) {
|
if (ref == null || ref.get() == null) {
|
||||||
@ -114,4 +110,8 @@ final class DirectBufferPool {
|
|||||||
// but it becomes 8192 to keep the calculation simplistic.
|
// but it becomes 8192 to keep the calculation simplistic.
|
||||||
return (capacity & 0xfffff000) + 0x1000;
|
return (capacity & 0xfffff000) + 0x1000;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private DirectBufferPool() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -127,8 +127,6 @@ class NioDatagramWorker implements Runnable {
|
|||||||
|
|
||||||
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
|
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
|
||||||
|
|
||||||
private final DirectBufferPool directBufferPool = new DirectBufferPool();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sole constructor.
|
* Sole constructor.
|
||||||
*
|
*
|
||||||
@ -476,7 +474,7 @@ class NioDatagramWorker implements Runnable {
|
|||||||
channel.currentWriteBuffer = buf = origBuf.toByteBuffer();
|
channel.currentWriteBuffer = buf = origBuf.toByteBuffer();
|
||||||
channel.currentWriteBufferIsPooled = false;
|
channel.currentWriteBufferIsPooled = false;
|
||||||
} else {
|
} else {
|
||||||
channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf);
|
channel.currentWriteBuffer = buf = DirectBufferPool.acquire(origBuf);
|
||||||
channel.currentWriteBufferIsPooled = true;
|
channel.currentWriteBufferIsPooled = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -508,7 +506,7 @@ class NioDatagramWorker implements Runnable {
|
|||||||
if (localWrittenBytes > 0) {
|
if (localWrittenBytes > 0) {
|
||||||
// Successful write - proceed to the next message.
|
// Successful write - proceed to the next message.
|
||||||
if (channel.currentWriteBufferIsPooled) {
|
if (channel.currentWriteBufferIsPooled) {
|
||||||
directBufferPool.release(buf);
|
DirectBufferPool.release(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
ChannelFuture future = evt.getFuture();
|
ChannelFuture future = evt.getFuture();
|
||||||
@ -526,7 +524,7 @@ class NioDatagramWorker implements Runnable {
|
|||||||
// Doesn't need a user attention - ignore.
|
// Doesn't need a user attention - ignore.
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
if (channel.currentWriteBufferIsPooled) {
|
if (channel.currentWriteBufferIsPooled) {
|
||||||
directBufferPool.release(buf);
|
DirectBufferPool.release(buf);
|
||||||
}
|
}
|
||||||
ChannelFuture future = evt.getFuture();
|
ChannelFuture future = evt.getFuture();
|
||||||
channel.currentWriteEvent = null;
|
channel.currentWriteEvent = null;
|
||||||
@ -636,7 +634,7 @@ class NioDatagramWorker implements Runnable {
|
|||||||
cause = new ClosedChannelException();
|
cause = new ClosedChannelException();
|
||||||
}
|
}
|
||||||
if (channel.currentWriteBufferIsPooled) {
|
if (channel.currentWriteBufferIsPooled) {
|
||||||
directBufferPool.release(buf);
|
DirectBufferPool.release(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
ChannelFuture future = evt.getFuture();
|
ChannelFuture future = evt.getFuture();
|
||||||
|
@ -78,7 +78,6 @@ class NioWorker implements Runnable {
|
|||||||
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
|
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
|
||||||
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
|
private final Queue<Runnable> writeTaskQueue = new LinkedTransferQueue<Runnable>();
|
||||||
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
|
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
|
||||||
private final DirectBufferPool directBufferPool = new DirectBufferPool();
|
|
||||||
|
|
||||||
NioWorker(int bossId, int id, Executor executor) {
|
NioWorker(int bossId, int id, Executor executor) {
|
||||||
this.bossId = bossId;
|
this.bossId = bossId;
|
||||||
@ -317,7 +316,7 @@ class NioWorker implements Runnable {
|
|||||||
final ByteBuffer directBuffer;
|
final ByteBuffer directBuffer;
|
||||||
final boolean fromPool = !buffer.isDirect();
|
final boolean fromPool = !buffer.isDirect();
|
||||||
if (fromPool) {
|
if (fromPool) {
|
||||||
directBuffer = directBufferPool.acquire(buffer.writableBytes());
|
directBuffer = DirectBufferPool.acquire(buffer.writableBytes());
|
||||||
} else {
|
} else {
|
||||||
directBuffer = buffer.toByteBuffer();
|
directBuffer = buffer.toByteBuffer();
|
||||||
}
|
}
|
||||||
@ -341,7 +340,7 @@ class NioWorker implements Runnable {
|
|||||||
if (fromPool) {
|
if (fromPool) {
|
||||||
directBuffer.flip();
|
directBuffer.flip();
|
||||||
buffer.writeBytes(directBuffer);
|
buffer.writeBytes(directBuffer);
|
||||||
directBufferPool.release(directBuffer);
|
DirectBufferPool.release(directBuffer);
|
||||||
} else {
|
} else {
|
||||||
// no need to copy: directBuffer is just a view to buffer.
|
// no need to copy: directBuffer is just a view to buffer.
|
||||||
buffer.writerIndex(buffer.writerIndex() + readBytes);
|
buffer.writerIndex(buffer.writerIndex() + readBytes);
|
||||||
@ -408,7 +407,7 @@ class NioWorker implements Runnable {
|
|||||||
channel.currentWriteBuffer = buf = origBuf.toByteBuffer();
|
channel.currentWriteBuffer = buf = origBuf.toByteBuffer();
|
||||||
channel.currentWriteBufferIsPooled = false;
|
channel.currentWriteBufferIsPooled = false;
|
||||||
} else {
|
} else {
|
||||||
channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf);
|
channel.currentWriteBuffer = buf = DirectBufferPool.acquire(origBuf);
|
||||||
channel.currentWriteBufferIsPooled = true;
|
channel.currentWriteBufferIsPooled = true;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -427,7 +426,7 @@ class NioWorker implements Runnable {
|
|||||||
if (!buf.hasRemaining()) {
|
if (!buf.hasRemaining()) {
|
||||||
// Successful write - proceed to the next message.
|
// Successful write - proceed to the next message.
|
||||||
if (channel.currentWriteBufferIsPooled) {
|
if (channel.currentWriteBufferIsPooled) {
|
||||||
directBufferPool.release(buf);
|
DirectBufferPool.release(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
ChannelFuture future = evt.getFuture();
|
ChannelFuture future = evt.getFuture();
|
||||||
@ -445,7 +444,7 @@ class NioWorker implements Runnable {
|
|||||||
// Doesn't need a user attention - ignore.
|
// Doesn't need a user attention - ignore.
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
if (channel.currentWriteBufferIsPooled) {
|
if (channel.currentWriteBufferIsPooled) {
|
||||||
directBufferPool.release(buf);
|
DirectBufferPool.release(buf);
|
||||||
}
|
}
|
||||||
ChannelFuture future = evt.getFuture();
|
ChannelFuture future = evt.getFuture();
|
||||||
channel.currentWriteEvent = null;
|
channel.currentWriteEvent = null;
|
||||||
@ -563,7 +562,7 @@ class NioWorker implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (channel.currentWriteBufferIsPooled) {
|
if (channel.currentWriteBufferIsPooled) {
|
||||||
directBufferPool.release(buf);
|
DirectBufferPool.release(buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
ChannelFuture future = evt.getFuture();
|
ChannelFuture future = evt.getFuture();
|
||||||
|
Loading…
Reference in New Issue
Block a user