Allow to customize NIO (channel) select strategies.

Motivation:

Under high throughput/low latency workloads, selector wakeups are
degrading performance when the incoming operations are triggered
from outside of the event loop. This is a common scenario for
"client" applications where the originating input is coming from
application threads rather from the socket attached inside the
event loops.

As a result, it can be desirable to defer the blocking select
so that incoming tasks (write/flush) do not need to wakeup
the selector.

Modifications:

This changeset adds the notion of a generic SelectStrategy which,
based on its contract, allows the implementation to optionally
defer the blocking select based on some custom criteria.

The default implementation resembles the original behaviour, that
is if tasks are in the queue `selectNow()` and move on, and if no
tasks need to be processed go into the blocking select and wait
for wakeup.

The strategy can be customized per `NioEventLoopGroup` in the
constructor.

Result:

High performance client applications are now given the chance to
customize for how long the actual selector blocking should be
deferred by employing a custom select strategy.
This commit is contained in:
Michael Nitschinger 2016-03-09 13:30:08 +01:00 committed by Scott Mitchell
parent 949bfdf8c5
commit 24cb673468
7 changed files with 244 additions and 43 deletions

View File

@ -0,0 +1,29 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
/**
* Represents a supplier of {@code int}-valued results.
*/
public interface IntSupplier {
/**
* Gets a result.
*
* @return a result
*/
int get() throws Exception;
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.IntSupplier;
/**
* Select strategy interface.
*
* Provides the ability to control the behavior of the select loop. For example a blocking select
* operation can be delayed or skipped entirely if there are events to process immediately.
*/
public interface SelectStrategy {
/**
* Indicates a blocking select should follow.
*/
int SELECT = -1;
/**
* Indicates the IO loop should be retried, no blocking select to follow directly.
*/
int CONTINUE = -2;
/**
* The {@link SelectStrategy} can be used to steer the outcome of a potential select
* call.
*
* @param selectSupplier The supplier with the result of a select result.
* @param hasTasks true if tasks are waiting to be processed.
* @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
* the next step should be to not select but rather jump back to the IO loop and try
* again. Any value >= 0 is treated as an indicator that work needs to be done.
*/
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
/**
* Factory that creates a new {@link SelectStrategy} every time.
*/
public interface SelectStrategyFactory {
/**
* Creates a new {@link SelectStrategy}.
*/
SelectStrategy newSelectStrategy();
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
import io.netty.channel.SelectStrategy;
import io.netty.util.IntSupplier;
/**
* Default {@link SelectStrategy} which triggers the blocking select without backoff if no
* tasks are in the queue to be processed.
*/
final class DefaultSelectStrategy implements SelectStrategy {
public static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() {
// singleton.
}
@Override
public int calculateStrategy(final IntSupplier supplier, final boolean hasTasks)
throws Exception {
return hasTasks ? supplier.get() : SelectStrategy.SELECT;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SelectStrategyFactory;
/**
* Select Strategy Factory for the Noop implementation (default).
*/
final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();
private DefaultSelectStrategyFactory() {
// singleton
}
@Override
public SelectStrategy newSelectStrategy() {
return DefaultSelectStrategy.INSTANCE;
}
}

View File

@ -16,16 +16,17 @@
package io.netty.channel.nio; package io.netty.channel.nio;
import io.netty.channel.SelectStrategy;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.EventLoopException; import io.netty.channel.EventLoopException;
import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.nio.AbstractNioChannel.NioUnsafe; import io.netty.channel.nio.AbstractNioChannel.NioUnsafe;
import io.netty.util.IntSupplier;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
@ -60,6 +61,13 @@ public final class NioEventLoop extends SingleThreadEventLoop {
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3; private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD; private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
// Workaround for JDK NIO bug. // Workaround for JDK NIO bug.
// //
// See: // See:
@ -107,17 +115,24 @@ public final class NioEventLoop extends SingleThreadEventLoop {
*/ */
private final AtomicBoolean wakenUp = new AtomicBoolean(); private final AtomicBoolean wakenUp = new AtomicBoolean();
private final SelectStrategy selectStrategy;
private volatile int ioRatio = 50; private volatile int ioRatio = 50;
private int cancelledKeys; private int cancelledKeys;
private boolean needsToSelectAgain; private boolean needsToSelectAgain;
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider,
SelectStrategy strategy) {
super(parent, threadFactory, false); super(parent, threadFactory, false);
if (selectorProvider == null) { if (selectorProvider == null) {
throw new NullPointerException("selectorProvider"); throw new NullPointerException("selectorProvider");
} }
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider; provider = selectorProvider;
selector = openSelector(); selector = openSelector();
selectStrategy = strategy;
} }
private Selector openSelector() { private Selector openSelector() {
@ -301,45 +316,47 @@ public final class NioEventLoop extends SingleThreadEventLoop {
@Override @Override
protected void run() { protected void run() {
for (;;) { selectloop: for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try { try {
if (hasTasks()) { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
selectNow(); case SelectStrategy.CONTINUE:
} else { continue selectloop;
select(oldWakenUp); case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated // 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up // before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.) // overhead. (Selector.wakeup() is an expensive operation.)
// //
// However, there is a race condition in this approach. // However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to // The race condition is triggered when 'wakenUp' is set to
// true too early. // true too early.
// //
// 'wakenUp' is set to true too early if: // 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and // 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD) // 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and // 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK) // 'if (wakenUp.get()) { ... }'. (OK)
// //
// In the first case, 'wakenUp' is set to true and the // In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately. // following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round, // Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing // any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block // the following 'selector.select(...)' call to block
// unnecessarily. // unnecessarily.
// //
// To fix this problem, we wake up the selector again if wakenUp // To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...). // is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both // It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case // the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required). // (OK - no wake-up required).
if (wakenUp.get()) { if (wakenUp.get()) {
selector.wakeup(); selector.wakeup();
} }
default:
// fallthrough
} }
cancelledKeys = 0; cancelledKeys = 0;
@ -592,9 +609,9 @@ public final class NioEventLoop extends SingleThreadEventLoop {
} }
} }
void selectNow() throws IOException { int selectNow() throws IOException {
try { try {
selector.selectNow(); return selector.selectNow();
} finally { } finally {
// restore wakup state if needed // restore wakup state if needed
if (wakenUp.get()) { if (wakenUp.get()) {

View File

@ -15,10 +15,10 @@
*/ */
package io.netty.channel.nio; package io.netty.channel.nio;
import io.netty.channel.SelectStrategyFactory;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -58,7 +58,12 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
*/ */
public NioEventLoopGroup( public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider); this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory);
} }
/** /**
@ -84,6 +89,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override @Override
protected EventExecutor newChild( protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception { ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]); return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy());
} }
} }