From 36b80c25f76b766a823198d35e944842dcdb1714 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 2 Jul 2014 07:07:38 +0200 Subject: [PATCH] Correctly return from selector loop one a scheduled task is ready for processing Motivation: We use the nanoTime of the scheduledTasks to calculate the milli-seconds to wait for a select operation to select something. Once these elapsed we check if there was something selected or some task is ready for processing. Unfortunally we not take into account scheduled tasks here so the selection loop will continue if only scheduled tasks are ready for processing. This will delay the execution of these tasks. Modification: - Check if a scheduled task is ready after selecting - also make a tiny change in NioEventLoop to not trigger a rebuild if nothing was selected because the timeout was reached a few times in a row. Result: Execute scheduled tasks on time. --- .../concurrent/SingleThreadEventExecutor.java | 10 +++++++ .../netty/channel/epoll/EpollEventLoop.java | 16 +++++----- .../io/netty/channel/nio/NioEventLoop.java | 29 +++++++++++-------- 3 files changed, 35 insertions(+), 20 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 1270e84bc5..c5105786c8 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -253,6 +253,16 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor { return !taskQueue.isEmpty(); } + /** + * Returns {@code true} if a scheduled task is ready for processing by {@link #runAllTasks()} or + * {@link #runAllTasks(long)}. + */ + protected boolean hasScheduledTasks() { + assert inEventLoop(); + ScheduledFutureTask delayedTask = delayedTaskQueue.peek(); + return delayedTask != null && delayedTask.deadlineNanos() <= ScheduledFutureTask.nanoTime(); + } + /** * Return the number of tasks that are pending for processing. * diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 037415d5d7..62a64cee44 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -54,7 +54,6 @@ final class EpollEventLoop extends SingleThreadEventLoop { private final long[] events; private int id; - private int oldWakenUp; private boolean overflown; @SuppressWarnings("unused") @@ -176,7 +175,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { this.ioRatio = ioRatio; } - private int epollWait() { + private int epollWait(boolean oldWakenUp) { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); @@ -195,10 +194,11 @@ final class EpollEventLoop extends SingleThreadEventLoop { int selectedKeys = Native.epollWait(epollFd, events, (int) timeoutMillis); selectCnt ++; - if (selectedKeys != 0 || oldWakenUp == 1 || wakenUp == 1 || hasTasks()) { - // Selected something, - // waken up by user, or - // the task queue has a pending task. + if (selectedKeys != 0 || oldWakenUp || wakenUp == 1 || hasTasks() || hasScheduledTasks()) { + // - Selected something, + // - waken up by user, or + // - the task queue has a pending task. + // - a scheduled task is ready for processing return selectedKeys; } currentTimeNanos = System.nanoTime(); @@ -209,14 +209,14 @@ final class EpollEventLoop extends SingleThreadEventLoop { @Override protected void run() { for (;;) { - oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0); + boolean oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0) == 1; try { int ready; if (hasTasks()) { // Non blocking just return what is ready directly without block ready = Native.epollWait(epollFd, events, 0); } else { - ready = epollWait(); + ready = epollWait(oldWakenUp); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index 79a024e8ed..2c4ce07481 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -20,7 +20,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.EventLoopException; import io.netty.channel.SingleThreadEventLoop; -import io.netty.channel.nio.AbstractNioChannel.NioUnsafe; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; @@ -40,6 +39,7 @@ import java.util.Iterator; import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -105,7 +105,6 @@ public final class NioEventLoop extends SingleThreadEventLoop { * waken up. */ private final AtomicBoolean wakenUp = new AtomicBoolean(); - private boolean oldWakenUp; private volatile int ioRatio = 50; private int cancelledKeys; @@ -302,12 +301,12 @@ public final class NioEventLoop extends SingleThreadEventLoop { @Override protected void run() { for (;;) { - oldWakenUp = wakenUp.getAndSet(false); + boolean oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { - select(); + select(oldWakenUp); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up @@ -496,7 +495,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { } private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { - final NioUnsafe unsafe = ch.unsafe(); + final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); @@ -603,7 +602,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { } } - private void select() throws IOException { + private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; @@ -622,10 +621,11 @@ public final class NioEventLoop extends SingleThreadEventLoop { int selectedKeys = selector.select(timeoutMillis); selectCnt ++; - if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) { - // Selected something, - // waken up by user, or - // the task queue has a pending task. + if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { + // - Selected something, + // - waken up by user, or + // - the task queue has a pending task. + // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { @@ -642,7 +642,12 @@ public final class NioEventLoop extends SingleThreadEventLoop { selectCnt = 1; break; } - if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && + + long time = System.nanoTime(); + if ((time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis)) >= currentTimeNanos) { + // timeoutMillis elapsed without anything selected. + selectCnt = 1; + } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. @@ -659,7 +664,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { break; } - currentTimeNanos = System.nanoTime(); + currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {