diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index c7bee6a05f..b8400bf817 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -315,11 +315,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha // This should never happem, anyway fallback to single write. doWriteSingle((ByteBuf) in.current()); } - - if (iovecArray.count() > 0) { - submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count()); - ioState |= WRITE_SCHEDULED; - } + submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count()); + ioState |= WRITE_SCHEDULED; } else { // We were not be able to create a new iovec, fallback to single write. doWriteSingle((ByteBuf) in.current()); @@ -542,8 +539,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } final void writeComplete(int res) { - ioState &= ~WRITE_SCHEDULED; - ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); if (iovecMemoryAddress != -1) { ((IOUringEventLoop) eventLoop()).getIovecArrayPool().releaseIovec(iovecMemoryAddress); @@ -551,8 +546,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } if (res >= 0) { channelOutboundBuffer.removeBytes(res); + + // We only reset this once we are done with calling removeBytes(...) as otherwise we may trigger a write + // while still removing messages internally in removeBytes(...) which then may corrupt state. + ioState &= ~WRITE_SCHEDULED; doWrite(channelOutboundBuffer); } else { + ioState &= ~WRITE_SCHEDULED; try { if (ioResult("io_uring write", res) == 0) { // We were not able to write everything, let's register for POLLOUT