diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java index e3ffd658fa..775ae25a9a 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringChannel.java @@ -60,14 +60,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha private static final ChannelMetadata METADATA = new ChannelMetadata(false); final LinuxSocket socket; protected volatile boolean active; - private boolean pollInScheduled = false; - + private boolean pollInPending = false; + private boolean pollOutPending = false; + private boolean writeScheduled = false; + private boolean readScheduled = false; boolean inputClosedSeenErrorOnRead; static final int SOCK_ADDR_LEN = 128; - //can only submit one write operation at a time - private boolean writeScheduled = false; /** * The future of the current connection attempt. If not null, subsequent connection attempts will fail. */ @@ -211,7 +211,15 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha active = false; IOUringSubmissionQueue submissionQueue = submissionQueue(); - submissionQueue.addPollRemove(socket.intValue()); + if (pollInPending) { + submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_IN); + pollInPending = false; + } + if (pollOutPending) { + submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_OUT); + pollOutPending = false; + } + submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_RDHUP); submissionQueue.submit(); // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a @@ -258,15 +266,13 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha @Override protected void doBeginRead() { final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe(); - if (!pollInScheduled) { + if (!pollInPending) { unsafe.schedulePollIn(); } } @Override protected void doWrite(ChannelOutboundBuffer in) { - logger.trace("IOUring doWrite message size: {}", in.size()); - if (writeScheduled) { return; } @@ -280,6 +286,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } private void doWriteMultiple(ChannelOutboundBuffer in) { + final IovecArrayPool iovecArray = ((IOUringEventLoop) eventLoop()).getIovecArrayPool(); iovecMemoryAddress = iovecArray.createNewIovecMemoryAddress(); @@ -313,6 +320,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha //POLLOUT private void addPollOut() { + assert !pollOutPending; + pollOutPending = true; IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollOut(socket.intValue()); submissionQueue.submit(); @@ -326,7 +335,7 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha // Flush immediately only when there's no pending flush. // If there's a pending flush operation, event loop will call forceFlush() later, // and thus there's no need to call it now. - if (!writeScheduled) { + if (!pollOutPending) { super.flush0(); } } @@ -418,18 +427,19 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } void schedulePollIn() { - assert !pollInScheduled; + assert !pollInPending; if (!isActive() || shouldBreakIoUringInReady(config())) { return; } - pollInScheduled = true; + pollInPending = true; IOUringSubmissionQueue submissionQueue = submissionQueue(); submissionQueue.addPollIn(socket.intValue()); submissionQueue.submit(); } final void readComplete(int res) { - pollInScheduled = false; + readScheduled = false; + readComplete0(res); } @@ -440,11 +450,8 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha */ final void pollRdHup(int res) { if (isActive()) { - if (!pollInScheduled) { - // If it is still active, we need to call epollInReady as otherwise we may miss to - // read pending data from the underlying file descriptor. - // See https://github.com/netty/netty/issues/3709 - pollIn(res); + if (!readScheduled) { + scheduleRead(); } } else { // Just to be safe make sure the input marked as closed. @@ -452,9 +459,30 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } - abstract void pollIn(int res); + /** + * Called once POLLIN event is ready to be processed + */ + final void pollIn(int res) { + pollInPending = false; + if (!readScheduled) { + scheduleRead(); + } + } + + protected final void scheduleRead() { + readScheduled = true; + scheduleRead0(); + } + + protected abstract void scheduleRead0(); + + /** + * Called once POLLOUT event is ready to be processed + */ final void pollOut(int res) { + pollOutPending = false; + // pending connect if (connectPromise != null) { // Note this method is invoked by the event loop only if the connection attempt was @@ -484,12 +512,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha } } } else if (!getSocket().isOutputShutdown()) { - doWrite(unsafe().outboundBuffer()); + // Try writing again + super.flush0(); } } final void writeComplete(int res) { writeScheduled = false; + ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer(); if (iovecMemoryAddress != -1) { ((IOUringEventLoop) eventLoop()).getIovecArrayPool().releaseIovec(iovecMemoryAddress); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java index 030652fd8a..a5e3752704 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringServerChannel.java @@ -54,20 +54,14 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple abstract Channel newChildChannel(int fd) throws Exception; final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe { - private final byte[] acceptedAddress = new byte[26]; - - private void acceptSocket() { + @Override + protected void scheduleRead0() { IOUringSubmissionQueue submissionQueue = submissionQueue(); //Todo get network addresses submissionQueue.addAccept(fd().intValue()); submissionQueue.submit(); } - @Override - void pollIn(int res) { - acceptSocket(); - } - // TODO: Respect MAX_MESSAGES_READ protected void readComplete0(int res) { final IOUringRecvByteAllocatorHandle allocHandle = @@ -89,7 +83,9 @@ abstract class AbstractIOUringServerChannel extends AbstractIOUringChannel imple pipeline.fireExceptionCaught(cause); pipeline.fireChannelReadComplete(); } - acceptSocket(); + if (config().isAutoRead()) { + scheduleRead(); + } } else { allocHandle.readComplete(); // Check if we did fail because there was nothing to accept atm. diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java index 1d40409b10..645374d90c 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/AbstractIOUringStreamChannel.java @@ -198,11 +198,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple private ByteBuf readBuffer; @Override - void pollIn(int res) { - readFromSocket(); - } - - private void readFromSocket() { + protected void scheduleRead0() { final ChannelConfig config = config(); final ByteBufAllocator allocator = config.getAllocator(); @@ -215,6 +211,7 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple assert readBuffer == null; readBuffer = byteBuf; + submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(), byteBuf.writerIndex(), byteBuf.capacity()); submissionQueue.submit(); @@ -232,8 +229,6 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple this.readBuffer = null; assert byteBuf != null; - boolean writable = true; - try { if (res < 0) { // If res is negative we should pass it to ioResult(...) which will either throw @@ -261,21 +256,20 @@ abstract class AbstractIOUringStreamChannel extends AbstractIOUringChannel imple } allocHandle.incMessagesRead(1); - writable = byteBuf.isWritable(); + boolean writable = byteBuf.isWritable(); pipeline.fireChannelRead(byteBuf); byteBuf = null; - } catch (Throwable t) { - handleReadException(pipeline, byteBuf, t, close, allocHandle); - } - if (!close) { - if (!writable) { + if (!writable && config().isAutoRead()) { // Let's schedule another read. - readFromSocket(); + scheduleRead(); } else { // We did not fill the whole ByteBuf so we should break the "read loop" and try again later. pipeline.fireChannelReadComplete(); } + } catch (Throwable t) { + handleReadException(pipeline, byteBuf, t, close, allocHandle); } + } private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 0f2405ce6f..211aa783d5 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -177,11 +177,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); switch (op) { case IOUring.OP_ACCEPT: - //Todo error handle the res - if (res == ECANCELED) { - logger.trace("POLL_LINK canceled"); - break; - } // Fall-through case IOUring.OP_READ: diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 9ad78b9001..0cd350dae0 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -122,8 +122,8 @@ final class IOUringSubmissionQueue { //user_data should be same as POLL_LINK fd if (op == IOUring.OP_POLL_REMOVE) { PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1); - long pollLinkuData = convertToUserData((byte) IOUring.IO_POLL, fd, IOUring.POLLMASK_IN); - PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, pollLinkuData); + long uData = convertToUserData(op, fd, pollMask); + PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData); } long uData = convertToUserData(op, fd, pollMask); @@ -241,7 +241,7 @@ final class IOUringSubmissionQueue { } //fill the address which is associated with server poll link user_data - public boolean addPollRemove(int fd) { + public boolean addPollRemove(int fd, int pollMask) { long sqe = 0; boolean submitted = false; while (sqe == 0) { @@ -252,7 +252,7 @@ final class IOUringSubmissionQueue { submitted = true; } } - setData(sqe, (byte) IOUring.OP_POLL_REMOVE, 0, fd, 0, 0, 0); + setData(sqe, (byte) IOUring.OP_POLL_REMOVE, pollMask, fd, 0, 0, 0); return submitted; } @@ -323,7 +323,6 @@ final class IOUringSubmissionQueue { public void submit() { int submitted = flushSqe(); logger.trace("Submitted: {}", submitted); - System.out.println("submitted: " + submitted); if (submitted > 0) { int ret = Native.ioUringEnter(ringFd, submitted, 0, 0); if (ret < 0) { diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java index 55c7f2539c..c5ad55676e 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java @@ -235,7 +235,7 @@ public class NativeTest { Thread.sleep(10); - submissionQueue.addPollRemove(eventFd.intValue()); + submissionQueue.addPollRemove(eventFd.intValue(), IOUring.POLLMASK_IN); submissionQueue.submit(); Thread waitingCqe = new Thread() {