From dd63d1c8d09157c1e6dd968833bf7e85e69f47b5 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 9 Sep 2020 17:18:47 +0200 Subject: [PATCH] Allow to specify a callback that is executed once submit was called and (#10555) use it for clearing the IovArrays Motivation: IOUringSubmissionQueue may call submit() internally when there is no space left in the buffer. Once this is done we can reuse for example IovArrays etc. Because of this its useful to be able to specify a callback that is executed after submission Modifications: - Allow to specify a Runnable that is called once submission was complete - Use this callback to clear the IovArrays Result: IovArrays are automatically cleared on each submit call. --- .../src/main/c/netty_io_uring_native.c | 8 ++++---- .../java/io/netty/channel/uring/IOUring.java | 7 ++++++- .../netty/channel/uring/IOUringEventLoop.java | 14 +++++++------ .../channel/uring/IOUringSubmissionQueue.java | 7 +++++-- .../java/io/netty/channel/uring/Native.java | 20 ++++++++++++++----- .../io/netty/channel/uring/NativeTest.java | 7 ++++++- 6 files changed, 44 insertions(+), 19 deletions(-) diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_native.c b/transport-native-io_uring/src/main/c/netty_io_uring_native.c index aa4bf57e17..2a98021b4e 100644 --- a/transport-native-io_uring/src/main/c/netty_io_uring_native.c +++ b/transport-native-io_uring/src/main/c/netty_io_uring_native.c @@ -289,7 +289,7 @@ static int nettyBlockingSocket(int domain, int type, int protocol) { return socket(domain, type, protocol); } -static jobject netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries) { +static jobject netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries, jobject submitCallback) { struct io_uring_params p; memset(&p, 0, sizeof(p)); @@ -315,7 +315,7 @@ static jobject netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries) { (jlong)io_uring_ring.sq.kring_entries, (jlong)io_uring_ring.sq.kflags, (jlong)io_uring_ring.sq.kdropped, (jlong)io_uring_ring.sq.array, (jlong)io_uring_ring.sq.sqes, (jlong)io_uring_ring.sq.ring_sz, - (jlong)io_uring_ring.cq.ring_ptr, (jint)ring_fd); + (jlong)io_uring_ring.cq.ring_ptr, (jint)ring_fd, submitCallback); jobject ioUringCompletionQueue = (*env)->NewObject( env, ioUringCompletionQueueClass, ioUringCommpletionQueueMethodId, @@ -434,7 +434,7 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = { static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]); static const JNINativeMethod method_table[] = { - {"ioUringSetup", "(I)Lio/netty/channel/uring/RingBuffer;", (void *) netty_io_uring_setup}, + {"ioUringSetup", "(ILjava/lang/Runnable;)Lio/netty/channel/uring/RingBuffer;", (void *) netty_io_uring_setup}, {"ioUringExit", "(Lio/netty/channel/uring/RingBuffer;)V", (void *) netty_io_uring_ring_buffer_exit}, {"createFile", "()I", (void *) netty_create_file}, {"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter}, @@ -540,7 +540,7 @@ JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) { nettyClassName, done); NETTY_LOAD_CLASS(env, ioUringSubmissionQueueClass, nettyClassName, done); NETTY_GET_METHOD(env, ioUringSubmissionQueueClass, - ioUringSubmissionQueueMethodId, "", "(JJJJJJJJIJI)V", + ioUringSubmissionQueueMethodId, "", "(JJJJJJJJIJILjava/lang/Runnable;)V", done); NETTY_PREPEND(packagePrefix, "io/netty/channel/uring/IOUringCompletionQueue", diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java index a7dd706e14..042f55a9ee 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java @@ -33,7 +33,12 @@ final class IOUring { if (unsafeCause == null) { RingBuffer ringBuffer = null; try { - ringBuffer = Native.createRingBuffer(); + ringBuffer = Native.createRingBuffer(new Runnable() { + @Override + public void run() { + // Noop + } + }); } catch (Throwable t) { cause = t; } finally { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 1d927107ed..5c49734ced 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -65,7 +65,14 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements for (int i = 0; i < iovArrays.length; i++) { iovArrays[i] = new IovArray(); } - ringBuffer = Native.createRingBuffer(); + ringBuffer = Native.createRingBuffer(new Runnable() { + @Override + public void run() { + // Once we submitted its safe to clear the IovArrays and so be able to re-use these. + clearUsedIovArrays(); + } + }); + eventfd = Native.newEventFd(); logger.trace("New EventLoop: {}", this.toString()); } @@ -160,15 +167,11 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements // Always call runAllTasks() as it will also fetch the scheduled tasks that are ready. runAllTasks(); - // Once we submitted its safe to clear the iovArray and so be able to re-use it. submissionQueue.submit(); - clearUsedIovArrays(); try { if (isShuttingDown()) { closeAll(); - // Once we submitted its safe to clear the iovArray and so be able to re-use it. submissionQueue.submit(); - clearUsedIovArrays(); if (confirmShutdown()) { break; @@ -318,7 +321,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements } else { // No array left to use. Submit so we can reuse all of the arrays. ringBuffer.getIoUringSubmissionQueue().submit(); - clearUsedIovArrays(); iovArray = iovArrays[iovArrayIdx]; } assert !iovArray.isFull(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 87a69d0fb0..33ce43a972 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -65,6 +65,7 @@ final class IOUringSubmissionQueue { private final int ringSize; private final long ringAddress; private final int ringFd; + private final Runnable submissionCallback; private final ByteBuffer timeoutMemory; private final long timeoutMemoryAddress; @@ -74,7 +75,7 @@ final class IOUringSubmissionQueue { IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress, long fFlagsAdress, long kDroppedAddress, long arrayAddress, long submissionQueueArrayAddress, int ringSize, - long ringAddress, int ringFd) { + long ringAddress, int ringFd, Runnable submissionCallback) { this.kHeadAddress = kHeadAddress; this.kTailAddress = kTailAddress; this.kRingMaskAddress = kRingMaskAddress; @@ -86,7 +87,7 @@ final class IOUringSubmissionQueue { this.ringSize = ringSize; this.ringAddress = ringAddress; this.ringFd = ringFd; - + this.submissionCallback = submissionCallback; timeoutMemory = Buffer.allocateDirectWithNativeOrder(KERNEL_TIMESPEC_SIZE); timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory); } @@ -347,7 +348,9 @@ final class IOUringSubmissionQueue { throw new RuntimeException("ioUringEnter syscall"); } } + submissionCallback.run(); } + private void setTimeout(long timeoutNanoSeconds) { long seconds, nanoSeconds; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java index b839dd7eae..d1767f24a2 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java @@ -82,16 +82,26 @@ final class Native { static final int IORING_ENTER_GETEVENTS = NativeStaticallyReferencedJniMethods.ioringEnterGetevents(); static final int IOSQE_ASYNC = NativeStaticallyReferencedJniMethods.iosqeAsync(); - public static RingBuffer createRingBuffer(int ringSize) { + static RingBuffer createRingBuffer(int ringSize) { //Todo throw Exception if it's null - return ioUringSetup(ringSize); + return ioUringSetup(ringSize, new Runnable() { + @Override + public void run() { + // Noop + } + }); } - public static RingBuffer createRingBuffer() { - return createRingBuffer(DEFAULT_RING_SIZE); + static RingBuffer createRingBuffer(int ringSize, Runnable submissionCallback) { + //Todo throw Exception if it's null + return ioUringSetup(ringSize, submissionCallback); } - private static native RingBuffer ioUringSetup(int entries); + static RingBuffer createRingBuffer(Runnable submissionCallback) { + return createRingBuffer(DEFAULT_RING_SIZE, submissionCallback); + } + + private static native RingBuffer ioUringSetup(int entries, Runnable submissionCallback); public static native int ioUringEnter(int ringFd, int toSubmit, int minComplete, int flags); diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java index d9b28f9d5f..873a11d573 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java @@ -216,7 +216,12 @@ public class NativeTest { @Test public void ioUringExitTest() { - RingBuffer ringBuffer = Native.createRingBuffer(); + RingBuffer ringBuffer = Native.createRingBuffer(new Runnable() { + @Override + public void run() { + // Noop + } + }); ringBuffer.close(); }