diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index 1a88c7a8c2..334455e162 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -56,14 +56,18 @@ import static io.netty.channel.unix.UnixChannelUtil.*; import static io.netty.util.internal.ObjectUtil.*; abstract class AbstractIOUringChannel extends AbstractChannel implements UnixChannel { - static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIOUringChannel.class); + private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractIOUringChannel.class); private static final ChannelMetadata METADATA = new ChannelMetadata(false); final LinuxSocket socket; protected volatile boolean active; - private boolean pollInPending = false; - private boolean pollOutPending = false; - private boolean writeScheduled = false; - private boolean readScheduled = false; + + // Different masks for outstanding I/O operations. + private static final int POLL_IN_SCHEDULED = 1; + private static final int POLL_OUT_SCHEDULED = 1 << 2; + private static final int WRITE_SCHEDULED = 1 << 3; + private static final int READ_SCHEDULED = 1 << 4; + private int ioState; + boolean inputClosedSeenErrorOnRead; static final int SOCK_ADDR_LEN = 128; @@ -211,13 +215,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha active = false; IOUringSubmissionQueue submissionQueue = submissionQueue(); - if (pollInPending) { + if ((ioState & POLL_IN_SCHEDULED) != 0) { submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_IN); - pollInPending = false; + ioState &= ~POLL_IN_SCHEDULED; } - if (pollOutPending) { + if ((ioState & POLL_OUT_SCHEDULED) != 0) { submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_OUT); - pollOutPending = false; + ioState &= ~POLL_OUT_SCHEDULED; } submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_RDHUP); submissionQueue.submit(); @@ -266,14 +270,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha @Override protected void doBeginRead() { final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe(); - if (!pollInPending) { + if ((ioState & POLL_IN_SCHEDULED) == 0) { unsafe.schedulePollIn(); } } @Override protected void doWrite(ChannelOutboundBuffer in) { - if (writeScheduled) { + if ((ioState & WRITE_SCHEDULED) != 0) { return; } @@ -307,7 +311,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha if (iovecArray.count() > 0) { submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count()); submissionQueue().submit(); - writeScheduled = true; + ioState |= WRITE_SCHEDULED; } } else { // We were not be able to create a new iovec, fallback to single write. @@ -321,13 +325,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(), buf.writerIndex()); submissionQueue.submit(); - writeScheduled = true; + ioState |= WRITE_SCHEDULED; } //POLLOUT private void addPollOut() { - assert !pollOutPending; - pollOutPending = true; + assert (ioState & POLL_OUT_SCHEDULED) == 0; + ioState |= POLL_OUT_SCHEDULED; IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollOut(socket.intValue()); submissionQueue.submit(); @@ -341,7 +345,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. - if (!pollOutPending) { + if ((ioState & POLL_OUT_SCHEDULED) == 0) { super.flush0(); } } @@ -433,18 +437,18 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } void schedulePollIn() { - assert !pollInPending; + assert (ioState & POLL_IN_SCHEDULED) == 0; if (!isActive() || shouldBreakIoUringInReady(config())) { return; } - pollInPending = true; + ioState |= POLL_IN_SCHEDULED; IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollIn(socket.intValue()); submissionQueue.submit(); } final void readComplete(int res) { - readScheduled = false; + ioState &= ~READ_SCHEDULED; readComplete0(res); } @@ -456,7 +460,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha */ final void pollRdHup(int res) { if (isActive()) { - if (!readScheduled) { + if ((ioState & READ_SCHEDULED) == 0) { scheduleRead(); } } else { @@ -469,15 +473,15 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha * Called once POLLIN event is ready to be processed */ final void pollIn(int res) { - pollInPending = false; + ioState &= ~POLL_IN_SCHEDULED; - if (!readScheduled) { + if ((ioState & READ_SCHEDULED) == 0) { scheduleRead(); } } protected final void scheduleRead() { - readScheduled = true; + ioState |= READ_SCHEDULED; scheduleRead0(); } @@ -487,7 +491,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha * Called once POLLOUT event is ready to be processed */ final void pollOut(int res) { - pollOutPending = false; + ioState &= ~POLL_OUT_SCHEDULED; // pending connect if (connectPromise != null) { @@ -524,7 +528,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } final void writeComplete(int res) { - writeScheduled = false; + ioState &= ~WRITE_SCHEDULED; ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); if (iovecMemoryAddress != -1) {