From 2316c2ce4587d9ccb25d1bc2dcbf85f4c4e9518d Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Wed, 9 Sep 2020 22:37:39 -0700 Subject: [PATCH] Exploit blocking FAST_POLL for eventfd reads (#10543) Motivation If we make eventfd blocking then read can take the place of poll+read Modification Make eventfd blocking, use READ instead of POLLIN, allocating a static 64bit buffer to read into Result Fewer kernel roundtrips for event loop wakeups --- .../src/main/c/netty_io_uring_native.c | 20 +++------- .../netty/channel/uring/IOUringEventLoop.java | 38 +++++++++---------- .../java/io/netty/channel/uring/Native.java | 8 ++-- .../io/netty/channel/uring/NativeTest.java | 6 +-- 4 files changed, 28 insertions(+), 44 deletions(-) diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_native.c b/transport-native-io_uring/src/main/c/netty_io_uring_native.c index 2a98021b4e..4dd6a5f287 100644 --- a/transport-native-io_uring/src/main/c/netty_io_uring_native.c +++ b/transport-native-io_uring/src/main/c/netty_io_uring_native.c @@ -178,8 +178,9 @@ static jint netty_io_uring_enter(JNIEnv *env, jclass class1, jint ring_fd, jint return -err; } -static jint netty_epoll_native_eventFd(JNIEnv* env, jclass clazz) { - jint eventFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); +static jint netty_epoll_native_blocking_event_fd(JNIEnv* env, jclass clazz) { + // We use a blocking fd with io_uring FAST_POLL read + jint eventFD = eventfd(0, EFD_CLOEXEC); if (eventFD < 0) { netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd() failed: ", errno); @@ -210,16 +211,6 @@ static jint netty_io_uring_unregister_event_fd(JNIEnv *env, jclass class1, jint return 0; } - -static void netty_io_uring_eventFdRead(JNIEnv* env, jclass clazz, jint fd) { - uint64_t eventfd_t; - - if (eventfd_read(fd, &eventfd_t) != 0) { - // something is serious wrong - netty_unix_errors_throwRuntimeException(env, "eventfd_read() failed"); - } -} - static void netty_io_uring_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) { uint64_t val; @@ -438,9 +429,8 @@ static const JNINativeMethod method_table[] = { {"ioUringExit", "(Lio/netty/channel/uring/RingBuffer;)V", (void *) netty_io_uring_ring_buffer_exit}, {"createFile", "()I", (void *) netty_create_file}, {"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter}, - {"eventFd", "()I", (void *) netty_epoll_native_eventFd}, + {"blockingEventFd", "()I", (void *) netty_epoll_native_blocking_event_fd}, {"eventFdWrite", "(IJ)V", (void *) netty_io_uring_eventFdWrite }, - {"eventFdRead", "(I)V", (void *) netty_io_uring_eventFdRead }, {"ioUringRegisterEventFd", "(II)I", (void *) netty_io_uring_register_event_fd}, {"ioUringUnregisterEventFd", "(I)I", (void *) netty_io_uring_unregister_event_fd} }; @@ -593,4 +583,4 @@ JNIEXPORT void JNI_OnUnload(JavaVM* vm, void* reserved) { return; } netty_io_uring_native_JNI_OnUnLoad(env); -} \ No newline at end of file +} diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 577a2a733d..f8c397bec7 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -35,6 +35,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements IOUringCompletionQueue.IOUringCompletionQueueCallback { private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class); + private final long eventfdReadBuf = PlatformDependent.allocateMemory(8); + private final IntObjectMap channels = new IntObjectHashMap(4096); private final RingBuffer ringBuffer; @@ -69,7 +71,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements } }); - eventfd = Native.newEventFd(); + eventfd = Native.newBlockingEventFd(); logger.trace("New EventLoop: {}", this.toString()); } @@ -122,7 +124,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements final IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); // Lets add the eventfd related events before starting to do any real work. - submissionQueue.addPollIn(eventfd.intValue()); + addEventFdRead(submissionQueue); submissionQueue.submit(); for (;;) { @@ -181,23 +183,21 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements @Override public boolean handle(int fd, int res, long flags, int op, int pollMask) { - IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); final AbstractIOUringChannel channel; if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT) { - channel = handleRead(fd, res); + if (eventfd.intValue() == fd) { + channel = null; + if (res != Native.ERRNO_ECANCELED_NEGATIVE) { + pendingWakeup = false; + addEventFdRead(ringBuffer.getIoUringSubmissionQueue()); + } + } else { + channel = handleRead(fd, res); + } } else if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_WRITE) { channel = handleWrite(fd, res); } else if (op == Native.IORING_OP_POLL_ADD) { - if (eventfd.intValue() == fd) { - if (res == Native.ERRNO_ECANCELED_NEGATIVE) { - return true; - } - channel = null; - pendingWakeup = false; - handleEventFd(submissionQueue); - } else { - channel = handlePollAdd(fd, res, pollMask); - } + channel = handlePollAdd(fd, res, pollMask); } else if (op == Native.IORING_OP_POLL_REMOVE) { if (res == Errors.ERRNO_ENOENT_NEGATIVE) { logger.trace("IORING_POLL_REMOVE not successful"); @@ -258,13 +258,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements return channel; } - private void handleEventFd(IOUringSubmissionQueue submissionQueue) { - // We need to consume the data as otherwise we would see another event - // in the completionQueue without - // an extra eventfd_write(....) - Native.eventFdRead(eventfd.intValue()); - - submissionQueue.addPollIn(eventfd.intValue()); + private void addEventFdRead(IOUringSubmissionQueue submissionQueue) { + submissionQueue.addRead(eventfd.intValue(), eventfdReadBuf, 0, 8); } private AbstractIOUringChannel handleConnect(int fd, int res) { @@ -284,6 +279,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements } ringBuffer.close(); iovArrays.release(); + PlatformDependent.freeMemory(eventfdReadBuf); } public RingBuffer getRingBuffer() { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java index d1767f24a2..5bcae81e8d 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java @@ -111,15 +111,13 @@ final class Native { public static native void eventFdWrite(int fd, long value); - public static native void eventFdRead(int fd); - - public static FileDescriptor newEventFd() { - return new FileDescriptor(eventFd()); + public static FileDescriptor newBlockingEventFd() { + return new FileDescriptor(blockingEventFd()); } public static native void ioUringExit(RingBuffer ringBuffer); - private static native int eventFd(); + private static native int blockingEventFd(); // for testing(it is only temporary) public static native int createFile(); diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java index 873a11d573..17367fc3fa 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java @@ -144,7 +144,7 @@ public class NativeTest { assertNotNull(submissionQueue); assertNotNull(completionQueue); - final FileDescriptor eventFd = Native.newEventFd(); + final FileDescriptor eventFd = Native.newBlockingEventFd(); assertFalse(submissionQueue.addPollIn(eventFd.intValue())); submissionQueue.submit(); @@ -198,7 +198,7 @@ public class NativeTest { } }; waitingCqe.start(); - final FileDescriptor eventFd = Native.newEventFd(); + final FileDescriptor eventFd = Native.newBlockingEventFd(); assertFalse(submissionQueue.addPollIn(eventFd.intValue())); submissionQueue.submit(); @@ -231,7 +231,7 @@ public class NativeTest { IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue(); - FileDescriptor eventFd = Native.newEventFd(); + FileDescriptor eventFd = Native.newBlockingEventFd(); submissionQueue.addPollIn(eventFd.intValue()); submissionQueue.submit(); submissionQueue.addPollRemove(eventFd.intValue(), Native.POLLIN);