diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 41430c708c..6b52151dbf 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit; * Abstract base class for {@link EventExecutor}s that want to support scheduling. */ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { - private static final Comparator> SCHEDULED_FUTURE_TASK_COMPARATOR = new Comparator>() { @Override @@ -51,6 +50,24 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut return ScheduledFutureTask.nanoTime(); } + /** + * Given an arbitrary deadline {@code deadlineNanos}, calculate the number of nano seconds from now + * {@code deadlineNanos} would expire. + * @param deadlineNanos An arbitrary deadline in nano seconds. + * @return the number of nano seconds from now {@code deadlineNanos} would expire. + */ + protected static long deadlineToDelayNanos(long deadlineNanos) { + return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos); + } + + /** + * The initial value used for delay and computations based upon a monatomic time source. + * @return initial value used for delay and computations based upon a monatomic time source. + */ + protected static long initialNanoTime() { + return ScheduledFutureTask.initialNanoTime(); + } + PriorityQueue> scheduledTaskQueue() { if (scheduledTaskQueue == null) { scheduledTaskQueue = new DefaultPriorityQueue>( @@ -85,6 +102,10 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut } scheduledTaskQueue.clearIgnoringIndexes(); + + // calling minimumDelayScheduledTaskRemoved was considered here to give a chance for EventLoop + // implementations the opportunity to cleanup any timerFds, but this is currently only called when the EventLoop + // is being shutdown, so the timerFd and associated polling mechanism will be destroyed anyways. } /** @@ -99,16 +120,21 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}. */ protected final Runnable pollScheduledTask(long nanoTime) { + Queue> scheduledTaskQueue = this.scheduledTaskQueue; + return scheduledTaskQueue != null ? pollScheduledTask(scheduledTaskQueue, nanoTime, true) : null; + } + + final Runnable pollScheduledTask(Queue> scheduledTaskQueue, long nanoTime, + boolean notifyMinimumDeadlineRemoved) { + assert scheduledTaskQueue != null; assert inEventLoop(); - Queue> scheduledTaskQueue = this.scheduledTaskQueue; - ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); - if (scheduledTask == null) { - return null; - } - - if (scheduledTask.deadlineNanos() <= nanoTime) { - scheduledTaskQueue.remove(); + ScheduledFutureTask scheduledTask = scheduledTaskQueue.peek(); + if (scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime) { + scheduledTaskQueue.poll(); + if (notifyMinimumDeadlineRemoved) { + minimumDelayScheduledTaskRemoved(scheduledTask, scheduledTask.deadlineNanos()); + } return scheduledTask; } return null; @@ -118,28 +144,29 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut * Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled. */ protected final long nextScheduledTaskNano() { - Queue> scheduledTaskQueue = this.scheduledTaskQueue; - ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); - if (scheduledTask == null) { - return -1; - } - return Math.max(0, scheduledTask.deadlineNanos() - nanoTime()); + ScheduledFutureTask scheduledTask = peekScheduledTask(); + return scheduledTask != null ? Math.max(0, scheduledTask.deadlineNanos() - nanoTime()) : -1; + } + + /** + * Return the deadline (in nanoseconds) when the next scheduled task is ready to be run or {@code -1} + * if no task is scheduled. + */ + protected final long nextScheduledTaskDeadlineNanos() { + ScheduledFutureTask scheduledTask = peekScheduledTask(); + return scheduledTask != null ? scheduledTask.deadlineNanos() : -1; } final ScheduledFutureTask peekScheduledTask() { Queue> scheduledTaskQueue = this.scheduledTaskQueue; - if (scheduledTaskQueue == null) { - return null; - } - return scheduledTaskQueue.peek(); + return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null; } /** * Returns {@code true} if a scheduled task is ready for processing. */ protected final boolean hasScheduledTasks() { - Queue> scheduledTaskQueue = this.scheduledTaskQueue; - ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); + ScheduledFutureTask scheduledTask = peekScheduledTask(); return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime(); } @@ -225,16 +252,16 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut // NOOP } - ScheduledFuture schedule(final ScheduledFutureTask task) { + private ScheduledFuture schedule(final ScheduledFutureTask task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { - execute(new Runnable() { + executeScheduledRunnable(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } - }); + }, true, task.deadlineNanos()); } return task; @@ -242,14 +269,48 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut final void removeScheduled(final ScheduledFutureTask task) { if (inEventLoop()) { - scheduledTaskQueue().removeTyped(task); + removedSchedule0(task); } else { - execute(new Runnable() { + executeScheduledRunnable(new Runnable() { @Override public void run() { - removeScheduled(task); + removedSchedule0(task); } - }); + }, false, task.deadlineNanos()); } } + + private void removedSchedule0(final ScheduledFutureTask task) { + if (scheduledTaskQueue == null || task == null) { + return; + } + if (scheduledTaskQueue.peek() == task) { + scheduledTaskQueue.poll(); + minimumDelayScheduledTaskRemoved(task, task.deadlineNanos()); + } else { + scheduledTaskQueue.removeTyped(task); + } + } + + /** + * Execute a {@link Runnable} from outside the event loop thread that is responsible for adding or removing + * a scheduled action. Note that schedule events which occur on the event loop thread do not interact with this + * method. + * @param runnable The {@link Runnable} to execute which will add or remove a scheduled action + * @param isAddition {@code true} if the {@link Runnable} will add an action, {@code false} if it will remove an + * action + * @param deadlineNanos the deadline in nanos of the scheduled task that will be added or removed. + */ + protected void executeScheduledRunnable(Runnable runnable, + @SuppressWarnings("unused") boolean isAddition, + @SuppressWarnings("unused") long deadlineNanos) { + execute(runnable); + } + + /** + * The next task to expire (e.g. minimum delay) has been removed from the scheduled priority queue. + */ + protected void minimumDelayScheduledTaskRemoved(@SuppressWarnings("unused") Runnable task, + @SuppressWarnings("unused") long deadlineNanos) { + } } diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index 1eaa7b9276..1d1403e3f0 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -40,6 +40,10 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; } + static long initialNanoTime() { + return START_TIME; + } + private final long id = nextTaskId.getAndIncrement(); private long deadlineNanos; /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ @@ -85,7 +89,11 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu } public long delayNanos() { - return Math.max(0, deadlineNanos() - nanoTime()); + return deadlineToDelayNanos(deadlineNanos()); + } + + static long deadlineToDelayNanos(long deadlineNanos) { + return Math.max(0, deadlineNanos - nanoTime()); } public long delayNanos(long currentTimeNanos) { @@ -111,9 +119,8 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu return 1; } else if (id < that.id) { return -1; - } else if (id == that.id) { - throw new Error(); } else { + assert id != that.id; return 1; } } @@ -132,11 +139,10 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu if (!isCancelled()) { task.call(); if (!executor().isShutdown()) { - long p = periodNanos; - if (p > 0) { - deadlineNanos += p; + if (periodNanos > 0) { + deadlineNanos += periodNanos; } else { - deadlineNanos = nanoTime() - p; + deadlineNanos = nanoTime() - periodNanos; } if (!isCancelled()) { // scheduledTaskQueue can never be null as we lazy init it before submit the task! diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index a7aee9a4c6..cf635f7216 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -73,6 +73,12 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx // Do nothing. } }; + private static final Runnable BOOKEND_TASK = new Runnable() { + @Override + public void run() { + // Do nothing. + } + }; private static final AtomicIntegerFieldUpdater STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state"); @@ -283,19 +289,41 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } private boolean fetchFromScheduledTaskQueue() { + if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { + return true; + } long nanoTime = AbstractScheduledEventExecutor.nanoTime(); - Runnable scheduledTask = pollScheduledTask(nanoTime); + Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, true); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. - scheduledTaskQueue().add((ScheduledFutureTask) scheduledTask); + scheduledTaskQueue.add((ScheduledFutureTask) scheduledTask); return false; } - scheduledTask = pollScheduledTask(nanoTime); + scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false); } return true; } + /** + * @return {@code true} if at least one scheduled task was executed. + */ + private boolean executeExpiredScheduledTasks(boolean notifyMinimumDeadlineRemoved) { + if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { + return false; + } + long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, notifyMinimumDeadlineRemoved); + if (scheduledTask != null) { + do { + safeExecute(scheduledTask); + scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false); + } while (scheduledTask != null); + return true; + } + return false; + } + /** * @see Queue#peek() */ @@ -376,6 +404,36 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx return ranAtLeastOne; } + /** + * Execute all expired scheduled tasks and all current tasks in the executor queue until both queues are empty, + * or {@code maxDrainAttempts} has been exceeded. + * @param maxDrainAttempts The maximum amount of times this method attempts to drain from queues. This is to prevent + * continuous task execution and scheduling from preventing the EventExecutor thread to + * make progress and return to the selector mechanism to process inbound I/O events. + * @return {@code true} if at least one task was run. + */ + protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) { + assert inEventLoop(); + // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued + // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe. + boolean ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue); + boolean ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(true); + int drainAttempt = 0; + while ((ranAtLeastOneExecutorTask || ranAtLeastOneScheduledTask) && ++drainAttempt < maxDrainAttempts) { + // We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued + // here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe. + ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue); + ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(false); + } + + if (drainAttempt > 0) { + lastExecutionTime = ScheduledFutureTask.nanoTime(); + } + afterRunningAllTasks(); + + return drainAttempt > 0; + } + /** * Runs all tasks from the passed {@code taskQueue}. * @@ -397,6 +455,55 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } } + /** + * What ever tasks are present in {@code taskQueue} when this method is invoked will be {@link Runnable#run()}. + * @param taskQueue the task queue to drain. + * @return {@code true} if at least {@link Runnable#run()} was called. + */ + private boolean runExistingTasksFrom(Queue taskQueue) { + return taskQueue.offer(BOOKEND_TASK) ? runExistingTasksUntilBookend(taskQueue) + : runExistingTasksUntilMaxTasks(taskQueue); + } + + private boolean runExistingTasksUntilBookend(Queue taskQueue) { + Runnable task = pollTaskFrom(taskQueue); + // null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue, and + // null elements are not permitted to be inserted into the queue. + if (task == BOOKEND_TASK) { + return false; + } + for (;;) { + safeExecute(task); + task = pollTaskFrom(taskQueue); + // null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue, + // and null elements are not permitted to be inserted into the queue. + if (task == BOOKEND_TASK) { + return true; + } + } + } + + private boolean runExistingTasksUntilMaxTasks(Queue taskQueue) { + Runnable task = pollTaskFrom(taskQueue); + // BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted into + // the queue, and if was previously inserted we always drain all the elements from queue including BOOKEND_TASK. + if (task == null) { + return false; + } + int i = 0; + do { + safeExecute(task); + task = pollTaskFrom(taskQueue); + // BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted + // into the queue, and if was previously inserted we always drain all the elements from queue including + // BOOKEND_TASK. + if (task == null) { + return true; + } + } while (++i < maxPendingTasks); + return true; + } + /** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}. @@ -443,6 +550,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx */ @UnstableApi protected void afterRunningAllTasks() { } + /** * Returns the amount of time left until the scheduled task with the closest dead line is executed. */ @@ -846,8 +954,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx return threadProperties; } - @SuppressWarnings("unused") - protected boolean wakesUpForTask(Runnable task) { + protected boolean wakesUpForTask(@SuppressWarnings("unused") Runnable task) { return true; } diff --git a/transport-native-epoll/src/main/c/netty_epoll_native.c b/transport-native-epoll/src/main/c/netty_epoll_native.c index 3f08c341cf..fc0c883b37 100644 --- a/transport-native-epoll/src/main/c/netty_epoll_native.c +++ b/transport-native-epoll/src/main/c/netty_epoll_native.c @@ -186,6 +186,31 @@ static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) { return efd; } +static void netty_epoll_native_timerFdSetTime(JNIEnv* env, jclass clazz, jint timerFd, jint tvSec, jint tvNsec) { + struct itimerspec ts; + memset(&ts.it_interval, 0, sizeof(struct timespec)); + ts.it_value.tv_sec = tvSec; + ts.it_value.tv_nsec = tvNsec; + if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) { + netty_unix_errors_throwIOExceptionErrorNo(env, "timerfd_settime() failed: ", errno); + } +} + +static jint netty_epoll_native_epollWaitNoTimeout(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jboolean immediatePoll) { + struct epoll_event *ev = (struct epoll_event*) (intptr_t) address; + const int timeout = immediatePoll ? 0 : -1; + int result, err; + + do { + result = epoll_wait(efd, ev, len, timeout); + if (result >= 0) { + return result; + } + } while((err = errno) == EINTR); + return -err; +} + +// This method is deprecated! static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) { struct epoll_event *ev = (struct epoll_event*) (intptr_t) address; int result, err; @@ -429,8 +454,10 @@ static const JNINativeMethod fixed_method_table[] = { { "eventFdWrite", "(IJ)V", (void *) netty_epoll_native_eventFdWrite }, { "eventFdRead", "(I)V", (void *) netty_epoll_native_eventFdRead }, { "timerFdRead", "(I)V", (void *) netty_epoll_native_timerFdRead }, + { "timerFdSetTime", "(III)V", (void *) netty_epoll_native_timerFdSetTime }, { "epollCreate", "()I", (void *) netty_epoll_native_epollCreate }, - { "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, + { "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, // This method is deprecated! + { "epollWaitNoTimeout", "(IJIZ)I", (void *) netty_epoll_native_epollWaitNoTimeout }, { "epollBusyWait0", "(IJI)I", (void *) netty_epoll_native_epollBusyWait0 }, { "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 }, { "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 }, diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 27bc1d1065..e6ac6c66e3 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -35,7 +35,9 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import java.io.IOException; import java.util.Queue; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static java.lang.Math.min; @@ -44,8 +46,10 @@ import static java.lang.Math.min; */ class EpollEventLoop extends SingleThreadEventLoop { private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class); - private static final AtomicIntegerFieldUpdater WAKEN_UP_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp"); + /** + * The maximum deadline value before overlap occurs on the time source. + */ + private static final long MAXIMUM_DEADLINE = initialNanoTime() - 1; static { // Ensure JNI is initialized by the time this class is loaded by this time! @@ -53,8 +57,12 @@ class EpollEventLoop extends SingleThreadEventLoop { Epoll.ensureAvailability(); } - // Pick a number that no task could have previously used. - private long prevDeadlineNanos = nanoTime() - 1; + /** + * Note that we use deadline instead of delay because deadline is just a fixed number but delay requires interacting + * with the time source (e.g. calling System.nanoTime()) which can be expensive. + */ + private final AtomicLong nextDeadlineNanos = new AtomicLong(MAXIMUM_DEADLINE); + private final AtomicInteger wakenUp = new AtomicInteger(); private final FileDescriptor epollFd; private final FileDescriptor eventFd; private final FileDescriptor timerFd; @@ -73,12 +81,6 @@ class EpollEventLoop extends SingleThreadEventLoop { return epollWaitNow(); } }; - @SuppressWarnings("unused") // AtomicIntegerFieldUpdater - private volatile int wakenUp; - private volatile int ioRatio = 50; - - // See http://man7.org/linux/man-pages/man2/timerfd_create.2.html. - private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999; EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, @@ -175,9 +177,112 @@ class EpollEventLoop extends SingleThreadEventLoop { return datagramPacketArray; } + @Override + protected void executeScheduledRunnable(Runnable runnable, boolean isAddition, long deadlineNanos) { + if (isAddition) { + try { + trySetTimerFd(deadlineNanos); + } catch (IOException cause) { + throw new RejectedExecutionException(cause); + } + } + // else this is a removal of scheduled task and we could attempt to detect if this task was responsible + // for the next delay, and find the next lowest delay in the queue to re-set the timer. However this + // is not practical for the following reasons: + // 1. The data structure is a PriorityQueue, and the scheduled task has not yet been removed. This means + // we would have to add/remove the head element to find the "next timeout". + // 2. We are not on the EventLoop thread, and the PriorityQueue is not thread safe. We could attempt + // to do (1) if we are on the EventLoop but when the EventLoop wakes up it checks if the timeout changes + // when it is woken up and before it calls epoll_wait again and adjusts the timer accordingly. + // The result is we wait until we are in the EventLoop and doing the actual removal, and also processing + // regular polling in the EventLoop too. + + execute(runnable); + } + + @Override + protected boolean wakesUpForScheduledRunnable() { + return false; + } + + @Override + protected boolean runAllTasks() { + // This method is overridden to ensure that all the expired scheduled tasks are executed during shutdown, and + // any other execute all scenarios in the base class. + return runScheduledAndExecutorTasks(4); + } + + private void trySetTimerFd(long candidateNextDeadline) throws IOException { + for (;;) { + long nextDeadline = nextDeadlineNanos.get(); + if (nextDeadline - candidateNextDeadline <= 0) { + break; + } + if (nextDeadlineNanos.compareAndSet(nextDeadline, candidateNextDeadline)) { + setTimerFd(deadlineToDelayNanos(candidateNextDeadline)); + // We are setting the timerFd outside of the EventLoop so it is possible that we raced with another call + // to set the timer and temporarily increased the value, in which case we should set it back to the + // lower value. + nextDeadline = nextDeadlineNanos.get(); + if (nextDeadline - candidateNextDeadline < 0) { + setTimerFd(deadlineToDelayNanos(nextDeadline)); + } + break; + } + } + } + + private void setTimerFd(long candidateNextDelayNanos) throws IOException { + if (candidateNextDelayNanos > 0) { + final int delaySeconds = (int) min(candidateNextDelayNanos / 1000000000L, Integer.MAX_VALUE); + final int delayNanos = (int) min(candidateNextDelayNanos - delaySeconds * 1000000000L, Integer.MAX_VALUE); + Native.timerFdSetTime(timerFd.intValue(), delaySeconds, delayNanos); + } else { + // Setting the timer to 0, 0 will disarm it, so we have a few options: + // 1. Set the timer wakeup to 1ns (1 system call). + // 2. Use the eventFd to force a wakeup and disarm the timer (2 system calls). + // For now we are using option (1) because there are less system calls, and we will correctly reset the + // nextDeadlineNanos state when the EventLoop processes the timer wakeup. + Native.timerFdSetTime(timerFd.intValue(), 0, 1); + } + } + + private void checkScheduleTaskQueueForNewDelay() throws IOException { + final long deadlineNanos = nextScheduledTaskDeadlineNanos(); + if (deadlineNanos != -1) { + trySetTimerFd(deadlineNanos); + } + // Don't disarm the timerFd even if there are no more queued tasks. Since we are setting timerFd from outside + // the EventLoop it is possible that another thread has set the timer and we may miss a wakeup if we disarm + // the timer here. Instead we wait for the timer wakeup on the EventLoop and clear state for the next timer. + } + + @Override + protected void minimumDelayScheduledTaskRemoved(@SuppressWarnings("unused") Runnable task, + @SuppressWarnings("unused") long deadlineNanos) { + // It is OK to reset nextDeadlineNanos here because we are in the event loop thread, and the event loop is + // guaranteed to transfer all pending ScheduledFutureTasks from the executor queue to the scheduled + // PriorityQueue and we will set the next expiration time. If another thread races with this thread inserting a + // ScheduledFutureTasks into the executor queue it should be OK, assuming the executor queue insertion is + // visible to the event loop thread. + // + // Assume the current minimum timer is delayNanos = 10 + // Thread A -> execute(ScheduledFutureTask(delayNanos = 12)), + // add ScheduledFutureTask to the executor queue + // fail to set nextDeadlineNanos, so no call to setTimerFd is made + // EventLoop -> minimumDelayScheduledTaskRemoved(10), + // set nextDeadlineNanos to MAXIMUM_DEADLINE + // ... process more tasks ... + // drain all the tasks from the executor queue, see that 12 is the next delay, call setTimerFd + nextDeadlineNanos.set(MAXIMUM_DEADLINE); + + // Note that we don't actually call setTimerFd here, we don't want to interrupt the actual timerFd and let + // the end of the EventLoop determine what the timerFd value should be (after execute queue is drained). + } + @Override protected void wakeup(boolean inEventLoop) { - if (!inEventLoop && WAKEN_UP_UPDATER.getAndSet(this, 1) == 0) { + if (!inEventLoop && wakenUp.getAndSet(1) == 0) { // write to the evfd which will then wake-up epoll_wait(...) Native.eventFdWrite(eventFd.intValue(), 1L); } @@ -237,47 +342,21 @@ class EpollEventLoop extends SingleThreadEventLoop { : PlatformDependent.newMpscQueue(maxPendingTasks); } - /** - * Returns the percentage of the desired amount of time spent for I/O in the event loop. - */ - public int getIoRatio() { - return ioRatio; - } - - /** - * Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is - * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. - */ - public void setIoRatio(int ioRatio) { - if (ioRatio <= 0 || ioRatio > 100) { - throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); - } - this.ioRatio = ioRatio; - } - @Override public int registeredChannels() { return channels.size(); } private int epollWait() throws IOException { - int delaySeconds; - int delayNanos; - long curDeadlineNanos = deadlineNanos(); - if (curDeadlineNanos == prevDeadlineNanos) { - delaySeconds = -1; - delayNanos = -1; - } else { - long totalDelay = delayNanos(System.nanoTime()); - prevDeadlineNanos = curDeadlineNanos; - delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE); - delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS); - } - return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos); + // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event. + // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended + // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed + // in pipeline. + return Native.epollWait(epollFd, events, hasTasks()); } private int epollWaitNow() throws IOException { - return Native.epollWait(epollFd, events, timerFd, 0, 0); + return Native.epollWait(epollFd, events, true); } private int epollBusyWait() throws IOException { @@ -298,8 +377,8 @@ class EpollEventLoop extends SingleThreadEventLoop { break; case SelectStrategy.SELECT: - if (wakenUp == 1) { - wakenUp = 0; + if (wakenUp.get() == 1) { + wakenUp.set(0); } if (!hasTasks()) { strategy = epollWait(); @@ -308,27 +387,21 @@ class EpollEventLoop extends SingleThreadEventLoop { default: } - final int ioRatio = this.ioRatio; - if (ioRatio == 100) { + try { + processReady(events, strategy); + } finally { try { - if (strategy > 0) { - processReady(events, strategy); - } - } finally { - // Ensure we always run tasks. + // Note the timerFd code depends upon running all the tasks on each event loop run. This is so + // we can get an accurate "next wakeup time" after the event loop run completes. runAllTasks(); - } - } else { - final long ioStartTime = System.nanoTime(); - - try { - if (strategy > 0) { - processReady(events, strategy); - } } finally { - // Ensure we always run tasks. - final long ioTime = System.nanoTime() - ioStartTime; - runAllTasks(ioTime * (100 - ioRatio) / ioRatio); + // No need to drainScheduledQueue() after the fact, because all in event loop scheduling results + // in direct addition to the scheduled priority queue. + + // When we are in the EventLoop we don't bother setting the timerFd for each scheduled task, but + // instead defer the processing until the end of the EventLoop to reduce the timerFd + // modifications. + checkScheduleTaskQueueForNewDelay(); } } if (allowGrowing && strategy == events.length()) { @@ -384,12 +457,34 @@ class EpollEventLoop extends SingleThreadEventLoop { } private void processReady(EpollEventArray events, int ready) { - for (int i = 0; i < ready; i ++) { + for (int i = 0; i < ready; ++i) { final int fd = events.fd(i); - if (fd == eventFd.intValue() || fd == timerFd.intValue()) { + if (fd == eventFd.intValue()) { // Just ignore as we use ET mode for the eventfd and timerfd. // // See also https://stackoverflow.com/a/12492308/1074097 + } else if (fd == timerFd.intValue()) { + // consume wakeup event, necessary because the timer is added with ET mode. + Native.timerFdRead(fd); + + // The timer is normalized, monotonically increasing, and the next value is always set to the minimum + // value of the pending timers. When the timer fires we can unset the timer value. + // Worst case another thread races with this thread and we set another timer event while processing + // this timer event and we get a duplicate wakeup some time in the future. + // + // This works because we always drain all ScheduledFutureTasks from the executor queue to the scheduled + // PriorityQueue and we will set the next expiration time. If another thread races with this thread + // inserting a ScheduledFutureTasks into the executor queue it should be OK, assuming the executor queue + // insertion is visible to the event loop thread. + // + // Assume the current minimum timer is nextDeadlineNanos = 10 + // Thread A -> execute(ScheduledFutureTask(delayNanos = 12)), + // add ScheduledFutureTask to the executor queue + // fail to set nextDeadlineNanos, so no call to setTimerFd is made + // EventLoop -> process timer wakeup here, set nextDeadlineNanos to MAXIMUM_DEADLINE + // ... process more tasks ... + // drain all the tasks from executor queue, see 12 is the next delay, call setTimerFd + nextDeadlineNanos.set(MAXIMUM_DEADLINE); } else { final long ev = events.events(i); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java index 85c5d5c158..4047fb7244 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoopGroup.java @@ -21,7 +21,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopTaskQueueFactory; import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategyFactory; -import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorChooserFactory; import io.netty.util.concurrent.RejectedExecutionHandler; import io.netty.util.concurrent.RejectedExecutionHandlers; @@ -128,12 +127,12 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup { } /** - * Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is - * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. + * @deprecated This method will be removed in future releases, and is not guaranteed to have any impacts. */ + @Deprecated public void setIoRatio(int ioRatio) { - for (EventExecutor e: this) { - ((EpollEventLoop) e).setIoRatio(ioRatio); + if (ioRatio <= 0 || ioRatio > 100) { + throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); } } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java index fed3e81e5e..b8d17f011a 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/Native.java @@ -84,6 +84,7 @@ public final class Native { public static native void eventFdWrite(int fd, long value); public static native void eventFdRead(int fd); static native void timerFdRead(int fd); + static native void timerFdSetTime(int fd, int sec, int nsec) throws IOException; public static FileDescriptor newEpollCreate() { return new FileDescriptor(epollCreate()); @@ -91,6 +92,10 @@ public final class Native { private static native int epollCreate(); + /** + * @deprecated this method is no longer supported. This functionality is internal to this package. + */ + @Deprecated public static int epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd, int timeoutSec, int timeoutNs) throws IOException { int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(), @@ -100,7 +105,14 @@ public final class Native { } return ready; } - private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs); + + static int epollWait(FileDescriptor epollFd, EpollEventArray events, boolean immediatePoll) throws IOException { + int ready = epollWaitNoTimeout(epollFd.intValue(), events.memoryAddress(), events.length(), immediatePoll); + if (ready < 0) { + throw newIOException("epoll_wait", ready); + } + return ready; + } /** * Non-blocking variant of @@ -115,6 +127,8 @@ public final class Native { return ready; } + private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs); + private static native int epollWaitNoTimeout(int efd, long address, int len, boolean immediatePoll); private static native int epollBusyWait0(int efd, long address, int len); public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTest.java deleted file mode 100644 index 5a9cb19a1f..0000000000 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.channel.epoll; - -import io.netty.channel.unix.FileDescriptor; -import org.junit.Test; - -import java.util.concurrent.atomic.AtomicReference; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -public class EpollTest { - - @Test - public void testIsAvailable() { - assertTrue(Epoll.isAvailable()); - } - - // Testcase for https://github.com/netty/netty/issues/8444 - @Test(timeout = 5000) - public void testEpollWaitWithTimeOutMinusOne() throws Exception { - final EpollEventArray eventArray = new EpollEventArray(8); - try { - final FileDescriptor epoll = Native.newEpollCreate(); - final FileDescriptor timerFd = Native.newTimerFd(); - final FileDescriptor eventfd = Native.newEventFd(); - Native.epollCtlAdd(epoll.intValue(), timerFd.intValue(), Native.EPOLLIN); - Native.epollCtlAdd(epoll.intValue(), eventfd.intValue(), Native.EPOLLIN); - - final AtomicReference ref = new AtomicReference(); - Thread t = new Thread(new Runnable() { - @Override - public void run() { - try { - assertEquals(1, Native.epollWait(epoll, eventArray, timerFd, -1, -1)); - // This should have been woken up because of eventfd_write. - assertEquals(eventfd.intValue(), eventArray.fd(0)); - } catch (Throwable cause) { - ref.set(cause); - } - } - }); - t.start(); - t.join(1000); - assertTrue(t.isAlive()); - Native.eventFdWrite(eventfd.intValue(), 1); - - t.join(); - assertNull(ref.get()); - epoll.close(); - timerFd.close(); - eventfd.close(); - } finally { - eventArray.free(); - } - } -} diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index 9abb39fc38..7eff9e85fa 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -102,6 +102,20 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im return promise; } + @Override + protected void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) { + super.executeScheduledRunnable(wakesUpForScheduledRunnable() ? runnable : new NonWakeupRunnable() { + @Override + public void run() { + runnable.run(); + } + }, isAddition, deadlineNanos); + } + + protected boolean wakesUpForScheduledRunnable() { + return true; + } + /** * Adds a task to be run once at the end of next (or current) {@code eventloop} iteration. *