Avoid unnecessary epoll event loop wake-ups (#9605)

Motivation

The recently-introduced event loop scheduling hooks can be exploited by
the epoll transport to avoid waking the event loop when scheduling
future tasks if there is a timer already set to wake up sooner.

There is also a "default" timeout which will wake the event
loop after 1 second if there are no pending future tasks. The
performance impact of these wakeups themselves is likely negligible but
there's significant overhead in having to re-arm the timer every time
the event loop goes to sleep (see #7816). It's not 100% clear why this
timeout was there originally but we're sure it's no longer needed.

Modification

Combine the existing volatile wakenUp and non-volatile prevDeadlineNanos
fields into a single AtomicLong that stores the next scheduled wakeup
time while the event loop is in epoll_wait, and is -1 while it is awake.

Use this as a guard to debounce wakeups from both immediate scheduled
tasks and future scheduled tasks, the latter using the new
before/afterScheduledTaskSubmitted overrides and based on whether the
new deadline occurs prior to an already-scheduled timer.

A similar optimization was already added to NioEventLoop, but it still
uses two separate volatiles. We should consider similar streamlining of
that in a future update.

Result

Fewer event loop wakeups when scheduling future tasks, greatly reduced
overhead when no future tasks are scheduled.
This commit is contained in:
Nick Hill 2019-10-13 02:16:16 +08:00 committed by Norman Maurer
parent ec8d43c294
commit 166caf96ef
3 changed files with 63 additions and 32 deletions

View File

@ -212,10 +212,6 @@ static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jl
// This method is deprecated! // This method is deprecated!
static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) { static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) {
if (tvSec == 0 && tvNsec == 0) {
// Zeros = poll (aka return immediately).
return netty_epoll_native_epollWait(env, clazz, efd, address, len, 0);
}
// only reschedule the timer if there is a newer event. // only reschedule the timer if there is a newer event.
// -1 is a special value used by EpollEventLoop. // -1 is a special value used by EpollEventLoop.
if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) { if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) {

View File

@ -36,7 +36,7 @@ import java.io.IOException;
import java.util.BitSet; import java.util.BitSet;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong;
import static java.lang.Math.min; import static java.lang.Math.min;
@ -52,8 +52,6 @@ class EpollEventLoop extends SingleThreadEventLoop {
Epoll.ensureAvailability(); Epoll.ensureAvailability();
} }
// Pick a number that no task could have previously used.
private long prevDeadlineNanos = nanoTime() - 1;
private final FileDescriptor epollFd; private final FileDescriptor epollFd;
private final FileDescriptor eventFd; private final FileDescriptor eventFd;
private final FileDescriptor timerFd; private final FileDescriptor timerFd;
@ -74,7 +72,12 @@ class EpollEventLoop extends SingleThreadEventLoop {
return epollWaitNow(); return epollWaitNow();
} }
}; };
private final AtomicInteger wakenUp = new AtomicInteger(1);
// nextWakeupNanos is:
// -1 when EL is awake
// Long.MAX_VALUE when EL is waiting with no wakeup scheduled
// other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(-1L);
private boolean pendingWakeup; private boolean pendingWakeup;
private volatile int ioRatio = 50; private volatile int ioRatio = 50;
@ -178,12 +181,24 @@ class EpollEventLoop extends SingleThreadEventLoop {
@Override @Override
protected void wakeup(boolean inEventLoop) { protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.getAndSet(1) == 0) { if (!inEventLoop && nextWakeupNanos.getAndSet(-1L) != -1L) {
// write to the evfd which will then wake-up epoll_wait(...) // write to the evfd which will then wake-up epoll_wait(...)
Native.eventFdWrite(eventFd.intValue(), 1L); Native.eventFdWrite(eventFd.intValue(), 1L);
} }
} }
@Override
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 case
return deadlineNanos < nextWakeupNanos.get();
}
@Override
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 case
return deadlineNanos < nextWakeupNanos.get();
}
/** /**
* Register the given epoll with this {@link EventLoop}. * Register the given epoll with this {@link EventLoop}.
*/ */
@ -288,22 +303,20 @@ class EpollEventLoop extends SingleThreadEventLoop {
return channels.size(); return channels.size();
} }
private int epollWait() throws IOException { private int epollWait(long deadlineNanos) throws IOException {
int delaySeconds; if (deadlineNanos == Long.MAX_VALUE) {
int delayNanos; return Native.epollWait(epollFd, events, timerFd, Integer.MAX_VALUE, 0); // disarm timer
long curDeadlineNanos = deadlineNanos();
if (curDeadlineNanos == prevDeadlineNanos) {
delaySeconds = -1;
delayNanos = -1;
} else {
long totalDelay = delayNanos(System.nanoTime());
prevDeadlineNanos = curDeadlineNanos;
delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
} }
long totalDelay = deadlineToDelayNanos(deadlineNanos);
int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos); return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos);
} }
private int epollWaitNoTimerChange() throws IOException {
return Native.epollWait(epollFd, events, false);
}
private int epollWaitNow() throws IOException { private int epollWaitNow() throws IOException {
return Native.epollWait(epollFd, events, true); return Native.epollWait(epollFd, events, true);
} }
@ -319,6 +332,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
@Override @Override
protected void run() { protected void run() {
long prevDeadlineNanos = Long.MAX_VALUE;
for (;;) { for (;;) {
try { try {
processPendingChannelFlags(); processPendingChannelFlags();
@ -349,15 +363,26 @@ class EpollEventLoop extends SingleThreadEventLoop {
// fall-through // fall-through
} }
wakenUp.set(0); long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = Long.MAX_VALUE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try { try {
if (!hasTasks()) { if (!hasTasks()) {
strategy = epollWait(); if (curDeadlineNanos == prevDeadlineNanos) {
// No timer activity needed
strategy = epollWaitNoTimerChange();
} else {
// Timerfd needs to be re-armed or disarmed
prevDeadlineNanos = curDeadlineNanos;
strategy = epollWait(curDeadlineNanos);
}
} }
} finally { } finally {
// Try get() first to avoid much more expensive CAS in the case we // Try get() first to avoid much more expensive CAS in the case we
// were woken via the wakeup() method (submitted task) // were woken via the wakeup() method (submitted task)
if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) { if (nextWakeupNanos.get() == -1L || nextWakeupNanos.getAndSet(-1L) == -1L) {
pendingWakeup = true; pendingWakeup = true;
} }
} }
@ -368,8 +393,8 @@ class EpollEventLoop extends SingleThreadEventLoop {
final int ioRatio = this.ioRatio; final int ioRatio = this.ioRatio;
if (ioRatio == 100) { if (ioRatio == 100) {
try { try {
if (strategy > 0) { if (strategy > 0 && processReady(events, strategy)) {
processReady(events, strategy); prevDeadlineNanos = Long.MAX_VALUE;
} }
} finally { } finally {
// Ensure we always run tasks. // Ensure we always run tasks.
@ -379,8 +404,8 @@ class EpollEventLoop extends SingleThreadEventLoop {
final long ioStartTime = System.nanoTime(); final long ioStartTime = System.nanoTime();
try { try {
if (strategy > 0) { if (strategy > 0 && processReady(events, strategy)) {
processReady(events, strategy); prevDeadlineNanos = Long.MAX_VALUE;
} }
} finally { } finally {
// Ensure we always run tasks. // Ensure we always run tasks.
@ -434,15 +459,15 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
} }
private void processReady(EpollEventArray events, int ready) { // Returns true if a timerFd event was encountered
private boolean processReady(EpollEventArray events, int ready) {
boolean timerFired = false;
for (int i = 0; i < ready; i ++) { for (int i = 0; i < ready; i ++) {
final int fd = events.fd(i); final int fd = events.fd(i);
if (fd == eventFd.intValue()) { if (fd == eventFd.intValue()) {
pendingWakeup = false; pendingWakeup = false;
} else if (fd == timerFd.intValue()) { } else if (fd == timerFd.intValue()) {
// Just ignore as we use ET mode for the eventfd and timerfd. timerFired = true;
//
// See also https://stackoverflow.com/a/12492308/1074097
} else { } else {
final long ev = events.events(i); final long ev = events.events(i);
@ -496,6 +521,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
} }
} }
return timerFired;
} }
@Override @Override

View File

@ -98,6 +98,15 @@ public final class Native {
@Deprecated @Deprecated
public static int epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd, public static int epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd,
int timeoutSec, int timeoutNs) throws IOException { int timeoutSec, int timeoutNs) throws IOException {
if (timeoutSec == 0 && timeoutNs == 0) {
// Zero timeout => poll (aka return immediately)
return epollWait(epollFd, events, 0);
}
if (timeoutSec == Integer.MAX_VALUE) {
// Max timeout => wait indefinitely: disarm timerfd first
timeoutSec = 0;
timeoutNs = 0;
}
int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(), int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(),
timeoutSec, timeoutNs); timeoutSec, timeoutNs);
if (ready < 0) { if (ready < 0) {