diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_native.c b/transport-native-io_uring/src/main/c/netty_io_uring_native.c index aa4bf57e17..2a98021b4e 100644 --- a/transport-native-io_uring/src/main/c/netty_io_uring_native.c +++ b/transport-native-io_uring/src/main/c/netty_io_uring_native.c @@ -289,7 +289,7 @@ static int nettyBlockingSocket(int domain, int type, int protocol) { return socket(domain, type, protocol); } -static jobject netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries) { +static jobject netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries, jobject submitCallback) { struct io_uring_params p; memset(&p, 0, sizeof(p)); @@ -315,7 +315,7 @@ static jobject netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries) { (jlong)io_uring_ring.sq.kring_entries, (jlong)io_uring_ring.sq.kflags, (jlong)io_uring_ring.sq.kdropped, (jlong)io_uring_ring.sq.array, (jlong)io_uring_ring.sq.sqes, (jlong)io_uring_ring.sq.ring_sz, - (jlong)io_uring_ring.cq.ring_ptr, (jint)ring_fd); + (jlong)io_uring_ring.cq.ring_ptr, (jint)ring_fd, submitCallback); jobject ioUringCompletionQueue = (*env)->NewObject( env, ioUringCompletionQueueClass, ioUringCommpletionQueueMethodId, @@ -434,7 +434,7 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = { static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]); static const JNINativeMethod method_table[] = { - {"ioUringSetup", "(I)Lio/netty/channel/uring/RingBuffer;", (void *) netty_io_uring_setup}, + {"ioUringSetup", "(ILjava/lang/Runnable;)Lio/netty/channel/uring/RingBuffer;", (void *) netty_io_uring_setup}, {"ioUringExit", "(Lio/netty/channel/uring/RingBuffer;)V", (void *) netty_io_uring_ring_buffer_exit}, {"createFile", "()I", (void *) netty_create_file}, {"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter}, @@ -540,7 +540,7 @@ JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) { nettyClassName, done); NETTY_LOAD_CLASS(env, ioUringSubmissionQueueClass, nettyClassName, done); NETTY_GET_METHOD(env, ioUringSubmissionQueueClass, - ioUringSubmissionQueueMethodId, "", "(JJJJJJJJIJI)V", + ioUringSubmissionQueueMethodId, "", "(JJJJJJJJIJILjava/lang/Runnable;)V", done); NETTY_PREPEND(packagePrefix, "io/netty/channel/uring/IOUringCompletionQueue", diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java index a7dd706e14..042f55a9ee 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUring.java @@ -33,7 +33,12 @@ final class IOUring { if (unsafeCause == null) { RingBuffer ringBuffer = null; try { - ringBuffer = Native.createRingBuffer(); + ringBuffer = Native.createRingBuffer(new Runnable() { + @Override + public void run() { + // Noop + } + }); } catch (Throwable t) { cause = t; } finally { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 1d927107ed..5c49734ced 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -65,7 +65,14 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements for (int i = 0; i < iovArrays.length; i++) { iovArrays[i] = new IovArray(); } - ringBuffer = Native.createRingBuffer(); + ringBuffer = Native.createRingBuffer(new Runnable() { + @Override + public void run() { + // Once we submitted its safe to clear the IovArrays and so be able to re-use these. + clearUsedIovArrays(); + } + }); + eventfd = Native.newEventFd(); logger.trace("New EventLoop: {}", this.toString()); } @@ -160,15 +167,11 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements // Always call runAllTasks() as it will also fetch the scheduled tasks that are ready. runAllTasks(); - // Once we submitted its safe to clear the iovArray and so be able to re-use it. submissionQueue.submit(); - clearUsedIovArrays(); try { if (isShuttingDown()) { closeAll(); - // Once we submitted its safe to clear the iovArray and so be able to re-use it. submissionQueue.submit(); - clearUsedIovArrays(); if (confirmShutdown()) { break; @@ -318,7 +321,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements } else { // No array left to use. Submit so we can reuse all of the arrays. ringBuffer.getIoUringSubmissionQueue().submit(); - clearUsedIovArrays(); iovArray = iovArrays[iovArrayIdx]; } assert !iovArray.isFull(); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 87a69d0fb0..33ce43a972 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -65,6 +65,7 @@ final class IOUringSubmissionQueue { private final int ringSize; private final long ringAddress; private final int ringFd; + private final Runnable submissionCallback; private final ByteBuffer timeoutMemory; private final long timeoutMemoryAddress; @@ -74,7 +75,7 @@ final class IOUringSubmissionQueue { IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress, long fFlagsAdress, long kDroppedAddress, long arrayAddress, long submissionQueueArrayAddress, int ringSize, - long ringAddress, int ringFd) { + long ringAddress, int ringFd, Runnable submissionCallback) { this.kHeadAddress = kHeadAddress; this.kTailAddress = kTailAddress; this.kRingMaskAddress = kRingMaskAddress; @@ -86,7 +87,7 @@ final class IOUringSubmissionQueue { this.ringSize = ringSize; this.ringAddress = ringAddress; this.ringFd = ringFd; - + this.submissionCallback = submissionCallback; timeoutMemory = Buffer.allocateDirectWithNativeOrder(KERNEL_TIMESPEC_SIZE); timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory); } @@ -347,7 +348,9 @@ final class IOUringSubmissionQueue { throw new RuntimeException("ioUringEnter syscall"); } } + submissionCallback.run(); } + private void setTimeout(long timeoutNanoSeconds) { long seconds, nanoSeconds; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java index b839dd7eae..d1767f24a2 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java @@ -82,16 +82,26 @@ final class Native { static final int IORING_ENTER_GETEVENTS = NativeStaticallyReferencedJniMethods.ioringEnterGetevents(); static final int IOSQE_ASYNC = NativeStaticallyReferencedJniMethods.iosqeAsync(); - public static RingBuffer createRingBuffer(int ringSize) { + static RingBuffer createRingBuffer(int ringSize) { //Todo throw Exception if it's null - return ioUringSetup(ringSize); + return ioUringSetup(ringSize, new Runnable() { + @Override + public void run() { + // Noop + } + }); } - public static RingBuffer createRingBuffer() { - return createRingBuffer(DEFAULT_RING_SIZE); + static RingBuffer createRingBuffer(int ringSize, Runnable submissionCallback) { + //Todo throw Exception if it's null + return ioUringSetup(ringSize, submissionCallback); } - private static native RingBuffer ioUringSetup(int entries); + static RingBuffer createRingBuffer(Runnable submissionCallback) { + return createRingBuffer(DEFAULT_RING_SIZE, submissionCallback); + } + + private static native RingBuffer ioUringSetup(int entries, Runnable submissionCallback); public static native int ioUringEnter(int ringFd, int toSubmit, int minComplete, int flags); diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java index d9b28f9d5f..873a11d573 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java @@ -216,7 +216,12 @@ public class NativeTest { @Test public void ioUringExitTest() { - RingBuffer ringBuffer = Native.createRingBuffer(); + RingBuffer ringBuffer = Native.createRingBuffer(new Runnable() { + @Override + public void run() { + // Noop + } + }); ringBuffer.close(); }