Correctly handle eventfd in io_uring
Motivation: We use eventfd in our io_uring based transport to wakeup the eventloop. When doing so we need to be careful that we read any data previous written to it. Modification: - Correctly read data that was written to eventfd before submit another event related to it to the submission queue as otherwise we will see another completion event related to it asap - Ensure we not remove the wrong event from the storted event ids (we did remove the wrong before because we reused the Event object) - ensure we only use the submission queue from the EventLoop thread in all cases - add another unit test Result: Wakeups via eventfd work as expected
This commit is contained in:
parent
191f0de6ee
commit
e13cc929dc
@ -201,7 +201,17 @@ static jint netty_io_uring_unregister_event_fd(JNIEnv *env, jclass class1, jint
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void netty_epoll_native_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) {
|
||||
|
||||
static void netty_io_uring_eventFdRead(JNIEnv* env, jclass clazz, jint fd) {
|
||||
uint64_t eventfd_t;
|
||||
|
||||
if (eventfd_read(fd, &eventfd_t) != 0) {
|
||||
// something is serious wrong
|
||||
netty_unix_errors_throwRuntimeException(env, "eventfd_read() failed");
|
||||
}
|
||||
}
|
||||
|
||||
static void netty_io_uring_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) {
|
||||
uint64_t val;
|
||||
|
||||
for (;;) {
|
||||
@ -325,7 +335,8 @@ static const JNINativeMethod method_table[] = {
|
||||
{"createFile", "()I", (void *) netty_create_file},
|
||||
{"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter},
|
||||
{"eventFd", "()I", (void *) netty_epoll_native_eventFd},
|
||||
{"eventFdWrite", "(IJ)V", (void *) netty_epoll_native_eventFdWrite },
|
||||
{"eventFdWrite", "(IJ)V", (void *) netty_io_uring_eventFdWrite },
|
||||
{"eventFdRead", "(I)V", (void *) netty_io_uring_eventFdRead },
|
||||
{"ioUringRegisterEventFd", "(II)I", (void *) netty_io_uring_register_event_fd},
|
||||
{"ioUringUnregisterEventFd", "(I)I", (void *) netty_io_uring_unregister_event_fd}
|
||||
};
|
||||
|
@ -25,10 +25,12 @@ import io.netty.channel.unix.FileDescriptor;
|
||||
import io.netty.util.collection.IntObjectHashMap;
|
||||
import io.netty.util.collection.IntObjectMap;
|
||||
import io.netty.util.collection.LongObjectHashMap;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@ -68,13 +70,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
|
||||
ringBuffer = Native.createRingBuffer(ringSize);
|
||||
eventfd = Native.newEventFd();
|
||||
long eventId = incrementEventIdCounter();
|
||||
Event event = new Event();
|
||||
event.setOp(EventType.POLL_EVENTFD);
|
||||
event.setId(eventId);
|
||||
addNewEvent(event);
|
||||
ringBuffer.getIoUringSubmissionQueue().addPoll(eventId, eventfd.intValue(), event.getOp());
|
||||
ringBuffer.getIoUringSubmissionQueue().submit();
|
||||
logger.trace("New EventLoop: {}", this.toString());
|
||||
}
|
||||
|
||||
@ -125,6 +120,16 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
protected void run() {
|
||||
final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue();
|
||||
final IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
|
||||
// Lets add the eventfd related events before starting to do any real work.
|
||||
long eventId = incrementEventIdCounter();
|
||||
Event event = new Event();
|
||||
event.setOp(EventType.POLL_EVENTFD);
|
||||
event.setId(eventId);
|
||||
addNewEvent(event);
|
||||
submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp());
|
||||
submissionQueue.submit();
|
||||
|
||||
for (;;) {
|
||||
logger.trace("Run IOUringEventLoop {}", this.toString());
|
||||
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
|
||||
@ -140,8 +145,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
try {
|
||||
if (curDeadlineNanos != prevDeadlineNanos) {
|
||||
prevDeadlineNanos = curDeadlineNanos;
|
||||
Event event = new Event();
|
||||
long eventId = incrementEventIdCounter();
|
||||
event = new Event();
|
||||
eventId = incrementEventIdCounter();
|
||||
event.setId(eventId);
|
||||
event.setOp(EventType.TIMEOUT);
|
||||
addNewEvent(event);
|
||||
@ -165,10 +170,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
}
|
||||
|
||||
while (ioUringCqe != null) {
|
||||
final Event event = events.get(ioUringCqe.getEventId());
|
||||
event = events.get(ioUringCqe.getEventId());
|
||||
|
||||
if (event != null) {
|
||||
logger.trace("EventType Incoming: " + event.getOp().name());
|
||||
System.err.println("EventType Incoming: " + event.getOp().name());
|
||||
processEvent(ioUringCqe.getRes(), event);
|
||||
}
|
||||
|
||||
@ -196,6 +201,9 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
}
|
||||
|
||||
private void processEvent(final int res, final Event event) {
|
||||
// Remove the id first so we not end up with invalid entries in any cases.
|
||||
this.events.remove(event.getId());
|
||||
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
switch (event.getOp()) {
|
||||
case ACCEPT:
|
||||
@ -307,12 +315,14 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
break;
|
||||
case POLL_EVENTFD:
|
||||
pendingWakeup = false;
|
||||
//Todo eventId is already used
|
||||
long eventId = incrementEventIdCounter();
|
||||
event.setId(eventId);
|
||||
event.setOp(EventType.POLL_EVENTFD);
|
||||
addNewEvent(event);
|
||||
submissionQueue.addPoll(eventId, eventfd.intValue(), event.getOp());
|
||||
// We need to consume the data as otherwise we would see another event in the completionQueue without
|
||||
// an extra eventfd_write(....)
|
||||
Native.eventFdRead(eventfd.intValue());
|
||||
Event eventfdEvent = new Event();
|
||||
eventfdEvent.setId(incrementEventIdCounter());
|
||||
eventfdEvent.setOp(EventType.POLL_EVENTFD);
|
||||
addNewEvent(eventfdEvent);
|
||||
submissionQueue.addPoll(eventfdEvent.getId(), eventfd.intValue(), eventfdEvent.getOp());
|
||||
// Submit so its picked up
|
||||
submissionQueue.submit();
|
||||
break;
|
||||
@ -329,7 +339,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop {
|
||||
logger.trace("POLL_OUT Res: {}", res);
|
||||
break;
|
||||
}
|
||||
this.events.remove(event.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -87,6 +87,8 @@ final class Native {
|
||||
|
||||
public static native void eventFdWrite(int fd, long value);
|
||||
|
||||
public static native void eventFdRead(int fd);
|
||||
|
||||
public static FileDescriptor newEventFd() {
|
||||
return new FileDescriptor(eventFd());
|
||||
}
|
||||
|
@ -0,0 +1,57 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.channel.uring;
|
||||
|
||||
import io.netty.channel.EventLoop;
|
||||
import org.junit.Test;
|
||||
|
||||
public class IOUringEventLoopTest {
|
||||
|
||||
@Test
|
||||
public void testSubmitMultipleTasksAndEnsureTheseAreExecuted() throws Exception {
|
||||
IOUringEventLoopGroup group = new IOUringEventLoopGroup(1);
|
||||
try {
|
||||
EventLoop loop = group.next();
|
||||
loop.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
}
|
||||
}).sync();
|
||||
|
||||
loop.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
}
|
||||
}).sync();
|
||||
loop.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
}
|
||||
}).sync();
|
||||
loop.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
}
|
||||
}).sync();
|
||||
} finally {
|
||||
group.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user