diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index 59a6d011e1..2f227a04c6 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -226,7 +226,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha ioState &= ~POLL_OUT_SCHEDULED; } submissionQueue.addPollRemove(socket.intValue(), Native.POLLRDHUP); - submissionQueue.submit(); } // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a @@ -313,7 +312,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha if (iovecArray.count() > 0) { submissionQueue().addWritev(socket.intValue(), iovecMemoryAddress, iovecArray.count()); - submissionQueue().submit(); ioState |= WRITE_SCHEDULED; } } else { @@ -327,7 +325,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addWrite(socket.intValue(), buf.memoryAddress(), buf.readerIndex(), buf.writerIndex()); - submissionQueue.submit(); ioState |= WRITE_SCHEDULED; } @@ -337,7 +334,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha ioState |= POLL_OUT_SCHEDULED; IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollOut(socket.intValue()); - submissionQueue.submit(); } abstract class AbstractUringUnsafe extends AbstractUnsafe { @@ -379,7 +375,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha // Register POLLRDHUP IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollRdHup(fd().intValue()); - submissionQueue.submit(); // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel. // We still need to ensure we call fireChannelActive() in this case. @@ -450,7 +445,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha ioState |= POLL_IN_SCHEDULED; IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollIn(socket.intValue()); - submissionQueue.submit(); } final void readComplete(int res) { @@ -610,8 +604,6 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha socket.initAddress(address.address(), address.scopeId(), inetSocketAddress.getPort(), remoteAddressMemoryAddress); final IOUringSubmissionQueue ioUringSubmissionQueue = submissionQueue(); ioUringSubmissionQueue.addConnect(socket.intValue(), remoteAddressMemoryAddress, SOCK_ADDR_LEN); - ioUringSubmissionQueue.submit(); - } catch (Throwable t) { closeIfClosed(); promise.tryFailure(annotateConnectException(t, remoteAddress)); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index 049d06bd5e..e95ece8970 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -58,7 +58,6 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple IOUringSubmissionQueue submissionQueue = submissionQueue(); //Todo get network addresses submissionQueue.addAccept(fd().intValue()); - submissionQueue.submit(); } protected void readComplete0(int res) { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java index ca8bba5d50..327710e544 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java @@ -198,7 +198,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple // Register for POLLRDHUP if this channel is already considered active. IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollRdHup(fd().intValue()); - submissionQueue.submit(); } } @@ -224,7 +223,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(), byteBuf.writerIndex(), byteBuf.capacity()); - submissionQueue.submit(); } @Override diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 9741088ad3..646b1b824f 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -34,8 +34,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCompletionQueue.IOUringCompletionQueueCallback { private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class); - //Todo set config ring buffer size - private static final int ringSize = 32; private static final long ETIME = -62; static final long ECANCELED = -125; @@ -59,7 +57,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { super(parent, executor, addTaskWakesUp); - ringBuffer = Native.createRingBuffer(ringSize); + ringBuffer = Native.createRingBuffer(); eventfd = Native.newEventFd(); logger.trace("New EventLoop: {}", this.toString()); iovecArrayPool = new IovecArrayPool(); @@ -155,9 +153,11 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements // Always call runAllTasks() as it will also fetch the scheduled tasks that are ready. runAllTasks(); + submissionQueue.submit(); try { if (isShuttingDown()) { closeAll(); + submissionQueue.submit(); if (confirmShutdown()) { break; } @@ -239,8 +239,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements Native.eventFdRead(eventfd.intValue()); submissionQueue.addPollIn(eventfd.intValue()); - // Submit so its picked up - submissionQueue.submit(); } private void handleConnect(int fd, int res) { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java index 85c3fc618c..3d7572b94b 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java @@ -31,7 +31,8 @@ import java.util.Locale; final class Native { private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class); - private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 32); + // Todo expose this via the EventLoopGroup constructor as well. + private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 4096); static { Selector selector = null; diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketHalfClosedTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketHalfClosedTest.java index e94c0338c6..e69ffb1808 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketHalfClosedTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketHalfClosedTest.java @@ -19,6 +19,10 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.testsuite.transport.TestsuitePermutation; import io.netty.testsuite.transport.socket.SocketHalfClosedTest; +import io.netty.util.internal.PlatformDependent; +import org.junit.Assume; +import org.junit.Ignore; +import org.junit.Test; import java.util.List; @@ -27,4 +31,12 @@ public class IOUringSocketHalfClosedTest extends SocketHalfClosedTest { protected List> newFactories() { return IOUringSocketTestPermutation.INSTANCE.socket(); } + + @Ignore + @Test + public void testAutoCloseFalseDoesShutdownOutput() throws Throwable { + // This test only works on Linux / BSD / MacOS as we assume some semantics that are not true for Windows. + Assume.assumeFalse(PlatformDependent.isWindows()); + run(); + } }