From d2219f089e0264b2cd89e94e441e9f98ceb47108 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 15 Sep 2020 16:47:20 +0200 Subject: [PATCH] Allow to configure if IOSEQ_ASYNC is used per EventLoopGroup (#10576) Motivation: There may be situations when the user dont want to use IOSEQ_ASYNC so we should allow to configure this Modifications: Make it configurable if IOSEQ_ASYNC should be used Result: More flexible configuration --- .../netty/channel/uring/IOUringEventLoop.java | 4 +- .../channel/uring/IOUringEventLoopGroup.java | 53 +++++++++++-------- .../channel/uring/IOUringSubmissionQueue.java | 7 +-- .../java/io/netty/channel/uring/Native.java | 8 +-- 4 files changed, 43 insertions(+), 29 deletions(-) diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 9bf0cfb121..459fd328d5 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -56,7 +56,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements private long prevDeadlineNanos = NONE; private boolean pendingWakeup; - IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, + IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, boolean ioseqAsync, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); @@ -66,7 +66,7 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements // TODO: Let's hard code this to 8 IovArrays to keep the memory overhead kind of small. We may want to consider // allow to change this in the future. iovArrays = new IovArrays(8); - ringBuffer = Native.createRingBuffer(ringSize, new Runnable() { + ringBuffer = Native.createRingBuffer(ringSize, ioseqAsync, new Runnable() { @Override public void run() { // Once we submitted its safe to clear the IovArrays and so be able to re-use these. diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java index 2597fdf33e..aad9d65190 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java @@ -48,48 +48,58 @@ public final class IOUringEventLoopGroup extends MultithreadEventLoopGroup { * Create a new instance using the default number of threads and the given {@link ThreadFactory}. */ public IOUringEventLoopGroup(ThreadFactory threadFactory) { - this(0, threadFactory, 0); + this(0, threadFactory, 0, Native.DEFAULT_USE_IOSEQ_ASYNC); } /** * Create a new instance using the specified number of threads and the given {@link ThreadFactory}. */ public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory) { - this(nThreads, threadFactory, 0); - } - - public IOUringEventLoopGroup(int nThreads, Executor executor) { - this(nThreads, executor, 0); + this(nThreads, threadFactory, 0, Native.DEFAULT_USE_IOSEQ_ASYNC); } /** - * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given - * maximal size of the used ringbuffer. + * Create a new instance using the specified number of threads and the given {@link Executor}. */ - public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int ringSize) { - this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), ringSize); + public IOUringEventLoopGroup(int nThreads, Executor executor) { + this(nThreads, executor, 0, Native.DEFAULT_USE_IOSEQ_ASYNC); } - public IOUringEventLoopGroup(int nThreads, Executor executor, int ringsize) { + /** + * Create a new instance using the specified number of threads, the given {@link ThreadFactory}, the given + * size of the used ringbuffer and + * if IOSEQ_ASYNC should be + * used for IO operations. + */ + public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int ringSize, boolean iosqeAsync) { + this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), ringSize, iosqeAsync); + } + + /** + * Create a new instance using the specified number of threads, the given {@link Executor}, the given + * size of the used ringbuffer and + * if IOSEQ_ASYNC should be + * used for IO operations. + */ + public IOUringEventLoopGroup(int nThreads, Executor executor, int ringsize, boolean iosqeAsync) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, - ringsize, RejectedExecutionHandlers.reject()); + ringsize, iosqeAsync, RejectedExecutionHandlers.reject()); } private IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, - int ringSize, RejectedExecutionHandler rejectedExecutionHandler) { - this(nThreads, executor, chooserFactory, ringSize, rejectedExecutionHandler, null); + int ringSize, boolean iosqeAsync, RejectedExecutionHandler rejectedExecutionHandler) { + this(nThreads, executor, chooserFactory, ringSize, iosqeAsync, rejectedExecutionHandler, null); } private IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, - int ringSize, RejectedExecutionHandler rejectedExecutionHandler, + int ringSize, boolean iosqeAsync, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { - super(nThreads, executor, chooserFactory, ringSize, rejectedExecutionHandler, queueFactory); + super(nThreads, executor, chooserFactory, ringSize, iosqeAsync, rejectedExecutionHandler, queueFactory); } - //Todo @Override protected EventLoop newChild(Executor executor, Object... args) { - if (args.length != 3) { + if (args.length != 4) { throw new IllegalArgumentException("Illegal amount of extra arguments"); } int ringSize = (Integer) args[0]; @@ -97,8 +107,9 @@ public final class IOUringEventLoopGroup extends MultithreadEventLoopGroup { if (ringSize == 0) { ringSize = Native.DEFAULT_RING_SIZE; } - RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[1]; - EventLoopTaskQueueFactory taskQueueFactory = (EventLoopTaskQueueFactory) args[2]; - return new IOUringEventLoop(this, executor, ringSize, rejectedExecutionHandler, taskQueueFactory); + boolean iosqeAsync = (Boolean) args[1]; + RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; + EventLoopTaskQueueFactory taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; + return new IOUringEventLoop(this, executor, ringSize, iosqeAsync, rejectedExecutionHandler, taskQueueFactory); } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index fa683f1116..a2d5be7001 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -67,7 +67,8 @@ final class IOUringSubmissionQueue { IOUringSubmissionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress, long fFlagsAdress, long kDroppedAddress, long arrayAddress, long submissionQueueArrayAddress, - int ringSize, long ringAddress, int ringFd, Runnable submissionCallback) { + int ringSize, long ringAddress, int ringFd, + boolean iosqeAsync, Runnable submissionCallback) { this.kHeadAddress = kHeadAddress; this.kTailAddress = kTailAddress; this.fFlagsAdress = fFlagsAdress; @@ -91,10 +92,10 @@ final class IOUringSubmissionQueue { // Fill SQ array indices (1-1 with SQE array) and set nonzero constant SQE fields long address = arrayAddress; long sqeFlagsAddress = submissionQueueArrayAddress + SQE_FLAGS_FIELD; + byte flag = iosqeAsync ? (byte) Native.IOSQE_ASYNC : 0; for (int i = 0; i < ringEntries; i++, address += INT_SIZE, sqeFlagsAddress += SQE_SIZE) { PlatformDependent.putInt(address, i); - // TODO: Make it configurable if we should use this flag or not. - PlatformDependent.putByte(sqeFlagsAddress, (byte) Native.IOSQE_ASYNC); + PlatformDependent.putByte(sqeFlagsAddress, flag); } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java index a443beca65..9c7bda4844 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java @@ -31,6 +31,7 @@ import java.util.Locale; final class Native { private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class); static final int DEFAULT_RING_SIZE = Math.max(64, SystemPropertyUtil.getInt("io.netty.uring.ringSize", 4096)); + static final boolean DEFAULT_USE_IOSEQ_ASYNC = true; static { Selector selector = null; @@ -82,7 +83,7 @@ final class Native { static final int IOSQE_ASYNC = NativeStaticallyReferencedJniMethods.iosqeAsync(); static RingBuffer createRingBuffer(int ringSize) { - return createRingBuffer(ringSize, new Runnable() { + return createRingBuffer(ringSize, DEFAULT_USE_IOSEQ_ASYNC, new Runnable() { @Override public void run() { // Noop @@ -90,7 +91,7 @@ final class Native { }); } - static RingBuffer createRingBuffer(int ringSize, Runnable submissionCallback) { + static RingBuffer createRingBuffer(int ringSize, boolean iosqeAsync, Runnable submissionCallback) { long[][] values = ioUringSetup(ringSize); assert values.length == 2; long[] submissionQueueArgs = values[0]; @@ -107,6 +108,7 @@ final class Native { (int) submissionQueueArgs[8], submissionQueueArgs[9], (int) submissionQueueArgs[10], + iosqeAsync, submissionCallback); long[] completionQueueArgs = values[1]; assert completionQueueArgs.length == 9; @@ -124,7 +126,7 @@ final class Native { } static RingBuffer createRingBuffer(Runnable submissionCallback) { - return createRingBuffer(DEFAULT_RING_SIZE, submissionCallback); + return createRingBuffer(DEFAULT_RING_SIZE, DEFAULT_USE_IOSEQ_ASYNC, submissionCallback); } private static native long[][] ioUringSetup(int entries);