Correctly return from selector loop one a scheduled task is ready for processing

Motivation:

We use the nanoTime of the scheduledTasks to calculate the milli-seconds to wait for a select operation to select something. Once these elapsed we check if there was something selected or some task is ready for processing. Unfortunally we not take into account scheduled tasks here so the selection loop will continue if only scheduled tasks are ready for processing. This will delay the execution of these tasks.

Modification:

- Check if a scheduled task is ready after selecting
- also make a tiny change in NioEventLoop to not trigger a rebuild if nothing was selected because the timeout was reached a few times in a row.

Result:

Execute scheduled tasks on time.
This commit is contained in:
Norman Maurer 2014-07-02 07:07:38 +02:00
parent 23840b76d7
commit 36b80c25f7
3 changed files with 35 additions and 20 deletions

View File

@ -253,6 +253,16 @@ public abstract class SingleThreadEventExecutor extends AbstractEventExecutor {
return !taskQueue.isEmpty();
}
/**
* Returns {@code true} if a scheduled task is ready for processing by {@link #runAllTasks()} or
* {@link #runAllTasks(long)}.
*/
protected boolean hasScheduledTasks() {
assert inEventLoop();
ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
return delayedTask != null && delayedTask.deadlineNanos() <= ScheduledFutureTask.nanoTime();
}
/**
* Return the number of tasks that are pending for processing.
*

View File

@ -54,7 +54,6 @@ final class EpollEventLoop extends SingleThreadEventLoop {
private final long[] events;
private int id;
private int oldWakenUp;
private boolean overflown;
@SuppressWarnings("unused")
@ -176,7 +175,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
this.ioRatio = ioRatio;
}
private int epollWait() {
private int epollWait(boolean oldWakenUp) {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
@ -195,10 +194,11 @@ final class EpollEventLoop extends SingleThreadEventLoop {
int selectedKeys = Native.epollWait(epollFd, events, (int) timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp == 1 || wakenUp == 1 || hasTasks()) {
// Selected something,
// waken up by user, or
// the task queue has a pending task.
if (selectedKeys != 0 || oldWakenUp || wakenUp == 1 || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
return selectedKeys;
}
currentTimeNanos = System.nanoTime();
@ -209,14 +209,14 @@ final class EpollEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0);
boolean oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0) == 1;
try {
int ready;
if (hasTasks()) {
// Non blocking just return what is ready directly without block
ready = Native.epollWait(epollFd, events, 0);
} else {
ready = epollWait();
ready = epollWait(oldWakenUp);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up

View File

@ -20,7 +20,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoopException;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.nio.AbstractNioChannel.NioUnsafe;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
@ -40,6 +39,7 @@ import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -105,7 +105,6 @@ public final class NioEventLoop extends SingleThreadEventLoop {
* waken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
private boolean oldWakenUp;
private volatile int ioRatio = 50;
private int cancelledKeys;
@ -302,12 +301,12 @@ public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
oldWakenUp = wakenUp.getAndSet(false);
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select();
select(oldWakenUp);
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
@ -496,7 +495,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
}
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
@ -603,7 +602,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
}
}
private void select() throws IOException {
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
@ -622,10 +621,11 @@ public final class NioEventLoop extends SingleThreadEventLoop {
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
// Selected something,
// waken up by user, or
// the task queue has a pending task.
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
@ -642,7 +642,12 @@ public final class NioEventLoop extends SingleThreadEventLoop {
selectCnt = 1;
break;
}
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
long time = System.nanoTime();
if ((time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis)) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
@ -659,7 +664,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
break;
}
currentTimeNanos = System.nanoTime();
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {