Properly debounce wakeups (#9191)

Motivation:
The wakeup logic in EpollEventLoop is overly complex

Modification:
* Simplify the race to wakeup the loop
* Dont let the event loop wake up itself (it's already awake!)
* Make event loop check if there are any more tasks after preparing to
sleep.  There is small window where the non-eventloop writers can issue
eventfd writes here, but that is okay.

Result:
Cleaner wakeup logic.

Benchmarks:

```
BEFORE
Benchmark                                   Mode  Cnt       Score      Error  Units
EpollSocketChannelBenchmark.executeMulti   thrpt   20  408381.411 ± 2857.498  ops/s
EpollSocketChannelBenchmark.executeSingle  thrpt   20  157022.360 ± 1240.573  ops/s
EpollSocketChannelBenchmark.pingPong       thrpt   20   60571.704 ±  331.125  ops/s

Benchmark                                   Mode  Cnt       Score      Error  Units
EpollSocketChannelBenchmark.executeMulti   thrpt   20  440546.953 ± 1652.823  ops/s
EpollSocketChannelBenchmark.executeSingle  thrpt   20  168114.751 ± 1176.609  ops/s
EpollSocketChannelBenchmark.pingPong       thrpt   20   61231.878 ±  520.108  ops/s
```
This commit is contained in:
Carl Mastrangelo 2019-06-04 05:17:23 -07:00 committed by Norman Maurer
parent 5dacc5f3da
commit f01278616a
2 changed files with 23 additions and 40 deletions

View File

@ -32,10 +32,15 @@ import io.netty.microbench.util.AbstractMicrobenchmark;
import io.netty.util.concurrent.ScheduledFuture; import io.netty.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.TearDown;
public class EpollSocketChannelBenchmark extends AbstractMicrobenchmark { public class EpollSocketChannelBenchmark extends AbstractMicrobenchmark {
private static final Runnable runnable = new Runnable() {
@Override
public void run() { }
};
private EventLoopGroup group; private EventLoopGroup group;
private Channel serverChan; private Channel serverChan;
@ -135,4 +140,15 @@ public class EpollSocketChannelBenchmark extends AbstractMicrobenchmark {
public Object pingPong() throws Exception { public Object pingPong() throws Exception {
return chan.pipeline().writeAndFlush(abyte.retainedSlice()).sync(); return chan.pipeline().writeAndFlush(abyte.retainedSlice()).sync();
} }
@Benchmark
public Object executeSingle() throws Exception {
return chan.eventLoop().submit(runnable).get();
}
@Benchmark
@GroupThreads(3)
public Object executeMulti() throws Exception {
return chan.eventLoop().submit(runnable).get();
}
} }

View File

@ -217,7 +217,7 @@ public class EpollHandler implements IoHandler {
@Override @Override
public final void wakeup(boolean inEventLoop) { public final void wakeup(boolean inEventLoop) {
if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) { if (!inEventLoop && WAKEN_UP_UPDATER.getAndSet(this, 1) == 0) {
// write to the evfd which will then wake-up epoll_wait(...) // write to the evfd which will then wake-up epoll_wait(...)
Native.eventFdWrite(eventFd.intValue(), 1L); Native.eventFdWrite(eventFd.intValue(), 1L);
} }
@ -263,15 +263,7 @@ public class EpollHandler implements IoHandler {
} }
} }
private int epollWait(IoExecutionContext context, boolean oldWakeup) throws IOException { private int epollWait(IoExecutionContext context) throws IOException {
// If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
// So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
// until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
// in pipeline.
if (oldWakeup && !context.canBlock()) {
return epollWaitNow();
}
int delaySeconds; int delaySeconds;
int delayNanos; int delayNanos;
long curDeadlineNanos = context.deadlineNanos(); long curDeadlineNanos = context.deadlineNanos();
@ -309,39 +301,14 @@ public class EpollHandler implements IoHandler {
break; break;
case SelectStrategy.SELECT: case SelectStrategy.SELECT:
strategy = epollWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp == 1) { if (wakenUp == 1) {
Native.eventFdWrite(eventFd.intValue(), 1L); Native.eventFdWrite(eventFd.intValue(), 1L);
wakenUp = 0;
} }
if (context.canBlock()) {
strategy = epollWait(context);
}
// fallthrough // fallthrough
default: default:
} }