EPOLL - decouple schedule tasks from epoll_wait life cycle (#7834)

Motivation:
EPOLL supports decoupling the timed wakeup mechanism from the selector call. The EPOLL transport takes advantage of this in order to offer more fine grained timer resolution. However we are current calling timerfd_settime on each call to epoll_wait and this is expensive. We don't have to re-arm the timer on every call to epoll_wait and instead only have to arm the timer when a task is scheduled with an earlier expiration than any other existing scheduled task.

Modifications:
- Before scheduled tasks are added to the task queue, we determine if the new
  duration is the soonest to expire, and if so update with timerfd_settime. We
also drain all the tasks at the end of the event loop to make sure we service
any expired tasks and get an accurate next time delay.
- EpollEventLoop maintains a volatile variable which represents the next deadline to expire. This variable is modified inside the event loop thread (before calling epoll_wait) and out side the event loop thread (immediately to ensure proper wakeup time).
- Execute the task queue before the schedule task priority queue. This means we
  may delay the processing of scheduled tasks but it ensures we transfer all
pending tasks from the task queue to the scheduled priority queue to run the
soonest to expire scheduled task first.
- Deprecate IORatio on EpollEventLoop, and drain the executor and scheduled queue on each event loop wakeup. Coupling the amount of time we are allowed to drain the executor queue to a proportion of time we process inbound IO may lead to unbounded queue sizes and unpredictable latency.

Result:
Fixes https://github.com/netty/netty/issues/7829
- In most cases this results in less calls to timerfd_settime
- Less event loop wakeups just to check for scheduled tasks executed outside the event loop
- More predictable executor queue and scheduled task queue draining
- More accurate and responsive scheduled task execution
This commit is contained in:
Scott Mitchell 2019-08-14 01:11:04 -07:00 committed by Norman Maurer
parent ac939e22dc
commit 1fa7a5e697
9 changed files with 436 additions and 185 deletions

View File

@ -29,7 +29,6 @@ import java.util.concurrent.TimeUnit;
* Abstract base class for {@link EventExecutor}s that want to support scheduling.
*/
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
new Comparator<ScheduledFutureTask<?>>() {
@Override
@ -51,6 +50,24 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
return ScheduledFutureTask.nanoTime();
}
/**
* Given an arbitrary deadline {@code deadlineNanos}, calculate the number of nano seconds from now
* {@code deadlineNanos} would expire.
* @param deadlineNanos An arbitrary deadline in nano seconds.
* @return the number of nano seconds from now {@code deadlineNanos} would expire.
*/
protected static long deadlineToDelayNanos(long deadlineNanos) {
return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);
}
/**
* The initial value used for delay and computations based upon a monatomic time source.
* @return initial value used for delay and computations based upon a monatomic time source.
*/
protected static long initialNanoTime() {
return ScheduledFutureTask.initialNanoTime();
}
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
@ -85,6 +102,10 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
}
scheduledTaskQueue.clearIgnoringIndexes();
// calling minimumDelayScheduledTaskRemoved was considered here to give a chance for EventLoop
// implementations the opportunity to cleanup any timerFds, but this is currently only called when the EventLoop
// is being shutdown, so the timerFd and associated polling mechanism will be destroyed anyways.
}
/**
@ -99,16 +120,21 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
* You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
*/
protected final Runnable pollScheduledTask(long nanoTime) {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
return scheduledTaskQueue != null ? pollScheduledTask(scheduledTaskQueue, nanoTime, true) : null;
}
final Runnable pollScheduledTask(Queue<ScheduledFutureTask<?>> scheduledTaskQueue, long nanoTime,
boolean notifyMinimumDeadlineRemoved) {
assert scheduledTaskQueue != null;
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue.peek();
if (scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.poll();
if (notifyMinimumDeadlineRemoved) {
minimumDelayScheduledTaskRemoved(scheduledTask, scheduledTask.deadlineNanos());
}
return scheduledTask;
}
return null;
@ -118,28 +144,29 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
* Return the nanoseconds when the next scheduled task is ready to be run or {@code -1} if no task is scheduled.
*/
protected final long nextScheduledTaskNano() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return -1;
}
return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null ? Math.max(0, scheduledTask.deadlineNanos() - nanoTime()) : -1;
}
/**
* Return the deadline (in nanoseconds) when the next scheduled task is ready to be run or {@code -1}
* if no task is scheduled.
*/
protected final long nextScheduledTaskDeadlineNanos() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
}
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (scheduledTaskQueue == null) {
return null;
}
return scheduledTaskQueue.peek();
return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
}
/**
* Returns {@code true} if a scheduled task is ready for processing.
*/
protected final boolean hasScheduledTasks() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
}
@ -225,16 +252,16 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
// NOOP
}
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
executeScheduledRunnable(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}, true, task.deadlineNanos());
}
return task;
@ -242,14 +269,48 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
final void removeScheduled(final ScheduledFutureTask<?> task) {
if (inEventLoop()) {
scheduledTaskQueue().removeTyped(task);
removedSchedule0(task);
} else {
execute(new Runnable() {
executeScheduledRunnable(new Runnable() {
@Override
public void run() {
removeScheduled(task);
removedSchedule0(task);
}
});
}, false, task.deadlineNanos());
}
}
private void removedSchedule0(final ScheduledFutureTask<?> task) {
if (scheduledTaskQueue == null || task == null) {
return;
}
if (scheduledTaskQueue.peek() == task) {
scheduledTaskQueue.poll();
minimumDelayScheduledTaskRemoved(task, task.deadlineNanos());
} else {
scheduledTaskQueue.removeTyped(task);
}
}
/**
* Execute a {@link Runnable} from outside the event loop thread that is responsible for adding or removing
* a scheduled action. Note that schedule events which occur on the event loop thread do not interact with this
* method.
* @param runnable The {@link Runnable} to execute which will add or remove a scheduled action
* @param isAddition {@code true} if the {@link Runnable} will add an action, {@code false} if it will remove an
* action
* @param deadlineNanos the deadline in nanos of the scheduled task that will be added or removed.
*/
protected void executeScheduledRunnable(Runnable runnable,
@SuppressWarnings("unused") boolean isAddition,
@SuppressWarnings("unused") long deadlineNanos) {
execute(runnable);
}
/**
* The next task to expire (e.g. minimum delay) has been removed from the scheduled priority queue.
*/
protected void minimumDelayScheduledTaskRemoved(@SuppressWarnings("unused") Runnable task,
@SuppressWarnings("unused") long deadlineNanos) {
}
}

