diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 92b648889e..a7aee9a4c6 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -167,6 +167,17 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } + protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, + boolean addTaskWakesUp, Queue taskQueue, + RejectedExecutionHandler rejectedHandler) { + super(parent); + this.addTaskWakesUp = addTaskWakesUp; + this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; + this.executor = ThreadExecutorMap.apply(executor, this); + this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue"); + rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); + } + /** * @deprecated Please use and override {@link #newTaskQueue(int)}. */ diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index d99f4a5d48..29d71d66a3 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -17,6 +17,7 @@ package io.netty.channel.epoll; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategy; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe; @@ -80,8 +81,10 @@ class EpollEventLoop extends SingleThreadEventLoop { private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999; EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, - SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { - super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); + SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, + MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), + rejectedExecutionHandler); selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); if (maxEvents == 0) { allowGrowing = true; @@ -140,6 +143,14 @@ class EpollEventLoop extends SingleThreadEventLoop { } } + private static Queue newTaskQueue( + MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + if (queueFactory == null) { + return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); + } + return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS); + } + /** * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}. */ @@ -217,9 +228,13 @@ class EpollEventLoop extends SingleThreadEventLoop { @Override protected Queue newTaskQueue(int maxPendingTasks) { + return newTaskQueue0(maxPendingTasks); + } + + private static Queue newTaskQueue0(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() - : PlatformDependent.newMpscQueue(maxPendingTasks); + : PlatformDependent.newMpscQueue(maxPendingTasks); } /** diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java index acb4212fe9..1a383a78c3 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java @@ -119,6 +119,13 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup { super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler); } + public EpollEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + SelectStrategyFactory selectStrategyFactory, + RejectedExecutionHandler rejectedExecutionHandler, + EventLoopTaskQueueFactory queueFactory) { + super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler, queueFactory); + } + /** * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. @@ -131,7 +138,9 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup { @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { + EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new EpollEventLoop(this, executor, (Integer) args[0], - ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); + ((SelectStrategyFactory) args[1]).newSelectStrategy(), + (RejectedExecutionHandler) args[2], queueFactory); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java index f250c272e6..71c56b6d86 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollEventLoopTest.java @@ -51,7 +51,7 @@ public class EpollEventLoopTest extends AbstractSingleThreadEventLoopTest { final EventLoopGroup group = new EpollEventLoop(null, new ThreadPerTaskExecutor(new DefaultThreadFactory(getClass())), 0, - DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(), RejectedExecutionHandlers.reject()) { + DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(), RejectedExecutionHandlers.reject(), null) { @Override void handleLoopException(Throwable t) { capture.set(t); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index a968118a67..567dd67c31 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -17,6 +17,7 @@ package io.netty.channel.kqueue; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategy; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe; @@ -71,8 +72,10 @@ final class KQueueEventLoop extends SingleThreadEventLoop { private volatile int ioRatio = 50; KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, - SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { - super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); + SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, + MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), + rejectedExecutionHandler); selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); this.kqueueFd = Native.newKQueue(); if (maxEvents == 0) { @@ -90,6 +93,14 @@ final class KQueueEventLoop extends SingleThreadEventLoop { } } + private static Queue newTaskQueue( + MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + if (queueFactory == null) { + return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); + } + return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS); + } + void add(AbstractKQueueChannel ch) { assert inEventLoop(); AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch); @@ -305,9 +316,13 @@ final class KQueueEventLoop extends SingleThreadEventLoop { @Override protected Queue newTaskQueue(int maxPendingTasks) { + return newTaskQueue0(maxPendingTasks); + } + + private static Queue newTaskQueue0(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() - : PlatformDependent.newMpscQueue(maxPendingTasks); + : PlatformDependent.newMpscQueue(maxPendingTasks); } /** diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java index fe32aa5a7b..77325c4c23 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java @@ -116,6 +116,14 @@ public final class KQueueEventLoopGroup extends MultithreadEventLoopGroup { super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, rejectedExecutionHandler); } + public KQueueEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + SelectStrategyFactory selectStrategyFactory, + RejectedExecutionHandler rejectedExecutionHandler, + EventLoopTaskQueueFactory queueFactory) { + super(nThreads, executor, chooserFactory, 0, selectStrategyFactory, + rejectedExecutionHandler, queueFactory); + } + /** * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. @@ -128,7 +136,10 @@ public final class KQueueEventLoopGroup extends MultithreadEventLoopGroup { @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { + EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; + return new KQueueEventLoop(this, executor, (Integer) args[0], - ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); + ((SelectStrategyFactory) args[1]).newSelectStrategy(), + (RejectedExecutionHandler) args[2], queueFactory); } } diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java index a9bc23dfe9..ee32218471 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java @@ -23,6 +23,7 @@ import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; @@ -96,4 +97,21 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor public ChannelFuture register(Channel channel, ChannelPromise promise) { return next().register(channel, promise); } + + /** + * Factory used to create {@link Queue} instances that will be used to store tasks for an {@link EventLoop}. + * + * Generally speaking the returned {@link Queue} MUST be thread-safe and depending on the {@link EventLoop} + * implementation must be of type {@link java.util.concurrent.BlockingQueue}. + */ + public interface EventLoopTaskQueueFactory { + + /** + * Returns a new {@link Queue} to use. + * @param maxCapacity the maximum amount of elements that can be stored in the {@link Queue} at a given point + * in time. + * @return the new queue. + */ + Queue newTaskQueue(int maxCapacity); + } } diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 1fe2d3fe2b..9abb39fc38 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -59,6 +59,13 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im tailTasks = newTaskQueue(maxPendingTasks); } + protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, + boolean addTaskWakesUp, Queue taskQueue, Queue tailTaskQueue, + RejectedExecutionHandler rejectedExecutionHandler) { + super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler); + tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue"); + } + @Override public EventLoopGroup parent() { return (EventLoopGroup) super.parent(); diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index bb9a1e25cf..edd41a5790 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -19,6 +19,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopException; +import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategy; import io.netty.channel.SingleThreadEventLoop; import io.netty.util.IntSupplier; @@ -131,8 +132,10 @@ public final class NioEventLoop extends SingleThreadEventLoop { private boolean needsToSelectAgain; NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, - SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { - super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); + SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, + MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), + rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } @@ -146,6 +149,14 @@ public final class NioEventLoop extends SingleThreadEventLoop { selectStrategy = strategy; } + private static Queue newTaskQueue( + MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + if (queueFactory == null) { + return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); + } + return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS); + } + private static final class SelectorTuple { final Selector unwrappedSelector; final Selector selector; @@ -265,9 +276,13 @@ public final class NioEventLoop extends SingleThreadEventLoop { @Override protected Queue newTaskQueue(int maxPendingTasks) { + return newTaskQueue0(maxPendingTasks); + } + + private static Queue newTaskQueue0(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() - : PlatformDependent.newMpscQueue(maxPendingTasks); + : PlatformDependent.newMpscQueue(maxPendingTasks); } /** diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java index 833b754978..8598e01043 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java @@ -101,6 +101,15 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler); } + public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, + final SelectorProvider selectorProvider, + final SelectStrategyFactory selectStrategyFactory, + final RejectedExecutionHandler rejectedExecutionHandler, + final EventLoopTaskQueueFactory taskQueueFactory) { + super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, + rejectedExecutionHandler, taskQueueFactory); + } + /** * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. @@ -123,7 +132,8 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { + EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], - ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); + ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory); } } diff --git a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java index 7db891afc6..99c949b060 100644 --- a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java @@ -17,15 +17,20 @@ package io.netty.channel.nio; import io.netty.channel.AbstractEventLoopTest; import io.netty.channel.Channel; +import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategy; import io.netty.channel.SelectStrategyFactory; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.IntSupplier; +import io.netty.util.concurrent.DefaultEventExecutorChooserFactory; import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.RejectedExecutionHandlers; +import io.netty.util.concurrent.ThreadPerTaskExecutor; import org.hamcrest.core.IsInstanceOf; import org.junit.Ignore; import org.junit.Test; @@ -36,9 +41,12 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; +import java.util.Queue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.*; @@ -281,4 +289,35 @@ public class NioEventLoopTest extends AbstractEventLoopTest { group.shutdownGracefully(); } } + + @Test + public void testCustomQueue() { + final AtomicBoolean called = new AtomicBoolean(); + NioEventLoopGroup group = new NioEventLoopGroup(1, + new ThreadPerTaskExecutor(new DefaultThreadFactory(NioEventLoopGroup.class)), + DefaultEventExecutorChooserFactory.INSTANCE, SelectorProvider.provider(), + DefaultSelectStrategyFactory.INSTANCE, RejectedExecutionHandlers.reject(), + new MultithreadEventLoopGroup.EventLoopTaskQueueFactory() { + @Override + public Queue newTaskQueue(int maxCapacity) { + called.set(true); + return new LinkedBlockingQueue(maxCapacity); + } + }); + + final NioEventLoop loop = (NioEventLoop) group.next(); + + try { + loop.submit(new Runnable() { + @Override + public void run() { + // NOOP. + } + }).syncUninterruptibly(); + assertTrue(called.get()); + } finally { + group.shutdownGracefully(); + } + } + }