From 8e86bf60a85587fe9e02ac6dddb5a29c5be1f55f Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 11 Sep 2020 08:34:57 +0200 Subject: [PATCH] Cleanup IOUringEventLoopGroup construction and allow to specify the ring (#10563) Motivation: We should allow to specify the ringsize when constructing the IOUringEventLoopGroup and also be constistent with the rest of the EventLoopGroup implementations Modifications: - Cleanup constructors - Make ringSize configurable Result: Cleaner code and more flexible in terms of configuration --- .../netty/channel/uring/IOUringEventLoop.java | 19 +++- .../channel/uring/IOUringEventLoopGroup.java | 104 +++++------------- .../java/io/netty/channel/uring/Native.java | 5 +- 3 files changed, 46 insertions(+), 82 deletions(-) diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index f8c397bec7..415d6adc3f 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -15,13 +15,14 @@ */ package io.netty.channel.uring; -import io.netty.channel.EventLoopGroup; +import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.unix.Errors; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.IovArray; import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectMap; +import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -55,15 +56,17 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements private long prevDeadlineNanos = NONE; private boolean pendingWakeup; - IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { - super(parent, executor, addTaskWakesUp); + IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, + RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) { + super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), + rejectedExecutionHandler); // Ensure that we load all native bits as otherwise it may fail when try to use native methods in IovArray IOUring.ensureAvailability(); // TODO: Let's hard code this to 8 IovArrays to keep the memory overhead kind of small. We may want to consider // allow to change this in the future. iovArrays = new IovArrays(8); - ringBuffer = Native.createRingBuffer(new Runnable() { + ringBuffer = Native.createRingBuffer(ringSize, new Runnable() { @Override public void run() { // Once we submitted its safe to clear the IovArrays and so be able to re-use these. @@ -75,6 +78,14 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements logger.trace("New EventLoop: {}", this.toString()); } + private static Queue newTaskQueue( + EventLoopTaskQueueFactory queueFactory) { + if (queueFactory == null) { + return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); + } + return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS); + } + @Override protected Queue newTaskQueue(int maxPendingTasks) { return newTaskQueue0(maxPendingTasks); diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java index 1b7d01ed7d..2597fdf33e 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoopGroup.java @@ -15,14 +15,15 @@ */ package io.netty.channel.uring; -import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.MultithreadEventLoopGroup; -import io.netty.channel.SelectStrategyFactory; +import io.netty.util.concurrent.DefaultEventExecutorChooserFactory; import io.netty.util.concurrent.EventExecutorChooserFactory; import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.concurrent.RejectedExecutionHandlers; +import io.netty.util.concurrent.ThreadPerTaskExecutor; +import io.netty.util.internal.ObjectUtil; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; @@ -46,105 +47,58 @@ public final class IOUringEventLoopGroup extends MultithreadEventLoopGroup { /** * Create a new instance using the default number of threads and the given {@link ThreadFactory}. */ - @SuppressWarnings("deprecation") public IOUringEventLoopGroup(ThreadFactory threadFactory) { this(0, threadFactory, 0); } - /** - * Create a new instance using the specified number of threads and the default {@link ThreadFactory}. - */ - @SuppressWarnings("deprecation") - public IOUringEventLoopGroup(int nThreads, SelectStrategyFactory selectStrategyFactory) { - this(nThreads, (ThreadFactory) null, selectStrategyFactory); - } - /** * Create a new instance using the specified number of threads and the given {@link ThreadFactory}. */ - @SuppressWarnings("deprecation") public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, 0); } public IOUringEventLoopGroup(int nThreads, Executor executor) { - this(nThreads, executor, DefaultSelectStrategyFactory.INSTANCE); - } - - /** - * Create a new instance using the specified number of threads and the given {@link ThreadFactory}. - */ - @SuppressWarnings("deprecation") - public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, - SelectStrategyFactory selectStrategyFactory) { - this(nThreads, threadFactory, 0, selectStrategyFactory); + this(nThreads, executor, 0); } /** * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given - * maximal amount of epoll events to handle per epollWait(...). - * - * @deprecated Use {@link #IOUringEventLoopGroup(int)} or {@link #IOUringEventLoopGroup(int, ThreadFactory)} + * maximal size of the used ringbuffer. */ - @Deprecated - public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) { - this(nThreads, threadFactory, maxEventsAtOnce, DefaultSelectStrategyFactory.INSTANCE); + public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int ringSize) { + this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), ringSize); } - /** - * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given - * maximal amount of epoll events to handle per epollWait(...). - * - * @deprecated Use {@link #IOUringEventLoopGroup(int)}, {@link #IOUringEventLoopGroup(int, ThreadFactory)}, or - * {@link #IOUringEventLoopGroup(int, SelectStrategyFactory)} - */ - @Deprecated - public IOUringEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce, - SelectStrategyFactory selectStrategyFactory) { - super(nThreads, threadFactory, maxEventsAtOnce, selectStrategyFactory, RejectedExecutionHandlers.reject()); + public IOUringEventLoopGroup(int nThreads, Executor executor, int ringsize) { + this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, + ringsize, RejectedExecutionHandlers.reject()); } - public IOUringEventLoopGroup(int nThreads, Executor executor, SelectStrategyFactory selectStrategyFactory) { - super(nThreads, executor, 0, selectStrategyFactory, RejectedExecutionHandlers.reject()); + private IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + int ringSize, RejectedExecutionHandler rejectedExecutionHandler) { + this(nThreads, executor, chooserFactory, ringSize, rejectedExecutionHandler, null); } - public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, - SelectStrategyFactory selectStrategyFactory) { - super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, RejectedExecutionHandlers.reject()); - } - - public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, - SelectStrategyFactory selectStrategyFactory, - RejectedExecutionHandler rejectedExecutionHandler) { - super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler); - } - - public IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, - SelectStrategyFactory selectStrategyFactory, - RejectedExecutionHandler rejectedExecutionHandler, - EventLoopTaskQueueFactory queueFactory) { - super(nThreads, executor, chooserFactory, 0, - selectStrategyFactory, rejectedExecutionHandler, queueFactory); - } - - /** - * @deprecated This method will be removed in future releases, and is not guaranteed to have any impacts. - */ - @Deprecated - public void setIoRatio(int ioRatio) { - if (ioRatio <= 0 || ioRatio > 100) { - throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); - } + private IOUringEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + int ringSize, RejectedExecutionHandler rejectedExecutionHandler, + EventLoopTaskQueueFactory queueFactory) { + super(nThreads, executor, chooserFactory, ringSize, rejectedExecutionHandler, queueFactory); } //Todo @Override - protected EventLoop newChild(Executor executor, Object... args) throws Exception { - //EventLoopTaskQueueFactory queueFactory = args.length == 4? (EventLoopTaskQueueFactory) args[3] : null; -// return new IOUringEventLoop(this, executor, (Integer) args[0], -// ((SelectStrategyFactory) args[1]).newSelectStrategy(), -// (RejectedExecutionHandler) args[2], queueFactory); - - return new IOUringEventLoop(this, executor, false); + protected EventLoop newChild(Executor executor, Object... args) { + if (args.length != 3) { + throw new IllegalArgumentException("Illegal amount of extra arguments"); + } + int ringSize = (Integer) args[0]; + ObjectUtil.checkPositiveOrZero(ringSize, "ringSize"); + if (ringSize == 0) { + ringSize = Native.DEFAULT_RING_SIZE; + } + RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[1]; + EventLoopTaskQueueFactory taskQueueFactory = (EventLoopTaskQueueFactory) args[2]; + return new IOUringEventLoop(this, executor, ringSize, rejectedExecutionHandler, taskQueueFactory); } } diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java index 5bcae81e8d..3072f59717 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/Native.java @@ -30,10 +30,9 @@ import java.util.Locale; final class Native { private static final InternalLogger logger = InternalLoggerFactory.getInstance(Native.class); - // Todo expose this via the EventLoopGroup constructor as well. - private static final int DEFAULT_RING_SIZE = SystemPropertyUtil.getInt("io.netty.uring.ringSize", 4096); + static final int DEFAULT_RING_SIZE = Math.max(64, SystemPropertyUtil.getInt("io.netty.uring.ringSize", 4096)); - static { + static { Selector selector = null; try { // We call Selector.open() as this will under the hood cause IOUtil to be loaded.