[#4205] Correctly set EPOLLOUT flag whe writeBytes(...) was not able to write everything

Motivation:

writeBytes(...) missed to set EPOLLOUT flag when not all bytes were written. This could lead to have the EpollEventLoop not try to flush the remaining bytes once the socket becomes writable again.

Modifications:

- Move setting EPOLLOUT flag logic to one point so we are sure we always do it.
- Move OP_WRITE flag logic to one point as well.

Result:

Correctly try to write pending data if socket becomes writable again.
This commit is contained in:
Norman Maurer 2015-09-10 21:23:23 +02:00
parent cac51ab8d6
commit 076d4ed514
2 changed files with 13 additions and 15 deletions

View File

@ -281,9 +281,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
}
} while (offset < end && localWrittenBytes > 0);
}
if (!done) {
setFlag(Native.EPOLLOUT);
}
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
return done;
}
@ -327,9 +324,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
}
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
if (!done) {
setFlag(Native.EPOLLOUT);
}
return done;
}
@ -372,9 +366,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
if (done) {
in.remove();
} else {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
}
return done;
}
@ -388,12 +379,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
if (msgCount == 0) {
// Wrote all messages.
clearFlag(Native.EPOLLOUT);
break;
// Return here so we not set the EPOLLOUT flag.
return;
}
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
if (!doWriteMultiple(in, writeSpinCount)) {
// Break the loop and so set EPOLLOUT flag.
break;
}
@ -402,10 +395,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
// listeners.
} else { // msgCount == 1
if (!doWriteSingle(in, writeSpinCount)) {
// Break the loop and so set EPOLLOUT flag.
break;
}
}
}
// Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
// when it can accept more data.
setFlag(Native.EPOLLOUT);
}
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {

View File

@ -152,12 +152,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
boolean setOpWrite = false;
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
clearOpWrite();
break;
// Directly return here so incompleteWrite(...) is not called.
return;
}
if (msg instanceof ByteBuf) {
@ -168,7 +170,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
continue;
}
boolean setOpWrite = false;
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
@ -193,13 +194,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
boolean done = region.transfered() >= region.count();
boolean setOpWrite = false;
if (!done) {
long flushedAmount = 0;
@ -227,7 +227,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
// Break the loop and so incompleteWrite(...) is called.
break;
}
} else {
@ -235,6 +235,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
throw new Error();
}
}
incompleteWrite(setOpWrite);
}
@Override