Fix bug related to reset the RecvByteBufAllocator.Handle on each read

Motivation:

We should only reset the RecvByteBufAllocator.Handle when a new "read
loop" starts. Otherwise the handle will not be able to correctly limit
reads.

Modifications:

- Move reset(...) call into pollIn(...)
- Remove all @Ignore

Result:

The whole testsuite passes
This commit is contained in:
Norman Maurer 2020-09-03 14:27:45 +02:00
parent 1440b4fa0c
commit 3b35976559
6 changed files with 11 additions and 34 deletions

View File

@ -467,7 +467,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
final void pollRdHup(int res) {
if (isActive()) {
if ((ioState & READ_SCHEDULED) == 0) {
scheduleRead();
scheduleFirstRead();
}
} else {
// Just to be safe make sure the input marked as closed.
@ -482,10 +482,18 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
ioState &= ~POLL_IN_SCHEDULED;
if ((ioState & READ_SCHEDULED) == 0) {
scheduleRead();
scheduleFirstRead();
}
}
private void scheduleFirstRead() {
// This is a new "read loop" so we need to reset the allocHandle.
final ChannelConfig config = config();
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
scheduleRead();
}
protected final void scheduleRead() {
ioState |= READ_SCHEDULED;
scheduleRead0();

View File

@ -53,7 +53,6 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
@Override
protected void scheduleRead0() {
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config());
allocHandle.attemptedBytesRead(1);
IOUringSubmissionQueue submissionQueue = submissionQueue();

View File

@ -210,13 +210,8 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
@Override
protected void scheduleRead0() {
final ChannelConfig config = config();
final ByteBufAllocator allocator = config.getAllocator();
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = allocHandle.allocate(allocator);
ByteBuf byteBuf = allocHandle.allocate(alloc());
IOUringSubmissionQueue submissionQueue = submissionQueue();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());

View File

@ -19,8 +19,6 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketHalfClosedTest;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
@ -29,17 +27,4 @@ public class IOUringSocketHalfClosedTest extends SocketHalfClosedTest {
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return IOUringSocketTestPermutation.INSTANCE.socket();
}
@Ignore("FIX ME")
@Test
public void testHalfClosureOnlyOneEventWhenAutoRead() throws Throwable {
super.testHalfClosureOnlyOneEventWhenAutoRead();
}
@Ignore("FIX ME")
@Test
public void testAutoCloseFalseDoesShutdownOutput() throws Throwable {
super.testAutoCloseFalseDoesShutdownOutput();
}
}

View File

@ -19,8 +19,6 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.SocketReadPendingTest;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
@ -29,11 +27,4 @@ public class IOUringSocketReadPendingTest extends SocketReadPendingTest {
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return IOUringSocketTestPermutation.INSTANCE.socket();
}
@Ignore("FIX ME")
@Test
@Override
public void testReadPendingIsResetAfterEachRead() throws Throwable {
super.testReadPendingIsResetAfterEachRead();
}
}

View File

@ -201,7 +201,6 @@ public class FileDescriptor {
}
static boolean isClosed(int state) {
System.out.println("State: " + state);
return (state & STATE_CLOSED_MASK) != 0;
}