EPOLL SelectStrategy

Motivation:
NIO now supports a pluggable select strategy, but EPOLL currently doesn't support this. We should strive for feature parity for EPOLL.

Modifications:
- Add SelectStrategy to EPOLL transport.

Result:
EPOLL transport supports SelectStategy.
This commit is contained in:
Scott Mitchell 2016-03-25 12:44:51 -07:00
parent 5d76daf33b
commit 0c839d9e0a
6 changed files with 104 additions and 73 deletions

View File

@ -17,11 +17,14 @@ package io.netty.channel.epoll;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
import io.netty.channel.unix.FileDescriptor;
import io.netty.util.IntSupplier;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -55,12 +58,20 @@ final class EpollEventLoop extends SingleThreadEventLoop {
private final boolean allowGrowing;
private final EpollEventArray events;
private final IovArray iovArray = new IovArray();
private final SelectStrategy selectStrategy;
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return Native.epollWait(epollFd.intValue(), events, 0);
}
};
private volatile int wakenUp;
private volatile int ioRatio = 50;
EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents) {
EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, SelectStrategy strategy) {
super(parent, executor, false);
selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
if (maxEvents == 0) {
allowGrowing = true;
events = new EpollEventArray(4096);
@ -208,65 +219,66 @@ final class EpollEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0) == 1;
try {
int ready;
if (hasTasks()) {
// Non blocking just return what is ready directly without block
ready = Native.epollWait(epollFd.intValue(), events, 0);
} else {
ready = epollWait(oldWakenUp);
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp == 1) {
Native.eventFdWrite(eventFd.intValue(), 1L);
}
if (wakenUp == 1) {
Native.eventFdWrite(eventFd.intValue(), 1L);
}
default:
// fallthrough
}
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
if (ready > 0) {
processReady(events, ready);
if (strategy > 0) {
processReady(events, strategy);
}
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
if (ready > 0) {
processReady(events, ready);
if (strategy > 0) {
processReady(events, strategy);
}
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (allowGrowing && ready == events.length()) {
if (allowGrowing && strategy == events.length()) {
//increase the size of the array as we needed the whole space for the events
events.increase();
}

View File

@ -16,8 +16,10 @@
package io.netty.channel.epoll;
import io.netty.channel.EventLoop;
import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory;
import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.Executor;
@ -40,7 +42,15 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
* Create a new instance using the specified number of threads and the default {@link ThreadFactory}.
*/
public EpollEventLoopGroup(int nThreads) {
this(nThreads, null);
this(nThreads, (ThreadFactory) null);
}
/**
* Create a new instance using the specified number of threads and the default {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public EpollEventLoopGroup(int nThreads, SelectStrategyFactory selectStrategyFactory) {
this(nThreads, null, selectStrategyFactory);
}
/**
@ -51,16 +61,36 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
this(nThreads, threadFactory, 0);
}
/**
* Create a new instance using the specified number of threads and the given {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectStrategyFactory selectStrategyFactory) {
this(nThreads, threadFactory, 0, selectStrategyFactory);
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* maximal amount of epoll events to handle per epollWait(...).
*
* @deprecated Use {@link #EpollEventLoopGroup(int)}, {@link #EpollEventLoopGroup(int)} or
* {@link #EpollEventLoopGroup(int, ThreadFactory)}
* @deprecated Use {@link #EpollEventLoopGroup(int)} or {@link #EpollEventLoopGroup(int, ThreadFactory)}
*/
@Deprecated
public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) {
super(nThreads, threadFactory, maxEventsAtOnce);
this(nThreads, threadFactory, maxEventsAtOnce, DefaultSelectStrategyFactory.INSTANCE);
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* maximal amount of epoll events to handle per epollWait(...).
*
* @deprecated Use {@link #EpollEventLoopGroup(int)}, {@link #EpollEventLoopGroup(int, ThreadFactory)}, or
* {@link #EpollEventLoopGroup(int, SelectStrategyFactory)}
*/
@Deprecated
public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce,
SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, maxEventsAtOnce, selectStrategyFactory);
}
/**
@ -75,6 +105,7 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new EpollEventLoop(this, executor, (Integer) args[0]);
return new EpollEventLoop(this, executor, (Integer) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy());
}
}

View File

@ -13,26 +13,20 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
package io.netty.channel;
import io.netty.channel.SelectStrategy;
import io.netty.util.IntSupplier;
/**
* Default {@link SelectStrategy} which triggers the blocking select without backoff if no
* tasks are in the queue to be processed.
* Default select strategy.
*/
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
public static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() {
// singleton.
}
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(final IntSupplier supplier, final boolean hasTasks)
throws Exception {
return hasTasks ? supplier.get() : SelectStrategy.SELECT;
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}

View File

@ -13,21 +13,15 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SelectStrategyFactory;
package io.netty.channel;
/**
* Select Strategy Factory for the Noop implementation (default).
* Factory which uses the default select strategy.
*/
final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();
private DefaultSelectStrategyFactory() {
// singleton
}
private DefaultSelectStrategyFactory() { }
@Override
public SelectStrategy newSelectStrategy() {

View File

@ -15,7 +15,6 @@
*/
package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoopException;
@ -316,11 +315,11 @@ public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
selectloop: for (;;) {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue selectloop;
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));

View File

@ -17,6 +17,7 @@ package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory;
import io.netty.util.concurrent.EventExecutor;