From 90674b4fced8236c611b395269606e4d22239c35 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Thu, 10 Sep 2020 04:25:34 -0700 Subject: [PATCH] Simplify SQE handling (#10544) Motivation SQE handling can be simplified in terms of code and operations performed Modifications - Zero SQE array up front - no need to set never-used fields each time - Fill SQ array up front with corresponding indicies - no need to set each time since they are 1-1 with SQE array entries - Keep local head and tail vars and don't track separate sqe array head/tail - Allocate memory for timespec directly (no need for ByteBuffer) - Avoid some unnecessary casts / type conversions (no need to convert uints to longs) Result Fewer operations, less code --- .../src/main/c/netty_io_uring_native.c | 6 +- .../channel/uring/IOUringSubmissionQueue.java | 323 ++++-------------- 2 files changed, 76 insertions(+), 253 deletions(-) diff --git a/transport-native-io_uring/src/main/c/netty_io_uring_native.c b/transport-native-io_uring/src/main/c/netty_io_uring_native.c index 4dd6a5f287..ed9df7c124 100644 --- a/transport-native-io_uring/src/main/c/netty_io_uring_native.c +++ b/transport-native-io_uring/src/main/c/netty_io_uring_native.c @@ -249,7 +249,7 @@ static void netty_io_uring_ring_buffer_exit(JNIEnv *env, jclass class, jobject r jclass completionQueueClass = (*env)->GetObjectClass(env, completionQueue); jmethodID submissionQueueArrayAddressMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getSubmissionQueueArrayAddress", "()J"); - jmethodID submissionQueueKringEntriesAddressMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getKRingEntriesAddress", "()J"); + jmethodID submissionQueueRingEntriesMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getRingEntries", "()I"); jmethodID submissionQueueRingFdMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getRingFd", "()I"); jmethodID submissionQueueRingAddressMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getRingAddress", "()J"); jmethodID submissionQueueRingSizeMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getRingSize", "()I"); @@ -258,7 +258,7 @@ static void netty_io_uring_ring_buffer_exit(JNIEnv *env, jclass class, jobject r jmethodID completionQueueRingSizeMethodId = (*env)->GetMethodID(env, completionQueueClass, "getRingSize", "()I"); jlong submissionQueueArrayAddress = (*env)->CallLongMethod(env, submissionQueue, submissionQueueArrayAddressMethodId); - jlong submissionQueueKringEntriesAddress = (*env)->CallLongMethod(env, submissionQueue, submissionQueueKringEntriesAddressMethodId); + jint submissionQueueKringEntries = (*env)->CallIntMethod(env, submissionQueue, submissionQueueRingEntriesMethodId); jint submissionQueueRingFd = (*env)->CallIntMethod(env, submissionQueue, submissionQueueRingFdMethodId); jlong submissionQueueRingAddress = (*env)->CallLongMethod(env, submissionQueue, submissionQueueRingAddressMethodId); jint submissionQueueRingSize = (*env)->CallIntMethod(env, submissionQueue, submissionQueueRingSizeMethodId); @@ -266,8 +266,6 @@ static void netty_io_uring_ring_buffer_exit(JNIEnv *env, jclass class, jobject r jlong completionQueueRingAddress = (*env)->CallLongMethod(env, completionQueue, completionQueueRingAddressMethodId); jint completionQueueRingSize = (*env)->CallIntMethod(env, completionQueue, completionQueueRingSizeMethodId); - unsigned submissionQueueKringEntries = *((unsigned int *) submissionQueueKringEntriesAddress); - munmap((struct io_uring_sqe*) submissionQueueArrayAddress, submissionQueueKringEntries * sizeof(struct io_uring_sqe)); munmap((void*) submissionQueueRingAddress, submissionQueueRingSize); if (((void *) completionQueueRingAddress) && ((void *) completionQueueRingAddress) != ((void *) submissionQueueRingAddress)) { diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 33ce43a972..53204e7004 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -15,20 +15,17 @@ */ package io.netty.channel.uring; -import io.netty.channel.unix.Buffer; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; -import java.nio.ByteBuffer; - import static java.lang.Math.max; import static java.lang.Math.min; final class IOUringSubmissionQueue { private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringSubmissionQueue.class); - private static final int SQE_SIZE = 64; + private static final long SQE_SIZE = 64; private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support? private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec @@ -51,35 +48,30 @@ final class IOUringSubmissionQueue { //these unsigned integer pointers(shared with the kernel) will be changed by the kernel private final long kHeadAddress; private final long kTailAddress; - private final long kRingMaskAddress; - private final long kRingEntriesAddress; private final long fFlagsAdress; private final long kDroppedAddress; private final long arrayAddress; private final long submissionQueueArrayAddress; - private long sqeHead; - private long sqeTail; + private final int ringEntries; + private final int ringMask; // = ringEntries - 1 private final int ringSize; private final long ringAddress; private final int ringFd; private final Runnable submissionCallback; - private final ByteBuffer timeoutMemory; private final long timeoutMemoryAddress; - //private int sqeSubmitCounter; + private int head; + private int tail; IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress, - long fFlagsAdress, long kDroppedAddress, long arrayAddress, - long submissionQueueArrayAddress, int ringSize, - long ringAddress, int ringFd, Runnable submissionCallback) { + long fFlagsAdress, long kDroppedAddress, long arrayAddress, long submissionQueueArrayAddress, + int ringSize, long ringAddress, int ringFd, Runnable submissionCallback) { this.kHeadAddress = kHeadAddress; this.kTailAddress = kTailAddress; - this.kRingMaskAddress = kRingMaskAddress; - this.kRingEntriesAddress = kRingEntriesAddress; this.fFlagsAdress = fFlagsAdress; this.kDroppedAddress = kDroppedAddress; this.arrayAddress = arrayAddress; @@ -88,85 +80,62 @@ final class IOUringSubmissionQueue { this.ringAddress = ringAddress; this.ringFd = ringFd; this.submissionCallback = submissionCallback; - timeoutMemory = Buffer.allocateDirectWithNativeOrder(KERNEL_TIMESPEC_SIZE); - timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory); - } + this.ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress); + this.ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress); + this.head = PlatformDependent.getIntVolatile(kHeadAddress); + this.tail = PlatformDependent.getIntVolatile(kTailAddress); - public long getSqe() { - long next = sqeTail + 1; - long kRingEntries = toUnsignedLong(PlatformDependent.getInt(kRingEntriesAddress)); + this.timeoutMemoryAddress = PlatformDependent.allocateMemory(KERNEL_TIMESPEC_SIZE); - //acquire memory barrier - long kHead = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); + // Zero the whole SQE array first + PlatformDependent.setMemory(submissionQueueArrayAddress, ringEntries * SQE_SIZE, (byte) 0); - long sqe = 0; - if ((next - kHead) <= kRingEntries) { - long index = sqeTail & toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress)); - sqe = SQE_SIZE * index + submissionQueueArrayAddress; - sqeTail = next; + // Fill SQ array indices (1-1 with SQE array) and set nonzero constant SQE fields + long address = arrayAddress; + long sqeFlagsAddress = submissionQueueArrayAddress + SQE_FLAGS_FIELD; + for (int i = 0; i < ringEntries; i++, address += INT_SIZE, sqeFlagsAddress += SQE_SIZE) { + PlatformDependent.putInt(address, i); + // TODO: Make it configurable if we should use this flag or not. + PlatformDependent.putByte(sqeFlagsAddress, (byte) Native.IOSQE_ASYNC); } - return sqe; } - private void setData(long sqe, byte op, int pollMask, int fd, long bufferAddress, int length, long offset) { - //Todo cleaner + private boolean enqueueSqe(int op, int rwFlags, int fd, long bufferAddress, int length, long offset) { + boolean submitted = false; + int pending = tail - head; + if (pending == ringEntries) { + submit(); + submitted = true; + } + long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE; + setData(sqe, op, rwFlags, fd, bufferAddress, length, offset); + return submitted; + } + + private void setData(long sqe, int op, int rwFlags, int fd, long bufferAddress, int length, long offset) { //set sqe(submission queue) properties - PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, op); - PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0); + + PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, (byte) op); + // These two constants are set up-front + //PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) Native.IOSQE_ASYNC); + //PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0); PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd); PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset); PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress); PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length); + PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, rwFlags); + long userData = convertToUserData(op, fd, rwFlags); + PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, userData); - //user_data should be same as POLL_LINK fd - if (op == Native.IORING_OP_POLL_REMOVE) { - PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1); - long uData = convertToUserData((byte) Native.IORING_OP_POLL_ADD, fd, pollMask); - PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData); - PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, convertToUserData(op, fd, 0)); - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0); - } else { - long uData = convertToUserData(op, fd, pollMask); - PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData); - //c union set Rw-Flags or accept_flags - if (op != Native.IORING_OP_ACCEPT) { - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask); - } else { - //accept_flags set NON_BLOCKING - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, Native.SOCK_NONBLOCK | Native.SOCK_CLOEXEC); - } - } - - // TODO: Make it configurable if we should use this flag or not. - PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) Native.IOSQE_ASYNC); - - // pad field array -> all fields should be zero - long offsetIndex = 0; - for (int i = 0; i < 3; i++) { - PlatformDependent.putLong(sqe + SQE_PAD_FIELD + offsetIndex, 0); - offsetIndex += 8; - } - - logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD)); - logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD)); - logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD)); - logger.trace("Offset: {}", PlatformDependent.getLong(sqe + SQE_OFFSET_FIELD)); + logger.trace("UserDataField: {}", userData); + logger.trace("BufferAddress: {}", bufferAddress); + logger.trace("Length: {}", length); + logger.trace("Offset: {}", offset); } public boolean addTimeout(long nanoSeconds) { - long sqe = 0; - boolean submitted = false; - while (sqe == 0) { - sqe = getSqe(); - - if (sqe == 0) { - submit(); - submitted = true; - } - } setTimeout(nanoSeconds); - setData(sqe, (byte) Native.IORING_OP_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0); - return submitted; + return enqueueSqe(Native.IORING_OP_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0); } public boolean addPollIn(int fd) { @@ -182,173 +151,54 @@ final class IOUringSubmissionQueue { } private boolean addPoll(int fd, int pollMask) { - long sqe = 0; - boolean submitted = false; - while (sqe == 0) { - sqe = getSqe(); - - if (sqe == 0) { - submit(); - submitted = true; - } - } - - setData(sqe, (byte) Native.IORING_OP_POLL_ADD, pollMask, fd, 0, 0, 0); - return submitted; + return enqueueSqe(Native.IORING_OP_POLL_ADD, pollMask, fd, 0, 0, 0); } //return true -> submit() was called public boolean addRead(int fd, long bufferAddress, int pos, int limit) { - long sqe = 0; - boolean submitted = false; - while (sqe == 0) { - sqe = getSqe(); - - if (sqe == 0) { - submit(); - submitted = true; - } - } - setData(sqe, (byte) Native.IORING_OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0); - return submitted; + return enqueueSqe(Native.IORING_OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0); } public boolean addWrite(int fd, long bufferAddress, int pos, int limit) { - long sqe = 0; - boolean submitted = false; - while (sqe == 0) { - sqe = getSqe(); - - if (sqe == 0) { - submit(); - submitted = true; - } - } - setData(sqe, (byte) Native.IORING_OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0); - return submitted; + return enqueueSqe(Native.IORING_OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0); } public boolean addAccept(int fd) { - long sqe = 0; - boolean submitted = false; - while (sqe == 0) { - sqe = getSqe(); - - if (sqe == 0) { - submit(); - submitted = true; - } - } - setData(sqe, (byte) Native.IORING_OP_ACCEPT, 0, fd, 0, 0, 0); - return submitted; + return enqueueSqe(Native.IORING_OP_ACCEPT, Native.SOCK_NONBLOCK | Native.SOCK_CLOEXEC, fd, 0, 0, 0); } //fill the address which is associated with server poll link user_data public boolean addPollRemove(int fd, int pollMask) { - long sqe = 0; - boolean submitted = false; - while (sqe == 0) { - sqe = getSqe(); - - if (sqe == 0) { - submit(); - submitted = true; - } - } - setData(sqe, (byte) Native.IORING_OP_POLL_REMOVE, pollMask, fd, 0, 0, 0); - - return submitted; + return enqueueSqe(Native.IORING_OP_POLL_REMOVE, 0, fd, + convertToUserData(Native.IORING_OP_POLL_ADD, fd, pollMask), 0, 0); } public boolean addConnect(int fd, long socketAddress, long socketAddressLength) { - long sqe = 0; - boolean submitted = false; - while (sqe == 0) { - sqe = getSqe(); - - if (sqe == 0) { - submit(); - submitted = true; - } - } - setData(sqe, (byte) Native.IORING_OP_CONNECT, 0, fd, socketAddress, 0, socketAddressLength); - - return submitted; + return enqueueSqe(Native.IORING_OP_CONNECT, 0, fd, socketAddress, 0, socketAddressLength); } public boolean addWritev(int fd, long iovecArrayAddress, int length) { - long sqe = 0; - boolean submitted = false; - while (sqe == 0) { - sqe = getSqe(); - - if (sqe == 0) { - submit(); - submitted = true; - } - } - setData(sqe, (byte) Native.IORING_OP_WRITEV, 0, fd, iovecArrayAddress, length, 0); - - return submitted; + return enqueueSqe(Native.IORING_OP_WRITEV, 0, fd, iovecArrayAddress, length, 0); } public boolean addClose(int fd) { - long sqe = 0; - boolean submitted = false; - while (sqe == 0) { - sqe = getSqe(); - - if (sqe == 0) { - submit(); - submitted = true; - } - } - setData(sqe, (byte) Native.IORING_OP_CLOSE, 0, fd, 0, 0, 0); - - return submitted; - } - - private int flushSqe() { - long kTail = toUnsignedLong(PlatformDependent.getInt(kTailAddress)); - long kHead = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); - long kRingMask = toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress)); - - logger.trace("Ktail: {}", kTail); - logger.trace("Ktail: {}", kHead); - logger.trace("SqeHead: {}", sqeHead); - logger.trace("SqeTail: {}", sqeTail); - - if (sqeHead == sqeTail) { - return (int) (kTail - kHead); - } - - long toSubmit = sqeTail - sqeHead; - while (toSubmit > 0) { - long index = kTail & kRingMask; - - PlatformDependent.putInt(arrayAddress + index * INT_SIZE, (int) (sqeHead & kRingMask)); - - sqeHead++; - kTail++; - toSubmit--; - } - - //release - PlatformDependent.putIntOrdered(kTailAddress, (int) kTail); - - return (int) (kTail - kHead); + return enqueueSqe(Native.IORING_OP_CLOSE, 0, fd, 0, 0, 0); } public void submit() { - int submitted = flushSqe(); - logger.trace("Submitted: {}", submitted); - if (submitted > 0) { - int ret = Native.ioUringEnter(ringFd, submitted, 0, 0); - if (ret < 0) { - throw new RuntimeException("ioUringEnter syscall"); + int submit = tail - head; + if (submit > 0) { + PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier + int ret = Native.ioUringEnter(ringFd, submit, 0, 0); + head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier + if (ret != submit) { + if (ret < 0) { + throw new RuntimeException("ioUringEnter syscall"); + } + logger.warn("Not all submissions succeeded"); } + submissionCallback.run(); } - submissionCallback.run(); } private void setTimeout(long timeoutNanoSeconds) { @@ -367,42 +217,22 @@ final class IOUringSubmissionQueue { PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds); } - private long convertToUserData(byte op, int fd, int pollMask) { - int opMask = (((short) op) << 16) | (((short) pollMask) & 0xFFFF); + private static long convertToUserData(int op, int fd, int pollMask) { + int opMask = op << 16 | (pollMask & 0xFFFF); return (long) fd << 32 | opMask & 0xFFFFFFFFL; } public long count() { - return sqeTail - toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress)); + return tail - head; } //delete memory public void release() { - Buffer.free(timeoutMemory); + PlatformDependent.freeMemory(timeoutMemoryAddress); } - public long getKHeadAddress() { - return this.kHeadAddress; - } - - public long getKTailAddress() { - return this.kTailAddress; - } - - public long getKRingMaskAddress() { - return this.kRingMaskAddress; - } - - public long getKRingEntriesAddress() { - return this.kRingEntriesAddress; - } - - public long getFFlagsAdress() { - return this.fFlagsAdress; - } - - public long getKDroppedAddress() { - return this.kDroppedAddress; + public int getRingEntries() { + return this.ringEntries; } public long getArrayAddress() { @@ -414,7 +244,7 @@ final class IOUringSubmissionQueue { } public int getRingFd() { - return ringFd; + return this.ringFd; } public int getRingSize() { @@ -425,9 +255,4 @@ final class IOUringSubmissionQueue { return this.ringAddress; } - //Todo Integer.toUnsignedLong -> maven checkstyle error - public static long toUnsignedLong(int x) { - return ((long) x) & 0xffffffffL; - } - }