From 5170838e795e008330966030b584a9b2b12d5c62 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Sun, 28 Sep 2008 12:51:50 +0000 Subject: [PATCH] Added direct buffer support (disabled by default and can't be enabled without recompilation for now - should be fixed in 3.1.) --- .../netty/channel/socket/nio/NioWorker.java | 75 ++++++++++++++++++- 1 file changed, 71 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 8bbee4bcda..383b09c8ad 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -32,6 +32,7 @@ import java.nio.channels.ScatteringByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -65,6 +66,7 @@ class NioWorker implements Runnable { InternalLoggerFactory.getInstance(NioWorker.class); private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL; + private static final boolean USE_DIRECT_BUFFER = false; // Hard-coded for now private final int bossId; private final int id; @@ -74,8 +76,7 @@ class NioWorker implements Runnable { volatile Selector selector; final AtomicBoolean wakenUp = new AtomicBoolean(); final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); - final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue(); - //final ConcurrentFastQueue taskQueue = new ConcurrentFastQueue(); + final Queue taskQueue = new ConcurrentLinkedQueue(); NioWorker(int bossId, int id, Executor executor) { this.bossId = bossId; @@ -254,7 +255,16 @@ class NioWorker implements Runnable { } if (k.isReadable()) { - read(k); + + // TODO Replace ReceiveBufferSizePredictor with + // ChannelBufferAllocator and let user specify it per + // Channel. (Netty 3.1) + + if (USE_DIRECT_BUFFER) { + readIntoDirectBuffer(k); + } else { + readIntoHeapBuffer(k); + } } if (!k.isValid()) { @@ -268,12 +278,13 @@ class NioWorker implements Runnable { } } - private static void read(SelectionKey k) { + private static void readIntoHeapBuffer(SelectionKey k) { ScatteringByteChannel ch = (ScatteringByteChannel) k.channel(); NioSocketChannel channel = (NioSocketChannel) k.attachment(); ReceiveBufferSizePredictor predictor = channel.getConfig().getReceiveBufferSizePredictor(); + ChannelBuffer buf = ChannelBuffers.buffer(predictor.nextReceiveBufferSize()); int ret = 0; @@ -306,6 +317,62 @@ class NioWorker implements Runnable { } } + private ChannelBuffer preallocatedDirectBuffer; + + private static void readIntoDirectBuffer(SelectionKey k) { + ScatteringByteChannel ch = (ScatteringByteChannel) k.channel(); + NioSocketChannel channel = (NioSocketChannel) k.attachment(); + + ReceiveBufferSizePredictor predictor = + channel.getConfig().getReceiveBufferSizePredictor(); + + ChannelBuffer preallocatedDirectBuffer = channel.getWorker().preallocatedDirectBuffer; + NioWorker worker = channel.getWorker(); + worker.preallocatedDirectBuffer = null; + + if (preallocatedDirectBuffer == null) { + preallocatedDirectBuffer = ChannelBuffers.directBuffer(1048576); + } + + int ret = 0; + int readBytes = 0; + boolean failure = true; + try { + while ((ret = preallocatedDirectBuffer.writeBytes(ch, preallocatedDirectBuffer.writableBytes())) > 0) { + readBytes += ret; + if (!preallocatedDirectBuffer.writable()) { + break; + } + } + failure = false; + } catch (AsynchronousCloseException e) { + // Can happen, and doesn't need a user attention. + } catch (Throwable t) { + fireExceptionCaught(channel, t); + } + + if (readBytes > 0) { + // Update the predictor. + predictor.previousReceiveBufferSize(readBytes); + + // Fire the event. + ChannelBuffer slice = preallocatedDirectBuffer.slice( + preallocatedDirectBuffer.readerIndex(), + preallocatedDirectBuffer.readableBytes()); + preallocatedDirectBuffer.readerIndex(preallocatedDirectBuffer.writerIndex()); + if (preallocatedDirectBuffer.writable()) { + worker.preallocatedDirectBuffer = preallocatedDirectBuffer; + } + fireMessageReceived(channel, slice); + } else if (readBytes == 0) { + worker.preallocatedDirectBuffer = preallocatedDirectBuffer; + } + + if (ret < 0 || failure) { + close(k); + } + } + private static void write(SelectionKey k) { NioSocketChannel ch = (NioSocketChannel) k.attachment(); write(ch, false);