diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 1270e84bc5..c5105786c8 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -253,6 +253,16 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return !taskQueue.isEmpty(); } + /** + * Returns {@code true} if a scheduled task is ready for processing by {@link #runAllTasks()} or + * {@link #runAllTasks(long)}. + */ + protected boolean hasScheduledTasks() { + assert inEventLoop(); + ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); + return delayedTask != null && delayedTask.deadlineNanos() <= ScheduledFutureTask.nanoTime(); + } + /** * Return the number of tasks that are pending for processing. * diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 037415d5d7..62a64cee44 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -54,7 +54,6 @@ final class EpollEventLoop extends SingleThreadEventLoop { private final long[] events; private int id; - private int oldWakenUp; private boolean overflown; @SuppressWarnings("unused") @@ -176,7 +175,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { this.ioRatio = ioRatio; } - private int epollWait() { + private int epollWait(boolean oldWakenUp) { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); @@ -195,10 +194,11 @@ final class EpollEventLoop extends SingleThreadEventLoop { int selectedKeys = Native.epollWait(epollFd, events, (int) timeoutMillis); selectCnt ++; - if (selectedKeys != 0 || oldWakenUp == 1 || wakenUp == 1 || hasTasks()) { - // Selected something, - // waken up by user, or - // the task queue has a pending task. + if (selectedKeys != 0 || oldWakenUp || wakenUp == 1 || hasTasks() || hasScheduledTasks()) { + // - Selected something, + // - waken up by user, or + // - the task queue has a pending task. + // - a scheduled task is ready for processing return selectedKeys; } currentTimeNanos = System.nanoTime(); @@ -209,14 +209,14 @@ final class EpollEventLoop extends SingleThreadEventLoop { @Override protected void run() { for (;;) { - oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0); + boolean oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0) == 1; try { int ready; if (hasTasks()) { // Non blocking just return what is ready directly without block ready = Native.epollWait(epollFd, events, 0); } else { - ready = epollWait(); + ready = epollWait(oldWakenUp); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index 79a024e8ed..2c4ce07481 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -20,7 +20,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.EventLoopException; import io.netty.channel.SingleThreadEventLoop; -import io.netty.channel.nio.AbstractNioChannel.NioUnsafe; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -40,6 +39,7 @@ import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -105,7 +105,6 @@ public final class NioEventLoop extends SingleThreadEventLoop { * waken up. */ private final AtomicBoolean wakenUp = new AtomicBoolean(); - private boolean oldWakenUp; private volatile int ioRatio = 50; private int cancelledKeys; @@ -302,12 +301,12 @@ public final class NioEventLoop extends SingleThreadEventLoop { @Override protected void run() { for (;;) { - oldWakenUp = wakenUp.getAndSet(false); + boolean oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { - select(); + select(oldWakenUp); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up @@ -496,7 +495,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { } private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { - final NioUnsafe unsafe = ch.unsafe(); + final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); @@ -603,7 +602,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { } } - private void select() throws IOException { + private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; @@ -622,10 +621,11 @@ public final class NioEventLoop extends SingleThreadEventLoop { int selectedKeys = selector.select(timeoutMillis); selectCnt ++; - if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) { - // Selected something, - // waken up by user, or - // the task queue has a pending task. + if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { + // - Selected something, + // - waken up by user, or + // - the task queue has a pending task. + // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { @@ -642,7 +642,12 @@ public final class NioEventLoop extends SingleThreadEventLoop { selectCnt = 1; break; } - if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && + + long time = System.nanoTime(); + if ((time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis)) >= currentTimeNanos) { + // timeoutMillis elapsed without anything selected. + selectCnt = 1; + } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. @@ -659,7 +664,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { break; } - currentTimeNanos = System.nanoTime(); + currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {