Allow to specify a callback that is executed once submit was called and (#10555)
use it for clearing the IovArrays Motivation: IOUringSubmissionQueue may call submit() internally when there is no space left in the buffer. Once this is done we can reuse for example IovArrays etc. Because of this its useful to be able to specify a callback that is executed after submission Modifications: - Allow to specify a Runnable that is called once submission was complete - Use this callback to clear the IovArrays Result: IovArrays are automatically cleared on each submit call.
This commit is contained in:
parent
044ec159b9
commit
dd63d1c8d0
@ -289,7 +289,7 @@ static int nettyBlockingSocket(int domain, int type, int protocol) {
|
||||
return socket(domain, type, protocol);
|
||||
}
|
||||
|
||||
static jobject netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries) {
|
||||
static jobject netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries, jobject submitCallback) {
|
||||
struct io_uring_params p;
|
||||
memset(&p, 0, sizeof(p));
|
||||
|
||||
@ -315,7 +315,7 @@ static jobject netty_io_uring_setup(JNIEnv *env, jclass class1, jint entries) {
|
||||
(jlong)io_uring_ring.sq.kring_entries, (jlong)io_uring_ring.sq.kflags,
|
||||
(jlong)io_uring_ring.sq.kdropped, (jlong)io_uring_ring.sq.array,
|
||||
(jlong)io_uring_ring.sq.sqes, (jlong)io_uring_ring.sq.ring_sz,
|
||||
(jlong)io_uring_ring.cq.ring_ptr, (jint)ring_fd);
|
||||
(jlong)io_uring_ring.cq.ring_ptr, (jint)ring_fd, submitCallback);
|
||||
|
||||
jobject ioUringCompletionQueue = (*env)->NewObject(
|
||||
env, ioUringCompletionQueueClass, ioUringCommpletionQueueMethodId,
|
||||
@ -434,7 +434,7 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = {
|
||||
static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]);
|
||||
|
||||
static const JNINativeMethod method_table[] = {
|
||||
{"ioUringSetup", "(I)Lio/netty/channel/uring/RingBuffer;", (void *) netty_io_uring_setup},
|
||||
{"ioUringSetup", "(ILjava/lang/Runnable;)Lio/netty/channel/uring/RingBuffer;", (void *) netty_io_uring_setup},
|
||||
{"ioUringExit", "(Lio/netty/channel/uring/RingBuffer;)V", (void *) netty_io_uring_ring_buffer_exit},
|
||||
{"createFile", "()I", (void *) netty_create_file},
|
||||
{"ioUringEnter", "(IIII)I", (void *)netty_io_uring_enter},
|
||||
@ -540,7 +540,7 @@ JNIEXPORT jint JNI_OnLoad(JavaVM *vm, void *reserved) {
|
||||
nettyClassName, done);
|
||||
NETTY_LOAD_CLASS(env, ioUringSubmissionQueueClass, nettyClassName, done);
|
||||
NETTY_GET_METHOD(env, ioUringSubmissionQueueClass,
|
||||
ioUringSubmissionQueueMethodId, "<init>", "(JJJJJJJJIJI)V",
|
||||
ioUringSubmissionQueueMethodId, "<init>", "(JJJJJJJJIJILjava/lang/Runnable;)V",
|
||||
done);
|
||||
|
||||
NETTY_PREPEND(packagePrefix, "io/netty/channel/uring/IOUringCompletionQueue",
|
||||
|
@ -33,7 +33,12 @@ final class IOUring {
|
||||
if (unsafeCause == null) {
|
||||
RingBuffer ringBuffer = null;
|
||||
try {
|
||||
ringBuffer = Native.createRingBuffer();
|
||||
ringBuffer = Native.createRingBuffer(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Noop
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
cause = t;
|
||||
} finally {
|
||||
|
@ -65,7 +65,14 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
for (int i = 0; i < iovArrays.length; i++) {
|
||||
iovArrays[i] = new IovArray();
|
||||
}
|
||||
ringBuffer = Native.createRingBuffer();
|
||||
ringBuffer = Native.createRingBuffer(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Once we submitted its safe to clear the IovArrays and so be able to re-use these.
|
||||
clearUsedIovArrays();
|
||||
}
|
||||
});
|
||||
|
||||
eventfd = Native.newEventFd();
|
||||
logger.trace("New EventLoop: {}", this.toString());
|
||||
}
|
||||
@ -160,15 +167,11 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
// Always call runAllTasks() as it will also fetch the scheduled tasks that are ready.
|
||||
runAllTasks();
|
||||
|
||||
// Once we submitted its safe to clear the iovArray and so be able to re-use it.
|
||||
submissionQueue.submit();
|
||||
clearUsedIovArrays();
|
||||
try {
|
||||
if (isShuttingDown()) {
|
||||
closeAll();
|
||||
// Once we submitted its safe to clear the iovArray and so be able to re-use it.
|
||||
submissionQueue.submit();
|
||||
clearUsedIovArrays();
|
||||
|
||||
if (confirmShutdown()) {
|
||||
break;
|
||||
@ -318,7 +321,6 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
|
||||
} else {
|
||||
// No array left to use. Submit so we can reuse all of the arrays.
|
||||
ringBuffer.getIoUringSubmissionQueue().submit();
|
||||
clearUsedIovArrays();
|
||||
iovArray = iovArrays[iovArrayIdx];
|
||||
}
|
||||
assert !iovArray.isFull();
|
||||
|
@ -65,6 +65,7 @@ final class IOUringSubmissionQueue {
|
||||
private final int ringSize;
|
||||
private final long ringAddress;
|
||||
private final int ringFd;
|
||||
private final Runnable submissionCallback;
|
||||
|
||||
private final ByteBuffer timeoutMemory;
|
||||
private final long timeoutMemoryAddress;
|
||||
@ -74,7 +75,7 @@ final class IOUringSubmissionQueue {
|
||||
IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
|
||||
long fFlagsAdress, long kDroppedAddress, long arrayAddress,
|
||||
long submissionQueueArrayAddress, int ringSize,
|
||||
long ringAddress, int ringFd) {
|
||||
long ringAddress, int ringFd, Runnable submissionCallback) {
|
||||
this.kHeadAddress = kHeadAddress;
|
||||
this.kTailAddress = kTailAddress;
|
||||
this.kRingMaskAddress = kRingMaskAddress;
|
||||
@ -86,7 +87,7 @@ final class IOUringSubmissionQueue {
|
||||
this.ringSize = ringSize;
|
||||
this.ringAddress = ringAddress;
|
||||
this.ringFd = ringFd;
|
||||
|
||||
this.submissionCallback = submissionCallback;
|
||||
timeoutMemory = Buffer.allocateDirectWithNativeOrder(KERNEL_TIMESPEC_SIZE);
|
||||
timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
|
||||
}
|
||||
@ -347,7 +348,9 @@ final class IOUringSubmissionQueue {
|
||||
throw new RuntimeException("ioUringEnter syscall");
|
||||
}
|
||||
}
|
||||
submissionCallback.run();
|
||||
}
|
||||
|
||||
private void setTimeout(long timeoutNanoSeconds) {
|
||||
long seconds, nanoSeconds;
|
||||
|
||||
|
@ -82,16 +82,26 @@ final class Native {
|
||||
static final int IORING_ENTER_GETEVENTS = NativeStaticallyReferencedJniMethods.ioringEnterGetevents();
|
||||
static final int IOSQE_ASYNC = NativeStaticallyReferencedJniMethods.iosqeAsync();
|
||||
|
||||
public static RingBuffer createRingBuffer(int ringSize) {
|
||||
static RingBuffer createRingBuffer(int ringSize) {
|
||||
//Todo throw Exception if it's null
|
||||
return ioUringSetup(ringSize);
|
||||
return ioUringSetup(ringSize, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Noop
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public static RingBuffer createRingBuffer() {
|
||||
return createRingBuffer(DEFAULT_RING_SIZE);
|
||||
static RingBuffer createRingBuffer(int ringSize, Runnable submissionCallback) {
|
||||
//Todo throw Exception if it's null
|
||||
return ioUringSetup(ringSize, submissionCallback);
|
||||
}
|
||||
|
||||
private static native RingBuffer ioUringSetup(int entries);
|
||||
static RingBuffer createRingBuffer(Runnable submissionCallback) {
|
||||
return createRingBuffer(DEFAULT_RING_SIZE, submissionCallback);
|
||||
}
|
||||
|
||||
private static native RingBuffer ioUringSetup(int entries, Runnable submissionCallback);
|
||||
|
||||
public static native int ioUringEnter(int ringFd, int toSubmit, int minComplete, int flags);
|
||||
|
||||
|
@ -216,7 +216,12 @@ public class NativeTest {
|
||||
|
||||
@Test
|
||||
public void ioUringExitTest() {
|
||||
RingBuffer ringBuffer = Native.createRingBuffer();
|
||||
RingBuffer ringBuffer = Native.createRingBuffer(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Noop
|
||||
}
|
||||
});
|
||||
ringBuffer.close();
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user