[#2667] Write until EAGAIN in native transport and only call setEpollOut() in this case

Motivation:

In the previous fix for #2667 I did introduce a bit overhead by calling setEpollOut() too often.

Modification:

Only call setEpollOut() if really needed and remove unused code.

Result:

Less overhead when saturate network.
This commit is contained in:
Norman Maurer 2014-07-18 20:31:19 +02:00
parent 7e61538790
commit aa66f556e5

View File

@ -52,7 +52,6 @@ import java.util.concurrent.TimeUnit;
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel {
private final EpollSocketChannelConfig config;
private Runnable flushTask;
/**
* The future of the current connection attempt. If not null, subsequent
@ -113,7 +112,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
in.remove();
return true;
}
boolean setEpollOut = false;
boolean done = false;
long writtenBytes = 0;
if (buf.nioBufferCount() == 1) {
@ -131,11 +129,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break;
}
} else {
setEpollOut = true;
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
}
updateOutboundBuffer(in, writtenBytes, 1, done, setEpollOut);
updateOutboundBuffer(in, writtenBytes, 1, done);
return done;
} else {
ByteBuffer[] nioBuffers = buf.nioBuffers();
@ -149,7 +148,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
int addressCnt = in.addressCount();
long expectedWrittenBytes = in.addressSize();
boolean done = false;
boolean setEpollOut = false;
long writtenBytes = 0;
int offset = 0;
int end = offset + addressCnt;
@ -159,7 +157,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
long localWrittenBytes = Native.writevAddresses(fd, addresses, offset, cnt);
if (localWrittenBytes == 0) {
setEpollOut = true;
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break loop;
}
expectedWrittenBytes -= localWrittenBytes;
@ -187,7 +186,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
}
updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut);
updateOutboundBuffer(in, writtenBytes, msgCount, done);
return done;
}
@ -200,7 +199,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers,
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
boolean done = false;
boolean setEpollOut = false;
long writtenBytes = 0;
int offset = 0;
int end = offset + nioBufferCnt;
@ -210,7 +208,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt);
if (localWrittenBytes == 0) {
setEpollOut = true;
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break loop;
}
expectedWrittenBytes -= localWrittenBytes;
@ -223,9 +222,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
if (bytes > localWrittenBytes) {
buffer.position(pos + (int) localWrittenBytes);
// incomplete write
// As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again
setEpollOut();
break;
} else {
offset++;
@ -240,12 +236,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
}
}
}
updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut);
updateOutboundBuffer(in, writtenBytes, msgCount, done);
return done;
}
private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount,
boolean done, boolean setEpollOut) {
boolean done) {
if (done) {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
@ -275,25 +271,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
break;
}
}
incompleteWrite(setEpollOut);
}
}
private void incompleteWrite(boolean setEpollOut) {
// Did not write completely.
if (setEpollOut) {
setEpollOut();
} else {
// Schedule flush again later so other tasks can be picked up in the meantime
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
unsafe().flush();
}
};
}
eventLoop().execute(flushTask);
}
}
@ -304,7 +281,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
* @return amount the amount of written bytes
*/
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
boolean setOpWrite = false;
boolean done = false;
long flushedAmount = 0;
@ -312,7 +288,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
long expected = region.count() - region.position();
long localFlushedAmount = Native.sendfile(fd, region, region.transfered(), expected);
if (localFlushedAmount == 0) {
setOpWrite = true;
// Returned EAGAIN need to set EPOLLOUT
setEpollOut();
break;
}
@ -320,9 +297,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
if (region.transfered() >= region.count()) {
done = true;
break;
} else {
// As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again
setEpollOut();
}
}
@ -330,8 +304,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
}
return done;
}