Merge pull request #29 from normanmaurer/handle_reset
Fix bug related to reset the RecvByteBufAllocator.Handle on each read
This commit is contained in:
commit
4c294908bf
@ -467,7 +467,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
|||||||
final void pollRdHup(int res) {
|
final void pollRdHup(int res) {
|
||||||
if (isActive()) {
|
if (isActive()) {
|
||||||
if ((ioState & READ_SCHEDULED) == 0) {
|
if ((ioState & READ_SCHEDULED) == 0) {
|
||||||
scheduleRead();
|
scheduleFirstRead();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Just to be safe make sure the input marked as closed.
|
// Just to be safe make sure the input marked as closed.
|
||||||
@ -482,10 +482,18 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
|
|||||||
ioState &= ~POLL_IN_SCHEDULED;
|
ioState &= ~POLL_IN_SCHEDULED;
|
||||||
|
|
||||||
if ((ioState & READ_SCHEDULED) == 0) {
|
if ((ioState & READ_SCHEDULED) == 0) {
|
||||||
scheduleRead();
|
scheduleFirstRead();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void scheduleFirstRead() {
|
||||||
|
// This is a new "read loop" so we need to reset the allocHandle.
|
||||||
|
final ChannelConfig config = config();
|
||||||
|
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
|
||||||
|
allocHandle.reset(config);
|
||||||
|
scheduleRead();
|
||||||
|
}
|
||||||
|
|
||||||
protected final void scheduleRead() {
|
protected final void scheduleRead() {
|
||||||
ioState |= READ_SCHEDULED;
|
ioState |= READ_SCHEDULED;
|
||||||
scheduleRead0();
|
scheduleRead0();
|
||||||
|
@ -53,7 +53,6 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple
|
|||||||
@Override
|
@Override
|
||||||
protected void scheduleRead0() {
|
protected void scheduleRead0() {
|
||||||
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
|
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
|
||||||
allocHandle.reset(config());
|
|
||||||
allocHandle.attemptedBytesRead(1);
|
allocHandle.attemptedBytesRead(1);
|
||||||
|
|
||||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||||
|
@ -210,13 +210,8 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void scheduleRead0() {
|
protected void scheduleRead0() {
|
||||||
final ChannelConfig config = config();
|
|
||||||
|
|
||||||
final ByteBufAllocator allocator = config.getAllocator();
|
|
||||||
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
|
final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
|
||||||
allocHandle.reset(config);
|
ByteBuf byteBuf = allocHandle.allocate(alloc());
|
||||||
|
|
||||||
ByteBuf byteBuf = allocHandle.allocate(allocator);
|
|
||||||
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
IOUringSubmissionQueue submissionQueue = submissionQueue();
|
||||||
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
|
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
|
||||||
|
|
||||||
|
@ -19,8 +19,6 @@ import io.netty.bootstrap.Bootstrap;
|
|||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||||
import io.netty.testsuite.transport.socket.SocketHalfClosedTest;
|
import io.netty.testsuite.transport.socket.SocketHalfClosedTest;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -29,17 +27,4 @@ public class IOUringSocketHalfClosedTest extends SocketHalfClosedTest {
|
|||||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||||
return IOUringSocketTestPermutation.INSTANCE.socket();
|
return IOUringSocketTestPermutation.INSTANCE.socket();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("FIX ME")
|
|
||||||
@Test
|
|
||||||
public void testHalfClosureOnlyOneEventWhenAutoRead() throws Throwable {
|
|
||||||
super.testHalfClosureOnlyOneEventWhenAutoRead();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Ignore("FIX ME")
|
|
||||||
@Test
|
|
||||||
public void testAutoCloseFalseDoesShutdownOutput() throws Throwable {
|
|
||||||
super.testAutoCloseFalseDoesShutdownOutput();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -19,8 +19,6 @@ import io.netty.bootstrap.Bootstrap;
|
|||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
import io.netty.testsuite.transport.TestsuitePermutation;
|
import io.netty.testsuite.transport.TestsuitePermutation;
|
||||||
import io.netty.testsuite.transport.socket.SocketReadPendingTest;
|
import io.netty.testsuite.transport.socket.SocketReadPendingTest;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@ -29,11 +27,4 @@ public class IOUringSocketReadPendingTest extends SocketReadPendingTest {
|
|||||||
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
|
||||||
return IOUringSocketTestPermutation.INSTANCE.socket();
|
return IOUringSocketTestPermutation.INSTANCE.socket();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("FIX ME")
|
|
||||||
@Test
|
|
||||||
@Override
|
|
||||||
public void testReadPendingIsResetAfterEachRead() throws Throwable {
|
|
||||||
super.testReadPendingIsResetAfterEachRead();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -201,7 +201,6 @@ public class FileDescriptor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static boolean isClosed(int state) {
|
static boolean isClosed(int state) {
|
||||||
System.out.println("State: " + state);
|
|
||||||
return (state & STATE_CLOSED_MASK) != 0;
|
return (state & STATE_CLOSED_MASK) != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user