diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 211aa783d5..028636b944 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -54,10 +54,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements // other value T when EL is waiting with wakeup scheduled at time T private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE); private final FileDescriptor eventfd; + private final IovecArrayPool iovecArrayPool; private long prevDeadlineNanos = NONE; private boolean pendingWakeup; - private IovecArrayPool iovecArrayPool; IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { super(parent, executor, addTaskWakesUp); @@ -203,9 +203,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements break; case IOUring.IO_POLL: - //Todo error handle the res if (res == ECANCELED) { - logger.trace("POLL_LINK canceled"); + logger.trace("IO_POLL cancelled"); break; } if (eventfd.intValue() == fd) { @@ -230,9 +229,9 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements case IOUring.OP_POLL_REMOVE: if (res == ENOENT) { - System.out.println(("POLL_REMOVE OPERATION not permitted")); + logger.trace("POLL_REMOVE not successful"); } else if (res == 0) { - System.out.println(("POLL_REMOVE OPERATION successful")); + logger.trace("POLL_REMOVE successful"); } break; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 0cd350dae0..ef28d475ef 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -122,22 +122,25 @@ final class IOUringSubmissionQueue { //user_data should be same as POLL_LINK fd if (op == IOUring.OP_POLL_REMOVE) { PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1); - long uData = convertToUserData(op, fd, pollMask); + long uData = convertToUserData((byte) IOUring.IO_POLL, fd, pollMask); PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData); + PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, convertToUserData(op, fd, 0)); + PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0); + } else { + long uData = convertToUserData(op, fd, pollMask); + PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData); + //c union set Rw-Flags or accept_flags + if (op != IOUring.OP_ACCEPT) { + PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask); + } else { + //accept_flags set NON_BLOCKING + PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC); + } } - long uData = convertToUserData(op, fd, pollMask); - PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData); - PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0); - //c union set Rw-Flags or accept_flags - if (op != IOUring.OP_ACCEPT) { - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0); - } else { - //accept_flags set NON_BLOCKING - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC); - } + // pad field array -> all fields should be zero long offsetIndex = 0; @@ -146,10 +149,6 @@ final class IOUringSubmissionQueue { offsetIndex += 8; } - if (pollMask != 0) { - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask); - } - logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD)); logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD)); logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD)); diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java index c5ad55676e..1fa8f9cb05 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java @@ -20,6 +20,7 @@ import org.junit.BeforeClass; import org.junit.Test; import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicReference; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; @@ -187,17 +188,13 @@ public class NativeTest { @Override public void run() { assertTrue(completionQueue.ioUringWaitCqe()); - try { - assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { - @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { - assertEquals(1, res); - return true; - } - })); - } catch (Exception e) { - e.printStackTrace(); - } + assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { + @Override + public boolean handle(int fd, int res, long flags, int op, int mask) { + assertEquals(1, res); + return true; + } + })); } }; waitingCqe.start(); @@ -232,47 +229,46 @@ public class NativeTest { FileDescriptor eventFd = Native.newEventFd(); submissionQueue.addPollIn(eventFd.intValue()); submissionQueue.submit(); - - Thread.sleep(10); - submissionQueue.addPollRemove(eventFd.intValue(), IOUring.POLLMASK_IN); submissionQueue.submit(); + final AtomicReference errorRef = new AtomicReference(); Thread waitingCqe = new Thread() { + private final IOUringCompletionQueue.IOUringCompletionQueueCallback verifyCallback = + new IOUringCompletionQueue.IOUringCompletionQueueCallback() { + @Override + public boolean handle(int fd, int res, long flags, int op, int mask) { + if (op == IOUring.IO_POLL) { + assertEquals(IOUringEventLoop.ECANCELED, res); + } else if (op == IOUring.OP_POLL_REMOVE) { + assertEquals(0, res); + } else { + fail("op " + op); + } + return false; + } + }; + @Override public void run() { - assertTrue(completionQueue.ioUringWaitCqe()); try { - assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { - @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { - assertEquals(IOUringEventLoop.ECANCELED, res); - assertEquals(IOUring.IO_POLL, op); - return true; - } - })); - } catch (Exception e) { - e.printStackTrace(); - } - try { - assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { - @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { - assertEquals(0, res); - assertEquals(IOUring.OP_POLL_REMOVE, op); - return true; - } - })); - } catch (Exception e) { - e.printStackTrace(); + assertTrue(completionQueue.ioUringWaitCqe()); + assertEquals(1, completionQueue.process(verifyCallback)); + assertTrue(completionQueue.ioUringWaitCqe()); + assertEquals(1, completionQueue.process(verifyCallback)); + } catch (AssertionError error) { + errorRef.set(error); } } }; waitingCqe.start(); - waitingCqe.join(); try { eventFd.close(); + AssertionError error = errorRef.get(); + if (error != null) { + throw error; + } } finally { ringBuffer.close(); }