diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 1b8da9224d..d928f3ce12 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -49,7 +49,6 @@ import java.util.concurrent.TimeUnit; public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { private final EpollSocketChannelConfig config; - private Runnable flushTask; /** * The future of the current connection attempt. If not null, subsequent @@ -110,7 +109,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So in.remove(); return true; } - boolean setEpollOut = false; boolean done = false; long writtenBytes = 0; if (buf.nioBufferCount() == 1) { @@ -128,11 +126,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So break; } } else { - setEpollOut = true; + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); break; } } - updateOutboundBuffer(in, writtenBytes, 1, done, setEpollOut); + updateOutboundBuffer(in, writtenBytes, 1, done); return done; } else { ByteBuffer[] nioBuffers = buf.nioBuffers(); @@ -149,7 +148,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes) throws IOException { boolean done = false; - boolean setEpollOut = false; long writtenBytes = 0; int offset = 0; int end = offset + nioBufferCnt; @@ -159,7 +157,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So long localWrittenBytes = Native.writev(fd, nioBuffers, offset, cnt); if (localWrittenBytes == 0) { - setEpollOut = true; + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); break loop; } expectedWrittenBytes -= localWrittenBytes; @@ -172,9 +171,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So if (bytes > localWrittenBytes) { buffer.position(pos + (int) localWrittenBytes); // incomplete write - - // As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again - setEpollOut(); break; } else { offset++; @@ -189,12 +185,12 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So } } } - updateOutboundBuffer(in, writtenBytes, msgCount, done, setEpollOut); + updateOutboundBuffer(in, writtenBytes, msgCount, done); return done; } private void updateOutboundBuffer(ChannelOutboundBuffer in, long writtenBytes, int msgCount, - boolean done, boolean setEpollOut) { + boolean done) { if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { @@ -224,26 +220,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So break; } } - incompleteWrite(setEpollOut); - } - } - - private void incompleteWrite(boolean setEpollOut) { - // Did not write completely. - if (setEpollOut) { - setEpollOut(); - } else { - // Schedule flush again later so other tasks can be picked up in the meantime - Runnable flushTask = this.flushTask; - if (flushTask == null) { - flushTask = this.flushTask = new Runnable() { - @Override - public void run() { - unsafe().flush(); - } - }; - } - eventLoop().execute(flushTask); } } @@ -254,7 +230,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So * @return amount the amount of written bytes */ private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { - boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; @@ -262,7 +237,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So long expected = region.count() - region.position(); long localFlushedAmount = Native.sendfile(fd, region, region.transfered(), expected); if (localFlushedAmount == 0) { - setOpWrite = true; + // Returned EAGAIN need to set EPOLLOUT + setEpollOut(); break; } @@ -270,9 +246,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So if (region.transfered() >= region.count()) { done = true; break; - } else { - // As we use edge-triggered we need to set EPOLLOUT as otherwise we may not get notified again - setEpollOut(); } } @@ -280,8 +253,6 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So if (done) { in.remove(); - } else { - incompleteWrite(setOpWrite); } return done; }