From a22d4ba859b115d353b4cea1af581b987249adf6 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Wed, 21 Aug 2019 03:34:22 -0700 Subject: [PATCH] Simplify EventLoop abstractions for timed scheduled tasks (#9470) Motivation The epoll transport was updated in #7834 to decouple setting of the timerFd from the event loop, so that scheduling delayed tasks does not require waking up epoll_wait. To achieve this, new overridable hooks were added in the AbstractScheduledEventExecutor and SingleThreadEventExecutor superclasses. However, the minimumDelayScheduledTaskRemoved hook has no current purpose and I can't envisage a _practical_ need for it. Removing it would reduce complexity and avoid supporting this specific API indefinitely. We can add something similar later if needed but the opposite is not true. There also isn't a _nice_ way to use the abstractions for wakeup-avoidance optimizations in other EventLoops that don't have a decoupled timer. This PR replaces executeScheduledRunnable and wakesUpForScheduledRunnable with two new methods before/afterFutureTaskScheduled that have slightly different semantics: - They only apply to additions; given the current internals there's no practical use for removals - They allow per-submission wakeup decisions via a boolean return val, which makes them easier to exploit from other existing EL impls (e.g. NIO/KQueue) - They are subjectively "cleaner", taking just the deadline parameter and not exposing Runnables - For current EL/queue impls, only the "after" hook is really needed, but specialized blocking queue impls can conditionally wake on task submission (I have one lined up) Also included are further optimization/simplification/fixes to the timerFd manipulation logic. Modifications - Remove AbstractScheduledEventExecutor#minimumDelayScheduledTaskRemoved() and supporting methods - Uplift NonWakeupRunnable and corresponding default wakesUpForTask() impl from SingleThreadEventLoop to SingleThreadEventExecutor - Change executeScheduledRunnable() to be package-private, and have a final impl in SingleThreadEventExecutor which triggers new overridable hooks before/afterFutureTaskScheduled() - Remove unnecessary use of bookend tasks while draining the task queue - Use new hooks to add simpler wake-up avoidance optimization to NioEventLoop (primarily to demonstrate utility/simplicity) - Reinstate removed EpollTest class In EpollEventLoop: - Refactor to use only the new afterFutureTaskScheduled() hook for updating timerFd - Fix setTimerFd race condition using a monitor - Set nextDeadlineNanos to a negative value while the EL is awake and use this to block timer changes from outside the EL. Restore the known-set value prior to sleeping, updating timerFd first if necessary - Don't read from timerFd when processing expiry event Result - Cleaner API for integrating with different EL/queue timing impls - Fixed race condition to avoid missing scheduled wakeups - Eliminate unnecessary timerFd updates while EL is awake, and unnecessary expired timerFd reads - Avoid unnecessary scheduled-task wakeups when using NIO transport I did not yet further explore the suggestion of using TFD_TIMER_ABSTIME for the timerFd. --- .../AbstractScheduledEventExecutor.java | 50 +----- .../concurrent/SingleThreadEventExecutor.java | 153 +++++++++------- .../netty/channel/epoll/EpollEventLoop.java | 168 +++++++----------- .../io/netty/channel/epoll/EpollTest.java | 72 ++++++++ .../netty/channel/SingleThreadEventLoop.java | 21 +-- .../io/netty/channel/nio/NioEventLoop.java | 16 ++ 6 files changed, 251 insertions(+), 229 deletions(-) create mode 100644 transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTest.java diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 6b52151dbf..656c57847b 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -102,10 +102,6 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut } scheduledTaskQueue.clearIgnoringIndexes(); - - // calling minimumDelayScheduledTaskRemoved was considered here to give a chance for EventLoop - // implementations the opportunity to cleanup any timerFds, but this is currently only called when the EventLoop - // is being shutdown, so the timerFd and associated polling mechanism will be destroyed anyways. } /** @@ -120,24 +116,15 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}. */ protected final Runnable pollScheduledTask(long nanoTime) { - Queue> scheduledTaskQueue = this.scheduledTaskQueue; - return scheduledTaskQueue != null ? pollScheduledTask(scheduledTaskQueue, nanoTime, true) : null; - } - - final Runnable pollScheduledTask(Queue> scheduledTaskQueue, long nanoTime, - boolean notifyMinimumDeadlineRemoved) { - assert scheduledTaskQueue != null; assert inEventLoop(); - ScheduledFutureTask scheduledTask = scheduledTaskQueue.peek(); - if (scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime) { - scheduledTaskQueue.poll(); - if (notifyMinimumDeadlineRemoved) { - minimumDelayScheduledTaskRemoved(scheduledTask, scheduledTask.deadlineNanos()); - } - return scheduledTask; + Queue> scheduledTaskQueue = this.scheduledTaskQueue; + ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) { + return null; } - return null; + scheduledTaskQueue.remove(); + return scheduledTask; } /** @@ -269,29 +256,17 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut final void removeScheduled(final ScheduledFutureTask task) { if (inEventLoop()) { - removedSchedule0(task); + scheduledTaskQueue().removeTyped(task); } else { executeScheduledRunnable(new Runnable() { @Override public void run() { - removedSchedule0(task); + scheduledTaskQueue().removeTyped(task); } }, false, task.deadlineNanos()); } } - private void removedSchedule0(final ScheduledFutureTask task) { - if (scheduledTaskQueue == null || task == null) { - return; - } - if (scheduledTaskQueue.peek() == task) { - scheduledTaskQueue.poll(); - minimumDelayScheduledTaskRemoved(task, task.deadlineNanos()); - } else { - scheduledTaskQueue.removeTyped(task); - } - } - /** * Execute a {@link Runnable} from outside the event loop thread that is responsible for adding or removing * a scheduled action. Note that schedule events which occur on the event loop thread do not interact with this @@ -301,16 +276,9 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut * action * @param deadlineNanos the deadline in nanos of the scheduled task that will be added or removed. */ - protected void executeScheduledRunnable(Runnable runnable, + void executeScheduledRunnable(Runnable runnable, @SuppressWarnings("unused") boolean isAddition, @SuppressWarnings("unused") long deadlineNanos) { execute(runnable); } - - /** - * The next task to expire (e.g. minimum delay) has been removed from the scheduled priority queue. - */ - protected void minimumDelayScheduledTaskRemoved(@SuppressWarnings("unused") Runnable task, - @SuppressWarnings("unused") long deadlineNanos) { - } } diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index cf635f7216..4d77da11a7 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -73,12 +73,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx // Do nothing. } }; - private static final Runnable BOOKEND_TASK = new Runnable() { - @Override - public void run() { - // Do nothing. - } - }; private static final AtomicIntegerFieldUpdater STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state"); @@ -184,6 +178,33 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } + /** + * Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission. + * Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to + * process the scheduled task (if not already awake). + *

+ * If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with + * the same value after the scheduled task is enqueued, providing another opportunity + * to wake the {@link EventExecutor} thread if required. + * + * @param deadlineNanos deadline of the to-be-scheduled task + * relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise + */ + protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { + return true; + } + + /** + * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false. + * + * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise + */ + protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { + return true; + } + /** * @deprecated Please use and override {@link #newTaskQueue(int)}. */ @@ -225,10 +246,9 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx protected static Runnable pollTaskFrom(Queue taskQueue) { for (;;) { Runnable task = taskQueue.poll(); - if (task == WAKEUP_TASK) { - continue; + if (task != WAKEUP_TASK) { + return task; } - return task; } } @@ -293,35 +313,35 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx return true; } long nanoTime = AbstractScheduledEventExecutor.nanoTime(); - Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, true); - while (scheduledTask != null) { + for (;;) { + Runnable scheduledTask = pollScheduledTask(nanoTime); + if (scheduledTask == null) { + return true; + } if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue.add((ScheduledFutureTask) scheduledTask); return false; } - scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false); } - return true; } /** * @return {@code true} if at least one scheduled task was executed. */ - private boolean executeExpiredScheduledTasks(boolean notifyMinimumDeadlineRemoved) { + private boolean executeExpiredScheduledTasks() { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return false; } long nanoTime = AbstractScheduledEventExecutor.nanoTime(); - Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, notifyMinimumDeadlineRemoved); - if (scheduledTask != null) { - do { - safeExecute(scheduledTask); - scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false); - } while (scheduledTask != null); - return true; + Runnable scheduledTask = pollScheduledTask(nanoTime); + if (scheduledTask == null) { + return false; } - return false; + do { + safeExecute(scheduledTask); + } while ((scheduledTask = pollScheduledTask(nanoTime)) != null); + return true; } /** @@ -414,17 +434,13 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx */ protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) { assert inEventLoop(); - // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued - // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe. - boolean ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue); - boolean ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(true); + boolean ranAtLeastOneTask; int drainAttempt = 0; - while ((ranAtLeastOneExecutorTask || ranAtLeastOneScheduledTask) && ++drainAttempt < maxDrainAttempts) { + do { // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe. - ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue); - ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(false); - } + ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks(); + } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts); if (drainAttempt > 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); @@ -461,46 +477,17 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx * @return {@code true} if at least {@link Runnable#run()} was called. */ private boolean runExistingTasksFrom(Queue taskQueue) { - return taskQueue.offer(BOOKEND_TASK) ? runExistingTasksUntilBookend(taskQueue) - : runExistingTasksUntilMaxTasks(taskQueue); - } - - private boolean runExistingTasksUntilBookend(Queue taskQueue) { Runnable task = pollTaskFrom(taskQueue); - // null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue, and - // null elements are not permitted to be inserted into the queue. - if (task == BOOKEND_TASK) { - return false; - } - for (;;) { - safeExecute(task); - task = pollTaskFrom(taskQueue); - // null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue, - // and null elements are not permitted to be inserted into the queue. - if (task == BOOKEND_TASK) { - return true; - } - } - } - - private boolean runExistingTasksUntilMaxTasks(Queue taskQueue) { - Runnable task = pollTaskFrom(taskQueue); - // BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted into - // the queue, and if was previously inserted we always drain all the elements from queue including BOOKEND_TASK. if (task == null) { return false; } - int i = 0; - do { + int remaining = Math.min(maxPendingTasks, taskQueue.size()); + safeExecute(task); + // Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may + // silently consume more than one item from the queue (skips over WAKEUP_TASK instances) + while (remaining-- > 0 && (task = taskQueue.poll()) != null) { safeExecute(task); - task = pollTaskFrom(taskQueue); - // BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted - // into the queue, and if was previously inserted we always drain all the elements from queue including - // BOOKEND_TASK. - if (task == null) { - return true; - } - } while (++i < maxPendingTasks); + } return true; } @@ -607,6 +594,25 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } } + @Override + final void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) { + // Don't wakeup if this is a removal task or if beforeScheduledTaskSubmitted returns false + if (isAddition && beforeScheduledTaskSubmitted(deadlineNanos)) { + super.executeScheduledRunnable(runnable, isAddition, deadlineNanos); + } else { + super.executeScheduledRunnable(new NonWakeupRunnable() { + @Override + public void run() { + runnable.run(); + } + }, isAddition, deadlineNanos); + // Second hook after scheduling to facilitate race-avoidance + if (isAddition && afterScheduledTaskSubmitted(deadlineNanos)) { + wakeup(false); + } + } + } + @Override public boolean inEventLoop(Thread thread) { return thread == this.thread; @@ -954,8 +960,21 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx return threadProperties; } - protected boolean wakesUpForTask(@SuppressWarnings("unused") Runnable task) { - return true; + /** + * Marker interface for {@link Runnable} to indicate that it should be queued for execution + * but does not need to run immediately. The default implementation of + * {@link SingleThreadEventExecutor#wakesUpForTask(Runnable)} uses this to avoid waking up + * the {@link EventExecutor} thread when not necessary. + */ + protected interface NonWakeupRunnable extends Runnable { } + + /** + * Can be overridden to control which tasks require waking the {@link EventExecutor} thread + * if it is waiting so that they can be run immediately. The default implementation + * decides based on whether the task implements {@link NonWakeupRunnable}. + */ + protected boolean wakesUpForTask(Runnable task) { + return !(task instanceof NonWakeupRunnable); } protected static void reject() { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 98b69bbb4d..187265d4f7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -47,10 +47,6 @@ import static java.lang.Math.min; */ class EpollEventLoop extends SingleThreadEventLoop { private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class); - /** - * The maximum deadline value before overlap occurs on the time source. - */ - private static final long MAXIMUM_DEADLINE = initialNanoTime() - 1; static { // Ensure JNI is initialized by the time this class is loaded by this time! @@ -59,10 +55,14 @@ class EpollEventLoop extends SingleThreadEventLoop { } /** + * When in epollWait(), this mirrors the currently-set deadline of the timerFd. A negative value + * means that the event loop is awake, which blocks rescheduling activity by other threads. + * It is restored to the real timerFd expiry time again prior to entering epollWait(). + * * Note that we use deadline instead of delay because deadline is just a fixed number but delay requires interacting * with the time source (e.g. calling System.nanoTime()) which can be expensive. */ - private final AtomicLong nextDeadlineNanos = new AtomicLong(MAXIMUM_DEADLINE); + private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L); private final AtomicInteger wakenUp = new AtomicInteger(); private final FileDescriptor epollFd; private final FileDescriptor eventFd; @@ -181,31 +181,18 @@ class EpollEventLoop extends SingleThreadEventLoop { } @Override - protected void executeScheduledRunnable(Runnable runnable, boolean isAddition, long deadlineNanos) { - if (isAddition) { - try { - trySetTimerFd(deadlineNanos); - } catch (IOException cause) { - throw new RejectedExecutionException(cause); - } - } - // else this is a removal of scheduled task and we could attempt to detect if this task was responsible - // for the next delay, and find the next lowest delay in the queue to re-set the timer. However this - // is not practical for the following reasons: - // 1. The data structure is a PriorityQueue, and the scheduled task has not yet been removed. This means - // we would have to add/remove the head element to find the "next timeout". - // 2. We are not on the EventLoop thread, and the PriorityQueue is not thread safe. We could attempt - // to do (1) if we are on the EventLoop but when the EventLoop wakes up it checks if the timeout changes - // when it is woken up and before it calls epoll_wait again and adjusts the timer accordingly. - // The result is we wait until we are in the EventLoop and doing the actual removal, and also processing - // regular polling in the EventLoop too. - - execute(runnable); + protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { + return false; // don't wake event loop } @Override - protected boolean wakesUpForScheduledRunnable() { - return false; + protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { + try { + trySetTimerFd(deadlineNanos); + } catch (IOException e) { + throw new RejectedExecutionException(e); + } + return false; // don't wake event loop } @Override @@ -218,19 +205,22 @@ class EpollEventLoop extends SingleThreadEventLoop { private void trySetTimerFd(long candidateNextDeadline) throws IOException { for (;;) { long nextDeadline = nextDeadlineNanos.get(); - if (nextDeadline - candidateNextDeadline <= 0) { - break; + if (nextDeadline <= candidateNextDeadline) { + // This includes case where nextDeadline is negative (event loop is awake) + return; } if (nextDeadlineNanos.compareAndSet(nextDeadline, candidateNextDeadline)) { - setTimerFd(deadlineToDelayNanos(candidateNextDeadline)); - // We are setting the timerFd outside of the EventLoop so it is possible that we raced with another call - // to set the timer and temporarily increased the value, in which case we should set it back to the - // lower value. - nextDeadline = nextDeadlineNanos.get(); - if (nextDeadline - candidateNextDeadline < 0) { - setTimerFd(deadlineToDelayNanos(nextDeadline)); + // We must serialize calls to setTimerFd to avoid the set of a later deadline + // racing with a sooner one and overwriting it. A second check of nextDeadlineNanos + // is made within the sync block to avoid having the CAS within the sync + synchronized (nextDeadlineNanos) { + nextDeadline = nextDeadlineNanos.get(); + if (nextDeadline == candidateNextDeadline || + (nextDeadline + Long.MAX_VALUE + 1) == candidateNextDeadline) { + setTimerFd(deadlineToDelayNanos(candidateNextDeadline)); + } } - break; + return; } } } @@ -250,39 +240,25 @@ class EpollEventLoop extends SingleThreadEventLoop { } } - private void checkScheduleTaskQueueForNewDelay() throws IOException { - final long deadlineNanos = nextScheduledTaskDeadlineNanos(); - if (deadlineNanos != -1) { - trySetTimerFd(deadlineNanos); + private long checkScheduleTaskQueueForNewDelay(long timerFdDeadline) throws IOException { + assert nextDeadlineNanos.get() < 0; + final long nextTaskDeadlineNanos = nextScheduledTaskDeadlineNanos(); + if (nextTaskDeadlineNanos == -1 || nextTaskDeadlineNanos >= timerFdDeadline) { + // Just restore to preexisting timerFd value, update not needed + nextDeadlineNanos.lazySet(timerFdDeadline); + } else { + synchronized (nextDeadlineNanos) { + // Shorter delay required than current timerFd setting, update it + nextDeadlineNanos.lazySet(timerFdDeadline = nextTaskDeadlineNanos); + setTimerFd(deadlineToDelayNanos(timerFdDeadline)); + } } + return timerFdDeadline; // Don't disarm the timerFd even if there are no more queued tasks. Since we are setting timerFd from outside // the EventLoop it is possible that another thread has set the timer and we may miss a wakeup if we disarm // the timer here. Instead we wait for the timer wakeup on the EventLoop and clear state for the next timer. } - @Override - protected void minimumDelayScheduledTaskRemoved(@SuppressWarnings("unused") Runnable task, - @SuppressWarnings("unused") long deadlineNanos) { - // It is OK to reset nextDeadlineNanos here because we are in the event loop thread, and the event loop is - // guaranteed to transfer all pending ScheduledFutureTasks from the executor queue to the scheduled - // PriorityQueue and we will set the next expiration time. If another thread races with this thread inserting a - // ScheduledFutureTasks into the executor queue it should be OK, assuming the executor queue insertion is - // visible to the event loop thread. - // - // Assume the current minimum timer is delayNanos = 10 - // Thread A -> execute(ScheduledFutureTask(delayNanos = 12)), - // add ScheduledFutureTask to the executor queue - // fail to set nextDeadlineNanos, so no call to setTimerFd is made - // EventLoop -> minimumDelayScheduledTaskRemoved(10), - // set nextDeadlineNanos to MAXIMUM_DEADLINE - // ... process more tasks ... - // drain all the tasks from the executor queue, see that 12 is the next delay, call setTimerFd - nextDeadlineNanos.set(MAXIMUM_DEADLINE); - - // Note that we don't actually call setTimerFd here, we don't want to interrupt the actual timerFd and let - // the end of the EventLoop determine what the timerFd value should be (after execute queue is drained). - } - @Override protected void wakeup(boolean inEventLoop) { if (!inEventLoop && wakenUp.getAndSet(1) == 0) { @@ -395,6 +371,7 @@ class EpollEventLoop extends SingleThreadEventLoop { @Override protected void run() { + long timerFdDeadline = Long.MAX_VALUE; for (;;) { try { processPendingChannelFlags(); @@ -412,28 +389,34 @@ class EpollEventLoop extends SingleThreadEventLoop { wakenUp.set(0); } if (!hasTasks()) { - strategy = epollWait(); + // When we are in the EventLoop we don't bother setting the timerFd for each + // scheduled task, but instead defer the processing until the end of the EventLoop + // (next wait) to reduce the timerFd modifications. + timerFdDeadline = checkScheduleTaskQueueForNewDelay(timerFdDeadline); + try { + strategy = epollWait(); + } finally { + // This getAndAdd will change the raw value of nextDeadlineNanos to be negative + // which will block any *new* timerFd mods by other threads while also "preserving" + // its last value to avoid disrupting a possibly-concurrent setTimerFd call + // (so that we can know the timerFd really did/will get updated to the read value). + timerFdDeadline = nextDeadlineNanos.getAndAdd(Long.MAX_VALUE + 1); + // The value of nextDeadlineNanos is now guaranteed to be negative + } } // fallthrough default: } try { - processReady(events, strategy); - } finally { - try { - // Note the timerFd code depends upon running all the tasks on each event loop run. This is so - // we can get an accurate "next wakeup time" after the event loop run completes. - runAllTasks(); - } finally { - // No need to drainScheduledQueue() after the fact, because all in event loop scheduling results - // in direct addition to the scheduled priority queue. - - // When we are in the EventLoop we don't bother setting the timerFd for each scheduled task, but - // instead defer the processing until the end of the EventLoop to reduce the timerFd - // modifications. - checkScheduleTaskQueueForNewDelay(); + if (processReady(events, strategy)) { + // Polled events include timerFd expiry; conservatively assume that no timer is set + timerFdDeadline = Long.MAX_VALUE; } + } finally { + runAllTasks(); + // No need to drainScheduledQueue() after the fact, because all in event loop scheduling results + // in direct addition to the scheduled priority queue. } if (allowGrowing && strategy == events.length()) { //increase the size of the array as we needed the whole space for the events @@ -487,7 +470,9 @@ class EpollEventLoop extends SingleThreadEventLoop { } } - private void processReady(EpollEventArray events, int ready) { + // Returns true if a timerFd event was encountered + private boolean processReady(EpollEventArray events, int ready) { + boolean timerFired = false; for (int i = 0; i < ready; ++i) { final int fd = events.fd(i); if (fd == eventFd.intValue()) { @@ -495,27 +480,7 @@ class EpollEventLoop extends SingleThreadEventLoop { // // See also https://stackoverflow.com/a/12492308/1074097 } else if (fd == timerFd.intValue()) { - // consume wakeup event, necessary because the timer is added with ET mode. - Native.timerFdRead(fd); - - // The timer is normalized, monotonically increasing, and the next value is always set to the minimum - // value of the pending timers. When the timer fires we can unset the timer value. - // Worst case another thread races with this thread and we set another timer event while processing - // this timer event and we get a duplicate wakeup some time in the future. - // - // This works because we always drain all ScheduledFutureTasks from the executor queue to the scheduled - // PriorityQueue and we will set the next expiration time. If another thread races with this thread - // inserting a ScheduledFutureTasks into the executor queue it should be OK, assuming the executor queue - // insertion is visible to the event loop thread. - // - // Assume the current minimum timer is nextDeadlineNanos = 10 - // Thread A -> execute(ScheduledFutureTask(delayNanos = 12)), - // add ScheduledFutureTask to the executor queue - // fail to set nextDeadlineNanos, so no call to setTimerFd is made - // EventLoop -> process timer wakeup here, set nextDeadlineNanos to MAXIMUM_DEADLINE - // ... process more tasks ... - // drain all the tasks from executor queue, see 12 is the next delay, call setTimerFd - nextDeadlineNanos.set(MAXIMUM_DEADLINE); + timerFired = true; } else { final long ev = events.events(i); @@ -569,6 +534,7 @@ class EpollEventLoop extends SingleThreadEventLoop { } } } + return timerFired; } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTest.java new file mode 100644 index 0000000000..f66a55e2c7 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTest.java @@ -0,0 +1,72 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.unix.FileDescriptor; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class EpollTest { + + @Test + public void testIsAvailable() { + assertTrue(Epoll.isAvailable()); + } + + // Testcase for https://github.com/netty/netty/issues/8444 + @Test(timeout = 5000) + public void testEpollWaitWithTimeOutMinusOne() throws Exception { + final EpollEventArray eventArray = new EpollEventArray(8); + try { + final FileDescriptor epoll = Native.newEpollCreate(); + final FileDescriptor timerFd = Native.newTimerFd(); + final FileDescriptor eventfd = Native.newEventFd(); + Native.epollCtlAdd(epoll.intValue(), timerFd.intValue(), Native.EPOLLIN); + Native.epollCtlAdd(epoll.intValue(), eventfd.intValue(), Native.EPOLLIN); + + final AtomicReference ref = new AtomicReference(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + assertEquals(1, Native.epollWait(epoll, eventArray, false)); + // This should have been woken up because of eventfd_write. + assertEquals(eventfd.intValue(), eventArray.fd(0)); + } catch (Throwable cause) { + ref.set(cause); + } + } + }); + t.start(); + t.join(1000); + assertTrue(t.isAlive()); + Native.eventFdWrite(eventfd.intValue(), 1); + + t.join(); + assertNull(ref.get()); + epoll.close(); + timerFd.close(); + eventfd.close(); + } finally { + eventArray.free(); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 7eff9e85fa..d0bc086cce 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -102,20 +102,6 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im return promise; } - @Override - protected void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) { - super.executeScheduledRunnable(wakesUpForScheduledRunnable() ? runnable : new NonWakeupRunnable() { - @Override - public void run() { - runnable.run(); - } - }, isAddition, deadlineNanos); - } - - protected boolean wakesUpForScheduledRunnable() { - return true; - } - /** * Adds a task to be run once at the end of next (or current) {@code eventloop} iteration. * @@ -149,11 +135,6 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im return tailTasks.remove(ObjectUtil.checkNotNull(task, "task")); } - @Override - protected boolean wakesUpForTask(Runnable task) { - return !(task instanceof NonWakeupRunnable); - } - @Override protected void afterRunningAllTasks() { runAllTasksFrom(tailTasks); @@ -182,5 +163,5 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im /** * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases. */ - interface NonWakeupRunnable extends Runnable { } + interface NonWakeupRunnable extends SingleThreadEventExecutor.NonWakeupRunnable { } } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index 8dd7609b41..ace430afc9 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -124,6 +124,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { * waken up. */ private final AtomicBoolean wakenUp = new AtomicBoolean(); + private volatile long nextWakeupTime = Long.MAX_VALUE; private final SelectStrategy selectStrategy; @@ -763,6 +764,16 @@ public final class NioEventLoop extends SingleThreadEventLoop { } } + @Override + protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { + return deadlineNanos < nextWakeupTime; + } + + @Override + protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { + return deadlineNanos < nextWakeupTime; + } + Selector unwrappedSelector() { return unwrappedSelector; } @@ -785,6 +796,11 @@ public final class NioEventLoop extends SingleThreadEventLoop { long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); + long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime(); + if (nextWakeupTime != normalizedDeadlineNanos) { + nextWakeupTime = normalizedDeadlineNanos; + } + for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) {