diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 6b52151dbf..656c57847b 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -102,10 +102,6 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut } scheduledTaskQueue.clearIgnoringIndexes(); - - // calling minimumDelayScheduledTaskRemoved was considered here to give a chance for EventLoop - // implementations the opportunity to cleanup any timerFds, but this is currently only called when the EventLoop - // is being shutdown, so the timerFd and associated polling mechanism will be destroyed anyways. } /** @@ -120,24 +116,15 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}. */ protected final Runnable pollScheduledTask(long nanoTime) { - Queue> scheduledTaskQueue = this.scheduledTaskQueue; - return scheduledTaskQueue != null ? pollScheduledTask(scheduledTaskQueue, nanoTime, true) : null; - } - - final Runnable pollScheduledTask(Queue> scheduledTaskQueue, long nanoTime, - boolean notifyMinimumDeadlineRemoved) { - assert scheduledTaskQueue != null; assert inEventLoop(); - ScheduledFutureTask scheduledTask = scheduledTaskQueue.peek(); - if (scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime) { - scheduledTaskQueue.poll(); - if (notifyMinimumDeadlineRemoved) { - minimumDelayScheduledTaskRemoved(scheduledTask, scheduledTask.deadlineNanos()); - } - return scheduledTask; + Queue> scheduledTaskQueue = this.scheduledTaskQueue; + ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + if (scheduledTask == null || scheduledTask.deadlineNanos() - nanoTime > 0) { + return null; } - return null; + scheduledTaskQueue.remove(); + return scheduledTask; } /** @@ -269,29 +256,17 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut final void removeScheduled(final ScheduledFutureTask task) { if (inEventLoop()) { - removedSchedule0(task); + scheduledTaskQueue().removeTyped(task); } else { executeScheduledRunnable(new Runnable() { @Override public void run() { - removedSchedule0(task); + scheduledTaskQueue().removeTyped(task); } }, false, task.deadlineNanos()); } } - private void removedSchedule0(final ScheduledFutureTask task) { - if (scheduledTaskQueue == null || task == null) { - return; - } - if (scheduledTaskQueue.peek() == task) { - scheduledTaskQueue.poll(); - minimumDelayScheduledTaskRemoved(task, task.deadlineNanos()); - } else { - scheduledTaskQueue.removeTyped(task); - } - } - /** * Execute a {@link Runnable} from outside the event loop thread that is responsible for adding or removing * a scheduled action. Note that schedule events which occur on the event loop thread do not interact with this @@ -301,16 +276,9 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut * action * @param deadlineNanos the deadline in nanos of the scheduled task that will be added or removed. */ - protected void executeScheduledRunnable(Runnable runnable, + void executeScheduledRunnable(Runnable runnable, @SuppressWarnings("unused") boolean isAddition, @SuppressWarnings("unused") long deadlineNanos) { execute(runnable); } - - /** - * The next task to expire (e.g. minimum delay) has been removed from the scheduled priority queue. - */ - protected void minimumDelayScheduledTaskRemoved(@SuppressWarnings("unused") Runnable task, - @SuppressWarnings("unused") long deadlineNanos) { - } } diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index cf635f7216..4d77da11a7 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -73,12 +73,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx // Do nothing. } }; - private static final Runnable BOOKEND_TASK = new Runnable() { - @Override - public void run() { - // Do nothing. - } - }; private static final AtomicIntegerFieldUpdater STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state"); @@ -184,6 +178,33 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } + /** + * Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission. + * Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to + * process the scheduled task (if not already awake). + *

