From 663c44cd45c583d66cf462a3d4f73d2a52178fb9 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 1 Sep 2020 10:49:09 +0200 Subject: [PATCH] Correctly respect RecvByteBufAllocator.Handle when reading Motivation: We need to respect RecvByteBufAllocator.Handle.continueReading() so settings like MAX_MESSAGES_PER_READ are respected. This also ensures that AUTO_READ is correctly working in all cases Modifications: - Correctly respect continueReading(); - Fix IOUringRecvByteAllocatorHandle - Cleanup Result: Correctly handling reading --- .../uring/AbstractIOUringServerChannel.java | 50 +++++++++++-------- .../uring/AbstractIOUringStreamChannel.java | 14 ++---- .../netty/channel/uring/IOUringEventLoop.java | 9 ++-- .../uring/IOUringRecvByteAllocatorHandle.java | 5 -- 4 files changed, 37 insertions(+), 41 deletions(-) diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index a5e3752704..8f746fc363 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -56,45 +56,53 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { @Override protected void scheduleRead0() { + final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); + allocHandle.reset(config()); + allocHandle.attemptedBytesRead(1); + IOUringSubmissionQueue submissionQueue = submissionQueue(); //Todo get network addresses submissionQueue.addAccept(fd().intValue()); submissionQueue.submit(); } - // TODO: Respect MAX_MESSAGES_READ protected void readComplete0(int res) { final IOUringRecvByteAllocatorHandle allocHandle = (IOUringRecvByteAllocatorHandle) unsafe() .recvBufAllocHandle(); final ChannelPipeline pipeline = pipeline(); - if (res >= 0) { - allocHandle.incMessagesRead(1); - try { - Channel channel = newChildChannel(res); - // Register accepted channel for POLLRDHUP - IOUringSubmissionQueue submissionQueue = submissionQueue(); - submissionQueue.addPollRdHup(res); - submissionQueue.submit(); + allocHandle.lastBytesRead(res); - pipeline.fireChannelRead(channel); - } catch (Throwable cause) { + if (res >= 0) { + allocHandle.incMessagesRead(1); + try { + Channel channel = newChildChannel(res); + // Register accepted channel for POLLRDHUP + IOUringSubmissionQueue submissionQueue = submissionQueue(); + submissionQueue.addPollRdHup(res); + submissionQueue.submit(); + + pipeline.fireChannelRead(channel); + if (allocHandle.continueReading()) { + scheduleRead(); + } else { allocHandle.readComplete(); - pipeline.fireExceptionCaught(cause); pipeline.fireChannelReadComplete(); } - if (config().isAutoRead()) { - scheduleRead(); - } - } else { + } catch (Throwable cause) { allocHandle.readComplete(); - // Check if we did fail because there was nothing to accept atm. - if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) { - // Something bad happened. Convert to an exception. - pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res)); - } pipeline.fireChannelReadComplete(); + pipeline.fireExceptionCaught(cause); } + } else { + allocHandle.readComplete(); + pipeline.fireChannelReadComplete(); + // Check if we did fail because there was nothing to accept atm. + if (res != ERRNO_EAGAIN_NEGATIVE && res != ERRNO_EWOULDBLOCK_NEGATIVE) { + // Something bad happened. Convert to an exception. + pipeline.fireExceptionCaught(Errors.newIOException("io_uring accept", res)); + } + } } @Override diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java index 645374d90c..3df12dd417 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java @@ -202,12 +202,12 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple final ChannelConfig config = config(); final ByteBufAllocator allocator = config.getAllocator(); - final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); + final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = allocHandle.allocate(allocator); IOUringSubmissionQueue submissionQueue = submissionQueue(); - unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes()); + allocHandle.attemptedBytesRead(byteBuf.writableBytes()); assert readBuffer == null; readBuffer = byteBuf; @@ -217,13 +217,11 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple submissionQueue.submit(); } - // TODO: Respect MAX_MESSAGE_READ. + @Override protected void readComplete0(int res) { boolean close = false; - final IOUringRecvByteAllocatorHandle allocHandle = - (IOUringRecvByteAllocatorHandle) unsafe() - .recvBufAllocHandle(); + final IOUringRecvByteAllocatorHandle allocHandle = recvBufAllocHandle(); final ChannelPipeline pipeline = pipeline(); ByteBuf byteBuf = this.readBuffer; this.readBuffer = null; @@ -256,10 +254,9 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple } allocHandle.incMessagesRead(1); - boolean writable = byteBuf.isWritable(); pipeline.fireChannelRead(byteBuf); byteBuf = null; - if (!writable && config().isAutoRead()) { + if (allocHandle.continueReading()) { // Let's schedule another read. scheduleRead(); } else { @@ -269,7 +266,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } - } private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 028636b944..4aa62441ef 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -34,11 +34,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class); //Todo set config ring buffer size - private final int ringSize = 32; - private final int ENOENT = -2; - - //just temporary -> Todo use ErrorsStaticallyReferencedJniMethods like in Epoll - private static final int SOCKET_ERROR_EPIPE = -32; + private static final int ringSize = 32; + private static final int ENOENT = -2; private static final long ETIME = -62; static final long ECANCELED = -125; @@ -263,7 +260,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements try { eventfd.close(); } catch (IOException e) { - e.printStackTrace(); + logger.warn("Failed to close the event fd.", e); } ringBuffer.close(); iovecArrayPool.release(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringRecvByteAllocatorHandle.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringRecvByteAllocatorHandle.java index f1a116bd8b..96bd42bf84 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringRecvByteAllocatorHandle.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringRecvByteAllocatorHandle.java @@ -41,9 +41,4 @@ final class IOUringRecvByteAllocatorHandle extends RecvByteBufAllocator.Delegati public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) { return ((RecvByteBufAllocator.ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier); } - - @Override - public boolean continueReading() { - return false; - } }