From 3b35976559365534e63b746cfb7f796b8733723f Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 3 Sep 2020 14:27:45 +0200 Subject: [PATCH] Fix bug related to reset the RecvByteBufAllocator.Handle on each read Motivation: We should only reset the RecvByteBufAllocator.Handle when a new "read loop" starts. Otherwise the handle will not be able to correctly limit reads. Modifications: - Move reset(...) call into pollIn(...) - Remove all @Ignore Result: The whole testsuite passes --- .../channel/uring/AbstractIOUringChannel.java | 12 ++++++++++-- .../uring/AbstractIOUringServerChannel.java | 1 - .../uring/AbstractIOUringStreamChannel.java | 7 +------ .../uring/IOUringSocketHalfClosedTest.java | 15 --------------- .../uring/IOUringSocketReadPendingTest.java | 9 --------- .../io/netty/channel/unix/FileDescriptor.java | 1 - 6 files changed, 11 insertions(+), 34 deletions(-) diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index 838034d65e..59a6d011e1 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -467,7 +467,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha final void pollRdHup(int res) { if (isActive()) { if ((ioState & READ_SCHEDULED) == 0) { - scheduleRead(); + scheduleFirstRead(); } } else { // Just to be safe make sure the input marked as closed. @@ -482,10 +482,18 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha ioState &= ~POLL_IN_SCHEDULED; if ((ioState & READ_SCHEDULED) == 0) { - scheduleRead(); + scheduleFirstRead(); } } + private void scheduleFirstRead() { + // This is a new "read loop" so we need to reset the allocHandle. + final ChannelConfig config = config(); + final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + allocHandle.reset(config); + scheduleRead(); + } + protected final void scheduleRead() { ioState |= READ_SCHEDULED; scheduleRead0(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index 4d1cd505ba..049d06bd5e 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -53,7 +53,6 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple @Override protected void scheduleRead0() { final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); - allocHandle.reset(config()); allocHandle.attemptedBytesRead(1); IOUringSubmissionQueue submissionQueue = submissionQueue(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java index feb64c2f38..6a7f8b712c 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java @@ -210,13 +210,8 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple @Override protected void scheduleRead0() { - final ChannelConfig config = config(); - - final ByteBufAllocator allocator = config.getAllocator(); final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); - allocHandle.reset(config); - - ByteBuf byteBuf = allocHandle.allocate(allocator); + ByteBuf byteBuf = allocHandle.allocate(alloc()); IOUringSubmissionQueue submissionQueue = submissionQueue(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketHalfClosedTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketHalfClosedTest.java index bd468198fb..e94c0338c6 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketHalfClosedTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketHalfClosedTest.java @@ -19,8 +19,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.testsuite.transport.TestsuitePermutation; import io.netty.testsuite.transport.socket.SocketHalfClosedTest; -import org.junit.Ignore; -import org.junit.Test; import java.util.List; @@ -29,17 +27,4 @@ public class IOUringSocketHalfClosedTest extends SocketHalfClosedTest { protected List> newFactories() { return IOUringSocketTestPermutation.INSTANCE.socket(); } - - @Ignore("FIX ME") - @Test - public void testHalfClosureOnlyOneEventWhenAutoRead() throws Throwable { - super.testHalfClosureOnlyOneEventWhenAutoRead(); - } - - - @Ignore("FIX ME") - @Test - public void testAutoCloseFalseDoesShutdownOutput() throws Throwable { - super.testAutoCloseFalseDoesShutdownOutput(); - } } diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketReadPendingTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketReadPendingTest.java index 5bb1c73796..d67a5886e0 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketReadPendingTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringSocketReadPendingTest.java @@ -19,8 +19,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.testsuite.transport.TestsuitePermutation; import io.netty.testsuite.transport.socket.SocketReadPendingTest; -import org.junit.Ignore; -import org.junit.Test; import java.util.List; @@ -29,11 +27,4 @@ public class IOUringSocketReadPendingTest extends SocketReadPendingTest { protected List> newFactories() { return IOUringSocketTestPermutation.INSTANCE.socket(); } - - @Ignore("FIX ME") - @Test - @Override - public void testReadPendingIsResetAfterEachRead() throws Throwable { - super.testReadPendingIsResetAfterEachRead(); - } } diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java index 66aa029d1e..0b0a55ee58 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/FileDescriptor.java @@ -201,7 +201,6 @@ public class FileDescriptor { } static boolean isClosed(int state) { - System.out.println("State: " + state); return (state & STATE_CLOSED_MASK) != 0; }