+ * If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with + * the same value after the scheduled task is enqueued, providing another opportunity + * to wake the {@link EventExecutor} thread if required. + * + * @param deadlineNanos deadline of the to-be-scheduled task + * relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise + */ + protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { + return true; + } + + /** + * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false. + * + * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise + */ + protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { + return true; + } + /** * @deprecated Please use and override {@link #newTaskQueue(int)}. */ @@ -225,10 +246,9 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx protected static Runnable pollTaskFrom(Queue taskQueue) { for (;;) { Runnable task = taskQueue.poll(); - if (task == WAKEUP_TASK) { - continue; + if (task != WAKEUP_TASK) { + return task; } - return task; } } @@ -293,35 +313,35 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx return true; } long nanoTime = AbstractScheduledEventExecutor.nanoTime(); - Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, true); - while (scheduledTask != null) { + for (;;) { + Runnable scheduledTask = pollScheduledTask(nanoTime); + if (scheduledTask == null) { + return true; + } if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue.add((ScheduledFutureTask) scheduledTask); return false; } - scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false); } - return true; } /** * @return {@code true} if at least one scheduled task was executed. */ - private boolean executeExpiredScheduledTasks(boolean notifyMinimumDeadlineRemoved) { + private boolean executeExpiredScheduledTasks() { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return false; } long nanoTime = AbstractScheduledEventExecutor.nanoTime(); - Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, notifyMinimumDeadlineRemoved); - if (scheduledTask != null) { - do { - safeExecute(scheduledTask); - scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false); - } while (scheduledTask != null); - return true; + Runnable scheduledTask = pollScheduledTask(nanoTime); + if (scheduledTask == null) { + return false; } - return false; + do { + safeExecute(scheduledTask); + } while ((scheduledTask = pollScheduledTask(nanoTime)) != null); + return true; } /** @@ -414,17 +434,13 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx */ protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) { assert inEventLoop(); - // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued - // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe. - boolean ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue); - boolean ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(true); + boolean ranAtLeastOneTask; int drainAttempt = 0; - while ((ranAtLeastOneExecutorTask || ranAtLeastOneScheduledTask) && ++drainAttempt < maxDrainAttempts) { + do { // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe. - ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue); - ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(false); - } + ranAtLeastOneTask = runExistingTasksFrom(taskQueue) | executeExpiredScheduledTasks(); + } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts); if (drainAttempt > 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); @@ -461,46 +477,17 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx * @return {@code true} if at least {@link Runnable#run()} was called. */ private boolean runExistingTasksFrom(Queue taskQueue) { - return taskQueue.offer(BOOKEND_TASK) ? runExistingTasksUntilBookend(taskQueue) - : runExistingTasksUntilMaxTasks(taskQueue); - } - - private boolean runExistingTasksUntilBookend(Queue taskQueue) { Runnable task = pollTaskFrom(taskQueue); - // null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue, and - // null elements are not permitted to be inserted into the queue. - if (task == BOOKEND_TASK) { - return false; - } - for (;;) { - safeExecute(task); - task = pollTaskFrom(taskQueue); - // null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue, - // and null elements are not permitted to be inserted into the queue. - if (task == BOOKEND_TASK) { - return true; - } - } - } - - private boolean runExistingTasksUntilMaxTasks(Queue taskQueue) { - Runnable task = pollTaskFrom(taskQueue); - // BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted into - // the queue, and if was previously inserted we always drain all the elements from queue including BOOKEND_TASK. if (task == null) { return false; } - int i = 0; - do { + int remaining = Math.min(maxPendingTasks, taskQueue.size()); + safeExecute(task); + // Use taskQueue.poll() directly rather than pollTaskFrom() since the latter may + // silently consume more than one item from the queue (skips over WAKEUP_TASK instances) + while (remaining-- > 0 && (task = taskQueue.poll()) != null) { safeExecute(task); - task = pollTaskFrom(taskQueue); - // BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted - // into the queue, and if was previously inserted we always drain all the elements from queue including - // BOOKEND_TASK. - if (task == null) { - return true; - } - } while (++i < maxPendingTasks); + } return true; } @@ -607,6 +594,25 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } } + @Override + final void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) { + // Don't wakeup if this is a removal task or if beforeScheduledTaskSubmitted returns false + if (isAddition && beforeScheduledTaskSubmitted(deadlineNanos)) { + super.executeScheduledRunnable(runnable, isAddition, deadlineNanos); + } else { + super.executeScheduledRunnable(new NonWakeupRunnable() { + @Override + public void run() { + runnable.run(); + } + }, isAddition, deadlineNanos); + // Second hook after scheduling to facilitate race-avoidance + if (isAddition && afterScheduledTaskSubmitted(deadlineNanos)) { + wakeup(false); + } + } + } + @Override public boolean inEventLoop(Thread thread) { return thread == this.thread; @@ -954,8 +960,21 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx return threadProperties; } - protected boolean wakesUpForTask(@SuppressWarnings("unused") Runnable task) { - return true; + /** + * Marker interface for {@link Runnable} to indicate that it should be queued for execution + * but does not need to run immediately. The default implementation of + * {@link SingleThreadEventExecutor#wakesUpForTask(Runnable)} uses this to avoid waking up + * the {@link EventExecutor} thread when not necessary. + */ + protected interface NonWakeupRunnable extends Runnable { } + + /** + * Can be overridden to control which tasks require waking the {@link EventExecutor} thread + * if it is waiting so that they can be run immediately. The default implementation + * decides based on whether the task implements {@link NonWakeupRunnable}. + */ + protected boolean wakesUpForTask(Runnable task) { + return !(task instanceof NonWakeupRunnable); } protected static void reject() { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 98b69bbb4d..187265d4f7 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -47,10 +47,6 @@ import static java.lang.Math.min; */ class EpollEventLoop extends SingleThreadEventLoop { private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class); - /** - * The maximum deadline value before overlap occurs on the time source. - */ - private static final long MAXIMUM_DEADLINE = initialNanoTime() - 1; static { // Ensure JNI is initialized by the time this class is loaded by this time! @@ -59,10 +55,14 @@ class EpollEventLoop extends SingleThreadEventLoop { } /** + * When in epollWait(), this mirrors the currently-set deadline of the timerFd. A negative value + * means that the event loop is awake, which blocks rescheduling activity by other threads. + * It is restored to the real timerFd expiry time again prior to entering epollWait(). + * * Note that we use deadline instead of delay because deadline is just a fixed number but delay requires interacting * with the time source (e.g. calling System.nanoTime()) which can be expensive. */ - private final AtomicLong nextDeadlineNanos = new AtomicLong(MAXIMUM_DEADLINE); + private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L); private final AtomicInteger wakenUp = new AtomicInteger(); private final FileDescriptor epollFd; private final FileDescriptor eventFd; @@ -181,31 +181,18 @@ class EpollEventLoop extends SingleThreadEventLoop { } @Override - protected void executeScheduledRunnable(Runnable runnable, boolean isAddition, long deadlineNanos) { - if (isAddition) { - try { - trySetTimerFd(deadlineNanos); - } catch (IOException cause) { - throw new RejectedExecutionException(cause); - } - } - // else this is a removal of scheduled task and we could attempt to detect if this task was responsible - // for the next delay, and find the next lowest delay in the queue to re-set the timer. However this - // is not practical for the following reasons: - // 1. The data structure is a PriorityQueue, and the scheduled task has not yet been removed. This means - // we would have to add/remove the head element to find the "next timeout". - // 2. We are not on the EventLoop thread, and the PriorityQueue is not thread safe. We could attempt - // to do (1) if we are on the EventLoop but when the EventLoop wakes up it checks if the timeout changes - // when it is woken up and before it calls epoll_wait again and adjusts the timer accordingly. - // The result is we wait until we are in the EventLoop and doing the actual removal, and also processing - // regular polling in the EventLoop too. - - execute(runnable); + protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { + return false; // don't wake event loop } @Override - protected boolean wakesUpForScheduledRunnable() { - return false; + protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { + try { + trySetTimerFd(deadlineNanos); + } catch (IOException e) { + throw new RejectedExecutionException(e); + } + return false; // don't wake event loop } @Override @@ -218,19 +205,22 @@ class EpollEventLoop extends SingleThreadEventLoop { private void trySetTimerFd(long candidateNextDeadline) throws IOException { for (;;) { long nextDeadline = nextDeadlineNanos.get(); - if (nextDeadline - candidateNextDeadline <= 0) { - break; + if (nextDeadline <= candidateNextDeadline) { + // This includes case where nextDeadline is negative (event loop is awake) + return; } if (nextDeadlineNanos.compareAndSet(nextDeadline, candidateNextDeadline)) { - setTimerFd(deadlineToDelayNanos(candidateNextDeadline)); - // We are setting the timerFd outside of the EventLoop so it is possible that we raced with another call - // to set the timer and temporarily increased the value, in which case we should set it back to the - // lower value. - nextDeadline = nextDeadlineNanos.get(); - if (nextDeadline - candidateNextDeadline < 0) { - setTimerFd(deadlineToDelayNanos(nextDeadline)); + // We must serialize calls to setTimerFd to avoid the set of a later deadline + // racing with a sooner one and overwriting it. A second check of nextDeadlineNanos + // is made within the sync block to avoid having the CAS within the sync + synchronized (nextDeadlineNanos) { + nextDeadline = nextDeadlineNanos.get(); + if (nextDeadline == candidateNextDeadline || + (nextDeadline + Long.MAX_VALUE + 1) == candidateNextDeadline) { + setTimerFd(deadlineToDelayNanos(candidateNextDeadline)); + } } - break; + return; } } } @@ -250,39 +240,25 @@ class EpollEventLoop extends SingleThreadEventLoop { } } - private void checkScheduleTaskQueueForNewDelay() throws IOException { - final long deadlineNanos = nextScheduledTaskDeadlineNanos(); - if (deadlineNanos != -1) { - trySetTimerFd(deadlineNanos); + private long checkScheduleTaskQueueForNewDelay(long timerFdDeadline) throws IOException { + assert nextDeadlineNanos.get() < 0; + final long nextTaskDeadlineNanos = nextScheduledTaskDeadlineNanos(); + if (nextTaskDeadlineNanos == -1 || nextTaskDeadlineNanos >= timerFdDeadline) { + // Just restore to preexisting timerFd value, update not needed + nextDeadlineNanos.lazySet(timerFdDeadline); + } else { + synchronized (nextDeadlineNanos) { + // Shorter delay required than current timerFd setting, update it + nextDeadlineNanos.lazySet(timerFdDeadline = nextTaskDeadlineNanos); + setTimerFd(deadlineToDelayNanos(timerFdDeadline)); + } } + return timerFdDeadline; // Don't disarm the timerFd even if there are no more queued tasks. Since we are setting timerFd from outside // the EventLoop it is possible that another thread has set the timer and we may miss a wakeup if we disarm // the timer here. Instead we wait for the timer wakeup on the EventLoop and clear state for the next timer. } - @Override - protected void minimumDelayScheduledTaskRemoved(@SuppressWarnings("unused") Runnable task, - @SuppressWarnings("unused") long deadlineNanos) { - // It is OK to reset nextDeadlineNanos here because we are in the event loop thread, and the event loop is - // guaranteed to transfer all pending ScheduledFutureTasks from the executor queue to the scheduled - // PriorityQueue and we will set the next expiration time. If another thread races with this thread inserting a - // ScheduledFutureTasks into the executor queue it should be OK, assuming the executor queue insertion is - // visible to the event loop thread. - // - // Assume the current minimum timer is delayNanos = 10 - // Thread A -> execute(ScheduledFutureTask(delayNanos = 12)), - // add ScheduledFutureTask to the executor queue - // fail to set nextDeadlineNanos, so no call to setTimerFd is made - // EventLoop -> minimumDelayScheduledTaskRemoved(10), - // set nextDeadlineNanos to MAXIMUM_DEADLINE - // ... process more tasks ... - // drain all the tasks from the executor queue, see that 12 is the next delay, call setTimerFd - nextDeadlineNanos.set(MAXIMUM_DEADLINE); - - // Note that we don't actually call setTimerFd here, we don't want to interrupt the actual timerFd and let - // the end of the EventLoop determine what the timerFd value should be (after execute queue is drained). - } - @Override protected void wakeup(boolean inEventLoop) { if (!inEventLoop && wakenUp.getAndSet(1) == 0) { @@ -395,6 +371,7 @@ class EpollEventLoop extends SingleThreadEventLoop { @Override protected void run() { + long timerFdDeadline = Long.MAX_VALUE; for (;;) { try { processPendingChannelFlags(); @@ -412,28 +389,34 @@ class EpollEventLoop extends SingleThreadEventLoop { wakenUp.set(0); } if (!hasTasks()) { - strategy = epollWait(); + // When we are in the EventLoop we don't bother setting the timerFd for each + // scheduled task, but instead defer the processing until the end of the EventLoop + // (next wait) to reduce the timerFd modifications. + timerFdDeadline = checkScheduleTaskQueueForNewDelay(timerFdDeadline); + try { + strategy = epollWait(); + } finally { + // This getAndAdd will change the raw value of nextDeadlineNanos to be negative + // which will block any *new* timerFd mods by other threads while also "preserving" + // its last value to avoid disrupting a possibly-concurrent setTimerFd call + // (so that we can know the timerFd really did/will get updated to the read value). + timerFdDeadline = nextDeadlineNanos.getAndAdd(Long.MAX_VALUE + 1); + // The value of nextDeadlineNanos is now guaranteed to be negative + } } // fallthrough default: } try { - processReady(events, strategy); - } finally { - try { - // Note the timerFd code depends upon running all the tasks on each event loop run. This is so - // we can get an accurate "next wakeup time" after the event loop run completes. - runAllTasks(); - } finally { - // No need to drainScheduledQueue() after the fact, because all in event loop scheduling results - // in direct addition to the scheduled priority queue. - - // When we are in the EventLoop we don't bother setting the timerFd for each scheduled task, but - // instead defer the processing until the end of the EventLoop to reduce the timerFd - // modifications. - checkScheduleTaskQueueForNewDelay(); + if (processReady(events, strategy)) { + // Polled events include timerFd expiry; conservatively assume that no timer is set + timerFdDeadline = Long.MAX_VALUE; } + } finally { + runAllTasks(); + // No need to drainScheduledQueue() after the fact, because all in event loop scheduling results + // in direct addition to the scheduled priority queue. } if (allowGrowing && strategy == events.length()) { //increase the size of the array as we needed the whole space for the events @@ -487,7 +470,9 @@ class EpollEventLoop extends SingleThreadEventLoop { } } - private void processReady(EpollEventArray events, int ready) { + // Returns true if a timerFd event was encountered + private boolean processReady(EpollEventArray events, int ready) { + boolean timerFired = false; for (int i = 0; i < ready; ++i) { final int fd = events.fd(i); if (fd == eventFd.intValue()) { @@ -495,27 +480,7 @@ class EpollEventLoop extends SingleThreadEventLoop { // // See also https://stackoverflow.com/a/12492308/1074097 } else if (fd == timerFd.intValue()) { - // consume wakeup event, necessary because the timer is added with ET mode. - Native.timerFdRead(fd); - - // The timer is normalized, monotonically increasing, and the next value is always set to the minimum - // value of the pending timers. When the timer fires we can unset the timer value. - // Worst case another thread races with this thread and we set another timer event while processing - // this timer event and we get a duplicate wakeup some time in the future. - // - // This works because we always drain all ScheduledFutureTasks from the executor queue to the scheduled - // PriorityQueue and we will set the next expiration time. If another thread races with this thread - // inserting a ScheduledFutureTasks into the executor queue it should be OK, assuming the executor queue - // insertion is visible to the event loop thread. - // - // Assume the current minimum timer is nextDeadlineNanos = 10 - // Thread A -> execute(ScheduledFutureTask(delayNanos = 12)), - // add ScheduledFutureTask to the executor queue - // fail to set nextDeadlineNanos, so no call to setTimerFd is made - // EventLoop -> process timer wakeup here, set nextDeadlineNanos to MAXIMUM_DEADLINE - // ... process more tasks ... - // drain all the tasks from executor queue, see 12 is the next delay, call setTimerFd - nextDeadlineNanos.set(MAXIMUM_DEADLINE); + timerFired = true; } else { final long ev = events.events(i); @@ -569,6 +534,7 @@ class EpollEventLoop extends SingleThreadEventLoop { } } } + return timerFired; } @Override diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTest.java new file mode 100644 index 0000000000..f66a55e2c7 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTest.java @@ -0,0 +1,72 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.unix.FileDescriptor; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class EpollTest { + + @Test + public void testIsAvailable() { + assertTrue(Epoll.isAvailable()); + } + + // Testcase for https://github.com/netty/netty/issues/8444 + @Test(timeout = 5000) + public void testEpollWaitWithTimeOutMinusOne() throws Exception { + final EpollEventArray eventArray = new EpollEventArray(8); + try { + final FileDescriptor epoll = Native.newEpollCreate(); + final FileDescriptor timerFd = Native.newTimerFd(); + final FileDescriptor eventfd = Native.newEventFd(); + Native.epollCtlAdd(epoll.intValue(), timerFd.intValue(), Native.EPOLLIN); + Native.epollCtlAdd(epoll.intValue(), eventfd.intValue(), Native.EPOLLIN); + + final AtomicReference ref = new AtomicReference(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + assertEquals(1, Native.epollWait(epoll, eventArray, false)); + // This should have been woken up because of eventfd_write. + assertEquals(eventfd.intValue(), eventArray.fd(0)); + } catch (Throwable cause) { + ref.set(cause); + } + } + }); + t.start(); + t.join(1000); + assertTrue(t.isAlive()); + Native.eventFdWrite(eventfd.intValue(), 1); + + t.join(); + assertNull(ref.get()); + epoll.close(); + timerFd.close(); + eventfd.close(); + } finally { + eventArray.free(); + } + } +} diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 7eff9e85fa..d0bc086cce 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -102,20 +102,6 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im return promise; } - @Override - protected void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) { - super.executeScheduledRunnable(wakesUpForScheduledRunnable() ? runnable : new NonWakeupRunnable() { - @Override - public void run() { - runnable.run(); - } - }, isAddition, deadlineNanos); - } - - protected boolean wakesUpForScheduledRunnable() { - return true; - } - /** * Adds a task to be run once at the end of next (or current) {@code eventloop} iteration. * @@ -149,11 +135,6 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im return tailTasks.remove(ObjectUtil.checkNotNull(task, "task")); } - @Override - protected boolean wakesUpForTask(Runnable task) { - return !(task instanceof NonWakeupRunnable); - } - @Override protected void afterRunningAllTasks() { runAllTasksFrom(tailTasks); @@ -182,5 +163,5 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im /** * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases. */ - interface NonWakeupRunnable extends Runnable { } + interface NonWakeupRunnable extends SingleThreadEventExecutor.NonWakeupRunnable { } } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index 8dd7609b41..ace430afc9 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -124,6 +124,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { * waken up. */ private final AtomicBoolean wakenUp = new AtomicBoolean(); + private volatile long nextWakeupTime = Long.MAX_VALUE; private final SelectStrategy selectStrategy; @@ -763,6 +764,16 @@ public final class NioEventLoop extends SingleThreadEventLoop { } } + @Override + protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { + return deadlineNanos < nextWakeupTime; + } + + @Override + protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { + return deadlineNanos < nextWakeupTime; + } + Selector unwrappedSelector() { return unwrappedSelector; } @@ -785,6 +796,11 @@ public final class NioEventLoop extends SingleThreadEventLoop { long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); + long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime(); + if (nextWakeupTime != normalizedDeadlineNanos) { + nextWakeupTime = normalizedDeadlineNanos; + } + for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) {