From 7961138f5278380da0397df5efb2863dad0b2333 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 10 Sep 2015 21:23:23 +0200 Subject: [PATCH] [#4205] Correctly set EPOLLOUT flag whe writeBytes(...) was not able to write everything Motivation: writeBytes(...) missed to set EPOLLOUT flag when not all bytes were written. This could lead to have the EpollEventLoop not try to flush the remaining bytes once the socket becomes writable again. Modifications: - Move setting EPOLLOUT flag logic to one point so we are sure we always do it. - Move OP_WRITE flag logic to one point as well. Result: Correctly try to write pending data if socket becomes writable again. --- .../epoll/AbstractEpollStreamChannel.java | 17 +++++++---------- .../channel/nio/AbstractNioByteChannel.java | 11 ++++++----- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 774cc20acc..3411cc4ae9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -282,9 +282,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } } while (offset < end && localWrittenBytes > 0); } - if (!done) { - setFlag(Native.EPOLLOUT); - } in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); return done; } @@ -328,9 +325,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); - if (!done) { - setFlag(Native.EPOLLOUT); - } return done; } @@ -373,9 +367,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { if (done) { in.remove(); - } else { - // Returned EAGAIN need to set EPOLLOUT - setFlag(Native.EPOLLOUT); } return done; } @@ -389,12 +380,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { if (msgCount == 0) { // Wrote all messages. clearFlag(Native.EPOLLOUT); - break; + // Return here so we not set the EPOLLOUT flag. + return; } // Do gathering write if the outbounf buffer entries start with more than one ByteBuf. if (msgCount > 1 && in.current() instanceof ByteBuf) { if (!doWriteMultiple(in, writeSpinCount)) { + // Break the loop and so set EPOLLOUT flag. break; } @@ -403,10 +396,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { // listeners. } else { // msgCount == 1 if (!doWriteSingle(in, writeSpinCount)) { + // Break the loop and so set EPOLLOUT flag. break; } } } + // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up + // when it can accept more data. + setFlag(Native.EPOLLOUT); } protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 8e58b51163..342d67fba6 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -178,12 +178,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; + boolean setOpWrite = false; for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. clearOpWrite(); - break; + // Directly return here so incompleteWrite(...) is not called. + return; } if (msg instanceof ByteBuf) { @@ -194,7 +196,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { continue; } - boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { @@ -219,13 +220,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (done) { in.remove(); } else { - incompleteWrite(setOpWrite); + // Break the loop and so incompleteWrite(...) is called. break; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; boolean done = region.transfered() >= region.count(); - boolean setOpWrite = false; if (!done) { long flushedAmount = 0; @@ -253,7 +253,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (done) { in.remove(); } else { - incompleteWrite(setOpWrite); + // Break the loop and so incompleteWrite(...) is called. break; } } else { @@ -261,6 +261,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { throw new Error(); } } + incompleteWrite(setOpWrite); } @Override