EPOLL SelectStrategy

Motivation:
NIO now supports a pluggable select strategy, but EPOLL currently doesn't support this. We should strive for feature parity for EPOLL.

Modifications:
- Add SelectStrategy to EPOLL transport.

Result:
EPOLL transport supports SelectStategy.
This commit is contained in:
Scott Mitchell 2016-03-25 12:44:51 -07:00
parent 24cb673468
commit a6d6a15ce6
6 changed files with 108 additions and 75 deletions

View File

@ -17,11 +17,14 @@ package io.netty.channel.epoll;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe; import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.FileDescriptor;
import io.netty.util.IntSupplier;
import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.IntObjectMap;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -55,13 +58,21 @@ final class EpollEventLoop extends SingleThreadEventLoop {
private final boolean allowGrowing; private final boolean allowGrowing;
private final EpollEventArray events; private final EpollEventArray events;
private final IovArray iovArray = new IovArray(); private final IovArray iovArray = new IovArray();
private final SelectStrategy selectStrategy;
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return Native.epollWait(epollFd.intValue(), events, 0);
}
};
@SuppressWarnings("unused") @SuppressWarnings("unused")
private volatile int wakenUp; private volatile int wakenUp;
private volatile int ioRatio = 50; private volatile int ioRatio = 50;
EpollEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, int maxEvents) { EpollEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, int maxEvents, SelectStrategy strategy) {
super(parent, threadFactory, false); super(parent, threadFactory, false);
selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
if (maxEvents == 0) { if (maxEvents == 0) {
allowGrowing = true; allowGrowing = true;
events = new EpollEventArray(4096); events = new EpollEventArray(4096);
@ -209,65 +220,66 @@ final class EpollEventLoop extends SingleThreadEventLoop {
@Override @Override
protected void run() { protected void run() {
for (;;) { for (;;) {
boolean oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0) == 1;
try { try {
int ready; int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
if (hasTasks()) { switch (strategy) {
// Non blocking just return what is ready directly without block case SelectStrategy.CONTINUE:
ready = Native.epollWait(epollFd.intValue(), events, 0); continue;
} else { case SelectStrategy.SELECT:
ready = epollWait(oldWakenUp); strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
// 'wakenUp.compareAndSet(false, true)' is always evaluated // 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up // before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.) // overhead. (Selector.wakeup() is an expensive operation.)
// //
// However, there is a race condition in this approach. // However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to // The race condition is triggered when 'wakenUp' is set to
// true too early. // true too early.
// //
// 'wakenUp' is set to true too early if: // 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and // 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD) // 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and // 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK) // 'if (wakenUp.get()) { ... }'. (OK)
// //
// In the first case, 'wakenUp' is set to true and the // In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately. // following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round, // Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing // any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block // the following 'selector.select(...)' call to block
// unnecessarily. // unnecessarily.
// //
// To fix this problem, we wake up the selector again if wakenUp // To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...). // is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both // It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case // the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required). // (OK - no wake-up required).
if (wakenUp == 1) { if (wakenUp == 1) {
Native.eventFdWrite(eventFd.intValue(), 1L); Native.eventFdWrite(eventFd.intValue(), 1L);
} }
default:
// fallthrough
} }
final int ioRatio = this.ioRatio; final int ioRatio = this.ioRatio;
if (ioRatio == 100) { if (ioRatio == 100) {
if (ready > 0) { if (strategy > 0) {
processReady(events, ready); processReady(events, strategy);
} }
runAllTasks(); runAllTasks();
} else { } else {
final long ioStartTime = System.nanoTime(); final long ioStartTime = System.nanoTime();
if (ready > 0) { if (strategy > 0) {
processReady(events, ready); processReady(events, strategy);
} }
final long ioTime = System.nanoTime() - ioStartTime; final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
} }
if (allowGrowing && ready == events.length()) { if (allowGrowing && strategy == events.length()) {
//increase the size of the array as we needed the whole space for the events //increase the size of the array as we needed the whole space for the events
events.increase(); events.increase();
} }

View File

@ -15,8 +15,10 @@
*/ */
package io.netty.channel.epoll; package io.netty.channel.epoll;
import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -38,7 +40,15 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
* Create a new instance using the specified number of threads and the default {@link ThreadFactory}. * Create a new instance using the specified number of threads and the default {@link ThreadFactory}.
*/ */
public EpollEventLoopGroup(int nThreads) { public EpollEventLoopGroup(int nThreads) {
this(nThreads, null); this(nThreads, (ThreadFactory) null);
}
/**
* Create a new instance using the specified number of threads and the default {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public EpollEventLoopGroup(int nThreads, SelectStrategyFactory selectStrategyFactory) {
this(nThreads, null, selectStrategyFactory);
} }
/** /**
@ -49,16 +59,36 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
this(nThreads, threadFactory, 0); this(nThreads, threadFactory, 0);
} }
/**
* Create a new instance using the specified number of threads and the given {@link ThreadFactory}.
*/
@SuppressWarnings("deprecation")
public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectStrategyFactory selectStrategyFactory) {
this(nThreads, threadFactory, 0, selectStrategyFactory);
}
/** /**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given * Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* maximal amount of epoll events to handle per epollWait(...). * maximal amount of epoll events to handle per epollWait(...).
* *
* @deprecated Use {@link #EpollEventLoopGroup(int)}, {@link #EpollEventLoopGroup(int)} or * @deprecated Use {@link #EpollEventLoopGroup(int)} or {@link #EpollEventLoopGroup(int, ThreadFactory)}
* {@link #EpollEventLoopGroup(int, ThreadFactory)}
*/ */
@Deprecated @Deprecated
public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) { public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce) {
super(nThreads, threadFactory, maxEventsAtOnce); this(nThreads, threadFactory, maxEventsAtOnce, DefaultSelectStrategyFactory.INSTANCE);
}
/**
* Create a new instance using the specified number of threads, the given {@link ThreadFactory} and the given
* maximal amount of epoll events to handle per epollWait(...).
*
* @deprecated Use {@link #EpollEventLoopGroup(int)}, {@link #EpollEventLoopGroup(int, ThreadFactory)}, or
* {@link #EpollEventLoopGroup(int, SelectStrategyFactory)}
*/
@Deprecated
public EpollEventLoopGroup(int nThreads, ThreadFactory threadFactory, int maxEventsAtOnce,
SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, maxEventsAtOnce, selectStrategyFactory);
} }
/** /**
@ -73,6 +103,7 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
@Override @Override
protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception { protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
return new EpollEventLoop(this, threadFactory, (Integer) args[0]); return new EpollEventLoop(this, threadFactory, (Integer) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy());
} }
} }

View File

@ -13,26 +13,20 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.channel.nio; package io.netty.channel;
import io.netty.channel.SelectStrategy;
import io.netty.util.IntSupplier; import io.netty.util.IntSupplier;
/** /**
* Default {@link SelectStrategy} which triggers the blocking select without backoff if no * Default select strategy.
* tasks are in the queue to be processed.
*/ */
final class DefaultSelectStrategy implements SelectStrategy { final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
public static final SelectStrategy INSTANCE = new DefaultSelectStrategy(); private DefaultSelectStrategy() { }
private DefaultSelectStrategy() {
// singleton.
}
@Override @Override
public int calculateStrategy(final IntSupplier supplier, final boolean hasTasks) public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
return hasTasks ? supplier.get() : SelectStrategy.SELECT;
} }
} }

View File

@ -13,21 +13,15 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package io.netty.channel.nio; package io.netty.channel;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SelectStrategyFactory;
/** /**
* Select Strategy Factory for the Noop implementation (default). * Factory which uses the default select strategy.
*/ */
final class DefaultSelectStrategyFactory implements SelectStrategyFactory { public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory(); public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();
private DefaultSelectStrategyFactory() { private DefaultSelectStrategyFactory() { }
// singleton
}
@Override @Override
public SelectStrategy newSelectStrategy() { public SelectStrategy newSelectStrategy() {

View File

@ -15,11 +15,10 @@
*/ */
package io.netty.channel.nio; package io.netty.channel.nio;
import io.netty.channel.SelectStrategy;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.EventLoopException; import io.netty.channel.EventLoopException;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.nio.AbstractNioChannel.NioUnsafe; import io.netty.channel.nio.AbstractNioChannel.NioUnsafe;
import io.netty.util.IntSupplier; import io.netty.util.IntSupplier;
@ -27,6 +26,7 @@ import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
@ -316,11 +316,11 @@ public final class NioEventLoop extends SingleThreadEventLoop {
@Override @Override
protected void run() { protected void run() {
selectloop: for (;;) { for (;;) {
try { try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE: case SelectStrategy.CONTINUE:
continue selectloop; continue;
case SelectStrategy.SELECT: case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false)); select(wakenUp.getAndSet(false));

View File

@ -15,10 +15,12 @@
*/ */
package io.netty.channel.nio; package io.netty.channel.nio;
import io.netty.channel.SelectStrategyFactory;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.DefaultSelectStrategyFactory;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;