diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_native.c b/transport-native-io_uring/src/main/c/netty_io_uring_native.c index d8093d2649..be98778a2f 100644 --- a/transport-native-io_uring/src/main/c/netty_io_uring_native.c +++ b/transport-native-io_uring/src/main/c/netty_io_uring_native.c @@ -201,7 +201,17 @@ static jint netty_io_uring_unregister_event_fd(JNIEnv *env, jclass class1, jint return 0; } -static void netty_epoll_native_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) { + +static void netty_io_uring_eventFdRead(JNIEnv* env, jclass clazz, jint fd) { + uint64_t eventfd_t; + + if (eventfd_read(fd, &eventfd_t) != 0) { + // something is serious wrong + netty_unix_errors_throwRuntimeException(env, "eventfd_read() failed"); + } +} + +static void netty_io_uring_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) { uint64_t val; for (;;) { @@ -325,7 +335,8 @@ static const JNINativeMethod method_table[] = { {"createFile", "()I", (void *) netty_create_file}, {"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter}, {"eventFd", "()I", (void *) netty_epoll_native_eventFd}, - {"eventFdWrite", "(IJ)V", (void *) netty_epoll_native_eventFdWrite }, + {"eventFdWrite", "(IJ)V", (void *) netty_io_uring_eventFdWrite }, + {"eventFdRead", "(I)V", (void *) netty_io_uring_eventFdRead }, {"ioUringRegisterEventFd", "(II)I", (void *) netty_io_uring_register_event_fd}, {"ioUringUnregisterEventFd", "(I)I", (void *) netty_io_uring_unregister_event_fd} }; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index c05013da72..fb1ab9c84e 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -25,10 +25,12 @@ import io.netty.channel.unix.FileDescriptor; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.LongObjectHashMap; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; +import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; @@ -68,13 +70,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop { ringBuffer = Native.createRingBuffer(ringSize); eventfd = Native.newEventFd(); - long eventId = incrementEventIdCounter(); - Event event = new Event(); - event.setOp(EventType.POLL_EVENTFD); - event.setId(eventId); - addNewEvent(event); - ringBuffer.getIoUringSubmissionQueue().addPoll(eventId, eventfd.intValue(), event.getOp()); - ringBuffer.getIoUringSubmissionQueue().submit(); logger.trace("New EventLoop: {}", this.toString()); } @@ -125,6 +120,16 @@ final class IOUringEventLoop extends SingleThreadEventLoop { protected void run() { final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue(); final IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); + + // Lets add the eventfd related events before starting to do any real work. + long eventId = incrementEventIdCounter(); + Event event = new Event(); + event.setOp(EventType.POLL_EVENTFD); + event.setId(eventId); + addNewEvent(event); + submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp()); + submissionQueue.submit(); + for (;;) { logger.trace("Run IOUringEventLoop {}", this.toString()); long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); @@ -140,8 +145,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop { try { if (curDeadlineNanos != prevDeadlineNanos) { prevDeadlineNanos = curDeadlineNanos; - Event event = new Event(); - long eventId = incrementEventIdCounter(); + event = new Event(); + eventId = incrementEventIdCounter(); event.setId(eventId); event.setOp(EventType.TIMEOUT); addNewEvent(event); @@ -165,10 +170,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop { } while (ioUringCqe != null) { - final Event event = events.get(ioUringCqe.getEventId()); + event = events.get(ioUringCqe.getEventId()); if (event != null) { - logger.trace("EventType Incoming: " + event.getOp().name()); + System.err.println("EventType Incoming: " + event.getOp().name()); processEvent(ioUringCqe.getRes(), event); } @@ -196,6 +201,9 @@ final class IOUringEventLoop extends SingleThreadEventLoop { } private void processEvent(final int res, final Event event) { + // Remove the id first so we not end up with invalid entries in any cases. + this.events.remove(event.getId()); + IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue(); switch (event.getOp()) { case ACCEPT: @@ -307,12 +315,14 @@ final class IOUringEventLoop extends SingleThreadEventLoop { break; case POLL_EVENTFD: pendingWakeup = false; - //Todo eventId is already used - long eventId = incrementEventIdCounter(); - event.setId(eventId); - event.setOp(EventType.POLL_EVENTFD); - addNewEvent(event); - submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp()); + // We need to consume the data as otherwise we would see another event in the completionQueue without + // an extra eventfd_write(....) + Native.eventFdRead(eventfd.intValue()); + Event eventfdEvent = new Event(); + eventfdEvent.setId(incrementEventIdCounter()); + eventfdEvent.setOp(EventType.POLL_EVENTFD); + addNewEvent(eventfdEvent); + submissionQueue.addPoll(eventfdEvent.getId(), eventfd.intValue(), eventfdEvent.getOp()); // Submit so its picked up submissionQueue.submit(); break; @@ -329,7 +339,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop { logger.trace("POLL_OUT Res: {}", res); break; } - this.events.remove(event.getId()); } @Override diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java index 6bbec45bea..e8a79b24a9 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java @@ -87,6 +87,8 @@ final class Native { public static native void eventFdWrite(int fd, long value); + public static native void eventFdRead(int fd); + public static FileDescriptor newEventFd() { return new FileDescriptor(eventFd()); } diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringEventLoopTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringEventLoopTest.java new file mode 100644 index 0000000000..4f179b3e7e --- /dev/null +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/IOUringEventLoopTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.uring; + +import io.netty.channel.EventLoop; +import org.junit.Test; + +public class IOUringEventLoopTest { + + @Test + public void testSubmitMultipleTasksAndEnsureTheseAreExecuted() throws Exception { + IOUringEventLoopGroup group = new IOUringEventLoopGroup(1); + try { + EventLoop loop = group.next(); + loop.submit(new Runnable() { + @Override + public void run() { + + } + }).sync(); + + loop.submit(new Runnable() { + @Override + public void run() { + + } + }).sync(); + loop.submit(new Runnable() { + @Override + public void run() { + + } + }).sync(); + loop.submit(new Runnable() { + @Override + public void run() { + + } + }).sync(); + } finally { + group.shutdownGracefully(); + } + } +}