Allow to specify a custom EventExecutorChooserFactory. Related to [#1230]
Motivation: Sometimes it may be benefitially for an user to specify a custom algorithm when choose the next EventExecutor/EventLoop. Modifications: Allow to specify a custom EventExecutorChooseFactory that allows to customize algorithm. Result: More flexible api.
This commit is contained in:
parent
7137d22994
commit
b461c9d54c
@ -0,0 +1,73 @@
|
||||
/*
|
||||
* Copyright 2016 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
|
||||
*/
|
||||
@UnstableApi
|
||||
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
|
||||
|
||||
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
|
||||
|
||||
private DefaultEventExecutorChooserFactory() { }
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public EventExecutorChooser newChooser(EventExecutor[] executors) {
|
||||
if (isPowerOfTwo(executors.length)) {
|
||||
return new PowerOfTowEventExecutorChooser(executors);
|
||||
} else {
|
||||
return new GenericEventExecutorChooser(executors);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isPowerOfTwo(int val) {
|
||||
return (val & -val) == val;
|
||||
}
|
||||
|
||||
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
|
||||
private final AtomicInteger idx = new AtomicInteger();
|
||||
private final EventExecutor[] executors;
|
||||
|
||||
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
|
||||
this.executors = executors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventExecutor next() {
|
||||
return executors[idx.getAndIncrement() & executors.length - 1];
|
||||
}
|
||||
}
|
||||
|
||||
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
|
||||
private final AtomicInteger idx = new AtomicInteger();
|
||||
private final EventExecutor[] executors;
|
||||
|
||||
GenericEventExecutorChooser(EventExecutor[] executors) {
|
||||
this.executors = executors;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventExecutor next() {
|
||||
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright 2016 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import io.netty.util.internal.UnstableApi;
|
||||
|
||||
/**
|
||||
* Factory that creates new {@link EventExecutorChooser}s.
|
||||
*/
|
||||
@UnstableApi
|
||||
public interface EventExecutorChooserFactory {
|
||||
|
||||
/**
|
||||
* Returns a new {@link EventExecutorChooser}.
|
||||
*/
|
||||
EventExecutorChooser newChooser(EventExecutor[] executors);
|
||||
|
||||
/**
|
||||
* Chooses the next {@link EventExecutor} to use.
|
||||
*/
|
||||
@UnstableApi
|
||||
interface EventExecutorChooser {
|
||||
|
||||
/**
|
||||
* Returns the new {@link EventExecutor} to use.
|
||||
*/
|
||||
EventExecutor next();
|
||||
}
|
||||
}
|
@ -32,10 +32,9 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
|
||||
|
||||
private final EventExecutor[] children;
|
||||
private final Set<EventExecutor> readonlyChildren;
|
||||
private final AtomicInteger childIndex = new AtomicInteger();
|
||||
private final AtomicInteger terminatedChildren = new AtomicInteger();
|
||||
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
|
||||
private final EventExecutorChooser chooser;
|
||||
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
|
||||
|
||||
/**
|
||||
* Create a new instance.
|
||||
@ -56,6 +55,19 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
|
||||
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
|
||||
*/
|
||||
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
|
||||
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new instance.
|
||||
*
|
||||
* @param nThreads the number of threads that will be used by this instance.
|
||||
* @param executor the Executor to use, or {@code null} if the default should be used.
|
||||
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
|
||||
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
|
||||
*/
|
||||
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
|
||||
EventExecutorChooserFactory chooserFactory, Object... args) {
|
||||
if (nThreads <= 0) {
|
||||
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
|
||||
}
|
||||
@ -65,11 +77,6 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
|
||||
}
|
||||
|
||||
children = new EventExecutor[nThreads];
|
||||
if (isPowerOfTwo(children.length)) {
|
||||
chooser = new PowerOfTwoEventExecutorChooser();
|
||||
} else {
|
||||
chooser = new GenericEventExecutorChooser();
|
||||
}
|
||||
|
||||
for (int i = 0; i < nThreads; i ++) {
|
||||
boolean success = false;
|
||||
@ -101,6 +108,8 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
|
||||
}
|
||||
}
|
||||
|
||||
chooser = chooserFactory.newChooser(children);
|
||||
|
||||
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
|
||||
@Override
|
||||
public void operationComplete(Future<Object> future) throws Exception {
|
||||
@ -216,26 +225,4 @@ public abstract class MultithreadEventExecutorGroup extends AbstractEventExecuto
|
||||
}
|
||||
return isTerminated();
|
||||
}
|
||||
|
||||
private static boolean isPowerOfTwo(int val) {
|
||||
return (val & -val) == val;
|
||||
}
|
||||
|
||||
private interface EventExecutorChooser {
|
||||
EventExecutor next();
|
||||
}
|
||||
|
||||
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
|
||||
@Override
|
||||
public EventExecutor next() {
|
||||
return children[childIndex.getAndIncrement() & children.length - 1];
|
||||
}
|
||||
}
|
||||
|
||||
private final class GenericEventExecutorChooser implements EventExecutorChooser {
|
||||
@Override
|
||||
public EventExecutor next() {
|
||||
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.MultithreadEventLoopGroup;
|
||||
import io.netty.channel.SelectStrategyFactory;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.EventExecutorChooserFactory;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
@ -101,6 +102,11 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
|
||||
super(nThreads, executor, 0, selectStrategyFactory);
|
||||
}
|
||||
|
||||
public EpollEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
|
||||
SelectStrategyFactory selectStrategyFactory) {
|
||||
super(nThreads, executor, chooserFactory, 0, selectStrategyFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is
|
||||
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
|
||||
|
@ -16,6 +16,7 @@
|
||||
package io.netty.channel;
|
||||
|
||||
import io.netty.util.concurrent.DefaultThreadFactory;
|
||||
import io.netty.util.concurrent.EventExecutorChooserFactory;
|
||||
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
|
||||
import io.netty.util.internal.SystemPropertyUtil;
|
||||
import io.netty.util.internal.logging.InternalLogger;
|
||||
@ -57,6 +58,15 @@ public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutor
|
||||
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
|
||||
}
|
||||
|
||||
/**
|
||||
* @see {@link MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
|
||||
* EventExecutorChooserFactory, Object...)}
|
||||
*/
|
||||
protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
|
||||
Object... args) {
|
||||
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ThreadFactory newDefaultThreadFactory() {
|
||||
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
|
||||
|
@ -21,6 +21,7 @@ import io.netty.channel.DefaultSelectStrategyFactory;
|
||||
import io.netty.channel.MultithreadEventLoopGroup;
|
||||
import io.netty.channel.SelectStrategyFactory;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.EventExecutorChooserFactory;
|
||||
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
@ -84,6 +85,12 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
|
||||
super(nThreads, executor, selectorProvider, selectStrategyFactory);
|
||||
}
|
||||
|
||||
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
|
||||
final SelectorProvider selectorProvider,
|
||||
final SelectStrategyFactory selectStrategyFactory) {
|
||||
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is
|
||||
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
|
||||
|
Loading…
Reference in New Issue
Block a user