Improved write performance by reducing the lame write attempts on the channel whose send buffer is full already

This commit is contained in:
Trustin Lee 2010-02-09 03:41:12 +00:00
parent 194fd79c6b
commit 0857f398e0
4 changed files with 35 additions and 18 deletions

View File

@ -114,7 +114,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBuffer.offer(event);
assert offered;
channel.worker.write(channel, true);
channel.worker.writeFromUserCode(channel);
}
}

View File

@ -134,7 +134,7 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
NioSocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBuffer.offer(event);
assert offered;
channel.worker.write(channel, true);
channel.worker.writeFromUserCode(channel);
}
}

View File

@ -67,7 +67,8 @@ class NioSocketChannel extends AbstractChannel
final Queue<MessageEvent> writeBuffer = new WriteBuffer();
final AtomicInteger writeBufferSize = new AtomicInteger();
final AtomicInteger highWaterMarkCounter = new AtomicInteger();
volatile boolean inWriteNowLoop;
boolean inWriteNowLoop;
boolean writeSuspended;
MessageEvent currentWriteEvent;
ByteBuffer currentWriteBuffer;
@ -257,7 +258,7 @@ class NioSocketChannel extends AbstractChannel
public void run() {
writeTaskInTaskQueue.set(false);
worker.write(NioSocketChannel.this, false);
worker.writeFromTaskLoop(NioSocketChannel.this);
}
}
}

View File

@ -281,7 +281,7 @@ class NioWorker implements Runnable {
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
write(k);
writeFromSelectorLoop(k);
}
} catch (CancelledKeyException e) {
close(k);
@ -364,31 +364,45 @@ class NioWorker implements Runnable {
return true;
}
private void write(SelectionKey k) {
NioSocketChannel ch = (NioSocketChannel) k.attachment();
write(ch, false);
}
private void close(SelectionKey k) {
NioSocketChannel ch = (NioSocketChannel) k.attachment();
close(ch, succeededFuture(ch));
}
void write(final NioSocketChannel channel, boolean mightNeedWakeup) {
void writeFromUserCode(final NioSocketChannel channel) {
if (!channel.isConnected()) {
cleanUpWriteBuffer(channel);
return;
}
if (mightNeedWakeup && scheduleWriteIfNecessary(channel)) {
if (scheduleWriteIfNecessary(channel)) {
return;
}
// From here, we are sure Thread.currentThread() == workerThread.
if (channel.writeSuspended) {
return;
}
if (channel.inWriteNowLoop) {
scheduleWriteIfNecessary(channel);
} else {
writeNow(channel, channel.getConfig().getWriteSpinCount());
// Need to update the queue?
return;
}
write0(channel);
}
void writeFromTaskLoop(final NioSocketChannel ch) {
if (!ch.writeSuspended) {
write0(ch);
}
}
void writeFromSelectorLoop(final SelectionKey k) {
NioSocketChannel ch = (NioSocketChannel) k.attachment();
ch.writeSuspended = false;
write0(ch);
}
private boolean scheduleWriteIfNecessary(final NioSocketChannel channel) {
@ -426,15 +440,15 @@ class NioWorker implements Runnable {
return false;
}
private void writeNow(NioSocketChannel channel, int writeSpinCount) {
private void write0(NioSocketChannel channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
int writtenBytes = 0;
Queue<MessageEvent> writeBuffer = channel.writeBuffer;
final Queue<MessageEvent> writeBuffer = channel.writeBuffer;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
synchronized (channel.writeLock) {
channel.inWriteNowLoop = true;
for (;;) {
@ -443,6 +457,7 @@ class NioWorker implements Runnable {
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
@ -482,6 +497,7 @@ class NioWorker implements Runnable {
} else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;
break;
}
} catch (AsynchronousCloseException e) {