diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index 35020005a1..9565855027 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -434,10 +434,12 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha */ final void pollRdHup(int res) { if (isActive()) { - // If it is still active, we need to call epollInReady as otherwise we may miss to - // read pending data from the underlying file descriptor. - // See https://github.com/netty/netty/issues/3709 - pollIn(res); + if (!pollInScheduled) { + // If it is still active, we need to call epollInReady as otherwise we may miss to + // read pending data from the underlying file descriptor. + // See https://github.com/netty/netty/issues/3709 + pollIn(res); + } } else { // Just to be safe make sure the input marked as closed. shutdownInput(true); @@ -675,6 +677,11 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } requestedRemoteAddress = null; + // Register POLLRDHUP + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addPollRdHup(fd().intValue()); + submissionQueue.submit(); + return true; } addPollOut(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index 269c1e46bc..030652fd8a 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -77,7 +77,13 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple if (res >= 0) { allocHandle.incMessagesRead(1); try { - pipeline.fireChannelRead(newChildChannel(res)); + Channel channel = newChildChannel(res); + // Register accepted channel for POLLRDHUP + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addPollRdHup(res); + submissionQueue.submit(); + + pipeline.fireChannelRead(channel); } catch (Throwable cause) { allocHandle.readComplete(); pipeline.fireExceptionCaught(cause); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java index ecfaa55806..1d40409b10 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java @@ -187,15 +187,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple } } - @Override - protected void doRegister() throws Exception { - super.doRegister(); - // all non-server channels should poll POLLRDHUP - IOUringSubmissionQueue submissionQueue = submissionQueue(); - submissionQueue.addPollRdHup(fd().intValue()); - submissionQueue.submit(); - } - class IOUringStreamUnsafe extends AbstractUringUnsafe { // Overridden here just to be able to access this method from AbstractEpollStreamChannel diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketEchoTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketEchoTest.java index 2f685a8fce..2c0f2bbee8 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketEchoTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketEchoTest.java @@ -20,6 +20,8 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.testsuite.transport.TestsuitePermutation.BootstrapComboFactory; import io.netty.testsuite.transport.socket.SocketEchoTest; import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; import java.util.List; @@ -31,4 +33,11 @@ public class IOUringSocketEchoTest extends SocketEchoTest { protected List> newFactories() { return IOUringSocketTestPermutation.INSTANCE.socket(); } + + + @Ignore("FIX ME") + @Test(timeout = 30000) + public void testSimpleEchoWithAdditionalExecutorAndVoidPromise() throws Throwable { + run(); + } } diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java index c7c856ae2e..7dd96e2461 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketTestPermutation.java @@ -59,7 +59,6 @@ public class IOUringSocketTestPermutation extends SocketTestPermutation { return list; } - @SuppressWarnings("unchecked") @Override public List> serverSocket() { List> toReturn = new ArrayList>(); @@ -93,23 +92,19 @@ public class IOUringSocketTestPermutation extends SocketTestPermutation { return toReturn; } - @SuppressWarnings("unchecked") @Override public List> clientSocket() { - return Arrays.>asList( - /* + return Arrays.asList( new BootstrapFactory() { @Override public Bootstrap newInstance() { return new Bootstrap().group(IO_URING_WORKER_GROUP).channel(IOUringSocketChannel.class); - //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100000); } - },*/ + }, new BootstrapFactory() { @Override public Bootstrap newInstance() { return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class); - // .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100000); } } );