From 517a93d87d5c65e3c4bedf4a77183583991f32e1 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 22 Jun 2019 07:38:03 +0200 Subject: [PATCH] Make EventLoopTaskQueueFactory a top-level interface Motivation: c9aaa93d83b5b571dbc733d2632232db82b3d884 added the ability to specify an EventLoopTaskQueueFactory but did place it under MultithreadEventLoopGroup while not really belongs there. Modifications: Make EventLoopTaskQueueFactory a top-level interface Result: More logical code layout. --- .../netty/channel/epoll/EpollEventLoop.java | 6 ++-- .../channel/epoll/EpollEventLoopGroup.java | 1 + .../netty/channel/kqueue/KQueueEventLoop.java | 6 ++-- .../channel/kqueue/KQueueEventLoopGroup.java | 1 + .../channel/EventLoopTaskQueueFactory.java | 35 +++++++++++++++++++ .../channel/MultithreadEventLoopGroup.java | 16 --------- .../io/netty/channel/nio/NioEventLoop.java | 6 ++-- .../netty/channel/nio/NioEventLoopGroup.java | 1 + .../netty/channel/nio/NioEventLoopTest.java | 4 +-- 9 files changed, 49 insertions(+), 27 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/EventLoopTaskQueueFactory.java diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 29d71d66a3..27bc1d1065 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -17,7 +17,7 @@ package io.netty.channel.epoll; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; -import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.SelectStrategy; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe; @@ -82,7 +82,7 @@ class EpollEventLoop extends SingleThreadEventLoop { EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, - MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); @@ -144,7 +144,7 @@ class EpollEventLoop extends SingleThreadEventLoop { } private static Queue newTaskQueue( - MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + EventLoopTaskQueueFactory queueFactory) { if (queueFactory == null) { return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java index 1a383a78c3..85c5d5c158 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java @@ -18,6 +18,7 @@ package io.netty.channel.epoll; import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; +import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategyFactory; import io.netty.util.concurrent.EventExecutor; diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index 567dd67c31..253d9e5c7b 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -17,7 +17,7 @@ package io.netty.channel.kqueue; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; -import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.SelectStrategy; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe; @@ -73,7 +73,7 @@ final class KQueueEventLoop extends SingleThreadEventLoop { KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, - MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); @@ -94,7 +94,7 @@ final class KQueueEventLoop extends SingleThreadEventLoop { } private static Queue newTaskQueue( - MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + EventLoopTaskQueueFactory queueFactory) { if (queueFactory == null) { return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java index 77325c4c23..52ba0d9b7c 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoopGroup.java @@ -17,6 +17,7 @@ package io.netty.channel.kqueue; import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategyFactory; import io.netty.util.concurrent.EventExecutor; diff --git a/transport/src/main/java/io/netty/channel/EventLoopTaskQueueFactory.java b/transport/src/main/java/io/netty/channel/EventLoopTaskQueueFactory.java new file mode 100644 index 0000000000..c2788da6ed --- /dev/null +++ b/transport/src/main/java/io/netty/channel/EventLoopTaskQueueFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel; + +import java.util.Queue; + +/** + * Factory used to create {@link Queue} instances that will be used to store tasks for an {@link EventLoop}. + * + * Generally speaking the returned {@link Queue} MUST be thread-safe and depending on the {@link EventLoop} + * implementation must be of type {@link java.util.concurrent.BlockingQueue}. + */ +public interface EventLoopTaskQueueFactory { + + /** + * Returns a new {@link Queue} to use. + * @param maxCapacity the maximum amount of elements that can be stored in the {@link Queue} at a given point + * in time. + * @return the new queue. + */ + Queue newTaskQueue(int maxCapacity); +} diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java index ee32218471..e2e3878822 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java @@ -98,20 +98,4 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor return next().register(channel, promise); } - /** - * Factory used to create {@link Queue} instances that will be used to store tasks for an {@link EventLoop}. - * - * Generally speaking the returned {@link Queue} MUST be thread-safe and depending on the {@link EventLoop} - * implementation must be of type {@link java.util.concurrent.BlockingQueue}. - */ - public interface EventLoopTaskQueueFactory { - - /** - * Returns a new {@link Queue} to use. - * @param maxCapacity the maximum amount of elements that can be stored in the {@link Queue} at a given point - * in time. - * @return the new queue. - */ - Queue newTaskQueue(int maxCapacity); - } } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index edd41a5790..e4eb8019de 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -19,7 +19,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopException; -import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.SelectStrategy; import io.netty.channel.SingleThreadEventLoop; import io.netty.util.IntSupplier; @@ -133,7 +133,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, - MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + EventLoopTaskQueueFactory queueFactory) { super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); if (selectorProvider == null) { @@ -150,7 +150,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { } private static Queue newTaskQueue( - MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { + EventLoopTaskQueueFactory queueFactory) { if (queueFactory == null) { return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java index 8598e01043..2dcf55126c 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java @@ -18,6 +18,7 @@ package io.netty.channel.nio; import io.netty.channel.Channel; import io.netty.channel.EventLoop; import io.netty.channel.DefaultSelectStrategyFactory; +import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategyFactory; import io.netty.util.concurrent.EventExecutor; diff --git a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java index 99c949b060..c18759dfb7 100644 --- a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java @@ -20,7 +20,7 @@ import io.netty.channel.Channel; import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; -import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.SelectStrategy; import io.netty.channel.SelectStrategyFactory; import io.netty.channel.socket.ServerSocketChannel; @@ -297,7 +297,7 @@ public class NioEventLoopTest extends AbstractEventLoopTest { new ThreadPerTaskExecutor(new DefaultThreadFactory(NioEventLoopGroup.class)), DefaultEventExecutorChooserFactory.INSTANCE, SelectorProvider.provider(), DefaultSelectStrategyFactory.INSTANCE, RejectedExecutionHandlers.reject(), - new MultithreadEventLoopGroup.EventLoopTaskQueueFactory() { + new EventLoopTaskQueueFactory() { @Override public Queue newTaskQueue(int maxCapacity) { called.set(true);