From aa66f556e54be58654f0dea387f48cab1a2b263f Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 18 Jul 2014 20:31:19 +0200 Subject: [PATCH] [#2667] Write until EAGAIN in native transport and only call setEpollOut() in this case Motivation: In the previous fix for #2667 I did introduce a bit overhead by calling setEpollOut() too often. Modification: Only call setEpollOut() if really needed and remove unused code. Result: Less overhead when saturate network. --- .../channel/epoll/EpollSocketChannel.java | 52 +++++-------------- 1 file changed, 12 insertions(+), 40 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index fce2a21d28..45f37ede61 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -52,7 +52,6 @@ import java.util.concurrent.TimeUnit; public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { private final EpollSocketChannelConfig config; - private Runnable flushTask; /** * The future of the current connection attempt. If not null, subsequent @@ -113,7 +112,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So in.remove(); return true; } - boolean setEpollOut = false; boolean done = false; long writtenBytes = 0; if (buf.nioBufferCount() == 1) { @@ -131,11 +129,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So break; } } else { - setEpollOut = true; + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); break; } } - updateOutboundBuffer(in, writtenBytes, 1, done, setEpollOut); + updateOutboundBuffer(in, writtenBytes, 1, done); return done; } else { ByteBuffer[] nioBuffers = buf.nioBuffers(); @@ -149,7 +148,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So int addressCnt = in.addressCount(); long expectedWrittenBytes = in.addressSize(); boolean done = false; - boolean setEpollOut = false; long writtenBytes = 0; int offset = 0; int end = offset + addressCnt; @@ -159,7 +157,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So long localWrittenBytes = Native.writevAddresses(fd, addresses, offset, cnt); if (localWrittenBytes == 0) { - setEpollOut = true; + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); break loop; } expectedWrittenBytes -= localWrittenBytes; @@ -187,7 +186,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } } - updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut); + updateOutboundBuffer(in, writtenBytes, msgCount, done); return done; } @@ -200,7 +199,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes) throws IOException { boolean done = false; - boolean setEpollOut = false; long writtenBytes = 0; int offset = 0; int end = offset + nioBufferCnt; @@ -210,7 +208,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt); if (localWrittenBytes == 0) { - setEpollOut = true; + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); break loop; } expectedWrittenBytes -= localWrittenBytes; @@ -223,9 +222,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So if (bytes > localWrittenBytes) { buffer.position(pos + (int) localWrittenBytes); // incomplete write - - // As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again - setEpollOut(); break; } else { offset++; @@ -240,12 +236,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } } } - updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut); + updateOutboundBuffer(in, writtenBytes, msgCount, done); return done; } private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount, - boolean done, boolean setEpollOut) { + boolean done) { if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { @@ -275,25 +271,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So break; } } - incompleteWrite(setEpollOut); - } - } - private void incompleteWrite(boolean setEpollOut) { - // Did not write completely. - if (setEpollOut) { - setEpollOut(); - } else { - // Schedule flush again later so other tasks can be picked up in the meantime - Runnable flushTask = this.flushTask; - if (flushTask == null) { - flushTask = this.flushTask = new Runnable() { - @Override - public void run() { - unsafe().flush(); - } - }; - } - eventLoop().execute(flushTask); } } @@ -304,7 +281,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So * @return amount the amount of written bytes */ private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { - boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; @@ -312,7 +288,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So long expected = region.count() - region.position(); long localFlushedAmount = Native.sendfile(fd, region, region.transfered(), expected); if (localFlushedAmount == 0) { - setOpWrite = true; + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); break; } @@ -320,9 +297,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So if (region.transfered() >= region.count()) { done = true; break; - } else { - // As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again - setEpollOut(); } } @@ -330,8 +304,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So if (done) { in.remove(); - } else { - incompleteWrite(setOpWrite); } return done; }