Make EventLoopTaskQueueFactory a top-level interface

Motivation:

c9aaa93d83 added the ability to specify an EventLoopTaskQueueFactory but did place it under MultithreadEventLoopGroup while not really belongs there.

Modifications:

Make EventLoopTaskQueueFactory a top-level interface

Result:

More logical code layout.
This commit is contained in:
Norman Maurer 2019-06-22 07:38:03 +02:00
parent 2c99fc0f12
commit 517a93d87d
9 changed files with 49 additions and 27 deletions

View File

@ -17,7 +17,7 @@ package io.netty.channel.epoll;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.SelectStrategy; import io.netty.channel.SelectStrategy;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe; import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
@ -82,7 +82,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler); rejectedExecutionHandler);
selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
@ -144,7 +144,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
private static Queue<Runnable> newTaskQueue( private static Queue<Runnable> newTaskQueue(
MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) { if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
} }

View File

@ -18,6 +18,7 @@ package io.netty.channel.epoll;
import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory; import io.netty.channel.SelectStrategyFactory;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;

View File

@ -17,7 +17,7 @@ package io.netty.channel.kqueue;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.SelectStrategy; import io.netty.channel.SelectStrategy;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe; import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe;
@ -73,7 +73,7 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler); rejectedExecutionHandler);
selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy"); selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
@ -94,7 +94,7 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
} }
private static Queue<Runnable> newTaskQueue( private static Queue<Runnable> newTaskQueue(
MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) { if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
} }

View File

@ -17,6 +17,7 @@ package io.netty.channel.kqueue;
import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory; import io.netty.channel.SelectStrategyFactory;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;

View File

@ -0,0 +1,35 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.Queue;
/**
* Factory used to create {@link Queue} instances that will be used to store tasks for an {@link EventLoop}.
*
* Generally speaking the returned {@link Queue} MUST be thread-safe and depending on the {@link EventLoop}
* implementation must be of type {@link java.util.concurrent.BlockingQueue}.
*/
public interface EventLoopTaskQueueFactory {
/**
* Returns a new {@link Queue} to use.
* @param maxCapacity the maximum amount of elements that can be stored in the {@link Queue} at a given point
* in time.
* @return the new queue.
*/
Queue<Runnable> newTaskQueue(int maxCapacity);
}

View File

@ -98,20 +98,4 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor
return next().register(channel, promise); return next().register(channel, promise);
} }
/**
* Factory used to create {@link Queue} instances that will be used to store tasks for an {@link EventLoop}.
*
* Generally speaking the returned {@link Queue} MUST be thread-safe and depending on the {@link EventLoop}
* implementation must be of type {@link java.util.concurrent.BlockingQueue}.
*/
public interface EventLoopTaskQueueFactory {
/**
* Returns a new {@link Queue} to use.
* @param maxCapacity the maximum amount of elements that can be stored in the {@link Queue} at a given point
* in time.
* @return the new queue.
*/
Queue<Runnable> newTaskQueue(int maxCapacity);
}
} }

View File

@ -19,7 +19,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopException; import io.netty.channel.EventLoopException;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.SelectStrategy; import io.netty.channel.SelectStrategy;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.IntSupplier; import io.netty.util.IntSupplier;
@ -133,7 +133,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler); rejectedExecutionHandler);
if (selectorProvider == null) { if (selectorProvider == null) {
@ -150,7 +150,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
} }
private static Queue<Runnable> newTaskQueue( private static Queue<Runnable> newTaskQueue(
MultithreadEventLoopGroup.EventLoopTaskQueueFactory queueFactory) { EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) { if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS); return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
} }

View File

@ -18,6 +18,7 @@ package io.netty.channel.nio;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory; import io.netty.channel.SelectStrategyFactory;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;

View File

@ -20,7 +20,7 @@ import io.netty.channel.Channel;
import io.netty.channel.DefaultSelectStrategyFactory; import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.SelectStrategy; import io.netty.channel.SelectStrategy;
import io.netty.channel.SelectStrategyFactory; import io.netty.channel.SelectStrategyFactory;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
@ -297,7 +297,7 @@ public class NioEventLoopTest extends AbstractEventLoopTest {
new ThreadPerTaskExecutor(new DefaultThreadFactory(NioEventLoopGroup.class)), new ThreadPerTaskExecutor(new DefaultThreadFactory(NioEventLoopGroup.class)),
DefaultEventExecutorChooserFactory.INSTANCE, SelectorProvider.provider(), DefaultEventExecutorChooserFactory.INSTANCE, SelectorProvider.provider(),
DefaultSelectStrategyFactory.INSTANCE, RejectedExecutionHandlers.reject(), DefaultSelectStrategyFactory.INSTANCE, RejectedExecutionHandlers.reject(),
new MultithreadEventLoopGroup.EventLoopTaskQueueFactory() { new EventLoopTaskQueueFactory() {
@Override @Override
public Queue<Runnable> newTaskQueue(int maxCapacity) { public Queue<Runnable> newTaskQueue(int maxCapacity) {
called.set(true); called.set(true);