Simplify EventLoop abstractions for timed scheduled tasks (#9470)

Motivation

The epoll transport was updated in #7834 to decouple setting of the
timerFd from the event loop, so that scheduling delayed tasks does not
require waking up epoll_wait. To achieve this, new overridable hooks
were added in the AbstractScheduledEventExecutor and
SingleThreadEventExecutor superclasses.

However, the minimumDelayScheduledTaskRemoved hook has no current
purpose and I can't envisage a _practical_ need for it. Removing
it would reduce complexity and avoid supporting this specific
API indefinitely. We can add something similar later if needed
but the opposite is not true.

There also isn't a _nice_ way to use the abstractions for
wakeup-avoidance optimizations in other EventLoops that don't have a
decoupled timer.

This PR replaces executeScheduledRunnable and
wakesUpForScheduledRunnable
with two new methods before/afterFutureTaskScheduled that have slightly
different semantics:
 - They only apply to additions; given the current internals there's no
practical use for removals
 - They allow per-submission wakeup decisions via a boolean return val,
which makes them easier to exploit from other existing EL impls (e.g.
NIO/KQueue)
 - They are subjectively "cleaner", taking just the deadline parameter
and not exposing Runnables
 - For current EL/queue impls, only the "after" hook is really needed,
but specialized blocking queue impls can conditionally wake on task
submission (I have one lined up)

Also included are further optimization/simplification/fixes to the
timerFd manipulation logic.

Modifications

- Remove AbstractScheduledEventExecutor#minimumDelayScheduledTaskRemoved()
and supporting methods
- Uplift NonWakeupRunnable and corresponding default wakesUpForTask()
impl from SingleThreadEventLoop to SingleThreadEventExecutor
- Change executeScheduledRunnable() to be package-private, and have a
final impl in SingleThreadEventExecutor which triggers new overridable
hooks before/afterFutureTaskScheduled()
- Remove unnecessary use of bookend tasks while draining the task queue
- Use new hooks to add simpler wake-up avoidance optimization to
NioEventLoop (primarily to demonstrate utility/simplicity)
- Reinstate removed EpollTest class

In EpollEventLoop:
 - Refactor to use only the new afterFutureTaskScheduled() hook for
updating timerFd
 - Fix setTimerFd race condition using a monitor
 - Set nextDeadlineNanos to a negative value while the EL is awake and
use this to block timer changes from outside the EL. Restore the
known-set value prior to sleeping, updating timerFd first if necessary
 - Don't read from timerFd when processing expiry event

Result

- Cleaner API for integrating with different EL/queue timing impls
- Fixed race condition to avoid missing scheduled wakeups
- Eliminate unnecessary timerFd updates while EL is awake, and
unnecessary expired timerFd reads
- Avoid unnecessary scheduled-task wakeups when using NIO transport

I did not yet further explore the suggestion of using
TFD_TIMER_ABSTIME for the timerFd.
This commit is contained in:
Nick Hill 2019-08-21 03:34:22 -07:00 committed by Norman Maurer
parent cb739b2619
commit a22d4ba859
6 changed files with 251 additions and 229 deletions

View File

@ -102,10 +102,6 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
} }
scheduledTaskQueue.clearIgnoringIndexes(); scheduledTaskQueue.clearIgnoringIndexes();
// calling minimumDelayScheduledTaskRemoved was considered here to give a chance for EventLoop
// implementations the opportunity to cleanup any timerFds, but this is currently only called when the EventLoop
// is being shutdown, so the timerFd and associated polling mechanism will be destroyed anyways.
} }
/** /**
@ -120,25 +116,16 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
* You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}. * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
*/ */
protected final Runnable pollScheduledTask(long nanoTime) { protected final Runnable pollScheduledTask(long nanoTime) {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
return scheduledTaskQueue != null ? pollScheduledTask(scheduledTaskQueue, nanoTime, true) : null;
}
final Runnable pollScheduledTask(Queue<ScheduledFutureTask<?>> scheduledTaskQueue, long nanoTime,
boolean notifyMinimumDeadlineRemoved) {
assert scheduledTaskQueue != null;
assert inEventLoop(); assert inEventLoop();
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue.peek(); Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime) { ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
scheduledTaskQueue.poll(); if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) {
if (notifyMinimumDeadlineRemoved) {
minimumDelayScheduledTaskRemoved(scheduledTask, scheduledTask.deadlineNanos());
}
return scheduledTask;
}
return null; return null;
} }
scheduledTaskQueue.remove();
return scheduledTask;
}
/** /**
* Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled. * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
@ -269,29 +256,17 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
final void removeScheduled(final ScheduledFutureTask<?> task) { final void removeScheduled(final ScheduledFutureTask<?> task) {
if (inEventLoop()) { if (inEventLoop()) {
removedSchedule0(task); scheduledTaskQueue().removeTyped(task);
} else { } else {
executeScheduledRunnable(new Runnable() { executeScheduledRunnable(new Runnable() {
@Override @Override
public void run() { public void run() {
removedSchedule0(task); scheduledTaskQueue().removeTyped(task);
} }
}, false, task.deadlineNanos()); }, false, task.deadlineNanos());
} }
} }
private void removedSchedule0(final ScheduledFutureTask<?> task) {
if (scheduledTaskQueue == null || task == null) {
return;
}
if (scheduledTaskQueue.peek() == task) {
scheduledTaskQueue.poll();
minimumDelayScheduledTaskRemoved(task, task.deadlineNanos());
} else {
scheduledTaskQueue.removeTyped(task);
}
}
/** /**
* Execute a {@link Runnable} from outside the event loop thread that is responsible for adding or removing * Execute a {@link Runnable} from outside the event loop thread that is responsible for adding or removing
* a scheduled action. Note that schedule events which occur on the event loop thread do not interact with this * a scheduled action. Note that schedule events which occur on the event loop thread do not interact with this
@ -301,16 +276,9 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
* action * action
* @param deadlineNanos the deadline in nanos of the scheduled task that will be added or removed. * @param deadlineNanos the deadline in nanos of the scheduled task that will be added or removed.
*/ */
protected void executeScheduledRunnable(Runnable runnable, void executeScheduledRunnable(Runnable runnable,
@SuppressWarnings("unused") boolean isAddition, @SuppressWarnings("unused") boolean isAddition,
@SuppressWarnings("unused") long deadlineNanos) { @SuppressWarnings("unused") long deadlineNanos) {
execute(runnable); execute(runnable);
} }
/**
* The next task to expire (e.g. minimum delay) has been removed from the scheduled priority queue.
*/
protected void minimumDelayScheduledTaskRemoved(@SuppressWarnings("unused") Runnable task,
@SuppressWarnings("unused") long deadlineNanos) {
}
} }

