Allow to customize NIO (channel) select strategies.

Motivation:

Under high throughput/low latency workloads, selector wakeups are
degrading performance when the incoming operations are triggered
from outside of the event loop. This is a common scenario for
"client" applications where the originating input is coming from
application threads rather from the socket attached inside the
event loops.

As a result, it can be desirable to defer the blocking select
so that incoming tasks (write/flush) do not need to wakeup
the selector.

Modifications:

This changeset adds the notion of a generic SelectStrategy which,
based on its contract, allows the implementation to optionally
defer the blocking select based on some custom criteria.

The default implementation resembles the original behaviour, that
is if tasks are in the queue `selectNow()` and move on, and if no
tasks need to be processed go into the blocking select and wait
for wakeup.

The strategy can be customized per `NioEventLoopGroup` in the
constructor.

Result:

High performance client applications are now given the chance to
customize for how long the actual selector blocking should be
deferred by employing a custom select strategy.
This commit is contained in:
Michael Nitschinger 2016-03-09 13:30:08 +01:00 committed by Scott Mitchell
parent 2facb7affd
commit 5d76daf33b
7 changed files with 250 additions and 42 deletions

View File

@ -0,0 +1,29 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
/**
* Represents a supplier of {@code int}-valued results.
*/
public interface IntSupplier {
/**
* Gets a result.
*
* @return a result
*/
int get() throws Exception;
}

View File

@ -0,0 +1,48 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.IntSupplier;
/**
* Select strategy interface.
*
* Provides the ability to control the behavior of the select loop. For example a blocking select
* operation can be delayed or skipped entirely if there are events to process immediately.
*/
public interface SelectStrategy {
/**
* Indicates a blocking select should follow.
*/
int SELECT = -1;
/**
* Indicates the IO loop should be retried, no blocking select to follow directly.
*/
int CONTINUE = -2;
/**
* The {@link SelectStrategy} can be used to steer the outcome of a potential select
* call.
*
* @param selectSupplier The supplier with the result of a select result.
* @param hasTasks true if tasks are waiting to be processed.
* @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
* the next step should be to not select but rather jump back to the IO loop and try
* again. Any value >= 0 is treated as an indicator that work needs to be done.
*/
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}

View File

@ -0,0 +1,27 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
/**
* Factory that creates a new {@link SelectStrategy} every time.
*/
public interface SelectStrategyFactory {
/**
* Creates a new {@link SelectStrategy}.
*/
SelectStrategy newSelectStrategy();
}

View File

@ -0,0 +1,38 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
import io.netty.channel.SelectStrategy;
import io.netty.util.IntSupplier;
/**
* Default {@link SelectStrategy} which triggers the blocking select without backoff if no
* tasks are in the queue to be processed.
*/
final class DefaultSelectStrategy implements SelectStrategy {
public static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() {
// singleton.
}
@Override
public int calculateStrategy(final IntSupplier supplier, final boolean hasTasks)
throws Exception {
return hasTasks ? supplier.get() : SelectStrategy.SELECT;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.nio;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SelectStrategyFactory;
/**
* Select Strategy Factory for the Noop implementation (default).
*/
final class DefaultSelectStrategyFactory implements SelectStrategyFactory {
public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();
private DefaultSelectStrategyFactory() {
// singleton
}
@Override
public SelectStrategy newSelectStrategy() {
return DefaultSelectStrategy.INSTANCE;
}
}

View File

@ -19,7 +19,9 @@ package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoopException;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.IntSupplier;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -59,6 +61,13 @@ public final class NioEventLoop extends SingleThreadEventLoop {
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
// Workaround for JDK NIO bug.
//
// See:
@ -106,17 +115,24 @@ public final class NioEventLoop extends SingleThreadEventLoop {
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final SelectStrategy selectStrategy;
private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy) {
super(parent, executor, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}
private Selector openSelector() {
@ -300,13 +316,13 @@ public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
selectloop: for (;;) {
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue selectloop;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
@ -339,6 +355,8 @@ public final class NioEventLoop extends SingleThreadEventLoop {
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
@ -591,9 +609,9 @@ public final class NioEventLoop extends SingleThreadEventLoop {
}
}
void selectNow() throws IOException {
int selectNow() throws IOException {
try {
selector.selectNow();
return selector.selectNow();
} finally {
// restore wakup state if needed
if (wakenUp.get()) {

View File

@ -18,6 +18,7 @@ package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory;
import io.netty.util.concurrent.EventExecutor;
import java.nio.channels.Selector;
@ -64,12 +65,22 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
*/
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory);
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
super(nThreads, executor, selectorProvider);
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory);
}
/**
@ -94,6 +105,7 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy());
}
}