Simplify SQE handling (#10544)

Motivation

SQE handling can be simplified in terms of code and operations
performed

Modifications

- Zero SQE array up front - no need to set never-used fields each time
- Fill SQ array up front with corresponding indicies - no need to set
each time since they are 1-1 with SQE array entries
- Keep local head and tail vars and don't track separate sqe array
head/tail
- Allocate memory for timespec directly (no need for ByteBuffer)
- Avoid some unnecessary casts / type conversions (no need to convert
uints to longs)

Result

Fewer operations, less code
This commit is contained in:
Nick Hill 2020-09-10 04:25:34 -07:00 committed by GitHub
parent 2316c2ce45
commit 90674b4fce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 76 additions and 253 deletions

View File

@ -249,7 +249,7 @@ static void netty_io_uring_ring_buffer_exit(JNIEnv *env, jclass class, jobject r
jclass completionQueueClass = (*env)->GetObjectClass(env, completionQueue);
jmethodID submissionQueueArrayAddressMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getSubmissionQueueArrayAddress", "()J");
jmethodID submissionQueueKringEntriesAddressMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getKRingEntriesAddress", "()J");
jmethodID submissionQueueRingEntriesMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getRingEntries", "()I");
jmethodID submissionQueueRingFdMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getRingFd", "()I");
jmethodID submissionQueueRingAddressMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getRingAddress", "()J");
jmethodID submissionQueueRingSizeMethodId = (*env)->GetMethodID(env, submissionQueueClass, "getRingSize", "()I");
@ -258,7 +258,7 @@ static void netty_io_uring_ring_buffer_exit(JNIEnv *env, jclass class, jobject r
jmethodID completionQueueRingSizeMethodId = (*env)->GetMethodID(env, completionQueueClass, "getRingSize", "()I");
jlong submissionQueueArrayAddress = (*env)->CallLongMethod(env, submissionQueue, submissionQueueArrayAddressMethodId);
jlong submissionQueueKringEntriesAddress = (*env)->CallLongMethod(env, submissionQueue, submissionQueueKringEntriesAddressMethodId);
jint submissionQueueKringEntries = (*env)->CallIntMethod(env, submissionQueue, submissionQueueRingEntriesMethodId);
jint submissionQueueRingFd = (*env)->CallIntMethod(env, submissionQueue, submissionQueueRingFdMethodId);
jlong submissionQueueRingAddress = (*env)->CallLongMethod(env, submissionQueue, submissionQueueRingAddressMethodId);
jint submissionQueueRingSize = (*env)->CallIntMethod(env, submissionQueue, submissionQueueRingSizeMethodId);
@ -266,8 +266,6 @@ static void netty_io_uring_ring_buffer_exit(JNIEnv *env, jclass class, jobject r
jlong completionQueueRingAddress = (*env)->CallLongMethod(env, completionQueue, completionQueueRingAddressMethodId);
jint completionQueueRingSize = (*env)->CallIntMethod(env, completionQueue, completionQueueRingSizeMethodId);
unsigned submissionQueueKringEntries = *((unsigned int *) submissionQueueKringEntriesAddress);
munmap((struct io_uring_sqe*) submissionQueueArrayAddress, submissionQueueKringEntries * sizeof(struct io_uring_sqe));
munmap((void*) submissionQueueRingAddress, submissionQueueRingSize);
if (((void *) completionQueueRingAddress) && ((void *) completionQueueRingAddress) != ((void *) submissionQueueRingAddress)) {

View File

@ -15,20 +15,17 @@
*/
package io.netty.channel.uring;
import io.netty.channel.unix.Buffer;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer;
import static java.lang.Math.max;
import static java.lang.Math.min;
final class IOUringSubmissionQueue {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringSubmissionQueue.class);
private static final int SQE_SIZE = 64;
private static final long SQE_SIZE = 64;
private static final int INT_SIZE = Integer.BYTES; //no 32 Bit support?
private static final int KERNEL_TIMESPEC_SIZE = 16; //__kernel_timespec
@ -51,35 +48,30 @@ final class IOUringSubmissionQueue {
//these unsigned integer pointers(shared with the kernel) will be changed by the kernel
private final long kHeadAddress;
private final long kTailAddress;
private final long kRingMaskAddress;
private final long kRingEntriesAddress;
private final long fFlagsAdress;
private final long kDroppedAddress;
private final long arrayAddress;
private final long submissionQueueArrayAddress;
private long sqeHead;
private long sqeTail;
private final int ringEntries;
private final int ringMask; // = ringEntries - 1
private final int ringSize;
private final long ringAddress;
private final int ringFd;
private final Runnable submissionCallback;
private final ByteBuffer timeoutMemory;
private final long timeoutMemoryAddress;
//private int sqeSubmitCounter;
private int head;
private int tail;
IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
long fFlagsAdress, long kDroppedAddress, long arrayAddress,
long submissionQueueArrayAddress, int ringSize,
long ringAddress, int ringFd, Runnable submissionCallback) {
long fFlagsAdress, long kDroppedAddress, long arrayAddress, long submissionQueueArrayAddress,
int ringSize, long ringAddress, int ringFd, Runnable submissionCallback) {
this.kHeadAddress = kHeadAddress;
this.kTailAddress = kTailAddress;
this.kRingMaskAddress = kRingMaskAddress;
this.kRingEntriesAddress = kRingEntriesAddress;
this.fFlagsAdress = fFlagsAdress;
this.kDroppedAddress = kDroppedAddress;
this.arrayAddress = arrayAddress;
@ -88,85 +80,62 @@ final class IOUringSubmissionQueue {
this.ringAddress = ringAddress;
this.ringFd = ringFd;
this.submissionCallback = submissionCallback;
timeoutMemory = Buffer.allocateDirectWithNativeOrder(KERNEL_TIMESPEC_SIZE);
timeoutMemoryAddress = Buffer.memoryAddress(timeoutMemory);
}
this.ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress);
this.ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress);
this.head = PlatformDependent.getIntVolatile(kHeadAddress);
this.tail = PlatformDependent.getIntVolatile(kTailAddress);
public long getSqe() {
long next = sqeTail + 1;
long kRingEntries = toUnsignedLong(PlatformDependent.getInt(kRingEntriesAddress));
this.timeoutMemoryAddress = PlatformDependent.allocateMemory(KERNEL_TIMESPEC_SIZE);
//acquire memory barrier
long kHead = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));
// Zero the whole SQE array first
PlatformDependent.setMemory(submissionQueueArrayAddress, ringEntries * SQE_SIZE, (byte) 0);
long sqe = 0;
if ((next - kHead) <= kRingEntries) {
long index = sqeTail & toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress));
sqe = SQE_SIZE * index + submissionQueueArrayAddress;
sqeTail = next;
// Fill SQ array indices (1-1 with SQE array) and set nonzero constant SQE fields
long address = arrayAddress;
long sqeFlagsAddress = submissionQueueArrayAddress + SQE_FLAGS_FIELD;
for (int i = 0; i < ringEntries; i++, address += INT_SIZE, sqeFlagsAddress += SQE_SIZE) {
PlatformDependent.putInt(address, i);
// TODO: Make it configurable if we should use this flag or not.
PlatformDependent.putByte(sqeFlagsAddress, (byte) Native.IOSQE_ASYNC);
}
return sqe;
}
private void setData(long sqe, byte op, int pollMask, int fd, long bufferAddress, int length, long offset) {
//Todo cleaner
private boolean enqueueSqe(int op, int rwFlags, int fd, long bufferAddress, int length, long offset) {
boolean submitted = false;
int pending = tail - head;
if (pending == ringEntries) {
submit();
submitted = true;
}
long sqe = submissionQueueArrayAddress + (tail++ & ringMask) * SQE_SIZE;
setData(sqe, op, rwFlags, fd, bufferAddress, length, offset);
return submitted;
}
private void setData(long sqe, int op, int rwFlags, int fd, long bufferAddress, int length, long offset) {
//set sqe(submission queue) properties
PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, op);
PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0);
PlatformDependent.putByte(sqe + SQE_OP_CODE_FIELD, (byte) op);
// These two constants are set up-front
//PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) Native.IOSQE_ASYNC);
//PlatformDependent.putShort(sqe + SQE_IOPRIO_FIELD, (short) 0);
PlatformDependent.putInt(sqe + SQE_FD_FIELD, fd);
PlatformDependent.putLong(sqe + SQE_OFFSET_FIELD, offset);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, bufferAddress);
PlatformDependent.putInt(sqe + SQE_LEN_FIELD, length);
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, rwFlags);
long userData = convertToUserData(op, fd, rwFlags);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, userData);
//user_data should be same as POLL_LINK fd
if (op == Native.IORING_OP_POLL_REMOVE) {
PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1);
long uData = convertToUserData((byte) Native.IORING_OP_POLL_ADD, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, convertToUserData(op, fd, 0));
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
} else {
long uData = convertToUserData(op, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData);
//c union set Rw-Flags or accept_flags
if (op != Native.IORING_OP_ACCEPT) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask);
} else {
//accept_flags set NON_BLOCKING
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, Native.SOCK_NONBLOCK | Native.SOCK_CLOEXEC);
}
}
// TODO: Make it configurable if we should use this flag or not.
PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) Native.IOSQE_ASYNC);
// pad field array -> all fields should be zero
long offsetIndex = 0;
for (int i = 0; i < 3; i++) {
PlatformDependent.putLong(sqe + SQE_PAD_FIELD + offsetIndex, 0);
offsetIndex += 8;
}
logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD));
logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD));
logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD));
logger.trace("Offset: {}", PlatformDependent.getLong(sqe + SQE_OFFSET_FIELD));
logger.trace("UserDataField: {}", userData);
logger.trace("BufferAddress: {}", bufferAddress);
logger.trace("Length: {}", length);
logger.trace("Offset: {}", offset);
}
public boolean addTimeout(long nanoSeconds) {
long sqe = 0;
boolean submitted = false;
while (sqe == 0) {
sqe = getSqe();
if (sqe == 0) {
submit();
submitted = true;
}
}
setTimeout(nanoSeconds);
setData(sqe, (byte) Native.IORING_OP_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0);
return submitted;
return enqueueSqe(Native.IORING_OP_TIMEOUT, 0, -1, timeoutMemoryAddress, 1, 0);
}
public boolean addPollIn(int fd) {
@ -182,173 +151,54 @@ final class IOUringSubmissionQueue {
}
private boolean addPoll(int fd, int pollMask) {
long sqe = 0;
boolean submitted = false;
while (sqe == 0) {
sqe = getSqe();
if (sqe == 0) {
submit();
submitted = true;
}
}
setData(sqe, (byte) Native.IORING_OP_POLL_ADD, pollMask, fd, 0, 0, 0);
return submitted;
return enqueueSqe(Native.IORING_OP_POLL_ADD, pollMask, fd, 0, 0, 0);
}
//return true -> submit() was called
public boolean addRead(int fd, long bufferAddress, int pos, int limit) {
long sqe = 0;
boolean submitted = false;
while (sqe == 0) {
sqe = getSqe();
if (sqe == 0) {
submit();
submitted = true;
}
}
setData(sqe, (byte) Native.IORING_OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0);
return submitted;
return enqueueSqe(Native.IORING_OP_READ, 0, fd, bufferAddress + pos, limit - pos, 0);
}
public boolean addWrite(int fd, long bufferAddress, int pos, int limit) {
long sqe = 0;
boolean submitted = false;
while (sqe == 0) {
sqe = getSqe();
if (sqe == 0) {
submit();
submitted = true;
}
}
setData(sqe, (byte) Native.IORING_OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0);
return submitted;
return enqueueSqe(Native.IORING_OP_WRITE, 0, fd, bufferAddress + pos, limit - pos, 0);
}
public boolean addAccept(int fd) {
long sqe = 0;
boolean submitted = false;
while (sqe == 0) {
sqe = getSqe();
if (sqe == 0) {
submit();
submitted = true;
}
}
setData(sqe, (byte) Native.IORING_OP_ACCEPT, 0, fd, 0, 0, 0);
return submitted;
return enqueueSqe(Native.IORING_OP_ACCEPT, Native.SOCK_NONBLOCK | Native.SOCK_CLOEXEC, fd, 0, 0, 0);
}
//fill the address which is associated with server poll link user_data
public boolean addPollRemove(int fd, int pollMask) {
long sqe = 0;
boolean submitted = false;
while (sqe == 0) {
sqe = getSqe();
if (sqe == 0) {
submit();
submitted = true;
}
}
setData(sqe, (byte) Native.IORING_OP_POLL_REMOVE, pollMask, fd, 0, 0, 0);
return submitted;
return enqueueSqe(Native.IORING_OP_POLL_REMOVE, 0, fd,
convertToUserData(Native.IORING_OP_POLL_ADD, fd, pollMask), 0, 0);
}
public boolean addConnect(int fd, long socketAddress, long socketAddressLength) {
long sqe = 0;
boolean submitted = false;
while (sqe == 0) {
sqe = getSqe();
if (sqe == 0) {
submit();
submitted = true;
}
}
setData(sqe, (byte) Native.IORING_OP_CONNECT, 0, fd, socketAddress, 0, socketAddressLength);
return submitted;
return enqueueSqe(Native.IORING_OP_CONNECT, 0, fd, socketAddress, 0, socketAddressLength);
}
public boolean addWritev(int fd, long iovecArrayAddress, int length) {
long sqe = 0;
boolean submitted = false;
while (sqe == 0) {
sqe = getSqe();
if (sqe == 0) {
submit();
submitted = true;
}
}
setData(sqe, (byte) Native.IORING_OP_WRITEV, 0, fd, iovecArrayAddress, length, 0);
return submitted;
return enqueueSqe(Native.IORING_OP_WRITEV, 0, fd, iovecArrayAddress, length, 0);
}
public boolean addClose(int fd) {
long sqe = 0;
boolean submitted = false;
while (sqe == 0) {
sqe = getSqe();
if (sqe == 0) {
submit();
submitted = true;
}
}
setData(sqe, (byte) Native.IORING_OP_CLOSE, 0, fd, 0, 0, 0);
return submitted;
}
private int flushSqe() {
long kTail = toUnsignedLong(PlatformDependent.getInt(kTailAddress));
long kHead = toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));
long kRingMask = toUnsignedLong(PlatformDependent.getInt(kRingMaskAddress));
logger.trace("Ktail: {}", kTail);
logger.trace("Ktail: {}", kHead);
logger.trace("SqeHead: {}", sqeHead);
logger.trace("SqeTail: {}", sqeTail);
if (sqeHead == sqeTail) {
return (int) (kTail - kHead);
}
long toSubmit = sqeTail - sqeHead;
while (toSubmit > 0) {
long index = kTail & kRingMask;
PlatformDependent.putInt(arrayAddress + index * INT_SIZE, (int) (sqeHead & kRingMask));
sqeHead++;
kTail++;
toSubmit--;
}
//release
PlatformDependent.putIntOrdered(kTailAddress, (int) kTail);
return (int) (kTail - kHead);
return enqueueSqe(Native.IORING_OP_CLOSE, 0, fd, 0, 0, 0);
}
public void submit() {
int submitted = flushSqe();
logger.trace("Submitted: {}", submitted);
if (submitted > 0) {
int ret = Native.ioUringEnter(ringFd, submitted, 0, 0);
if (ret < 0) {
throw new RuntimeException("ioUringEnter syscall");
int submit = tail - head;
if (submit > 0) {
PlatformDependent.putIntOrdered(kTailAddress, tail); // release memory barrier
int ret = Native.ioUringEnter(ringFd, submit, 0, 0);
head = PlatformDependent.getIntVolatile(kHeadAddress); // acquire memory barrier
if (ret != submit) {
if (ret < 0) {
throw new RuntimeException("ioUringEnter syscall");
}
logger.warn("Not all submissions succeeded");
}
submissionCallback.run();
}
submissionCallback.run();
}
private void setTimeout(long timeoutNanoSeconds) {
@ -367,42 +217,22 @@ final class IOUringSubmissionQueue {
PlatformDependent.putLong(timeoutMemoryAddress + KERNEL_TIMESPEC_TV_NSEC_FIELD, nanoSeconds);
}
private long convertToUserData(byte op, int fd, int pollMask) {
int opMask = (((short) op) << 16) | (((short) pollMask) & 0xFFFF);
private static long convertToUserData(int op, int fd, int pollMask) {
int opMask = op << 16 | (pollMask & 0xFFFF);
return (long) fd << 32 | opMask & 0xFFFFFFFFL;
}
public long count() {
return sqeTail - toUnsignedLong(PlatformDependent.getIntVolatile(kHeadAddress));
return tail - head;
}
//delete memory
public void release() {
Buffer.free(timeoutMemory);
PlatformDependent.freeMemory(timeoutMemoryAddress);
}
public long getKHeadAddress() {
return this.kHeadAddress;
}
public long getKTailAddress() {
return this.kTailAddress;
}
public long getKRingMaskAddress() {
return this.kRingMaskAddress;
}
public long getKRingEntriesAddress() {
return this.kRingEntriesAddress;
}
public long getFFlagsAdress() {
return this.fFlagsAdress;
}
public long getKDroppedAddress() {
return this.kDroppedAddress;
public int getRingEntries() {
return this.ringEntries;
}
public long getArrayAddress() {
@ -414,7 +244,7 @@ final class IOUringSubmissionQueue {
}
public int getRingFd() {
return ringFd;
return this.ringFd;
}
public int getRingSize() {
@ -425,9 +255,4 @@ final class IOUringSubmissionQueue {
return this.ringAddress;
}
//Todo Integer.toUnsignedLong -> maven checkstyle error
public static long toUnsignedLong(int x) {
return ((long) x) & 0xffffffffL;
}
}