From 2abe20a6b5f53a5e8c508b6abb9eeed9562cb5d7 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Thu, 1 Jul 2021 11:51:10 +0300 Subject: [PATCH] Tail tasks queue: configure separately from tasks queue (#11400) Motivation: IO transports (primarily epoll, but also applies to kqueue, nio) cant be configured with separate tail tasks queue factory - instead single queue factory is used for both normal tasks and tail tasks. Modifications: Add constructor accepting tail EventLoopTaskQueueFactory to aforementioned transports Result: IO transports can be configured with separate tail tasks --- .../netty/channel/epoll/EpollEventLoop.java | 4 +- .../channel/epoll/EpollEventLoopGroup.java | 43 +++++++++++++++-- .../channel/epoll/EpollEventLoopTest.java | 5 +- .../netty/channel/kqueue/KQueueEventLoop.java | 4 +- .../channel/kqueue/KQueueEventLoopGroup.java | 42 +++++++++++++++-- .../io/netty/channel/nio/NioEventLoop.java | 4 +- .../netty/channel/nio/NioEventLoopGroup.java | 46 +++++++++++++++++-- 7 files changed, 128 insertions(+), 20 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index c9cb838ed8..c3b2f6016a 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -86,8 +86,8 @@ class EpollEventLoop extends SingleThreadEventLoop { EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, - EventLoopTaskQueueFactory queueFactory) { - super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), + EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { + super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler); selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); if (maxEvents == 0) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java index 80de7fe0fa..ee1467d914 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java @@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategyFactory; +import io.netty.channel.SingleThreadEventLoop; import io.netty.util.concurrent.EventExecutorChooserFactory; import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.concurrent.RejectedExecutionHandlers; @@ -134,6 +135,28 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup { super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler, queueFactory); } + /** + * @param nThreads the number of threads that will be used by this instance. + * @param executor the Executor to use, or {@code null} if default one should be used. + * @param chooserFactory the {@link EventExecutorChooserFactory} to use. + * @param selectStrategyFactory the {@link SelectStrategyFactory} to use. + * @param rejectedExecutionHandler the {@link RejectedExecutionHandler} to use. + * @param taskQueueFactory the {@link EventLoopTaskQueueFactory} to use for + * {@link SingleThreadEventLoop#execute(Runnable)}, + * or {@code null} if default one should be used. + * @param tailTaskQueueFactory the {@link EventLoopTaskQueueFactory} to use for + * {@link SingleThreadEventLoop#executeAfterEventLoopIteration(Runnable)}, + * or {@code null} if default one should be used. + */ + public EpollEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + SelectStrategyFactory selectStrategyFactory, + RejectedExecutionHandler rejectedExecutionHandler, + EventLoopTaskQueueFactory taskQueueFactory, + EventLoopTaskQueueFactory tailTaskQueueFactory) { + super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler, taskQueueFactory, + tailTaskQueueFactory); + } + /** * @deprecated This method will be removed in future releases, and is not guaranteed to have any impacts. */ @@ -146,9 +169,21 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup { @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { - EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; - return new EpollEventLoop(this, executor, (Integer) args[0], - ((SelectStrategyFactory) args[1]).newSelectStrategy(), - (RejectedExecutionHandler) args[2], queueFactory); + Integer maxEvents = (Integer) args[0]; + SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1]; + RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; + EventLoopTaskQueueFactory taskQueueFactory = null; + EventLoopTaskQueueFactory tailTaskQueueFactory = null; + + int argsLength = args.length; + if (argsLength > 3) { + taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; + } + if (argsLength > 4) { + tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4]; + } + return new EpollEventLoop(this, executor, maxEvents, + selectStrategyFactory.newSelectStrategy(), + rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java index 02e55fd6aa..f3d42ae3f2 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java @@ -15,13 +15,13 @@ */ package io.netty.channel.epoll; +import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest; import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.unix.FileDescriptor; -import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.RejectedExecutionHandlers; @@ -61,7 +61,8 @@ public class EpollEventLoopTest extends AbstractSingleThreadEventLoopTest { final EventLoopGroup group = new EpollEventLoop(null, new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass())), 0, - DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(), RejectedExecutionHandlers.reject(), null) { + DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(), RejectedExecutionHandlers.reject(), + null, null) { @Override void handleLoopException(Throwable t) { capture.set(t); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index 6791768a7d..cdd57b30ae 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -73,8 +73,8 @@ final class KQueueEventLoop extends SingleThreadEventLoop { KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, - EventLoopTaskQueueFactory queueFactory) { - super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), + EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { + super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); this.kqueueFd = Native.newKQueue(); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java index 1be9de7574..30c726da50 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java @@ -20,6 +20,7 @@ import io.netty.channel.EventLoop; import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategyFactory; +import io.netty.channel.SingleThreadEventLoop; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorChooserFactory; import io.netty.util.concurrent.RejectedExecutionHandler; @@ -133,6 +134,28 @@ public final class KQueueEventLoopGroup extends MultithreadEventLoopGroup { rejectedExecutionHandler, queueFactory); } + /** + * @param nThreads the number of threads that will be used by this instance. + * @param executor the Executor to use, or {@code null} if default one should be used. + * @param chooserFactory the {@link EventExecutorChooserFactory} to use. + * @param selectStrategyFactory the {@link SelectStrategyFactory} to use. + * @param rejectedExecutionHandler the {@link RejectedExecutionHandler} to use. + * @param taskQueueFactory the {@link EventLoopTaskQueueFactory} to use for + * {@link SingleThreadEventLoop#execute(Runnable)}, + * or {@code null} if default one should be used. + * @param tailTaskQueueFactory the {@link EventLoopTaskQueueFactory} to use for + * {@link SingleThreadEventLoop#executeAfterEventLoopIteration(Runnable)}, + * or {@code null} if default one should be used. + */ + public KQueueEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + SelectStrategyFactory selectStrategyFactory, + RejectedExecutionHandler rejectedExecutionHandler, + EventLoopTaskQueueFactory taskQueueFactory, + EventLoopTaskQueueFactory tailTaskQueueFactory) { + super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler, taskQueueFactory, + tailTaskQueueFactory); + } + /** * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. @@ -145,10 +168,21 @@ public final class KQueueEventLoopGroup extends MultithreadEventLoopGroup { @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { - EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; + Integer maxEvents = (Integer) args[0]; + SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1]; + RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; + EventLoopTaskQueueFactory taskQueueFactory = null; + EventLoopTaskQueueFactory tailTaskQueueFactory = null; - return new KQueueEventLoop(this, executor, (Integer) args[0], - ((SelectStrategyFactory) args[1]).newSelectStrategy(), - (RejectedExecutionHandler) args[2], queueFactory); + int argsLength = args.length; + if (argsLength > 3) { + taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; + } + if (argsLength > 4) { + tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4]; + } + return new KQueueEventLoop(this, executor, maxEvents, + selectStrategyFactory.newSelectStrategy(), + rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory); } } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index 5acdff64d5..719037d941 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -134,8 +134,8 @@ public final class NioEventLoop extends SingleThreadEventLoop { NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, - EventLoopTaskQueueFactory queueFactory) { - super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), + EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { + super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java index 89f7b40bf1..9854e9c03c 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java @@ -16,11 +16,12 @@ package io.netty.channel.nio; import io.netty.channel.Channel; -import io.netty.channel.EventLoop; import io.netty.channel.DefaultSelectStrategyFactory; +import io.netty.channel.EventLoop; import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategyFactory; +import io.netty.channel.SingleThreadEventLoop; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorChooserFactory; import io.netty.util.concurrent.RejectedExecutionHandler; @@ -119,6 +120,30 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { rejectedExecutionHandler, taskQueueFactory); } + /** + * @param nThreads the number of threads that will be used by this instance. + * @param executor the Executor to use, or {@code null} if default one should be used. + * @param chooserFactory the {@link EventExecutorChooserFactory} to use. + * @param selectorProvider the {@link SelectorProvider} to use. + * @param selectStrategyFactory the {@link SelectStrategyFactory} to use. + * @param rejectedExecutionHandler the {@link RejectedExecutionHandler} to use. + * @param taskQueueFactory the {@link EventLoopTaskQueueFactory} to use for + * {@link SingleThreadEventLoop#execute(Runnable)}, + * or {@code null} if default one should be used. + * @param tailTaskQueueFactory the {@link EventLoopTaskQueueFactory} to use for + * {@link SingleThreadEventLoop#executeAfterEventLoopIteration(Runnable)}, + * or {@code null} if default one should be used. + */ + public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + SelectorProvider selectorProvider, + SelectStrategyFactory selectStrategyFactory, + RejectedExecutionHandler rejectedExecutionHandler, + EventLoopTaskQueueFactory taskQueueFactory, + EventLoopTaskQueueFactory tailTaskQueueFactory) { + super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, + rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory); + } + /** * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. @@ -141,8 +166,21 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { - EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; - return new NioEventLoop(this, executor, (SelectorProvider) args[0], - ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); + SelectorProvider selectorProvider = (SelectorProvider) args[0]; + SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1]; + RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; + EventLoopTaskQueueFactory taskQueueFactory = null; + EventLoopTaskQueueFactory tailTaskQueueFactory = null; + + int argsLength = args.length; + if (argsLength > 3) { + taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; + } + if (argsLength > 4) { + tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4]; + } + return new NioEventLoop(this, executor, selectorProvider, + selectStrategyFactory.newSelectStrategy(), + rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory); } }