Fix reentrancy bug in io_uring transport implementation related to (#10541)

writes

Motivation:

We need to carefully manage state in terms of writing to guard against
rentrancy problems that could lead to corrupt state in the
ChannelOutboundBuffer

Modifications:

Only reset the flag once removeBytes(...) was called

Result:

No more reentrancy bug related to writes.
This commit is contained in:
Norman Maurer 2020-09-08 08:43:46 +02:00 committed by GitHub
parent 9b296c8034
commit 9da59c3894
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -315,11 +315,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
// This should never happem, anyway fallback to single write. // This should never happem, anyway fallback to single write.
doWriteSingle((ByteBuf) in.current()); doWriteSingle((ByteBuf) in.current());
} }
submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count());
if (iovecArray.count() > 0) { ioState |= WRITE_SCHEDULED;
submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count());
ioState |= WRITE_SCHEDULED;
}
} else { } else {
// We were not be able to create a new iovec, fallback to single write. // We were not be able to create a new iovec, fallback to single write.
doWriteSingle((ByteBuf) in.current()); doWriteSingle((ByteBuf) in.current());
@ -542,8 +539,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
final void writeComplete(int res) { final void writeComplete(int res) {
ioState &= ~WRITE_SCHEDULED;
ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (iovecMemoryAddress != -1) { if (iovecMemoryAddress != -1) {
((IOUringEventLoop) eventLoop()).getIovecArrayPool().releaseIovec(iovecMemoryAddress); ((IOUringEventLoop) eventLoop()).getIovecArrayPool().releaseIovec(iovecMemoryAddress);
@ -551,8 +546,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
} }
if (res >= 0) { if (res >= 0) {
channelOutboundBuffer.removeBytes(res); channelOutboundBuffer.removeBytes(res);
// We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write
// while still removing messages internally in removeBytes(...) which then may corrupt state.
ioState &= ~WRITE_SCHEDULED;
doWrite(channelOutboundBuffer); doWrite(channelOutboundBuffer);
} else { } else {
ioState &= ~WRITE_SCHEDULED;
try { try {
if (ioResult("io_uring write", res) == 0) { if (ioResult("io_uring write", res) == 0) {
// We were not able to write everything, let's register for POLLOUT // We were not able to write everything, let's register for POLLOUT