Allow to configure if IOSEQ_ASYNC is used per EventLoopGroup (#10576)

Motivation:

There may be situations when the user dont want to use IOSEQ_ASYNC so we
should allow to configure this

Modifications:

Make it configurable if IOSEQ_ASYNC should be used

Result:

More flexible configuration
This commit is contained in:
Norman Maurer 2020-09-15 16:47:20 +02:00 committed by GitHub
parent 0ef8fa47e5
commit d2219f089e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 29 deletions

View File

@ -56,7 +56,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
private long prevDeadlineNanos = NONE; private long prevDeadlineNanos = NONE;
private boolean pendingWakeup; private boolean pendingWakeup;
IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, boolean ioseqAsync,
RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler); rejectedExecutionHandler);
@ -66,7 +66,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
// TODO: Let's hard code this to 8 IovArrays to keep the memory overhead kind of small. We may want to consider // TODO: Let's hard code this to 8 IovArrays to keep the memory overhead kind of small. We may want to consider
// allow to change this in the future. // allow to change this in the future.
iovArrays = new IovArrays(8); iovArrays = new IovArrays(8);
ringBuffer = Native.createRingBuffer(ringSize, new Runnable() { ringBuffer = Native.createRingBuffer(ringSize, ioseqAsync, new Runnable() {
@Override @Override
public void run() { public void run() {
// Once we submitted its safe to clear the IovArrays and so be able to re-use these. // Once we submitted its safe to clear the IovArrays and so be able to re-use these.

View File

@ -48,48 +48,58 @@ public final class IOUringEventLoopGroup extends MultithreadEventLoopGroup {
* Create a new instance using the default number of threads and the given {@link ThreadFactory}. * Create a new instance using the default number of threads and the given {@link ThreadFactory}.
*/ */
public IOUringEventLoopGroup(ThreadFactory threadFactory) { public IOUringEventLoopGroup(ThreadFactory threadFactory) {
this(0, threadFactory, 0); this(0, threadFactory, 0, Native.DEFAULT_USE_IOSEQ_ASYNC);
} }
/** /**
* Create a new instance using the specified number of threads and the given {@link ThreadFactory}. * Create a new instance using the specified number of threads and the given {@link ThreadFactory}.
*/ */
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory) { public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, 0); this(nThreads, threadFactory, 0, Native.DEFAULT_USE_IOSEQ_ASYNC);
}
public IOUringEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, 0);
} }
/** /**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given * Create a new instance using the specified number of threads and the given {@link Executor}.
* maximal size of the used ringbuffer.
*/ */
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int ringSize) { public IOUringEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), ringSize); this(nThreads, executor, 0, Native.DEFAULT_USE_IOSEQ_ASYNC);
} }
public IOUringEventLoopGroup(int nThreads, Executor executor, int ringsize) { /**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory}, the given
* size of the used ringbuffer and
* if <a href=https://manpages.debian.org/unstable/liburing-dev/io_uring_enter.2.en.html>IOSEQ_ASYNC</a> should be
* used for IO operations.
*/
public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int ringSize, boolean iosqeAsync) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), ringSize, iosqeAsync);
}
/**
* Create a new instance using the specified number of threads, the given {@link Executor}, the given
* size of the used ringbuffer and
* if <a href=https://manpages.debian.org/unstable/liburing-dev/io_uring_enter.2.en.html>IOSEQ_ASYNC</a> should be
* used for IO operations.
*/
public IOUringEventLoopGroup(int nThreads, Executor executor, int ringsize, boolean iosqeAsync) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE,
ringsize, RejectedExecutionHandlers.reject()); ringsize, iosqeAsync, RejectedExecutionHandlers.reject());
} }
private IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, private IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
int ringSize, RejectedExecutionHandler rejectedExecutionHandler) { int ringSize, boolean iosqeAsync, RejectedExecutionHandler rejectedExecutionHandler) {
this(nThreads, executor, chooserFactory, ringSize, rejectedExecutionHandler, null); this(nThreads, executor, chooserFactory, ringSize, iosqeAsync, rejectedExecutionHandler, null);
} }
private IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, private IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
int ringSize, RejectedExecutionHandler rejectedExecutionHandler, int ringSize, boolean iosqeAsync, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) { EventLoopTaskQueueFactory queueFactory) {
super(nThreads, executor, chooserFactory, ringSize, rejectedExecutionHandler, queueFactory); super(nThreads, executor, chooserFactory, ringSize, iosqeAsync, rejectedExecutionHandler, queueFactory);
} }
//Todo
@Override @Override
protected EventLoop newChild(Executor executor, Object... args) { protected EventLoop newChild(Executor executor, Object... args) {
if (args.length != 3) { if (args.length != 4) {
throw new IllegalArgumentException("Illegal amount of extra arguments"); throw new IllegalArgumentException("Illegal amount of extra arguments");
} }
int ringSize = (Integer) args[0]; int ringSize = (Integer) args[0];
@ -97,8 +107,9 @@ public final class IOUringEventLoopGroup extends MultithreadEventLoopGroup {
if (ringSize == 0) { if (ringSize == 0) {
ringSize = Native.DEFAULT_RING_SIZE; ringSize = Native.DEFAULT_RING_SIZE;
} }
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[1]; boolean iosqeAsync = (Boolean) args[1];
EventLoopTaskQueueFactory taskQueueFactory = (EventLoopTaskQueueFactory) args[2]; RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
return new IOUringEventLoop(this, executor, ringSize, rejectedExecutionHandler, taskQueueFactory); EventLoopTaskQueueFactory taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
return new IOUringEventLoop(this, executor, ringSize, iosqeAsync, rejectedExecutionHandler, taskQueueFactory);
} }
} }