View File

@ -40,6 +40,10 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}
static long initialNanoTime() {
return START_TIME;
}
private final long id = nextTaskId.getAndIncrement();
private long deadlineNanos;
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
@ -85,7 +89,11 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
}
public long delayNanos() {
return Math.max(0, deadlineNanos() - nanoTime());
return deadlineToDelayNanos(deadlineNanos());
}
static long deadlineToDelayNanos(long deadlineNanos) {
return Math.max(0, deadlineNanos - nanoTime());
}
public long delayNanos(long currentTimeNanos) {
@ -111,9 +119,8 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
assert id != that.id;
return 1;
}
}
@ -132,11 +139,10 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
if (!isCancelled()) {
task.call();
if (!executor().isShutdown()) {
long p = periodNanos;
if (p > 0) {
deadlineNanos += p;
if (periodNanos > 0) {
deadlineNanos += periodNanos;
} else {
deadlineNanos = nanoTime() - p;
deadlineNanos = nanoTime() - periodNanos;
}
if (!isCancelled()) {
// scheduledTaskQueue can never be null as we lazy init it before submit the task!

View File

@ -73,6 +73,12 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
// Do nothing.
}
};
private static final Runnable BOOKEND_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
@ -283,19 +289,41 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
}
private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, true);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false);
}
return true;
}
/**
* @return {@code true} if at least one scheduled task was executed.
*/
private boolean executeExpiredScheduledTasks(boolean notifyMinimumDeadlineRemoved) {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return false;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, notifyMinimumDeadlineRemoved);
if (scheduledTask != null) {
do {
safeExecute(scheduledTask);
scheduledTask = pollScheduledTask(scheduledTaskQueue, nanoTime, false);
} while (scheduledTask != null);
return true;
}
return false;
}
/**
* @see Queue#peek()
*/
@ -376,6 +404,36 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
return ranAtLeastOne;
}
/**
* Execute all expired scheduled tasks and all current tasks in the executor queue until both queues are empty,
* or {@code maxDrainAttempts} has been exceeded.
* @param maxDrainAttempts The maximum amount of times this method attempts to drain from queues. This is to prevent
* continuous task execution and scheduling from preventing the EventExecutor thread to
* make progress and return to the selector mechanism to process inbound I/O events.
* @return {@code true} if at least one task was run.
*/
protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) {
assert inEventLoop();
// We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
// here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
boolean ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue);
boolean ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(true);
int drainAttempt = 0;
while ((ranAtLeastOneExecutorTask || ranAtLeastOneScheduledTask) && ++drainAttempt < maxDrainAttempts) {
// We must run the taskQueue tasks first, because the scheduled tasks from outside the EventLoop are queued
// here because the taskQueue is thread safe and the scheduledTaskQueue is not thread safe.
ranAtLeastOneExecutorTask = runExistingTasksFrom(taskQueue);
ranAtLeastOneScheduledTask = executeExpiredScheduledTasks(false);
}
if (drainAttempt > 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return drainAttempt > 0;
}
/**
* Runs all tasks from the passed {@code taskQueue}.
*
@ -397,6 +455,55 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
}
}
/**
* What ever tasks are present in {@code taskQueue} when this method is invoked will be {@link Runnable#run()}.
* @param taskQueue the task queue to drain.
* @return {@code true} if at least {@link Runnable#run()} was called.
*/
private boolean runExistingTasksFrom(Queue<Runnable> taskQueue) {
return taskQueue.offer(BOOKEND_TASK) ? runExistingTasksUntilBookend(taskQueue)
: runExistingTasksUntilMaxTasks(taskQueue);
}
private boolean runExistingTasksUntilBookend(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
// null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue, and
// null elements are not permitted to be inserted into the queue.
if (task == BOOKEND_TASK) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
// null is not expected because this method isn't called unless BOOKEND_TASK was inserted into the queue,
// and null elements are not permitted to be inserted into the queue.
if (task == BOOKEND_TASK) {
return true;
}
}
}
private boolean runExistingTasksUntilMaxTasks(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
// BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted into
// the queue, and if was previously inserted we always drain all the elements from queue including BOOKEND_TASK.
if (task == null) {
return false;
}
int i = 0;
do {
safeExecute(task);
task = pollTaskFrom(taskQueue);
// BOOKEND_TASK is not expected because this method isn't called unless BOOKEND_TASK fails to be inserted
// into the queue, and if was previously inserted we always drain all the elements from queue including
// BOOKEND_TASK.
if (task == null) {
return true;
}
} while (++i < maxPendingTasks);
return true;
}
/**
* Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running
* the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}.
@ -443,6 +550,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
*/
@UnstableApi
protected void afterRunningAllTasks() { }
/**
* Returns the amount of time left until the scheduled task with the closest dead line is executed.
*/
@ -846,8 +954,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
return threadProperties;
}
@SuppressWarnings("unused")
protected boolean wakesUpForTask(Runnable task) {
protected boolean wakesUpForTask(@SuppressWarnings("unused") Runnable task) {
return true;
}

View File

@ -186,6 +186,31 @@ static jint netty_epoll_native_epollCreate(JNIEnv* env, jclass clazz) {
return efd;
}
static void netty_epoll_native_timerFdSetTime(JNIEnv* env, jclass clazz, jint timerFd, jint tvSec, jint tvNsec) {
struct itimerspec ts;
memset(&ts.it_interval, 0, sizeof(struct timespec));
ts.it_value.tv_sec = tvSec;
ts.it_value.tv_nsec = tvNsec;
if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) {
netty_unix_errors_throwIOExceptionErrorNo(env, "timerfd_settime() failed: ", errno);
}
}
static jint netty_epoll_native_epollWaitNoTimeout(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jboolean immediatePoll) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
const int timeout = immediatePoll ? 0 : -1;
int result, err;
do {
result = epoll_wait(efd, ev, len, timeout);
if (result >= 0) {
return result;
}
} while((err = errno) == EINTR);
return -err;
}
// This method is deprecated!
static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
int result, err;
@ -429,8 +454,10 @@ static const JNINativeMethod fixed_method_table[] = {
{ "eventFdWrite", "(IJ)V", (void *) netty_epoll_native_eventFdWrite },
{ "eventFdRead", "(I)V", (void *) netty_epoll_native_eventFdRead },
{ "timerFdRead", "(I)V", (void *) netty_epoll_native_timerFdRead },
{ "timerFdSetTime", "(III)V", (void *) netty_epoll_native_timerFdSetTime },
{ "epollCreate", "()I", (void *) netty_epoll_native_epollCreate },
{ "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 },
{ "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, // This method is deprecated!
{ "epollWaitNoTimeout", "(IJIZ)I", (void *) netty_epoll_native_epollWaitNoTimeout },
{ "epollBusyWait0", "(IJI)I", (void *) netty_epoll_native_epollBusyWait0 },
{ "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 },
{ "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 },

View File

@ -35,7 +35,9 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static java.lang.Math.min;
@ -44,8 +46,10 @@ import static java.lang.Math.min;
*/
class EpollEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
private static final AtomicIntegerFieldUpdater<EpollEventLoop> WAKEN_UP_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp");
/**
* The maximum deadline value before overlap occurs on the time source.
*/
private static final long MAXIMUM_DEADLINE = initialNanoTime() - 1;
static {
// Ensure JNI is initialized by the time this class is loaded by this time!
@ -53,8 +57,12 @@ class EpollEventLoop extends SingleThreadEventLoop {
Epoll.ensureAvailability();
}
// Pick a number that no task could have previously used.
private long prevDeadlineNanos = nanoTime() - 1;
/**
* Note that we use deadline instead of delay because deadline is just a fixed number but delay requires interacting
* with the time source (e.g. calling System.nanoTime()) which can be expensive.
*/
private final AtomicLong nextDeadlineNanos = new AtomicLong(MAXIMUM_DEADLINE);
private final AtomicInteger wakenUp = new AtomicInteger();
private final FileDescriptor epollFd;
private final FileDescriptor eventFd;
private final FileDescriptor timerFd;
@ -73,12 +81,6 @@ class EpollEventLoop extends SingleThreadEventLoop {
return epollWaitNow();
}
};
@SuppressWarnings("unused") // AtomicIntegerFieldUpdater
private volatile int wakenUp;
private volatile int ioRatio = 50;
// See http://man7.org/linux/man-pages/man2/timerfd_create.2.html.
private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
@ -175,9 +177,112 @@ class EpollEventLoop extends SingleThreadEventLoop {
return datagramPacketArray;
}
@Override
protected void executeScheduledRunnable(Runnable runnable, boolean isAddition, long deadlineNanos) {
if (isAddition) {
try {
trySetTimerFd(deadlineNanos);
} catch (IOException cause) {
throw new RejectedExecutionException(cause);
}
}
// else this is a removal of scheduled task and we could attempt to detect if this task was responsible
// for the next delay, and find the next lowest delay in the queue to re-set the timer. However this
// is not practical for the following reasons:
// 1. The data structure is a PriorityQueue, and the scheduled task has not yet been removed. This means
// we would have to add/remove the head element to find the "next timeout".
// 2. We are not on the EventLoop thread, and the PriorityQueue is not thread safe. We could attempt
// to do (1) if we are on the EventLoop but when the EventLoop wakes up it checks if the timeout changes
// when it is woken up and before it calls epoll_wait again and adjusts the timer accordingly.
// The result is we wait until we are in the EventLoop and doing the actual removal, and also processing
// regular polling in the EventLoop too.
execute(runnable);
}
@Override
protected boolean wakesUpForScheduledRunnable() {
return false;
}
@Override
protected boolean runAllTasks() {
// This method is overridden to ensure that all the expired scheduled tasks are executed during shutdown, and
// any other execute all scenarios in the base class.
return runScheduledAndExecutorTasks(4);
}
private void trySetTimerFd(long candidateNextDeadline) throws IOException {
for (;;) {
long nextDeadline = nextDeadlineNanos.get();
if (nextDeadline - candidateNextDeadline <= 0) {
break;
}
if (nextDeadlineNanos.compareAndSet(nextDeadline, candidateNextDeadline)) {
setTimerFd(deadlineToDelayNanos(candidateNextDeadline));
// We are setting the timerFd outside of the EventLoop so it is possible that we raced with another call
// to set the timer and temporarily increased the value, in which case we should set it back to the
// lower value.
nextDeadline = nextDeadlineNanos.get();
if (nextDeadline - candidateNextDeadline < 0) {
setTimerFd(deadlineToDelayNanos(nextDeadline));
}
break;
}
}
}
private void setTimerFd(long candidateNextDelayNanos) throws IOException {
if (candidateNextDelayNanos > 0) {
final int delaySeconds = (int) min(candidateNextDelayNanos / 1000000000L, Integer.MAX_VALUE);
final int delayNanos = (int) min(candidateNextDelayNanos - delaySeconds * 1000000000L, Integer.MAX_VALUE);
Native.timerFdSetTime(timerFd.intValue(), delaySeconds, delayNanos);
} else {
// Setting the timer to 0, 0 will disarm it, so we have a few options:
// 1. Set the timer wakeup to 1ns (1 system call).
// 2. Use the eventFd to force a wakeup and disarm the timer (2 system calls).
// For now we are using option (1) because there are less system calls, and we will correctly reset the
// nextDeadlineNanos state when the EventLoop processes the timer wakeup.
Native.timerFdSetTime(timerFd.intValue(), 0, 1);
}
}
private void checkScheduleTaskQueueForNewDelay() throws IOException {
final long deadlineNanos = nextScheduledTaskDeadlineNanos();
if (deadlineNanos != -1) {
trySetTimerFd(deadlineNanos);
}
// Don't disarm the timerFd even if there are no more queued tasks. Since we are setting timerFd from outside
// the EventLoop it is possible that another thread has set the timer and we may miss a wakeup if we disarm
// the timer here. Instead we wait for the timer wakeup on the EventLoop and clear state for the next timer.
}
@Override
protected void minimumDelayScheduledTaskRemoved(@SuppressWarnings("unused") Runnable task,
@SuppressWarnings("unused") long deadlineNanos) {
// It is OK to reset nextDeadlineNanos here because we are in the event loop thread, and the event loop is
// guaranteed to transfer all pending ScheduledFutureTasks from the executor queue to the scheduled
// PriorityQueue and we will set the next expiration time. If another thread races with this thread inserting a
// ScheduledFutureTasks into the executor queue it should be OK, assuming the executor queue insertion is
// visible to the event loop thread.
//
// Assume the current minimum timer is delayNanos = 10
// Thread A -> execute(ScheduledFutureTask(delayNanos = 12)),
// add ScheduledFutureTask to the executor queue
// fail to set nextDeadlineNanos, so no call to setTimerFd is made
// EventLoop -> minimumDelayScheduledTaskRemoved(10),
// set nextDeadlineNanos to MAXIMUM_DEADLINE
// ... process more tasks ...
// drain all the tasks from the executor queue, see that 12 is the next delay, call setTimerFd
nextDeadlineNanos.set(MAXIMUM_DEADLINE);
// Note that we don't actually call setTimerFd here, we don't want to interrupt the actual timerFd and let
// the end of the EventLoop determine what the timerFd value should be (after execute queue is drained).
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && WAKEN_UP_UPDATER.getAndSet(this, 1) == 0) {
if (!inEventLoop && wakenUp.getAndSet(1) == 0) {
// write to the evfd which will then wake-up epoll_wait(...)
Native.eventFdWrite(eventFd.intValue(), 1L);
}
@ -237,47 +342,21 @@ class EpollEventLoop extends SingleThreadEventLoop {
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
/**
* Returns the percentage of the desired amount of time spent for I/O in the event loop.
*/
public int getIoRatio() {
return ioRatio;
}
/**
* Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
this.ioRatio = ioRatio;
}
@Override
public int registeredChannels() {
return channels.size();
}
private int epollWait() throws IOException {
int delaySeconds;
int delayNanos;
long curDeadlineNanos = deadlineNanos();
if (curDeadlineNanos == prevDeadlineNanos) {
delaySeconds = -1;
delayNanos = -1;
} else {
long totalDelay = delayNanos(System.nanoTime());
prevDeadlineNanos = curDeadlineNanos;
delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
}
return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos);
// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
// until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// in pipeline.
return Native.epollWait(epollFd, events, hasTasks());
}
private int epollWaitNow() throws IOException {
return Native.epollWait(epollFd, events, timerFd, 0, 0);
return Native.epollWait(epollFd, events, true);
}
private int epollBusyWait() throws IOException {
@ -298,8 +377,8 @@ class EpollEventLoop extends SingleThreadEventLoop {
break;
case SelectStrategy.SELECT:
if (wakenUp == 1) {
wakenUp = 0;
if (wakenUp.get() == 1) {
wakenUp.set(0);
}
if (!hasTasks()) {
strategy = epollWait();
@ -308,27 +387,21 @@ class EpollEventLoop extends SingleThreadEventLoop {
default:
}
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processReady(events, strategy);
} finally {
try {
if (strategy > 0) {
processReady(events, strategy);
}
} finally {
// Ensure we always run tasks.
// Note the timerFd code depends upon running all the tasks on each event loop run. This is so
// we can get an accurate "next wakeup time" after the event loop run completes.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
if (strategy > 0) {
processReady(events, strategy);
}
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
// No need to drainScheduledQueue() after the fact, because all in event loop scheduling results
// in direct addition to the scheduled priority queue.
// When we are in the EventLoop we don't bother setting the timerFd for each scheduled task, but
// instead defer the processing until the end of the EventLoop to reduce the timerFd
// modifications.
checkScheduleTaskQueueForNewDelay();
}
}
if (allowGrowing && strategy == events.length()) {
@ -384,12 +457,34 @@ class EpollEventLoop extends SingleThreadEventLoop {
}
private void processReady(EpollEventArray events, int ready) {
for (int i = 0; i < ready; i ++) {
for (int i = 0; i < ready; ++i) {
final int fd = events.fd(i);
if (fd == eventFd.intValue() || fd == timerFd.intValue()) {
if (fd == eventFd.intValue()) {
// Just ignore as we use ET mode for the eventfd and timerfd.
//
// See also https://stackoverflow.com/a/12492308/1074097
} else if (fd == timerFd.intValue()) {
// consume wakeup event, necessary because the timer is added with ET mode.
Native.timerFdRead(fd);
// The timer is normalized, monotonically increasing, and the next value is always set to the minimum
// value of the pending timers. When the timer fires we can unset the timer value.
// Worst case another thread races with this thread and we set another timer event while processing
// this timer event and we get a duplicate wakeup some time in the future.
//
// This works because we always drain all ScheduledFutureTasks from the executor queue to the scheduled
// PriorityQueue and we will set the next expiration time. If another thread races with this thread
// inserting a ScheduledFutureTasks into the executor queue it should be OK, assuming the executor queue
// insertion is visible to the event loop thread.
//
// Assume the current minimum timer is nextDeadlineNanos = 10
// Thread A -> execute(ScheduledFutureTask(delayNanos = 12)),
// add ScheduledFutureTask to the executor queue
// fail to set nextDeadlineNanos, so no call to setTimerFd is made
// EventLoop -> process timer wakeup here, set nextDeadlineNanos to MAXIMUM_DEADLINE
// ... process more tasks ...
// drain all the tasks from executor queue, see 12 is the next delay, call setTimerFd
nextDeadlineNanos.set(MAXIMUM_DEADLINE);
} else {
final long ev = events.events(i);

View File

@ -21,7 +21,6 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.EventLoopTaskQueueFactory;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.SelectStrategyFactory;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorChooserFactory;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.concurrent.RejectedExecutionHandlers;
@ -128,12 +127,12 @@ public final class EpollEventLoopGroup extends MultithreadEventLoopGroup {
}
/**
* Sets the percentage of the desired amount of time spent for I/O in the child event loops. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
* @deprecated This method will be removed in future releases, and is not guaranteed to have any impacts.
*/
@Deprecated
public void setIoRatio(int ioRatio) {
for (EventExecutor e: this) {
((EpollEventLoop) e).setIoRatio(ioRatio);
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
}

View File

@ -84,6 +84,7 @@ public final class Native {
public static native void eventFdWrite(int fd, long value);
public static native void eventFdRead(int fd);
static native void timerFdRead(int fd);
static native void timerFdSetTime(int fd, int sec, int nsec) throws IOException;
public static FileDescriptor newEpollCreate() {
return new FileDescriptor(epollCreate());
@ -91,6 +92,10 @@ public final class Native {
private static native int epollCreate();
/**
* @deprecated this method is no longer supported. This functionality is internal to this package.
*/
@Deprecated
public static int epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd,
int timeoutSec, int timeoutNs) throws IOException {
int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(),
@ -100,7 +105,14 @@ public final class Native {
}
return ready;
}
private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs);
static int epollWait(FileDescriptor epollFd, EpollEventArray events, boolean immediatePoll) throws IOException {
int ready = epollWaitNoTimeout(epollFd.intValue(), events.memoryAddress(), events.length(), immediatePoll);
if (ready < 0) {
throw newIOException("epoll_wait", ready);
}
return ready;
}
/**
* Non-blocking variant of
@ -115,6 +127,8 @@ public final class Native {
return ready;
}
private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs);
private static native int epollWaitNoTimeout(int efd, long address, int len, boolean immediatePoll);
private static native int epollBusyWait0(int efd, long address, int len);
public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException {

View File

@ -1,72 +0,0 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.unix.FileDescriptor;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class EpollTest {
@Test
public void testIsAvailable() {
assertTrue(Epoll.isAvailable());
}
// Testcase for https://github.com/netty/netty/issues/8444
@Test(timeout = 5000)
public void testEpollWaitWithTimeOutMinusOne() throws Exception {
final EpollEventArray eventArray = new EpollEventArray(8);
try {
final FileDescriptor epoll = Native.newEpollCreate();
final FileDescriptor timerFd = Native.newTimerFd();
final FileDescriptor eventfd = Native.newEventFd();
Native.epollCtlAdd(epoll.intValue(), timerFd.intValue(), Native.EPOLLIN);
Native.epollCtlAdd(epoll.intValue(), eventfd.intValue(), Native.EPOLLIN);
final AtomicReference<Throwable> ref = new AtomicReference<Throwable>();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
assertEquals(1, Native.epollWait(epoll, eventArray, timerFd, -1, -1));
// This should have been woken up because of eventfd_write.
assertEquals(eventfd.intValue(), eventArray.fd(0));
} catch (Throwable cause) {
ref.set(cause);
}
}
});
t.start();
t.join(1000);
assertTrue(t.isAlive());
Native.eventFdWrite(eventfd.intValue(), 1);
t.join();
assertNull(ref.get());
epoll.close();
timerFd.close();
eventfd.close();
} finally {
eventArray.free();
}
}
}

View File

@ -102,6 +102,20 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
return promise;
}
@Override
protected void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) {
super.executeScheduledRunnable(wakesUpForScheduledRunnable() ? runnable : new NonWakeupRunnable() {
@Override
public void run() {
runnable.run();
}
}, isAddition, deadlineNanos);
}
protected boolean wakesUpForScheduledRunnable() {
return true;
}
/**
* Adds a task to be run once at the end of next (or current) {@code eventloop} iteration.
*