Exploit blocking FAST_POLL for eventfd reads (#10543)
Motivation If we make eventfd blocking then read can take the place of poll+read Modification Make eventfd blocking, use READ instead of POLLIN, allocating a static 64bit buffer to read into Result Fewer kernel roundtrips for event loop wakeups
This commit is contained in:
parent
d933a9dd56
commit
2316c2ce45
@ -178,8 +178,9 @@ static jint netty_io_uring_enter(JNIEnv *env, jclass class1, jint ring_fd, jint
|
||||
return -err;
|
||||
}
|
||||
|
||||
static jint netty_epoll_native_eventFd(JNIEnv* env, jclass clazz) {
|
||||
jint eventFD = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
|
||||
static jint netty_epoll_native_blocking_event_fd(JNIEnv* env, jclass clazz) {
|
||||
// We use a blocking fd with io_uring FAST_POLL read
|
||||
jint eventFD = eventfd(0, EFD_CLOEXEC);
|
||||
|
||||
if (eventFD < 0) {
|
||||
netty_unix_errors_throwChannelExceptionErrorNo(env, "eventfd() failed: ", errno);
|
||||
@ -210,16 +211,6 @@ static jint netty_io_uring_unregister_event_fd(JNIEnv *env, jclass class1, jint
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static void netty_io_uring_eventFdRead(JNIEnv* env, jclass clazz, jint fd) {
|
||||
uint64_t eventfd_t;
|
||||
|
||||
if (eventfd_read(fd, &eventfd_t) != 0) {
|
||||
// something is serious wrong
|
||||
netty_unix_errors_throwRuntimeException(env, "eventfd_read() failed");
|
||||
}
|
||||
}
|
||||
|
||||
static void netty_io_uring_eventFdWrite(JNIEnv* env, jclass clazz, jint fd, jlong value) {
|
||||
uint64_t val;
|
||||
|
||||
@ -438,9 +429,8 @@ static const JNINativeMethod method_table[] = {
|
||||
{"ioUringExit", "(Lio/netty/channel/uring/RingBuffer;)V", (void *) netty_io_uring_ring_buffer_exit},
|
||||
{"createFile", "()I", (void *) netty_create_file},
|
||||
{"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter},
|
||||
{"eventFd", "()I", (void *) netty_epoll_native_eventFd},
|
||||
{"blockingEventFd", "()I", (void *) netty_epoll_native_blocking_event_fd},
|
||||
{"eventFdWrite", "(IJ)V", (void *) netty_io_uring_eventFdWrite },
|
||||
{"eventFdRead", "(I)V", (void *) netty_io_uring_eventFdRead },
|
||||
{"ioUringRegisterEventFd", "(II)I", (void *) netty_io_uring_register_event_fd},
|
||||
{"ioUringUnregisterEventFd", "(I)I", (void *) netty_io_uring_unregister_event_fd}
|
||||
};
|
||||
@ -593,4 +583,4 @@ JNIEXPORT void JNI_OnUnload(JavaVM* vm, void* reserved) {
|
||||
return;
|
||||
}
|
||||
netty_io_uring_native_JNI_OnUnLoad(env);
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
IOUringCompletionQueue.IOUringCompletionQueueCallback {
|
||||
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class);
|
||||
|
||||
private final long eventfdReadBuf = PlatformDependent.allocateMemory(8);
|
||||
|
||||
private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<AbstractIOUringChannel>(4096);
|
||||
private final RingBuffer ringBuffer;
|
||||
|
||||
@ -69,7 +71,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
}
|
||||
});
|
||||
|
||||
eventfd = Native.newEventFd();
|
||||
eventfd = Native.newBlockingEventFd();
|
||||
logger.trace("New EventLoop: {}", this.toString());
|
||||
}
|
||||
|
||||
@ -122,7 +124,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
final IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
|
||||
// Lets add the eventfd related events before starting to do any real work.
|
||||
submissionQueue.addPollIn(eventfd.intValue());
|
||||
addEventFdRead(submissionQueue);
|
||||
submissionQueue.submit();
|
||||
|
||||
for (;;) {
|
||||
@ -181,23 +183,21 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
|
||||
@Override
|
||||
public boolean handle(int fd, int res, long flags, int op, int pollMask) {
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
final AbstractIOUringChannel channel;
|
||||
if (op == Native.IORING_OP_READ || op == Native.IORING_OP_ACCEPT) {
|
||||
channel = handleRead(fd, res);
|
||||
if (eventfd.intValue() == fd) {
|
||||
channel = null;
|
||||
if (res != Native.ERRNO_ECANCELED_NEGATIVE) {
|
||||
pendingWakeup = false;
|
||||
addEventFdRead(ringBuffer.getIoUringSubmissionQueue());
|
||||
}
|
||||
} else {
|
||||
channel = handleRead(fd, res);
|
||||
}
|
||||
} else if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_WRITE) {
|
||||
channel = handleWrite(fd, res);
|
||||
} else if (op == Native.IORING_OP_POLL_ADD) {
|
||||
if (eventfd.intValue() == fd) {
|
||||
if (res == Native.ERRNO_ECANCELED_NEGATIVE) {
|
||||
return true;
|
||||
}
|
||||
channel = null;
|
||||
pendingWakeup = false;
|
||||
handleEventFd(submissionQueue);
|
||||
} else {
|
||||
channel = handlePollAdd(fd, res, pollMask);
|
||||
}
|
||||
channel = handlePollAdd(fd, res, pollMask);
|
||||
} else if (op == Native.IORING_OP_POLL_REMOVE) {
|
||||
if (res == Errors.ERRNO_ENOENT_NEGATIVE) {
|
||||
logger.trace("IORING_POLL_REMOVE not successful");
|
||||
@ -258,13 +258,8 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
return channel;
|
||||
}
|
||||
|
||||
private void handleEventFd(IOUringSubmissionQueue submissionQueue) {
|
||||
// We need to consume the data as otherwise we would see another event
|
||||
// in the completionQueue without
|
||||
// an extra eventfd_write(....)
|
||||
Native.eventFdRead(eventfd.intValue());
|
||||
|
||||
submissionQueue.addPollIn(eventfd.intValue());
|
||||
private void addEventFdRead(IOUringSubmissionQueue submissionQueue) {
|
||||
submissionQueue.addRead(eventfd.intValue(), eventfdReadBuf, 0, 8);
|
||||
}
|
||||
|
||||
private AbstractIOUringChannel handleConnect(int fd, int res) {
|
||||
@ -284,6 +279,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
}
|
||||
ringBuffer.close();
|
||||
iovArrays.release();
|
||||
PlatformDependent.freeMemory(eventfdReadBuf);
|
||||
}
|
||||
|
||||
public RingBuffer getRingBuffer() {
|
||||
|
@ -111,15 +111,13 @@ final class Native {
|
||||
|
||||
public static native void eventFdWrite(int fd, long value);
|
||||
|
||||
public static native void eventFdRead(int fd);
|
||||
|
||||
public static FileDescriptor newEventFd() {
|
||||
return new FileDescriptor(eventFd());
|
||||
public static FileDescriptor newBlockingEventFd() {
|
||||
return new FileDescriptor(blockingEventFd());
|
||||
}
|
||||
|
||||
public static native void ioUringExit(RingBuffer ringBuffer);
|
||||
|
||||
private static native int eventFd();
|
||||
private static native int blockingEventFd();
|
||||
|
||||
// for testing(it is only temporary)
|
||||
public static native int createFile();
|
||||
|
@ -144,7 +144,7 @@ public class NativeTest {
|
||||
assertNotNull(submissionQueue);
|
||||
assertNotNull(completionQueue);
|
||||
|
||||
final FileDescriptor eventFd = Native.newEventFd();
|
||||
final FileDescriptor eventFd = Native.newBlockingEventFd();
|
||||
assertFalse(submissionQueue.addPollIn(eventFd.intValue()));
|
||||
submissionQueue.submit();
|
||||
|
||||
@ -198,7 +198,7 @@ public class NativeTest {
|
||||
}
|
||||
};
|
||||
waitingCqe.start();
|
||||
final FileDescriptor eventFd = Native.newEventFd();
|
||||
final FileDescriptor eventFd = Native.newBlockingEventFd();
|
||||
assertFalse(submissionQueue.addPollIn(eventFd.intValue()));
|
||||
submissionQueue.submit();
|
||||
|
||||
@ -231,7 +231,7 @@ public class NativeTest {
|
||||
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
|
||||
final IOUringCompletionQueue completionQueue = ringBuffer.getIoUringCompletionQueue();
|
||||
|
||||
FileDescriptor eventFd = Native.newEventFd();
|
||||
FileDescriptor eventFd = Native.newBlockingEventFd();
|
||||
submissionQueue.addPollIn(eventFd.intValue());
|
||||
submissionQueue.submit();
|
||||
submissionQueue.addPollRemove(eventFd.intValue(), Native.POLLIN);
|
||||
|
Loading…
x
Reference in New Issue
Block a user