View File

@ -67,7 +67,8 @@ final class IOUringSubmissionQueue {
IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress, IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
long fFlagsAdress, long kDroppedAddress, long arrayAddress, long submissionQueueArrayAddress, long fFlagsAdress, long kDroppedAddress, long arrayAddress, long submissionQueueArrayAddress,
int ringSize, long ringAddress, int ringFd, Runnable submissionCallback) { int ringSize, long ringAddress, int ringFd,
boolean iosqeAsync, Runnable submissionCallback) {
this.kHeadAddress = kHeadAddress; this.kHeadAddress = kHeadAddress;
this.kTailAddress = kTailAddress; this.kTailAddress = kTailAddress;
this.fFlagsAdress = fFlagsAdress; this.fFlagsAdress = fFlagsAdress;
@ -91,10 +92,10 @@ final class IOUringSubmissionQueue {
// Fill SQ array indices (1-1 with SQE array) and set nonzero constant SQE fields // Fill SQ array indices (1-1 with SQE array) and set nonzero constant SQE fields
long address = arrayAddress; long address = arrayAddress;
long sqeFlagsAddress = submissionQueueArrayAddress + SQE_FLAGS_FIELD; long sqeFlagsAddress = submissionQueueArrayAddress + SQE_FLAGS_FIELD;
byte flag = iosqeAsync ? (byte) Native.IOSQE_ASYNC : 0;
for (int i = 0; i < ringEntries; i++, address += INT_SIZE, sqeFlagsAddress += SQE_SIZE) { for (int i = 0; i < ringEntries; i++, address += INT_SIZE, sqeFlagsAddress += SQE_SIZE) {
PlatformDependent.putInt(address, i); PlatformDependent.putInt(address, i);
// TODO: Make it configurable if we should use this flag or not. PlatformDependent.putByte(sqeFlagsAddress, flag);
PlatformDependent.putByte(sqeFlagsAddress, (byte) Native.IOSQE_ASYNC);
} }
} }

View File

@ -31,6 +31,7 @@ import java.util.Locale;
final class Native { final class Native {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class);
static final int DEFAULT_RING_SIZE = Math.max(64, SystemPropertyUtil.getInt("io.netty.uring.ringSize", 4096)); static final int DEFAULT_RING_SIZE = Math.max(64, SystemPropertyUtil.getInt("io.netty.uring.ringSize", 4096));
static final boolean DEFAULT_USE_IOSEQ_ASYNC = true;
static { static {
Selector selector = null; Selector selector = null;
@ -82,7 +83,7 @@ final class Native {
static final int IOSQE_ASYNC = NativeStaticallyReferencedJniMethods.iosqeAsync(); static final int IOSQE_ASYNC = NativeStaticallyReferencedJniMethods.iosqeAsync();
static RingBuffer createRingBuffer(int ringSize) { static RingBuffer createRingBuffer(int ringSize) {
return createRingBuffer(ringSize, new Runnable() { return createRingBuffer(ringSize, DEFAULT_USE_IOSEQ_ASYNC, new Runnable() {
@Override @Override
public void run() { public void run() {
// Noop // Noop
@ -90,7 +91,7 @@ final class Native {
}); });
} }
static RingBuffer createRingBuffer(int ringSize, Runnable submissionCallback) { static RingBuffer createRingBuffer(int ringSize, boolean iosqeAsync, Runnable submissionCallback) {
long[][] values = ioUringSetup(ringSize); long[][] values = ioUringSetup(ringSize);
assert values.length == 2; assert values.length == 2;
long[] submissionQueueArgs = values[0]; long[] submissionQueueArgs = values[0];
@ -107,6 +108,7 @@ final class Native {
(int) submissionQueueArgs[8], (int) submissionQueueArgs[8],
submissionQueueArgs[9], submissionQueueArgs[9],
(int) submissionQueueArgs[10], (int) submissionQueueArgs[10],
iosqeAsync,
submissionCallback); submissionCallback);
long[] completionQueueArgs = values[1]; long[] completionQueueArgs = values[1];
assert completionQueueArgs.length == 9; assert completionQueueArgs.length == 9;
@ -124,7 +126,7 @@ final class Native {
} }
static RingBuffer createRingBuffer(Runnable submissionCallback) { static RingBuffer createRingBuffer(Runnable submissionCallback) {
return createRingBuffer(DEFAULT_RING_SIZE, submissionCallback); return createRingBuffer(DEFAULT_RING_SIZE, DEFAULT_USE_IOSEQ_ASYNC, submissionCallback);
} }
private static native long[][] ioUringSetup(int entries); private static native long[][] ioUringSetup(int entries);