Rewrote DirectBufferPool using preallocation strategy (like DirectChannelBufferFactory does)

This commit is contained in:
Trustin Lee 2010-02-18 01:52:48 +00:00
parent e50192c312
commit f650a8bcee
5 changed files with 31 additions and 90 deletions

View File

@ -15,7 +15,6 @@
*/ */
package org.jboss.netty.channel.socket.nio; package org.jboss.netty.channel.socket.nio;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
@ -27,10 +26,9 @@ import org.jboss.netty.buffer.ChannelBuffer;
*/ */
final class DirectBufferPool { final class DirectBufferPool {
private static final int POOL_SIZE = 4; private static final int preallocatedBufferCapacity = 128 * 1024;
@SuppressWarnings("unchecked") private ByteBuffer preallocatedBuffer;
private final SoftReference<ByteBuffer>[] pool = new SoftReference[POOL_SIZE];
DirectBufferPool() { DirectBufferPool() {
super(); super();
@ -38,70 +36,43 @@ final class DirectBufferPool {
final ByteBuffer acquire(ChannelBuffer src) { final ByteBuffer acquire(ChannelBuffer src) {
ByteBuffer dst = acquire(src.readableBytes()); ByteBuffer dst = acquire(src.readableBytes());
dst.mark();
src.getBytes(src.readerIndex(), dst); src.getBytes(src.readerIndex(), dst);
dst.rewind(); dst.reset();
return dst; return dst;
} }
final ByteBuffer acquire(int size) { final ByteBuffer acquire(int size) {
for (int i = 0; i < POOL_SIZE; i ++) { ByteBuffer preallocatedBuffer = this.preallocatedBuffer;
SoftReference<ByteBuffer> ref = pool[i]; if (preallocatedBuffer == null) {
if (ref == null) { if (size < preallocatedBufferCapacity) {
continue; return preallocateAndAcquire(size);
} } else {
return ByteBuffer.allocateDirect(size);
ByteBuffer buf = ref.get();
if (buf == null) {
pool[i] = null;
continue;
}
if (buf.capacity() < size) {
continue;
}
pool[i] = null;
buf.rewind();
buf.limit(size);
return buf;
}
ByteBuffer buf = ByteBuffer.allocateDirect(normalizeCapacity(size));
buf.limit(size);
return buf;
}
final void release(ByteBuffer buffer) {
for (int i = 0; i < POOL_SIZE; i ++) {
SoftReference<ByteBuffer> ref = pool[i];
if (ref == null || ref.get() == null) {
pool[i] = new SoftReference<ByteBuffer>(buffer);
return;
} }
} }
// pool is full - replace one if (preallocatedBuffer.remaining() < size) {
final int capacity = buffer.capacity(); if (size > preallocatedBufferCapacity) {
for (int i = 0; i< POOL_SIZE; i ++) { return ByteBuffer.allocateDirect(size);
SoftReference<ByteBuffer> ref = pool[i]; } else {
ByteBuffer pooled = ref.get(); return preallocateAndAcquire(size);
if (pooled == null) {
pool[i] = null;
continue;
}
if (pooled.capacity() < capacity) {
pool[i] = new SoftReference<ByteBuffer>(buffer);
return;
} }
} else {
int nextPos = preallocatedBuffer.position() + size;
ByteBuffer x = preallocatedBuffer.duplicate();
preallocatedBuffer.position(nextPos);
x.limit(nextPos);
return x;
} }
} }
private static final int normalizeCapacity(int capacity) { private final ByteBuffer preallocateAndAcquire(int size) {
// Normalize to multiple of 4096. ByteBuffer preallocatedBuffer = this.preallocatedBuffer =
// Strictly speaking, 4096 should be normalized to 4096, ByteBuffer.allocateDirect(preallocatedBufferCapacity);
// but it becomes 8192 to keep the calculation simplistic. ByteBuffer x = preallocatedBuffer.duplicate();
return (capacity & 0xfffff000) + 0x1000; x.limit(size);
preallocatedBuffer.position(size);
return x;
} }
} }

View File

@ -109,7 +109,6 @@ class NioDatagramChannel extends AbstractChannel
*/ */
MessageEvent currentWriteEvent; MessageEvent currentWriteEvent;
ByteBuffer currentWriteBuffer; ByteBuffer currentWriteBuffer;
boolean currentWriteBufferIsPooled;
/** /**
* Boolean that indicates that write operation is in progress. * Boolean that indicates that write operation is in progress.

View File

@ -521,10 +521,8 @@ class NioDatagramWorker implements Runnable {
ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage(); ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage();
if (origBuf.isDirect()) { if (origBuf.isDirect()) {
channel.currentWriteBuffer = buf = origBuf.toByteBuffer(); channel.currentWriteBuffer = buf = origBuf.toByteBuffer();
channel.currentWriteBufferIsPooled = false;
} else { } else {
channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf); channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf);
channel.currentWriteBufferIsPooled = true;
} }
} else { } else {
buf = channel.currentWriteBuffer; buf = channel.currentWriteBuffer;
@ -554,10 +552,6 @@ class NioDatagramWorker implements Runnable {
if (localWrittenBytes > 0) { if (localWrittenBytes > 0) {
// Successful write - proceed to the next message. // Successful write - proceed to the next message.
if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture(); ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
channel.currentWriteBuffer = null; channel.currentWriteBuffer = null;
@ -573,9 +567,6 @@ class NioDatagramWorker implements Runnable {
} catch (final AsynchronousCloseException e) { } catch (final AsynchronousCloseException e) {
// Doesn't need a user attention - ignore. // Doesn't need a user attention - ignore.
} catch (final Throwable t) { } catch (final Throwable t) {
if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture(); ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
channel.currentWriteBuffer = null; channel.currentWriteBuffer = null;
@ -704,7 +695,6 @@ class NioDatagramWorker implements Runnable {
// Clean up the stale messages in the write buffer. // Clean up the stale messages in the write buffer.
synchronized (channel.writeLock) { synchronized (channel.writeLock) {
MessageEvent evt = channel.currentWriteEvent; MessageEvent evt = channel.currentWriteEvent;
ByteBuffer buf = channel.currentWriteBuffer;
if (evt != null) { if (evt != null) {
// Create the exception only once to avoid the excessive overhead // Create the exception only once to avoid the excessive overhead
// caused by fillStackTrace. // caused by fillStackTrace.
@ -713,14 +703,10 @@ class NioDatagramWorker implements Runnable {
} else { } else {
cause = new ClosedChannelException(); cause = new ClosedChannelException();
} }
if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture(); ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
channel.currentWriteBuffer = null; channel.currentWriteBuffer = null;
buf = null;
evt = null; evt = null;
future.setFailure(cause); future.setFailure(cause);
fireExceptionCaught = true; fireExceptionCaught = true;

View File

@ -72,7 +72,6 @@ class NioSocketChannel extends AbstractChannel
MessageEvent currentWriteEvent; MessageEvent currentWriteEvent;
ByteBuffer currentWriteBuffer; ByteBuffer currentWriteBuffer;
boolean currentWriteBufferIsPooled;
public NioSocketChannel( public NioSocketChannel(
Channel parent, ChannelFactory factory, Channel parent, ChannelFactory factory,

View File

@ -318,6 +318,7 @@ class NioWorker implements Runnable {
final boolean fromPool = !buffer.isDirect(); final boolean fromPool = !buffer.isDirect();
if (fromPool) { if (fromPool) {
directBuffer = directBufferPool.acquire(buffer.writableBytes()); directBuffer = directBufferPool.acquire(buffer.writableBytes());
directBuffer.mark();
} else { } else {
directBuffer = buffer.toByteBuffer(); directBuffer = buffer.toByteBuffer();
} }
@ -339,9 +340,9 @@ class NioWorker implements Runnable {
fireExceptionCaught(channel, t); fireExceptionCaught(channel, t);
} finally { } finally {
if (fromPool) { if (fromPool) {
directBuffer.flip(); directBuffer.limit(directBuffer.position());
directBuffer.reset();
buffer.writeBytes(directBuffer); buffer.writeBytes(directBuffer);
directBufferPool.release(directBuffer);
} else { } else {
// no need to copy: directBuffer is just a view to buffer. // no need to copy: directBuffer is just a view to buffer.
buffer.writerIndex(buffer.writerIndex() + readBytes); buffer.writerIndex(buffer.writerIndex() + readBytes);
@ -463,10 +464,8 @@ class NioWorker implements Runnable {
ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage(); ChannelBuffer origBuf = (ChannelBuffer) evt.getMessage();
if (origBuf.isDirect()) { if (origBuf.isDirect()) {
channel.currentWriteBuffer = buf = origBuf.toByteBuffer(); channel.currentWriteBuffer = buf = origBuf.toByteBuffer();
channel.currentWriteBufferIsPooled = false;
} else { } else {
channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf); channel.currentWriteBuffer = buf = directBufferPool.acquire(origBuf);
channel.currentWriteBufferIsPooled = true;
} }
} else { } else {
buf = channel.currentWriteBuffer; buf = channel.currentWriteBuffer;
@ -483,10 +482,6 @@ class NioWorker implements Runnable {
if (!buf.hasRemaining()) { if (!buf.hasRemaining()) {
// Successful write - proceed to the next message. // Successful write - proceed to the next message.
if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture(); ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
channel.currentWriteBuffer = null; channel.currentWriteBuffer = null;
@ -502,9 +497,6 @@ class NioWorker implements Runnable {
} catch (AsynchronousCloseException e) { } catch (AsynchronousCloseException e) {
// Doesn't need a user attention - ignore. // Doesn't need a user attention - ignore.
} catch (Throwable t) { } catch (Throwable t) {
if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture(); ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
channel.currentWriteBuffer = null; channel.currentWriteBuffer = null;
@ -624,7 +616,6 @@ class NioWorker implements Runnable {
// Clean up the stale messages in the write buffer. // Clean up the stale messages in the write buffer.
synchronized (channel.writeLock) { synchronized (channel.writeLock) {
MessageEvent evt = channel.currentWriteEvent; MessageEvent evt = channel.currentWriteEvent;
ByteBuffer buf = channel.currentWriteBuffer;
if (evt != null) { if (evt != null) {
// Create the exception only once to avoid the excessive overhead // Create the exception only once to avoid the excessive overhead
// caused by fillStackTrace. // caused by fillStackTrace.
@ -634,14 +625,9 @@ class NioWorker implements Runnable {
cause = new ClosedChannelException(); cause = new ClosedChannelException();
} }
if (channel.currentWriteBufferIsPooled) {
directBufferPool.release(buf);
}
ChannelFuture future = evt.getFuture(); ChannelFuture future = evt.getFuture();
channel.currentWriteEvent = null; channel.currentWriteEvent = null;
channel.currentWriteBuffer = null; channel.currentWriteBuffer = null;
buf = null;
evt = null; evt = null;
future.setFailure(cause); future.setFailure(cause);
fireExceptionCaught = true; fireExceptionCaught = true;