More efficient handling of incomplete writes.

The problem with the old way was that we always set the OP_WRITE when the buffer could not be written
until the write-spin-count was reached. This means that in some cases the channel was still be writable
but we just was not able to write out the data quick enough. For this cases we should better break out the
write loop and schedule a write to be picked up later in the EventLoop, when other tasks was executed.
The OP_WRITE will only be set if a write actual returned 0 which means there is no more room for writing data
and this we need to wait for the os to notify us.
This commit is contained in:
Norman Maurer 2013-09-13 21:05:08 +02:00
parent 8dc57f6933
commit 3cfcf09af8
2 changed files with 29 additions and 5 deletions

View File

@ -35,6 +35,7 @@ import java.nio.channels.SelectionKey;
* {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes. * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes.
*/ */
public abstract class AbstractNioByteChannel extends AbstractNioChannel { public abstract class AbstractNioByteChannel extends AbstractNioChannel {
private Runnable flushTask;
/** /**
* Create a new instance * Create a new instance
@ -160,6 +161,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
continue; continue;
} }
boolean setOpWrite = false;
boolean done = false; boolean done = false;
long flushedAmount = 0; long flushedAmount = 0;
if (writeSpinCount == -1) { if (writeSpinCount == -1) {
@ -168,6 +170,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
for (int i = writeSpinCount - 1; i >= 0; i --) { for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf); int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) { if (localFlushedAmount == 0) {
setOpWrite = true;
break; break;
} }
@ -183,12 +186,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
if (done) { if (done) {
in.remove(); in.remove();
} else { } else {
// Did not write completely. incompleteWrite(setOpWrite);
setOpWrite();
break; break;
} }
} else if (msg instanceof FileRegion) { } else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg; FileRegion region = (FileRegion) msg;
boolean setOpWrite = false;
boolean done = false; boolean done = false;
long flushedAmount = 0; long flushedAmount = 0;
if (writeSpinCount == -1) { if (writeSpinCount == -1) {
@ -197,6 +200,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
for (int i = writeSpinCount - 1; i >= 0; i --) { for (int i = writeSpinCount - 1; i >= 0; i --) {
long localFlushedAmount = doWriteFileRegion(region); long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount == 0) { if (localFlushedAmount == 0) {
setOpWrite = true;
break; break;
} }
@ -212,8 +216,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
if (done) { if (done) {
in.remove(); in.remove();
} else { } else {
// Did not write completely. incompleteWrite(setOpWrite);
setOpWrite();
break; break;
} }
} else { } else {
@ -222,6 +225,25 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
} }
} }
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
setOpWrite();
} else {
// Schedule flush again later so other tasks can be picked up in the meantime
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
}
eventLoop().execute(flushTask);
}
}
/** /**
* Write a {@link FileRegion} * Write a {@link FileRegion}
* *

View File

@ -245,9 +245,11 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
final SocketChannel ch = javaChannel(); final SocketChannel ch = javaChannel();
long writtenBytes = 0; long writtenBytes = 0;
boolean done = false; boolean done = false;
boolean setOpWrite = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) { if (localWrittenBytes == 0) {
setOpWrite = true;
break; break;
} }
expectedWrittenBytes -= localWrittenBytes; expectedWrittenBytes -= localWrittenBytes;
@ -293,7 +295,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
} }
} }
setOpWrite(); incompleteWrite(setOpWrite);
break; break;
} }
} }