View File

@ -73,12 +73,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
// Do nothing. // Do nothing.
} }
}; };
private static final Runnable BOOKEND_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER = private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state"); AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
@ -184,6 +178,33 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
} }
/**
* Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission.
* Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to
* process the scheduled task (if not already awake).
* <p>
* If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with
* the same value <i>after</i> the scheduled task is enqueued, providing another opportunity
* to wake the {@link EventExecutor} thread if required.
*
* @param deadlineNanos deadline of the to-be-scheduled task
* relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
return true;
}
/**
* See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false.
*
* @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
return true;
}
/** /**
* @deprecated Please use and override {@link #newTaskQueue(int)}. * @deprecated Please use and override {@link #newTaskQueue(int)}.
*/ */
@ -225,12 +246,11 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) { protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) { for (;;) {
Runnable task = taskQueue.poll(); Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) { if (task != WAKEUP_TASK) {
continue;
}
return task; return task;
} }
} }
}
/** /**
* Take the next {@link Runnable} from the task queue and so will block if no task is currently present. * Take the next {@link Runnable} from the task queue and so will block if no task is currently present.
@ -293,36 +313,36 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
return true; return true;
} }
long nanoTime = AbstractScheduledEventExecutor.nanoTime(); long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, true); for (;;) {
while (scheduledTask != null) { Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return true;
}
if (!taskQueue.offer(scheduledTask)) { if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask); scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false; return false;
} }
scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false);
} }
return true;
} }
/** /**
* @return {@code true} if at least one scheduled task was executed. * @return {@code true} if at least one scheduled task was executed.
*/ */
private boolean executeExpiredScheduledTasks(boolean notifyMinimumDeadlineRemoved) { private boolean executeExpiredScheduledTasks() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return false; return false;
} }
long nanoTime = AbstractScheduledEventExecutor.nanoTime(); long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, notifyMinimumDeadlineRemoved); Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask != null) { if (scheduledTask == null) {
return false;
}
do { do {
safeExecute(scheduledTask); safeExecute(scheduledTask);
scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false); } while ((scheduledTask = pollScheduledTask(nanoTime)) != null);
} while (scheduledTask != null);
return true; return true;
} }
return false;
}
/** /**
* @see Queue#peek() * @see Queue#peek()
@ -414,17 +434,13 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
*/ */
protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) { protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
assert inEventLoop(); assert inEventLoop();
// We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued boolean ranAtLeastOneTask;
// here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
boolean ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue);
boolean ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(true);
int drainAttempt = 0; int drainAttempt = 0;
while ((ranAtLeastOneExecutorTask || ranAtLeastOneScheduledTask) && ++drainAttempt < maxDrainAttempts) { do {
// We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
// here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe. // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue); ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks();
ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(false); } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
}
if (drainAttempt > 0) { if (drainAttempt > 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime(); lastExecutionTime = ScheduledFutureTask.nanoTime();
@ -461,46 +477,17 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
* @return {@code true} if at least {@link Runnable#run()} was called. * @return {@code true} if at least {@link Runnable#run()} was called.
*/ */
private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) { private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
return taskQueue.offer(BOOKEND_TASK) ? runExistingTasksUntilBookend(taskQueue)
: runExistingTasksUntilMaxTasks(taskQueue);
}
private boolean runExistingTasksUntilBookend(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue); Runnable task = pollTaskFrom(taskQueue);
// null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue, and
// null elements are not permitted to be inserted into the queue.
if (task == BOOKEND_TASK) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
// null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue,
// and null elements are not permitted to be inserted into the queue.
if (task == BOOKEND_TASK) {
return true;
}
}
}
private boolean runExistingTasksUntilMaxTasks(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
// BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted into
// the queue, and if was previously inserted we always drain all the elements from queue including BOOKEND_TASK.
if (task == null) { if (task == null) {
return false; return false;
} }
int i = 0; int remaining = Math.min(maxPendingTasks, taskQueue.size());
do { safeExecute(task);
// Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may
// silently consume more than one item from the queue (skips over WAKEUP_TASK instances)
while (remaining-- > 0 && (task = taskQueue.poll()) != null) {
safeExecute(task); safeExecute(task);
task = pollTaskFrom(taskQueue);
// BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted
// into the queue, and if was previously inserted we always drain all the elements from queue including
// BOOKEND_TASK.
if (task == null) {
return true;
} }
} while (++i < maxPendingTasks);
return true; return true;
} }
@ -607,6 +594,25 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
} }
} }
@Override
final void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) {
// Don't wakeup if this is a removal task or if beforeScheduledTaskSubmitted returns false
if (isAddition && beforeScheduledTaskSubmitted(deadlineNanos)) {
super.executeScheduledRunnable(runnable, isAddition, deadlineNanos);
} else {
super.executeScheduledRunnable(new NonWakeupRunnable() {
@Override
public void run() {
runnable.run();
}
}, isAddition, deadlineNanos);
// Second hook after scheduling to facilitate race-avoidance
if (isAddition && afterScheduledTaskSubmitted(deadlineNanos)) {
wakeup(false);
}
}
}
@Override @Override
public boolean inEventLoop(Thread thread) { public boolean inEventLoop(Thread thread) {
return thread == this.thread; return thread == this.thread;
@ -954,8 +960,21 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
return threadProperties; return threadProperties;
} }
protected boolean wakesUpForTask(@SuppressWarnings("unused") Runnable task) { /**
return true; * Marker interface for {@link Runnable} to indicate that it should be queued for execution
* but does not need to run immediately. The default implementation of
* {@link SingleThreadEventExecutor#wakesUpForTask(Runnable)} uses this to avoid waking up
* the {@link EventExecutor} thread when not necessary.
*/
protected interface NonWakeupRunnable extends Runnable { }
/**
* Can be overridden to control which tasks require waking the {@link EventExecutor} thread
* if it is waiting so that they can be run immediately. The default implementation
* decides based on whether the task implements {@link NonWakeupRunnable}.
*/
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof NonWakeupRunnable);
} }
protected static void reject() { protected static void reject() {

View File

@ -47,10 +47,6 @@ import static java.lang.Math.min;
*/ */
class EpollEventLoop extends SingleThreadEventLoop { class EpollEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
/**
* The maximum deadline value before overlap occurs on the time source.
*/
private static final long MAXIMUM_DEADLINE = initialNanoTime() - 1;
static { static {
// Ensure JNI is initialized by the time this class is loaded by this time! // Ensure JNI is initialized by the time this class is loaded by this time!
@ -59,10 +55,14 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
/** /**
* When in epollWait(), this mirrors the currently-set deadline of the timerFd. A negative value
* means that the event loop is awake, which blocks rescheduling activity by other threads.
* It is restored to the real timerFd expiry time again prior to entering epollWait().
*
* Note that we use deadline instead of delay because deadline is just a fixed number but delay requires interacting * Note that we use deadline instead of delay because deadline is just a fixed number but delay requires interacting
* with the time source (e.g. calling System.nanoTime()) which can be expensive. * with the time source (e.g. calling System.nanoTime()) which can be expensive.
*/ */
private final AtomicLong nextDeadlineNanos = new AtomicLong(MAXIMUM_DEADLINE); private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L);
private final AtomicInteger wakenUp = new AtomicInteger(); private final AtomicInteger wakenUp = new AtomicInteger();
private final FileDescriptor epollFd; private final FileDescriptor epollFd;
private final FileDescriptor eventFd; private final FileDescriptor eventFd;
@ -181,31 +181,18 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
@Override @Override
protected void executeScheduledRunnable(Runnable runnable, boolean isAddition, long deadlineNanos) { protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
if (isAddition) { return false; // don't wake event loop
try {
trySetTimerFd(deadlineNanos);
} catch (IOException cause) {
throw new RejectedExecutionException(cause);
}
}
// else this is a removal of scheduled task and we could attempt to detect if this task was responsible
// for the next delay, and find the next lowest delay in the queue to re-set the timer. However this
// is not practical for the following reasons:
// 1. The data structure is a PriorityQueue, and the scheduled task has not yet been removed. This means
// we would have to add/remove the head element to find the "next timeout".
// 2. We are not on the EventLoop thread, and the PriorityQueue is not thread safe. We could attempt
// to do (1) if we are on the EventLoop but when the EventLoop wakes up it checks if the timeout changes
// when it is woken up and before it calls epoll_wait again and adjusts the timer accordingly.
// The result is we wait until we are in the EventLoop and doing the actual removal, and also processing
// regular polling in the EventLoop too.
execute(runnable);
} }
@Override @Override
protected boolean wakesUpForScheduledRunnable() { protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
return false; try {
trySetTimerFd(deadlineNanos);
} catch (IOException e) {
throw new RejectedExecutionException(e);
}
return false; // don't wake event loop
} }
@Override @Override
@ -218,19 +205,22 @@ class EpollEventLoop extends SingleThreadEventLoop {
private void trySetTimerFd(long candidateNextDeadline) throws IOException { private void trySetTimerFd(long candidateNextDeadline) throws IOException {
for (;;) { for (;;) {
long nextDeadline = nextDeadlineNanos.get(); long nextDeadline = nextDeadlineNanos.get();
if (nextDeadline - candidateNextDeadline <= 0) { if (nextDeadline <= candidateNextDeadline) {
break; // This includes case where nextDeadline is negative (event loop is awake)
return;
} }
if (nextDeadlineNanos.compareAndSet(nextDeadline, candidateNextDeadline)) { if (nextDeadlineNanos.compareAndSet(nextDeadline, candidateNextDeadline)) {
setTimerFd(deadlineToDelayNanos(candidateNextDeadline)); // We must serialize calls to setTimerFd to avoid the set of a later deadline
// We are setting the timerFd outside of the EventLoop so it is possible that we raced with another call // racing with a sooner one and overwriting it. A second check of nextDeadlineNanos
// to set the timer and temporarily increased the value, in which case we should set it back to the // is made within the sync block to avoid having the CAS within the sync
// lower value. synchronized (nextDeadlineNanos) {
nextDeadline = nextDeadlineNanos.get(); nextDeadline = nextDeadlineNanos.get();
if (nextDeadline - candidateNextDeadline < 0) { if (nextDeadline == candidateNextDeadline ||
setTimerFd(deadlineToDelayNanos(nextDeadline)); (nextDeadline + Long.MAX_VALUE + 1) == candidateNextDeadline) {
setTimerFd(deadlineToDelayNanos(candidateNextDeadline));
} }
break; }
return;
} }
} }
} }
@ -250,39 +240,25 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
} }
private void checkScheduleTaskQueueForNewDelay() throws IOException { private long checkScheduleTaskQueueForNewDelay(long timerFdDeadline) throws IOException {
final long deadlineNanos = nextScheduledTaskDeadlineNanos(); assert nextDeadlineNanos.get() < 0;
if (deadlineNanos != -1) { final long nextTaskDeadlineNanos = nextScheduledTaskDeadlineNanos();
trySetTimerFd(deadlineNanos); if (nextTaskDeadlineNanos == -1 || nextTaskDeadlineNanos >= timerFdDeadline) {
// Just restore to preexisting timerFd value, update not needed
nextDeadlineNanos.lazySet(timerFdDeadline);
} else {
synchronized (nextDeadlineNanos) {
// Shorter delay required than current timerFd setting, update it
nextDeadlineNanos.lazySet(timerFdDeadline = nextTaskDeadlineNanos);
setTimerFd(deadlineToDelayNanos(timerFdDeadline));
} }
}
return timerFdDeadline;
// Don't disarm the timerFd even if there are no more queued tasks. Since we are setting timerFd from outside // Don't disarm the timerFd even if there are no more queued tasks. Since we are setting timerFd from outside
// the EventLoop it is possible that another thread has set the timer and we may miss a wakeup if we disarm // the EventLoop it is possible that another thread has set the timer and we may miss a wakeup if we disarm
// the timer here. Instead we wait for the timer wakeup on the EventLoop and clear state for the next timer. // the timer here. Instead we wait for the timer wakeup on the EventLoop and clear state for the next timer.
} }
@Override
protected void minimumDelayScheduledTaskRemoved(@SuppressWarnings("unused") Runnable task,
@SuppressWarnings("unused") long deadlineNanos) {
// It is OK to reset nextDeadlineNanos here because we are in the event loop thread, and the event loop is
// guaranteed to transfer all pending ScheduledFutureTasks from the executor queue to the scheduled
// PriorityQueue and we will set the next expiration time. If another thread races with this thread inserting a
// ScheduledFutureTasks into the executor queue it should be OK, assuming the executor queue insertion is
// visible to the event loop thread.
//
// Assume the current minimum timer is delayNanos = 10
// Thread A -> execute(ScheduledFutureTask(delayNanos = 12)),
// add ScheduledFutureTask to the executor queue
// fail to set nextDeadlineNanos, so no call to setTimerFd is made
// EventLoop -> minimumDelayScheduledTaskRemoved(10),
// set nextDeadlineNanos to MAXIMUM_DEADLINE
// ... process more tasks ...
// drain all the tasks from the executor queue, see that 12 is the next delay, call setTimerFd
nextDeadlineNanos.set(MAXIMUM_DEADLINE);
// Note that we don't actually call setTimerFd here, we don't want to interrupt the actual timerFd and let
// the end of the EventLoop determine what the timerFd value should be (after execute queue is drained).
}
@Override @Override
protected void wakeup(boolean inEventLoop) { protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.getAndSet(1) == 0) { if (!inEventLoop && wakenUp.getAndSet(1) == 0) {
@ -395,6 +371,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
@Override @Override
protected void run() { protected void run() {
long timerFdDeadline = Long.MAX_VALUE;
for (;;) { for (;;) {
try { try {
processPendingChannelFlags(); processPendingChannelFlags();
@ -412,28 +389,34 @@ class EpollEventLoop extends SingleThreadEventLoop {
wakenUp.set(0); wakenUp.set(0);
} }
if (!hasTasks()) { if (!hasTasks()) {
// When we are in the EventLoop we don't bother setting the timerFd for each
// scheduled task, but instead defer the processing until the end of the EventLoop
// (next wait) to reduce the timerFd modifications.
timerFdDeadline = checkScheduleTaskQueueForNewDelay(timerFdDeadline);
try {
strategy = epollWait(); strategy = epollWait();
} finally {
// This getAndAdd will change the raw value of nextDeadlineNanos to be negative
// which will block any *new* timerFd mods by other threads while also "preserving"
// its last value to avoid disrupting a possibly-concurrent setTimerFd call
// (so that we can know the timerFd really did/will get updated to the read value).
timerFdDeadline = nextDeadlineNanos.getAndAdd(Long.MAX_VALUE + 1);
// The value of nextDeadlineNanos is now guaranteed to be negative
}
} }
// fallthrough // fallthrough
default: default:
} }
try { try {
processReady(events, strategy); if (processReady(events, strategy)) {
// Polled events include timerFd expiry; conservatively assume that no timer is set
timerFdDeadline = Long.MAX_VALUE;
}
} finally { } finally {
try {
// Note the timerFd code depends upon running all the tasks on each event loop run. This is so
// we can get an accurate "next wakeup time" after the event loop run completes.
runAllTasks(); runAllTasks();
} finally {
// No need to drainScheduledQueue() after the fact, because all in event loop scheduling results // No need to drainScheduledQueue() after the fact, because all in event loop scheduling results
// in direct addition to the scheduled priority queue. // in direct addition to the scheduled priority queue.
// When we are in the EventLoop we don't bother setting the timerFd for each scheduled task, but
// instead defer the processing until the end of the EventLoop to reduce the timerFd
// modifications.
checkScheduleTaskQueueForNewDelay();
}
} }
if (allowGrowing && strategy == events.length()) { if (allowGrowing && strategy == events.length()) {
//increase the size of the array as we needed the whole space for the events //increase the size of the array as we needed the whole space for the events
@ -487,7 +470,9 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
} }
private void processReady(EpollEventArray events, int ready) { // Returns true if a timerFd event was encountered
private boolean processReady(EpollEventArray events, int ready) {
boolean timerFired = false;
for (int i = 0; i < ready; ++i) { for (int i = 0; i < ready; ++i) {
final int fd = events.fd(i); final int fd = events.fd(i);
if (fd == eventFd.intValue()) { if (fd == eventFd.intValue()) {
@ -495,27 +480,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
// //
// See also https://stackoverflow.com/a/12492308/1074097 // See also https://stackoverflow.com/a/12492308/1074097
} else if (fd == timerFd.intValue()) { } else if (fd == timerFd.intValue()) {
// consume wakeup event, necessary because the timer is added with ET mode. timerFired = true;
Native.timerFdRead(fd);
// The timer is normalized, monotonically increasing, and the next value is always set to the minimum
// value of the pending timers. When the timer fires we can unset the timer value.
// Worst case another thread races with this thread and we set another timer event while processing
// this timer event and we get a duplicate wakeup some time in the future.
//
// This works because we always drain all ScheduledFutureTasks from the executor queue to the scheduled
// PriorityQueue and we will set the next expiration time. If another thread races with this thread
// inserting a ScheduledFutureTasks into the executor queue it should be OK, assuming the executor queue
// insertion is visible to the event loop thread.
//
// Assume the current minimum timer is nextDeadlineNanos = 10
// Thread A -> execute(ScheduledFutureTask(delayNanos = 12)),
// add ScheduledFutureTask to the executor queue
// fail to set nextDeadlineNanos, so no call to setTimerFd is made
// EventLoop -> process timer wakeup here, set nextDeadlineNanos to MAXIMUM_DEADLINE
// ... process more tasks ...
// drain all the tasks from executor queue, see 12 is the next delay, call setTimerFd
nextDeadlineNanos.set(MAXIMUM_DEADLINE);
} else { } else {
final long ev = events.events(i); final long ev = events.events(i);
@ -569,6 +534,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
} }
} }
return timerFired;
} }
@Override @Override

View File

@ -0,0 +1,72 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.unix.FileDescriptor;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class EpollTest {
@Test
public void testIsAvailable() {
assertTrue(Epoll.isAvailable());
}
// Testcase for https://github.com/netty/netty/issues/8444
@Test(timeout = 5000)
public void testEpollWaitWithTimeOutMinusOne() throws Exception {
final EpollEventArray eventArray = new EpollEventArray(8);
try {
final FileDescriptor epoll = Native.newEpollCreate();
final FileDescriptor timerFd = Native.newTimerFd();
final FileDescriptor eventfd = Native.newEventFd();
Native.epollCtlAdd(epoll.intValue(), timerFd.intValue(), Native.EPOLLIN);
Native.epollCtlAdd(epoll.intValue(), eventfd.intValue(), Native.EPOLLIN);
final AtomicReference<Throwable> ref = new AtomicReference<Throwable>();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
assertEquals(1, Native.epollWait(epoll, eventArray, false));
// This should have been woken up because of eventfd_write.
assertEquals(eventfd.intValue(), eventArray.fd(0));
} catch (Throwable cause) {
ref.set(cause);
}
}
});
t.start();
t.join(1000);
assertTrue(t.isAlive());
Native.eventFdWrite(eventfd.intValue(), 1);
t.join();
assertNull(ref.get());
epoll.close();
timerFd.close();
eventfd.close();
} finally {
eventArray.free();
}
}
}

View File

@ -102,20 +102,6 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
return promise; return promise;
} }
@Override
protected void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) {
super.executeScheduledRunnable(wakesUpForScheduledRunnable() ? runnable : new NonWakeupRunnable() {
@Override
public void run() {
runnable.run();
}
}, isAddition, deadlineNanos);
}
protected boolean wakesUpForScheduledRunnable() {
return true;
}
/** /**
* Adds a task to be run once at the end of next (or current) {@code eventloop} iteration. * Adds a task to be run once at the end of next (or current) {@code eventloop} iteration.
* *
@ -149,11 +135,6 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
return tailTasks.remove(ObjectUtil.checkNotNull(task, "task")); return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
} }
@Override
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof NonWakeupRunnable);
}
@Override @Override
protected void afterRunningAllTasks() { protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks); runAllTasksFrom(tailTasks);
@ -182,5 +163,5 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
/** /**
* Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases. * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases.
*/ */
interface NonWakeupRunnable extends Runnable { } interface NonWakeupRunnable extends SingleThreadEventExecutor.NonWakeupRunnable { }
} }

View File

@ -124,6 +124,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
* waken up. * waken up.
*/ */
private final AtomicBoolean wakenUp = new AtomicBoolean(); private final AtomicBoolean wakenUp = new AtomicBoolean();
private volatile long nextWakeupTime = Long.MAX_VALUE;
private final SelectStrategy selectStrategy; private final SelectStrategy selectStrategy;
@ -763,6 +764,16 @@ public final class NioEventLoop extends SingleThreadEventLoop {
} }
} }
@Override
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
return deadlineNanos < nextWakeupTime;
}
@Override
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
return deadlineNanos < nextWakeupTime;
}
Selector unwrappedSelector() { Selector unwrappedSelector() {
return unwrappedSelector; return unwrappedSelector;
} }
@ -785,6 +796,11 @@ public final class NioEventLoop extends SingleThreadEventLoop {
long currentTimeNanos = System.nanoTime(); long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
if (nextWakeupTime != normalizedDeadlineNanos) {
nextWakeupTime = normalizedDeadlineNanos;
}
for (;;) { for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) { if (timeoutMillis <= 0) {