Tail tasks queue: configure separately from tasks queue (#11400)

Motivation:

IO transports (primarily epoll, but also applies to kqueue, nio) cant be configured with separate tail tasks queue factory -
instead single queue factory is used for both normal tasks and tail tasks.

Modifications:

Add constructor accepting tail EventLoopTaskQueueFactory to aforementioned transports

Result:

IO transports can be configured with separate tail tasks
This commit is contained in:
Maksym Ostroverkhov 2021-07-01 11:51:10 +03:00 committed by GitHub
parent 20e4ccbd33
commit 2abe20a6b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 128 additions and 20 deletions

View File

@ -86,8 +86,8 @@ class EpollEventLoop extends SingleThreadEventLoop {
EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) { EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler); rejectedExecutionHandler);
selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
if (maxEvents == 0) { if (maxEvents == 0) {

View File

@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory; import io.netty.channel.SelectStrategyFactory;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.EventExecutorChooserFactory; import io.netty.util.concurrent.EventExecutorChooserFactory;
import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.concurrent.RejectedExecutionHandlers; import io.netty.util.concurrent.RejectedExecutionHandlers;
@ -134,6 +135,28 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler, queueFactory); super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler, queueFactory);
} }
/**
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if default one should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param selectStrategyFactory the {@link SelectStrategyFactory} to use.
* @param rejectedExecutionHandler the {@link RejectedExecutionHandler} to use.
* @param taskQueueFactory the {@link EventLoopTaskQueueFactory} to use for
* {@link SingleThreadEventLoop#execute(Runnable)},
* or {@code null} if default one should be used.
* @param tailTaskQueueFactory the {@link EventLoopTaskQueueFactory} to use for
* {@link SingleThreadEventLoop#executeAfterEventLoopIteration(Runnable)},
* or {@code null} if default one should be used.
*/
public EpollEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
SelectStrategyFactory selectStrategyFactory,
RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory,
EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler, taskQueueFactory,
tailTaskQueueFactory);
}
/** /**
* @deprecated This method will be removed in future releases, and is not guaranteed to have any impacts. * @deprecated This method will be removed in future releases, and is not guaranteed to have any impacts.
*/ */
@ -146,9 +169,21 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
@Override @Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception { protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; Integer maxEvents = (Integer) args[0];
return new EpollEventLoop(this, executor, (Integer) args[0], SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
((SelectStrategyFactory) args[1]).newSelectStrategy(), RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
(RejectedExecutionHandler) args[2], queueFactory); EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new EpollEventLoop(this, executor, maxEvents,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
} }
} }

View File

@ -15,13 +15,13 @@
*/ */
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest;
import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel; import io.netty.channel.ServerChannel;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.FileDescriptor;
import io.netty.testsuite.transport.AbstractSingleThreadEventLoopTest;
import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.RejectedExecutionHandlers; import io.netty.util.concurrent.RejectedExecutionHandlers;
@ -61,7 +61,8 @@ public class EpollEventLoopTest extends AbstractSingleThreadEventLoopTest {
final EventLoopGroup group = new EpollEventLoop(null, final EventLoopGroup group = new EpollEventLoop(null,
new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass())), 0, new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass())), 0,
DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(), RejectedExecutionHandlers.reject(), null) { DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(), RejectedExecutionHandlers.reject(),
null, null) {
@Override @Override
void handleLoopException(Throwable t) { void handleLoopException(Throwable t) {
capture.set(t); capture.set(t);

View File

@ -73,8 +73,8 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) { EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler); rejectedExecutionHandler);
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
this.kqueueFd = Native.newKQueue(); this.kqueueFd = Native.newKQueue();

View File

@ -20,6 +20,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory; import io.netty.channel.SelectStrategyFactory;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorChooserFactory; import io.netty.util.concurrent.EventExecutorChooserFactory;
import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.concurrent.RejectedExecutionHandler;
@ -133,6 +134,28 @@ public final class KQueueEventLoopGroup extends MultithreadEventLoopGroup {
rejectedExecutionHandler, queueFactory); rejectedExecutionHandler, queueFactory);
} }
/**
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if default one should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param selectStrategyFactory the {@link SelectStrategyFactory} to use.
* @param rejectedExecutionHandler the {@link RejectedExecutionHandler} to use.
* @param taskQueueFactory the {@link EventLoopTaskQueueFactory} to use for
* {@link SingleThreadEventLoop#execute(Runnable)},
* or {@code null} if default one should be used.
* @param tailTaskQueueFactory the {@link EventLoopTaskQueueFactory} to use for
* {@link SingleThreadEventLoop#executeAfterEventLoopIteration(Runnable)},
* or {@code null} if default one should be used.
*/
public KQueueEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
SelectStrategyFactory selectStrategyFactory,
RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory,
EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler, taskQueueFactory,
tailTaskQueueFactory);
}
/** /**
* Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
@ -145,10 +168,21 @@ public final class KQueueEventLoopGroup extends MultithreadEventLoopGroup {
@Override @Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception { protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; Integer maxEvents = (Integer) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
return new KQueueEventLoop(this, executor, (Integer) args[0], int argsLength = args.length;
((SelectStrategyFactory) args[1]).newSelectStrategy(), if (argsLength > 3) {
(RejectedExecutionHandler) args[2], queueFactory); taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new KQueueEventLoop(this, executor, maxEvents,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
} }
} }

View File

@ -134,8 +134,8 @@ public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) { EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler); rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");

View File

@ -16,11 +16,12 @@
package io.netty.channel.nio; package io.netty.channel.nio;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory; import io.netty.channel.SelectStrategyFactory;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorChooserFactory; import io.netty.util.concurrent.EventExecutorChooserFactory;
import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.concurrent.RejectedExecutionHandler;
@ -119,6 +120,30 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
rejectedExecutionHandler, taskQueueFactory); rejectedExecutionHandler, taskQueueFactory);
} }
/**
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if default one should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param selectorProvider the {@link SelectorProvider} to use.
* @param selectStrategyFactory the {@link SelectStrategyFactory} to use.
* @param rejectedExecutionHandler the {@link RejectedExecutionHandler} to use.
* @param taskQueueFactory the {@link EventLoopTaskQueueFactory} to use for
* {@link SingleThreadEventLoop#execute(Runnable)},
* or {@code null} if default one should be used.
* @param tailTaskQueueFactory the {@link EventLoopTaskQueueFactory} to use for
* {@link SingleThreadEventLoop#executeAfterEventLoopIteration(Runnable)},
* or {@code null} if default one should be used.
*/
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
SelectorProvider selectorProvider,
SelectStrategyFactory selectStrategyFactory,
RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory,
EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
/** /**
* Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
@ -141,8 +166,21 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override @Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception { protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; SelectorProvider selectorProvider = (SelectorProvider) args[0];
return new NioEventLoop(this, executor, (SelectorProvider) args[0], SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
} }
} }