[#4205] Correctly set EPOLLOUT flag whe writeBytes(...) was not able to write everything
Motivation: writeBytes(...) missed to set EPOLLOUT flag when not all bytes were written. This could lead to have the EpollEventLoop not try to flush the remaining bytes once the socket becomes writable again. Modifications: - Move setting EPOLLOUT flag logic to one point so we are sure we always do it. - Move OP_WRITE flag logic to one point as well. Result: Correctly try to write pending data if socket becomes writable again.
This commit is contained in:
parent
0915b1b215
commit
7961138f52
@ -282,9 +282,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
}
|
}
|
||||||
} while (offset < end && localWrittenBytes > 0);
|
} while (offset < end && localWrittenBytes > 0);
|
||||||
}
|
}
|
||||||
if (!done) {
|
|
||||||
setFlag(Native.EPOLLOUT);
|
|
||||||
}
|
|
||||||
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
|
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
|
||||||
return done;
|
return done;
|
||||||
}
|
}
|
||||||
@ -328,9 +325,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
|
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
|
||||||
if (!done) {
|
|
||||||
setFlag(Native.EPOLLOUT);
|
|
||||||
}
|
|
||||||
return done;
|
return done;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -373,9 +367,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
|
|
||||||
if (done) {
|
if (done) {
|
||||||
in.remove();
|
in.remove();
|
||||||
} else {
|
|
||||||
// Returned EAGAIN need to set EPOLLOUT
|
|
||||||
setFlag(Native.EPOLLOUT);
|
|
||||||
}
|
}
|
||||||
return done;
|
return done;
|
||||||
}
|
}
|
||||||
@ -389,12 +380,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
if (msgCount == 0) {
|
if (msgCount == 0) {
|
||||||
// Wrote all messages.
|
// Wrote all messages.
|
||||||
clearFlag(Native.EPOLLOUT);
|
clearFlag(Native.EPOLLOUT);
|
||||||
break;
|
// Return here so we not set the EPOLLOUT flag.
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
|
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
|
||||||
if (msgCount > 1 && in.current() instanceof ByteBuf) {
|
if (msgCount > 1 && in.current() instanceof ByteBuf) {
|
||||||
if (!doWriteMultiple(in, writeSpinCount)) {
|
if (!doWriteMultiple(in, writeSpinCount)) {
|
||||||
|
// Break the loop and so set EPOLLOUT flag.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -403,10 +396,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
|
|||||||
// listeners.
|
// listeners.
|
||||||
} else { // msgCount == 1
|
} else { // msgCount == 1
|
||||||
if (!doWriteSingle(in, writeSpinCount)) {
|
if (!doWriteSingle(in, writeSpinCount)) {
|
||||||
|
// Break the loop and so set EPOLLOUT flag.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
|
||||||
|
// when it can accept more data.
|
||||||
|
setFlag(Native.EPOLLOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
|
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
|
||||||
|
@ -178,12 +178,14 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
|||||||
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
|
||||||
int writeSpinCount = -1;
|
int writeSpinCount = -1;
|
||||||
|
|
||||||
|
boolean setOpWrite = false;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
Object msg = in.current();
|
Object msg = in.current();
|
||||||
if (msg == null) {
|
if (msg == null) {
|
||||||
// Wrote all messages.
|
// Wrote all messages.
|
||||||
clearOpWrite();
|
clearOpWrite();
|
||||||
break;
|
// Directly return here so incompleteWrite(...) is not called.
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msg instanceof ByteBuf) {
|
if (msg instanceof ByteBuf) {
|
||||||
@ -194,7 +196,6 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean setOpWrite = false;
|
|
||||||
boolean done = false;
|
boolean done = false;
|
||||||
long flushedAmount = 0;
|
long flushedAmount = 0;
|
||||||
if (writeSpinCount == -1) {
|
if (writeSpinCount == -1) {
|
||||||
@ -219,13 +220,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
|||||||
if (done) {
|
if (done) {
|
||||||
in.remove();
|
in.remove();
|
||||||
} else {
|
} else {
|
||||||
incompleteWrite(setOpWrite);
|
// Break the loop and so incompleteWrite(...) is called.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else if (msg instanceof FileRegion) {
|
} else if (msg instanceof FileRegion) {
|
||||||
FileRegion region = (FileRegion) msg;
|
FileRegion region = (FileRegion) msg;
|
||||||
boolean done = region.transfered() >= region.count();
|
boolean done = region.transfered() >= region.count();
|
||||||
boolean setOpWrite = false;
|
|
||||||
|
|
||||||
if (!done) {
|
if (!done) {
|
||||||
long flushedAmount = 0;
|
long flushedAmount = 0;
|
||||||
@ -253,7 +253,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
|||||||
if (done) {
|
if (done) {
|
||||||
in.remove();
|
in.remove();
|
||||||
} else {
|
} else {
|
||||||
incompleteWrite(setOpWrite);
|
// Break the loop and so incompleteWrite(...) is called.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -261,6 +261,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
|||||||
throw new Error();
|
throw new Error();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
incompleteWrite(setOpWrite);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
Loading…
x
Reference in New Issue
Block a user