diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java b/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java index cfd6e83949..69d3f1d7c1 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/DirectBufferPool.java @@ -15,7 +15,6 @@ */ package org.jboss.netty.channel.socket.nio; -import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import org.jboss.netty.buffer.ChannelBuffer; @@ -27,10 +26,9 @@ import org.jboss.netty.buffer.ChannelBuffer; */ final class DirectBufferPool { - private static final int POOL_SIZE = 4; + private static final int preallocatedBufferCapacity = 128 * 1024; - @SuppressWarnings("unchecked") - private final SoftReference[] pool = new SoftReference[POOL_SIZE]; + private ByteBuffer preallocatedBuffer; DirectBufferPool() { super(); @@ -38,70 +36,43 @@ final class DirectBufferPool { final ByteBuffer acquire(ChannelBuffer src) { ByteBuffer dst = acquire(src.readableBytes()); + dst.mark(); src.getBytes(src.readerIndex(), dst); - dst.rewind(); + dst.reset(); return dst; } final ByteBuffer acquire(int size) { - for (int i = 0; i < POOL_SIZE; i ++) { - SoftReference ref = pool[i]; - if (ref == null) { - continue; - } - - ByteBuffer buf = ref.get(); - if (buf == null) { - pool[i] = null; - continue; - } - - if (buf.capacity() < size) { - continue; - } - - pool[i] = null; - - buf.rewind(); - buf.limit(size); - return buf; - } - - ByteBuffer buf = ByteBuffer.allocateDirect(normalizeCapacity(size)); - buf.limit(size); - return buf; - } - - final void release(ByteBuffer buffer) { - for (int i = 0; i < POOL_SIZE; i ++) { - SoftReference ref = pool[i]; - if (ref == null || ref.get() == null) { - pool[i] = new SoftReference(buffer); - return; + ByteBuffer preallocatedBuffer = this.preallocatedBuffer; + if (preallocatedBuffer == null) { + if (size < preallocatedBufferCapacity) { + return preallocateAndAcquire(size); + } else { + return ByteBuffer.allocateDirect(size); } } - // pool is full - replace one - final int capacity = buffer.capacity(); - for (int i = 0; i< POOL_SIZE; i ++) { - SoftReference ref = pool[i]; - ByteBuffer pooled = ref.get(); - if (pooled == null) { - pool[i] = null; - continue; - } - - if (pooled.capacity() < capacity) { - pool[i] = new SoftReference(buffer); - return; + if (preallocatedBuffer.remaining() < size) { + if (size > preallocatedBufferCapacity) { + return ByteBuffer.allocateDirect(size); + } else { + return preallocateAndAcquire(size); } + } else { + int nextPos = preallocatedBuffer.position() + size; + ByteBuffer x = preallocatedBuffer.duplicate(); + preallocatedBuffer.position(nextPos); + x.limit(nextPos); + return x; } } - private static final int normalizeCapacity(int capacity) { - // Normalize to multiple of 4096. - // Strictly speaking, 4096 should be normalized to 4096, - // but it becomes 8192 to keep the calculation simplistic. - return (capacity & 0xfffff000) + 0x1000; + private final ByteBuffer preallocateAndAcquire(int size) { + ByteBuffer preallocatedBuffer = this.preallocatedBuffer = + ByteBuffer.allocateDirect(preallocatedBufferCapacity); + ByteBuffer x = preallocatedBuffer.duplicate(); + x.limit(size); + preallocatedBuffer.position(size); + return x; } } diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java index d51722e51f..144961fad2 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramChannel.java @@ -109,7 +109,6 @@ class NioDatagramChannel extends AbstractChannel */ MessageEvent currentWriteEvent; ByteBuffer currentWriteBuffer; - boolean currentWriteBufferIsPooled; /** * Boolean that indicates that write operation is in progress. diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java index 7345a90006..1e4ee433ad 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioDatagramWorker.java @@ -521,10 +521,8 @@ class NioDatagramWorker implements Runnable { ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage(); if (origBuf.isDirect()) { channel.currentWriteBuffer = buf = origBuf.toByteBuffer(); - channel.currentWriteBufferIsPooled = false; } else { channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf); - channel.currentWriteBufferIsPooled = true; } } else { buf = channel.currentWriteBuffer; @@ -554,10 +552,6 @@ class NioDatagramWorker implements Runnable { if (localWrittenBytes > 0) { // Successful write - proceed to the next message. - if (channel.currentWriteBufferIsPooled) { - directBufferPool.release(buf); - } - ChannelFuture future = evt.getFuture(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; @@ -573,9 +567,6 @@ class NioDatagramWorker implements Runnable { } catch (final AsynchronousCloseException e) { // Doesn't need a user attention - ignore. } catch (final Throwable t) { - if (channel.currentWriteBufferIsPooled) { - directBufferPool.release(buf); - } ChannelFuture future = evt.getFuture(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; @@ -704,7 +695,6 @@ class NioDatagramWorker implements Runnable { // Clean up the stale messages in the write buffer. synchronized (channel.writeLock) { MessageEvent evt = channel.currentWriteEvent; - ByteBuffer buf = channel.currentWriteBuffer; if (evt != null) { // Create the exception only once to avoid the excessive overhead // caused by fillStackTrace. @@ -713,14 +703,10 @@ class NioDatagramWorker implements Runnable { } else { cause = new ClosedChannelException(); } - if (channel.currentWriteBufferIsPooled) { - directBufferPool.release(buf); - } ChannelFuture future = evt.getFuture(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; - buf = null; evt = null; future.setFailure(cause); fireExceptionCaught = true; diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 9c063495cc..577e53cb64 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -72,7 +72,6 @@ class NioSocketChannel extends AbstractChannel MessageEvent currentWriteEvent; ByteBuffer currentWriteBuffer; - boolean currentWriteBufferIsPooled; public NioSocketChannel( Channel parent, ChannelFactory factory, diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 9e52e8d0ab..6893a9bdc6 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -318,6 +318,7 @@ class NioWorker implements Runnable { final boolean fromPool = !buffer.isDirect(); if (fromPool) { directBuffer = directBufferPool.acquire(buffer.writableBytes()); + directBuffer.mark(); } else { directBuffer = buffer.toByteBuffer(); } @@ -339,9 +340,9 @@ class NioWorker implements Runnable { fireExceptionCaught(channel, t); } finally { if (fromPool) { - directBuffer.flip(); + directBuffer.limit(directBuffer.position()); + directBuffer.reset(); buffer.writeBytes(directBuffer); - directBufferPool.release(directBuffer); } else { // no need to copy: directBuffer is just a view to buffer. buffer.writerIndex(buffer.writerIndex() + readBytes); @@ -463,10 +464,8 @@ class NioWorker implements Runnable { ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage(); if (origBuf.isDirect()) { channel.currentWriteBuffer = buf = origBuf.toByteBuffer(); - channel.currentWriteBufferIsPooled = false; } else { channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf); - channel.currentWriteBufferIsPooled = true; } } else { buf = channel.currentWriteBuffer; @@ -483,10 +482,6 @@ class NioWorker implements Runnable { if (!buf.hasRemaining()) { // Successful write - proceed to the next message. - if (channel.currentWriteBufferIsPooled) { - directBufferPool.release(buf); - } - ChannelFuture future = evt.getFuture(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; @@ -502,9 +497,6 @@ class NioWorker implements Runnable { } catch (AsynchronousCloseException e) { // Doesn't need a user attention - ignore. } catch (Throwable t) { - if (channel.currentWriteBufferIsPooled) { - directBufferPool.release(buf); - } ChannelFuture future = evt.getFuture(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; @@ -624,7 +616,6 @@ class NioWorker implements Runnable { // Clean up the stale messages in the write buffer. synchronized (channel.writeLock) { MessageEvent evt = channel.currentWriteEvent; - ByteBuffer buf = channel.currentWriteBuffer; if (evt != null) { // Create the exception only once to avoid the excessive overhead // caused by fillStackTrace. @@ -634,14 +625,9 @@ class NioWorker implements Runnable { cause = new ClosedChannelException(); } - if (channel.currentWriteBufferIsPooled) { - directBufferPool.release(buf); - } - ChannelFuture future = evt.getFuture(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; - buf = null; evt = null; future.setFailure(cause); fireExceptionCaught = true;