Clean up NioEventLoop (#9799)

Motivation

The event loop implementations had become somewhat tangled over time and
work was done recently to streamline EpollEventLoop. NioEventLoop would
benefit from the same treatment and it is more straighforward now that
we can follow the same structure as was done for epoll.

Modifications

Untangle NioEventLoop logic and mirror what's now done in EpollEventLoop
w.r.t. the volatile selector wake-up guard and scheduled task deadline
handling.

Some common refinements to EpollEventLoop have also been included - to
use constants for the "special" deadline/wakeup volatile values and to
avoid some unnecessary calls to System.nanoTime() on task-only
iterations.

Result

Hopefully cleaner, more efficient and less fragile NIO transport
implementation.
This commit is contained in:
Nick Hill 2019-11-25 23:25:59 -08:00 committed by Norman Maurer
parent ad67d1d751
commit e208e96f12
4 changed files with 112 additions and 184 deletions

View File

@ -470,7 +470,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
return false; return false;
} }
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0; long runTasks = 0;
long lastExecutionTime; long lastExecutionTime;
for (;;) { for (;;) {

View File

@ -73,11 +73,14 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
}; };
private static final long AWAKE = -1L;
private static final long NONE = Long.MAX_VALUE;
// nextWakeupNanos is: // nextWakeupNanos is:
// -1 when EL is awake // AWAKE when EL is awake
// Long.MAX_VALUE when EL is waiting with no wakeup scheduled // NONE when EL is waiting with no wakeup scheduled
// other value T when EL is waiting with wakeup scheduled at time T // other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(-1L); private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
private boolean pendingWakeup; private boolean pendingWakeup;
private volatile int ioRatio = 50; private volatile int ioRatio = 50;
@ -181,7 +184,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
@Override @Override
protected void wakeup(boolean inEventLoop) { protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && nextWakeupNanos.getAndSet(-1L) != -1L) { if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
// write to the evfd which will then wake-up epoll_wait(...) // write to the evfd which will then wake-up epoll_wait(...)
Native.eventFdWrite(eventFd.intValue(), 1L); Native.eventFdWrite(eventFd.intValue(), 1L);
} }
@ -189,13 +192,13 @@ class EpollEventLoop extends SingleThreadEventLoop {
@Override @Override
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 case // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
return deadlineNanos < nextWakeupNanos.get(); return deadlineNanos < nextWakeupNanos.get();
} }
@Override @Override
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 case // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
return deadlineNanos < nextWakeupNanos.get(); return deadlineNanos < nextWakeupNanos.get();
} }
@ -304,7 +307,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
} }
private int epollWait(long deadlineNanos) throws IOException { private int epollWait(long deadlineNanos) throws IOException {
if (deadlineNanos == Long.MAX_VALUE) { if (deadlineNanos == NONE) {
return Native.epollWait(epollFd, events, timerFd, Integer.MAX_VALUE, 0); // disarm timer return Native.epollWait(epollFd, events, timerFd, Integer.MAX_VALUE, 0); // disarm timer
} }
long totalDelay = deadlineToDelayNanos(deadlineNanos); long totalDelay = deadlineToDelayNanos(deadlineNanos);
@ -332,7 +335,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
@Override @Override
protected void run() { protected void run() {
long prevDeadlineNanos = Long.MAX_VALUE; long prevDeadlineNanos = NONE;
for (;;) { for (;;) {
try { try {
processPendingChannelFlags(); processPendingChannelFlags();
@ -365,7 +368,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) { if (curDeadlineNanos == -1L) {
curDeadlineNanos = Long.MAX_VALUE; // nothing on the calendar curDeadlineNanos = NONE; // nothing on the calendar
} }
nextWakeupNanos.set(curDeadlineNanos); nextWakeupNanos.set(curDeadlineNanos);
try { try {
@ -382,7 +385,7 @@ class EpollEventLoop extends SingleThreadEventLoop {
} finally { } finally {
// Try get() first to avoid much more expensive CAS in the case we // Try get() first to avoid much more expensive CAS in the case we
// were woken via the wakeup() method (submitted task) // were woken via the wakeup() method (submitted task)
if (nextWakeupNanos.get() == -1L || nextWakeupNanos.getAndSet(-1L) == -1L) { if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
pendingWakeup = true; pendingWakeup = true;
} }
} }
@ -394,24 +397,25 @@ class EpollEventLoop extends SingleThreadEventLoop {
if (ioRatio == 100) { if (ioRatio == 100) {
try { try {
if (strategy > 0 && processReady(events, strategy)) { if (strategy > 0 && processReady(events, strategy)) {
prevDeadlineNanos = Long.MAX_VALUE; prevDeadlineNanos = NONE;
} }
} finally { } finally {
// Ensure we always run tasks. // Ensure we always run tasks.
runAllTasks(); runAllTasks();
} }
} else { } else if (strategy > 0) {
final long ioStartTime = System.nanoTime(); final long ioStartTime = System.nanoTime();
try { try {
if (strategy > 0 && processReady(events, strategy)) { if (processReady(events, strategy)) {
prevDeadlineNanos = Long.MAX_VALUE; prevDeadlineNanos = NONE;
} }
} finally { } finally {
// Ensure we always run tasks. // Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime; final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
} }
} else {
runAllTasks(0); // This will run the minimum number of tasks
} }
if (allowGrowing && strategy == events.length()) { if (allowGrowing && strategy == events.length()) {
//increase the size of the array as we needed the whole space for the events //increase the size of the array as we needed the whole space for the events

View File

@ -46,8 +46,7 @@ import java.util.Iterator;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a * {@link SingleThreadEventLoop} implementation which register the {@link Channel}'s to a
@ -117,14 +116,14 @@ public final class NioEventLoop extends SingleThreadEventLoop {
private final SelectorProvider provider; private final SelectorProvider provider;
/** private static final long AWAKE = -1L;
* Boolean that controls determines if a blocked Selector.select should private static final long NONE = Long.MAX_VALUE;
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless // nextWakeupNanos is:
* waken up. // AWAKE when EL is awake
*/ // NONE when EL is waiting with no wakeup scheduled
private final AtomicBoolean wakenUp = new AtomicBoolean(); // other value T when EL is waiting with wakeup scheduled at time T
private volatile long nextWakeupTime = Long.MAX_VALUE; private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
private final SelectStrategy selectStrategy; private final SelectStrategy selectStrategy;
@ -443,10 +442,13 @@ public final class NioEventLoop extends SingleThreadEventLoop {
@Override @Override
protected void run() { protected void run() {
int selectCnt = 0;
for (;;) { for (;;) {
try { try {
int strategy;
try { try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE: case SelectStrategy.CONTINUE:
continue; continue;
@ -454,38 +456,19 @@ public final class NioEventLoop extends SingleThreadEventLoop {
// fall-through to SELECT since the busy-wait is not supported with NIO // fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false)); long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
// 'wakenUp.compareAndSet(false, true)' is always evaluated curDeadlineNanos = NONE; // nothing on the calendar
// before calling 'selector.wakeup()' to reduce the wake-up }
// overhead. (Selector.wakeup() is an expensive operation.) nextWakeupNanos.set(curDeadlineNanos);
// try {
// However, there is a race condition in this approach. if (!hasTasks()) {
// The race condition is triggered when 'wakenUp' is set to strategy = select(curDeadlineNanos);
// true too early. }
// } finally {
// 'wakenUp' is set to true too early if: // This update is just to help block unnecessary selector wakeups
// 1) Selector is waken up between 'wakenUp.set(false)' and // so use of lazySet is ok (no race condition)
// 'selector.select(...)'. (BAD) nextWakeupNanos.lazySet(AWAKE);
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
} }
// fall through // fall through
default: default:
@ -494,29 +477,52 @@ public final class NioEventLoop extends SingleThreadEventLoop {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild // If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566 // the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0(); rebuildSelector0();
selectCnt = 0;
handleLoopException(e); handleLoopException(e);
continue; continue;
} }
selectCnt++;
cancelledKeys = 0; cancelledKeys = 0;
needsToSelectAgain = false; needsToSelectAgain = false;
final int ioRatio = this.ioRatio; final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) { if (ioRatio == 100) {
try { try {
if (strategy > 0) {
processSelectedKeys(); processSelectedKeys();
}
} finally { } finally {
// Ensure we always run tasks. // Ensure we always run tasks.
runAllTasks(); ranTasks = runAllTasks();
} }
} else { } else if (strategy > 0) {
final long ioStartTime = System.nanoTime(); final long ioStartTime = System.nanoTime();
try { try {
processSelectedKeys(); processSelectedKeys();
} finally { } finally {
// Ensure we always run tasks. // Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime; final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
} }
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
} }
} catch (Throwable t) { } catch (Throwable t) {
handleLoopException(t); handleLoopException(t);
@ -535,6 +541,33 @@ public final class NioEventLoop extends SingleThreadEventLoop {
} }
} }
// returns true if selectCnt should be reset
private boolean unexpectedSelectorWakeup(int selectCnt) {
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
return true;
}
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
return true;
}
return false;
}
private static void handleLoopException(Throwable t) { private static void handleLoopException(Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t); logger.warn("Unexpected exception in the selector loop.", t);
@ -573,15 +606,6 @@ public final class NioEventLoop extends SingleThreadEventLoop {
} }
} }
@Override
protected Runnable pollTask() {
Runnable task = super.pollTask();
if (needsToSelectAgain) {
selectAgain();
}
return task;
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by // check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process. // creating a new Iterator every time even if there is nothing to process.
@ -758,19 +782,21 @@ public final class NioEventLoop extends SingleThreadEventLoop {
@Override @Override
protected void wakeup(boolean inEventLoop) { protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) { if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
selector.wakeup(); selector.wakeup();
} }
} }
@Override @Override
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
return deadlineNanos < nextWakeupTime; // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
return deadlineNanos < nextWakeupNanos.get();
} }
@Override @Override
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
return deadlineNanos < nextWakeupTime; // Note this is also correct for the nextWakeupNanos == -1 (AWAKE) case
return deadlineNanos < nextWakeupNanos.get();
} }
Selector unwrappedSelector() { Selector unwrappedSelector() {
@ -778,117 +804,16 @@ public final class NioEventLoop extends SingleThreadEventLoop {
} }
int selectNow() throws IOException { int selectNow() throws IOException {
try {
return selector.selectNow(); return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
} }
private void select(boolean oldWakenUp) throws IOException { private int select(long deadlineNanos) throws IOException {
Selector selector = this.selector; if (deadlineNanos == NONE) {
try { return selector.select();
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
if (nextWakeupTime != normalizedDeadlineNanos) {
nextWakeupTime = normalizedDeadlineNanos;
} }
// Timeout will only be 0 if deadline is within 5 microsecs
for (;;) { long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
private Selector selectRebuildSelector(int selectCnt) throws IOException {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
Selector selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
return selector;
} }
private void selectAgain() { private void selectAgain() {

View File

@ -269,7 +269,6 @@ public class NioEventLoopTest extends AbstractEventLoopTest {
} }
} }
@Ignore
@Test @Test
public void testChannelsRegistered() throws Exception { public void testChannelsRegistered() throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup(1); NioEventLoopGroup group = new NioEventLoopGroup(1);