Added direct buffer support (disabled by default and can't be enabled without recompilation for now - should be fixed in 3.1.)

This commit is contained in:
Trustin Lee 2008-09-28 12:51:50 +00:00
parent b64124efd6
commit 5170838e79

View File

@ -32,6 +32,7 @@ import java.nio.channels.ScatteringByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
@ -65,6 +66,7 @@ class NioWorker implements Runnable {
InternalLoggerFactory.getInstance(NioWorker.class);
private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL;
private static final boolean USE_DIRECT_BUFFER = false; // Hard-coded for now
private final int bossId;
private final int id;
@ -74,8 +76,7 @@ class NioWorker implements Runnable {
volatile Selector selector;
final AtomicBoolean wakenUp = new AtomicBoolean();
final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
final ConcurrentLinkedQueue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
//final ConcurrentFastQueue<Runnable> taskQueue = new ConcurrentFastQueue<Runnable>();
final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
NioWorker(int bossId, int id, Executor executor) {
this.bossId = bossId;
@ -254,7 +255,16 @@ class NioWorker implements Runnable {
}
if (k.isReadable()) {
read(k);
// TODO Replace ReceiveBufferSizePredictor with
// ChannelBufferAllocator and let user specify it per
// Channel. (Netty 3.1)
if (USE_DIRECT_BUFFER) {
readIntoDirectBuffer(k);
} else {
readIntoHeapBuffer(k);
}
}
if (!k.isValid()) {
@ -268,12 +278,13 @@ class NioWorker implements Runnable {
}
}
private static void read(SelectionKey k) {
private static void readIntoHeapBuffer(SelectionKey k) {
ScatteringByteChannel ch = (ScatteringByteChannel) k.channel();
NioSocketChannel channel = (NioSocketChannel) k.attachment();
ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
ChannelBuffer buf = ChannelBuffers.buffer(predictor.nextReceiveBufferSize());
int ret = 0;
@ -306,6 +317,62 @@ class NioWorker implements Runnable {
}
}
private ChannelBuffer preallocatedDirectBuffer;
private static void readIntoDirectBuffer(SelectionKey k) {
ScatteringByteChannel ch = (ScatteringByteChannel) k.channel();
NioSocketChannel channel = (NioSocketChannel) k.attachment();
ReceiveBufferSizePredictor predictor =
channel.getConfig().getReceiveBufferSizePredictor();
ChannelBuffer preallocatedDirectBuffer = channel.getWorker().preallocatedDirectBuffer;
NioWorker worker = channel.getWorker();
worker.preallocatedDirectBuffer = null;
if (preallocatedDirectBuffer == null) {
preallocatedDirectBuffer = ChannelBuffers.directBuffer(1048576);
}
int ret = 0;
int readBytes = 0;
boolean failure = true;
try {
while ((ret = preallocatedDirectBuffer.writeBytes(ch, preallocatedDirectBuffer.writableBytes())) > 0) {
readBytes += ret;
if (!preallocatedDirectBuffer.writable()) {
break;
}
}
failure = false;
} catch (AsynchronousCloseException e) {
// Can happen, and doesn't need a user attention.
} catch (Throwable t) {
fireExceptionCaught(channel, t);
}
if (readBytes > 0) {
// Update the predictor.
predictor.previousReceiveBufferSize(readBytes);
// Fire the event.
ChannelBuffer slice = preallocatedDirectBuffer.slice(
preallocatedDirectBuffer.readerIndex(),
preallocatedDirectBuffer.readableBytes());
preallocatedDirectBuffer.readerIndex(preallocatedDirectBuffer.writerIndex());
if (preallocatedDirectBuffer.writable()) {
worker.preallocatedDirectBuffer = preallocatedDirectBuffer;
}
fireMessageReceived(channel, slice);
} else if (readBytes == 0) {
worker.preallocatedDirectBuffer = preallocatedDirectBuffer;
}
if (ret < 0 || failure) {
close(k);
}
}
private static void write(SelectionKey k) {
NioSocketChannel ch = (NioSocketChannel) k.attachment();
write(ch, false);