[#2667] Write until EAGAIN in native transport and only call setEpollOut() in this case

Motivation:

In the previous fix for #2667 I did introduce a bit overhead by calling setEpollOut() too often.

Modification:

Only call setEpollOut() if really needed and remove unused code.

Result:

Less overhead when saturate network.
This commit is contained in:
Norman Maurer 2014-07-18 20:31:19 +02:00
parent 65718db3ec
commit 3d1560e61b

View File

@ -52,7 +52,6 @@ import java.util.concurrent.TimeUnit;
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel {
private final EpollSocketChannelConfig config; private final EpollSocketChannelConfig config;
private Runnable flushTask;
/** /**
* The future of the current connection attempt. If not null, subsequent * The future of the current connection attempt. If not null, subsequent
@ -113,7 +112,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
in.remove(); in.remove();
return true; return true;
} }
boolean setEpollOut = false;
boolean done = false; boolean done = false;
long writtenBytes = 0; long writtenBytes = 0;
if (buf.nioBufferCount() == 1) { if (buf.nioBufferCount() == 1) {
@ -131,11 +129,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break; break;
} }
} else { } else {
setEpollOut = true; // Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break; break;
} }
} }
updateOutboundBuffer(in, writtenBytes, 1, done, setEpollOut); updateOutboundBuffer(in, writtenBytes, 1, done);
return done; return done;
} else { } else {
ByteBuffer[] nioBuffers = buf.nioBuffers(); ByteBuffer[] nioBuffers = buf.nioBuffers();
@ -149,7 +148,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
int addressCnt = in.addressCount(); int addressCnt = in.addressCount();
long expectedWrittenBytes = in.addressSize(); long expectedWrittenBytes = in.addressSize();
boolean done = false; boolean done = false;
boolean setEpollOut = false;
long writtenBytes = 0; long writtenBytes = 0;
int offset = 0; int offset = 0;
int end = offset + addressCnt; int end = offset + addressCnt;
@ -159,7 +157,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
long localWrittenBytes = Native.writevAddresses(fd, addresses, offset, cnt); long localWrittenBytes = Native.writevAddresses(fd, addresses, offset, cnt);
if (localWrittenBytes == 0) { if (localWrittenBytes == 0) {
setEpollOut = true; // Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break loop; break loop;
} }
expectedWrittenBytes -= localWrittenBytes; expectedWrittenBytes -= localWrittenBytes;
@ -187,7 +186,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} }
} }
updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut); updateOutboundBuffer(in, writtenBytes, msgCount, done);
return done; return done;
} }
@ -200,7 +199,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers, ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers,
int nioBufferCnt, long expectedWrittenBytes) throws IOException { int nioBufferCnt, long expectedWrittenBytes) throws IOException {
boolean done = false; boolean done = false;
boolean setEpollOut = false;
long writtenBytes = 0; long writtenBytes = 0;
int offset = 0; int offset = 0;
int end = offset + nioBufferCnt; int end = offset + nioBufferCnt;
@ -210,7 +208,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt); long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt);
if (localWrittenBytes == 0) { if (localWrittenBytes == 0) {
setEpollOut = true; // Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break loop; break loop;
} }
expectedWrittenBytes -= localWrittenBytes; expectedWrittenBytes -= localWrittenBytes;
@ -223,9 +222,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
if (bytes > localWrittenBytes) { if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes); buffer.position(pos + (int) localWrittenBytes);
// incomplete write // incomplete write
// As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again
setEpollOut();
break; break;
} else { } else {
offset++; offset++;
@ -240,12 +236,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
} }
} }
} }
updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut); updateOutboundBuffer(in, writtenBytes, msgCount, done);
return done; return done;
} }
private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount, private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount,
boolean done, boolean setEpollOut) { boolean done) {
if (done) { if (done) {
// Release all buffers // Release all buffers
for (int i = msgCount; i > 0; i --) { for (int i = msgCount; i > 0; i --) {
@ -275,25 +271,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break; break;
} }
} }
incompleteWrite(setEpollOut);
}
}
private void incompleteWrite(boolean setEpollOut) {
// Did not write completely.
if (setEpollOut) {
setEpollOut();
} else {
// Schedule flush again later so other tasks can be picked up in the meantime
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
unsafe().flush();
}
};
}
eventLoop().execute(flushTask);
} }
} }
@ -304,7 +281,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
* @return amount the amount of written bytes * @return amount the amount of written bytes
*/ */
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
boolean setOpWrite = false;
boolean done = false; boolean done = false;
long flushedAmount = 0; long flushedAmount = 0;
@ -312,7 +288,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
long expected = region.count() - region.position(); long expected = region.count() - region.position();
long localFlushedAmount = Native.sendfile(fd, region, region.transfered(), expected); long localFlushedAmount = Native.sendfile(fd, region, region.transfered(), expected);
if (localFlushedAmount == 0) { if (localFlushedAmount == 0) {
setOpWrite = true; // Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break; break;
} }
@ -320,9 +297,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
if (region.transfered() >= region.count()) { if (region.transfered() >= region.count()) {
done = true; done = true;
break; break;
} else {
// As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again
setEpollOut();
} }
} }
@ -330,8 +304,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
if (done) { if (done) {
in.remove(); in.remove();
} else {
incompleteWrite(setOpWrite);
} }
return done; return done;
} }