diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index a31313f99c..e9622964be 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -170,7 +170,13 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { */ protected Runnable pollTask() { assert inEventLoop(); - return taskQueue.poll(); + for (;;) { + Runnable task = taskQueue.poll(); + if (task == WAKEUP_TASK) { + continue; + } + return task; + } } /** @@ -230,27 +236,65 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { /** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. + * + * @return {@code true} if and only if at least one task was run */ protected boolean runAllTasks() { - boolean ran = false; + Runnable task = pollTask(); + if (task == null) { + return false; + } + for (;;) { - final Runnable task = pollTask(); - if (task == null) { - break; - } - - if (task == WAKEUP_TASK) { - continue; - } - try { task.run(); - ran = true; } catch (Throwable t) { logger.warn("A task raised an exception.", t); } + + task = pollTask(); + if (task == null) { + return true; + } } - return ran; + } + + /** + * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running + * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}. + */ + protected boolean runAllTasks(long timeoutNanos) { + Runnable task = pollTask(); + if (task == null) { + return false; + } + + final long deadline = System.nanoTime() + timeoutNanos; + long runTasks = 0; + for (;;) { + try { + task.run(); + } catch (Throwable t) { + logger.warn("A task raised an exception.", t); + } + + runTasks ++; + + // Check timeout every 64 tasks because System.nanoTime() is relatively expensive. + // XXX: Hard-coded value - will make it configurable if it is really a problem. + if ((runTasks & 0x40) == 0) { + if (System.nanoTime() >= deadline) { + break; + } + } + + task = pollTask(); + if (task == null) { + break; + } + } + + return true; } /** diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index b33fe1267d..b8d8051f4c 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -18,10 +18,11 @@ package io.netty.channel.nio; import io.netty.channel.Channel; import io.netty.channel.ChannelException; -import io.netty.util.concurrent.TaskScheduler; import io.netty.channel.EventLoopException; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.nio.AbstractNioChannel.NioUnsafe; +import io.netty.util.concurrent.TaskScheduler; +import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -39,6 +40,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -54,8 +56,36 @@ public final class NioEventLoop extends SingleThreadEventLoop { private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class); - static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. + private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. + private static final boolean EPOLL_BUG_WORKAROUND = + SystemPropertyUtil.getBoolean("io.netty.epollBugWorkaround", false); + + private static final long SELECT_TIMEOUT = SystemPropertyUtil.getLong("io.netty.selectTimeout", 500); + private static final long SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(SELECT_TIMEOUT); + + // Workaround for JDK NIO bug. + // + // See: + // - http://bugs.sun.com/view_bug.do?bug_id=6427854 + // - https://github.com/netty/netty/issues/203 + static { + String key = "sun.nio.ch.bugLevel"; + try { + String buglevel = System.getProperty(key); + if (buglevel == null) { + System.setProperty(key, ""); + } + } catch (SecurityException e) { + if (logger.isDebugEnabled()) { + logger.debug("Unable to get/set System Property '" + key + '\'', e); + } + } + if (logger.isDebugEnabled()) { + logger.debug("Using select timeout of " + SELECT_TIMEOUT); + logger.debug("Epoll-bug workaround enabled = " + EPOLL_BUG_WORKAROUND); + } + } /** * The NIO {@link Selector}. */ @@ -71,6 +101,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { */ private final AtomicBoolean wakenUp = new AtomicBoolean(); + private volatile int ioRatio = 50; private int cancelledKeys; private boolean cleanedCancelledKeys; @@ -148,6 +179,28 @@ public final class NioEventLoop extends SingleThreadEventLoop { } } + /** + * Returns the percentage of the desired amount of time spent for I/O in the event loop. + */ + public int getIoRatio() { + return ioRatio; + } + + /** + * Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is + * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. + */ + public void setIoRatio(int ioRatio) { + if (ioRatio <= 0 || ioRatio >= 100) { + throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio < 100)"); + } + this.ioRatio = ioRatio; + } + + /** + * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work + * around the infamous epoll 100% CPU bug. + */ public void rebuildSelector() { if (!inEventLoop()) { execute(new Runnable() { @@ -224,85 +277,91 @@ public final class NioEventLoop extends SingleThreadEventLoop { @Override protected void run() { + // use 80% of the timeout for measure + final long minSelectTimeout = SELECT_TIMEOUT_NANOS / 100 * 80; + Selector selector = this.selector; int selectReturnsImmediately = 0; - // use 80% of the timeout for measure - long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS / 100 * 80; - for (;;) { wakenUp.set(false); try { - long beforeSelect = System.nanoTime(); - int selected = SelectorUtil.select(selector); + if (hasTasks()) { + selectNow(); + } else { + long beforeSelect = System.nanoTime(); + int selected = select(); + if (EPOLL_BUG_WORKAROUND) { + if (selected == 0) { + long timeBlocked = System.nanoTime() - beforeSelect; + if (timeBlocked < minSelectTimeout) { + // returned before the minSelectTimeout elapsed with nothing select. + // this may be the cause of the jdk epoll(..) bug, so increment the counter + // which we use later to see if its really the jdk bug. + selectReturnsImmediately ++; + } else { + selectReturnsImmediately = 0; + } + if (selectReturnsImmediately == 10) { + // The selector returned immediately for 10 times in a row, + // so recreate one selector as it seems like we hit the + // famous epoll(..) jdk bug. + rebuildSelector(); + selector = this.selector; + selectReturnsImmediately = 0; - if (SelectorUtil.EPOLL_BUG_WORKAROUND) { - if (selected == 0) { - long timeBlocked = System.nanoTime() - beforeSelect; - if (timeBlocked < minSelectTimeout) { - // returned before the minSelectTimeout elapsed with nothing select. - // this may be the cause of the jdk epoll(..) bug, so increment the counter - // which we use later to see if its really the jdk bug. - selectReturnsImmediately ++; + // try to select again + continue; + } } else { + // reset counter selectReturnsImmediately = 0; } - if (selectReturnsImmediately == 10) { - // The selector returned immediately for 10 times in a row, - // so recreate one selector as it seems like we hit the - // famous epoll(..) jdk bug. - rebuildSelector(); - selector = this.selector; - selectReturnsImmediately = 0; - - // try to select again - continue; - } - } else { - // reset counter - selectReturnsImmediately = 0; } - } - // 'wakenUp.compareAndSet(false, true)' is always evaluated - // before calling 'selector.wakeup()' to reduce the wake-up - // overhead. (Selector.wakeup() is an expensive operation.) - // - // However, there is a race condition in this approach. - // The race condition is triggered when 'wakenUp' is set to - // true too early. - // - // 'wakenUp' is set to true too early if: - // 1) Selector is waken up between 'wakenUp.set(false)' and - // 'selector.select(...)'. (BAD) - // 2) Selector is waken up between 'selector.select(...)' and - // 'if (wakenUp.get()) { ... }'. (OK) - // - // In the first case, 'wakenUp' is set to true and the - // following 'selector.select(...)' will wake up immediately. - // Until 'wakenUp' is set to false again in the next round, - // 'wakenUp.compareAndSet(false, true)' will fail, and therefore - // any attempt to wake up the Selector will fail, too, causing - // the following 'selector.select(...)' call to block - // unnecessarily. - // - // To fix this problem, we wake up the selector again if wakenUp - // is true immediately after selector.select(...). - // It is inefficient in that it wakes up the selector for both - // the first case (BAD - wake-up required) and the second case - // (OK - no wake-up required). + // 'wakenUp.compareAndSet(false, true)' is always evaluated + // before calling 'selector.wakeup()' to reduce the wake-up + // overhead. (Selector.wakeup() is an expensive operation.) + // + // However, there is a race condition in this approach. + // The race condition is triggered when 'wakenUp' is set to + // true too early. + // + // 'wakenUp' is set to true too early if: + // 1) Selector is waken up between 'wakenUp.set(false)' and + // 'selector.select(...)'. (BAD) + // 2) Selector is waken up between 'selector.select(...)' and + // 'if (wakenUp.get()) { ... }'. (OK) + // + // In the first case, 'wakenUp' is set to true and the + // following 'selector.select(...)' will wake up immediately. + // Until 'wakenUp' is set to false again in the next round, + // 'wakenUp.compareAndSet(false, true)' will fail, and therefore + // any attempt to wake up the Selector will fail, too, causing + // the following 'selector.select(...)' call to block + // unnecessarily. + // + // To fix this problem, we wake up the selector again if wakenUp + // is true immediately after selector.select(...). + // It is inefficient in that it wakes up the selector for both + // the first case (BAD - wake-up required) and the second case + // (OK - no wake-up required). - if (wakenUp.get()) { - selector.wakeup(); + if (wakenUp.get()) { + selector.wakeup(); + } } cancelledKeys = 0; + final long ioStartTime = System.nanoTime(); processSelectedKeys(); selector = this.selector; + final long ioTime = System.nanoTime() - ioStartTime; - runAllTasks(); + final int ioRatio = this.ioRatio; + runAllTasks(ioTime * (100 - ioRatio) / ioRatio); selector = this.selector; if (isShutdown()) { @@ -312,8 +371,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { } } } catch (Throwable t) { - logger.warn( - "Unexpected exception in the selector loop.", t); + logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. @@ -341,7 +399,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; cleanedCancelledKeys = true; - SelectorUtil.cleanupKeys(selector); + cleanupKeys(); } } @@ -475,7 +533,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { } private void closeAll() { - SelectorUtil.cleanupKeys(selector); + cleanupKeys(); Set keys = selector.keys(); Collection channels = new ArrayList(keys.size()); for (SelectionKey k: keys) { @@ -521,4 +579,27 @@ public final class NioEventLoop extends SingleThreadEventLoop { } } } + + private int select() throws IOException { + try { + return selector.select(); + } catch (CancelledKeyException e) { + if (logger.isDebugEnabled()) { + logger.debug( + CancelledKeyException.class.getSimpleName() + + " raised by a Selector - JDK bug?", e); + } + // Harmless exception - log anyway + } + + return -1; + } + + void cleanupKeys() { + try { + selector.selectNow(); + } catch (Throwable t) { + logger.warn("Failed to update SelectionKeys.", t); + } + } } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java index 2f491876f4..d0068bb672 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoopGroup.java @@ -16,9 +16,9 @@ package io.netty.channel.nio; import io.netty.channel.Channel; -import io.netty.util.concurrent.TaskScheduler; -import io.netty.util.concurrent.EventExecutor; import io.netty.channel.MultithreadEventLoopGroup; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.TaskScheduler; import java.nio.channels.Selector; import java.nio.channels.spi.SelectorProvider; @@ -62,6 +62,16 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup { super(nThreads, threadFactory, selectorProvider); } + /** + * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is + * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. + */ + public void setIoRatio(int ioRatio) { + for (EventExecutor e: children()) { + ((NioEventLoop) e).setIoRatio(ioRatio); + } + } + /** * Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work * around the infamous epoll 100% CPU bug. diff --git a/transport/src/main/java/io/netty/channel/nio/SelectorUtil.java b/transport/src/main/java/io/netty/channel/nio/SelectorUtil.java deleted file mode 100644 index 2a22785045..0000000000 --- a/transport/src/main/java/io/netty/channel/nio/SelectorUtil.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2012 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.nio; - -import io.netty.util.internal.SystemPropertyUtil; -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import java.io.IOException; -import java.nio.channels.CancelledKeyException; -import java.nio.channels.Selector; -import java.util.concurrent.TimeUnit; - -/** - * Utility class for operate on a {@link Selector} - */ -final class SelectorUtil { - private static final InternalLogger logger = - InternalLoggerFactory.getInstance(SelectorUtil.class); - static final long DEFAULT_SELECT_TIMEOUT = 500; - static final long SELECT_TIMEOUT = - SystemPropertyUtil.getLong("io.netty.selectTimeout", DEFAULT_SELECT_TIMEOUT); - static final long SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(SELECT_TIMEOUT); - static final boolean EPOLL_BUG_WORKAROUND = - SystemPropertyUtil.getBoolean("io.netty.epollBugWorkaround", false); - - // Workaround for JDK NIO bug. - // - // See: - // - http://bugs.sun.com/view_bug.do?bug_id=6427854 - // - https://github.com/netty/netty/issues/203 - static { - String key = "sun.nio.ch.bugLevel"; - try { - String buglevel = System.getProperty(key); - if (buglevel == null) { - System.setProperty(key, ""); - } - } catch (SecurityException e) { - if (logger.isDebugEnabled()) { - logger.debug("Unable to get/set System Property '" + key + '\'', e); - } - } - if (logger.isDebugEnabled()) { - logger.debug("Using select timeout of " + SELECT_TIMEOUT); - logger.debug("Epoll-bug workaround enabled = " + EPOLL_BUG_WORKAROUND); - } - } - - static int select(Selector selector) throws IOException { - try { - return selector.select(SELECT_TIMEOUT); - } catch (CancelledKeyException e) { - if (logger.isDebugEnabled()) { - logger.debug( - CancelledKeyException.class.getSimpleName() + - " raised by a Selector - JDK bug?", e); - } - // Harmless exception - log anyway - } - return -1; - } - - static void cleanupKeys(Selector selector) { - try { - selector.selectNow(); - } catch (Throwable t) { - logger.warn("Failed to update SelectionKeys.", t); - } - } - - private SelectorUtil() { - // Unused - } -}