diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 774cc20acc..3411cc4ae9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -282,9 +282,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } } while (offset < end && localWrittenBytes > 0); } - if (!done) { - setFlag(Native.EPOLLOUT); - } in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); return done; } @@ -328,9 +325,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); - if (!done) { - setFlag(Native.EPOLLOUT); - } return done; } @@ -373,9 +367,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { if (done) { in.remove(); - } else { - // Returned EAGAIN need to set EPOLLOUT - setFlag(Native.EPOLLOUT); } return done; } @@ -389,12 +380,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { if (msgCount == 0) { // Wrote all messages. clearFlag(Native.EPOLLOUT); - break; + // Return here so we not set the EPOLLOUT flag. + return; } // Do gathering write if the outbounf buffer entries start with more than one ByteBuf. if (msgCount > 1 && in.current() instanceof ByteBuf) { if (!doWriteMultiple(in, writeSpinCount)) { + // Break the loop and so set EPOLLOUT flag. break; } @@ -403,10 +396,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { // listeners. } else { // msgCount == 1 if (!doWriteSingle(in, writeSpinCount)) { + // Break the loop and so set EPOLLOUT flag. break; } } } + // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up + // when it can accept more data. + setFlag(Native.EPOLLOUT); } protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 8e58b51163..342d67fba6 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -178,12 +178,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; + boolean setOpWrite = false; for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. clearOpWrite(); - break; + // Directly return here so incompleteWrite(...) is not called. + return; } if (msg instanceof ByteBuf) { @@ -194,7 +196,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { continue; } - boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { @@ -219,13 +220,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (done) { in.remove(); } else { - incompleteWrite(setOpWrite); + // Break the loop and so incompleteWrite(...) is called. break; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; boolean done = region.transfered() >= region.count(); - boolean setOpWrite = false; if (!done) { long flushedAmount = 0; @@ -253,7 +253,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (done) { in.remove(); } else { - incompleteWrite(setOpWrite); + // Break the loop and so incompleteWrite(...) is called. break; } } else { @@ -261,6 +261,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { throw new Error(); } } + incompleteWrite(setOpWrite); } @Override