Ability to run a task at the end of an eventloop iteration.

Motivation:

This change is part of the change done in PR #5395 to provide an `AUTO_FLUSH` capability.
Splitting this change will enable to try other ways of implementing `AUTO_FLUSH`.

Modifications:



Two methods:

```java
void executeAfterEventLoopIteration(Runnable task);


boolean removeAfterEventLoopIterationTask(Runnable task);
```
are added to `SingleThreadEventLoop` class for adding/removing a task to be executed at the end of current/next iteration of this `eventloop`.

In order to support the above, a few methods are added to `SingleThreadEventExecutor`

```java
protected void afterRunningAllTasks() { }
```

This is invoked after all tasks are run for this executor OR if the passed timeout value for `runAllTasks(long timeoutNanos)` is expired.

Added a queue of `tailTasks` to `SingleThreadEventLoop` to hold all tasks to be executed at the end of every iteration.


Result:



`SingleThreadEventLoop` now has the ability to execute tasks at the end of an eventloop iteration.
This commit is contained in:
Nitesh Kant 2016-06-14 17:28:28 -07:00 committed by Norman Maurer
parent cfd6db7915
commit 77770374fb
3 changed files with 209 additions and 27 deletions

View File

@ -18,6 +18,7 @@ package io.netty.util.concurrent;
import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -175,7 +176,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
this.addTaskWakesUp = addTaskWakesUp; this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks); this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor"); this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(); taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
} }
@ -214,6 +215,10 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
*/ */
protected Runnable pollTask() { protected Runnable pollTask() {
assert inEventLoop(); assert inEventLoop();
return pollTaskFrom(taskQueue);
}
protected final Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
for (;;) { for (;;) {
Runnable task = taskQueue.poll(); Runnable task = taskQueue.poll();
if (task == WAKEUP_TASK) { if (task == WAKEUP_TASK) {
@ -328,7 +333,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
throw new NullPointerException("task"); throw new NullPointerException("task");
} }
if (!offerTask(task)) { if (!offerTask(task)) {
rejectedExecutionHandler.rejected(task, this); reject(task);
} }
} }
@ -355,30 +360,43 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
* @return {@code true} if and only if at least one task was run * @return {@code true} if and only if at least one task was run
*/ */
protected boolean runAllTasks() { protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll; boolean fetchedAll;
boolean ranAtLeastOne = false;
do { do {
fetchedAll = fetchFromScheduledTaskQueue(); fetchedAll = fetchFromScheduledTaskQueue();
Runnable task = pollTask(); if (runAllTasksFrom(taskQueue)) {
if (task == null) { ranAtLeastOne = true;
return false;
}
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
task = pollTask();
if (task == null) {
break;
}
} }
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
lastExecutionTime = ScheduledFutureTask.nanoTime(); if (ranAtLeastOne) {
return true; lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
/**
* Runs all tasks from the passed {@code taskQueue}.
*
* @param taskQueue To poll and execute all tasks.
*
* @return {@code true} if atleast one task was executed.
*/
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
} }
/** /**
@ -389,6 +407,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
fetchFromScheduledTaskQueue(); fetchFromScheduledTaskQueue();
Runnable task = pollTask(); Runnable task = pollTask();
if (task == null) { if (task == null) {
afterRunningAllTasks();
return false; return false;
} }
@ -396,11 +415,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
long runTasks = 0; long runTasks = 0;
long lastExecutionTime; long lastExecutionTime;
for (;;) { for (;;) {
try { safeExecute(task);
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
runTasks ++; runTasks ++;
@ -420,10 +435,25 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
} }
} }
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime; this.lastExecutionTime = lastExecutionTime;
return true; return true;
} }
/**
* Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}.
*/
@UnstableApi
protected void afterRunningAllTasks() { }
private static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
/** /**
* Returns the amount of time left until the scheduled task with the closest dead line is executed. * Returns the amount of time left until the scheduled task with the closest dead line is executed.
*/ */
@ -810,6 +840,15 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
throw new RejectedExecutionException("event executor terminated"); throw new RejectedExecutionException("event executor terminated");
} }
/**
* Offers the task to the associated {@link RejectedExecutionHandler}.
*
* @param task to reject.
*/
protected final void reject(Runnable task) {
rejectedExecutionHandler.rejected(task, this);
}
// ScheduledExecutorService implementation // ScheduledExecutorService implementation
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);

View File

@ -20,7 +20,9 @@ import io.netty.util.concurrent.RejectedExecutionHandlers;
import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.UnstableApi;
import java.util.Queue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
@ -33,6 +35,8 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)); SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
private final Queue<Runnable> tailTasks;
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject()); this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
} }
@ -45,12 +49,14 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
boolean addTaskWakesUp, int maxPendingTasks, boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) { RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
} }
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks, boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) { RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
} }
@Override @Override
@ -89,11 +95,59 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
return promise; return promise;
} }
/**
* Adds a task to be run once at the end of next (or current) {@code eventloop} iteration.
*
* @param task to be added.
*/
@UnstableApi
public final void executeAfterEventLoopIteration(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (isShutdown()) {
reject();
}
if (!tailTasks.offer(task)) {
reject(task);
}
if (wakesUpForTask(task)) {
wakeup(inEventLoop());
}
}
/**
* Removes a task that was added previously via {@link #executeAfterEventLoopIteration(Runnable)}.
*
* @param task to be removed.
*
* @return {@code true} if the task was removed as a result of this call.
*/
@UnstableApi
final boolean removeAfterEventLoopIterationTask(Runnable task) {
return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
}
@Override @Override
protected boolean wakesUpForTask(Runnable task) { protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof NonWakeupRunnable); return !(task instanceof NonWakeupRunnable);
} }
@Override
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}
@Override
protected boolean hasTasks() {
return super.hasTasks() || !tailTasks.isEmpty();
}
@Override
public int pendingTasks() {
return super.pendingTasks() + tailTasks.size();
}
/** /**
* Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases. * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases.
*/ */

View File

@ -20,6 +20,7 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender; import ch.qos.logback.core.Appender;
import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalChannel;
import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutor;
import org.hamcrest.MatcherAssert;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -51,11 +52,13 @@ public class SingleThreadEventLoopTest {
private SingleThreadEventLoopA loopA; private SingleThreadEventLoopA loopA;
private SingleThreadEventLoopB loopB; private SingleThreadEventLoopB loopB;
private SingleThreadEventLoopC loopC;
@Before @Before
public void newEventLoop() { public void newEventLoop() {
loopA = new SingleThreadEventLoopA(); loopA = new SingleThreadEventLoopA();
loopB = new SingleThreadEventLoopB(); loopB = new SingleThreadEventLoopB();
loopC = new SingleThreadEventLoopC();
} }
@After @After
@ -66,6 +69,9 @@ public class SingleThreadEventLoopTest {
if (!loopB.isShuttingDown()) { if (!loopB.isShuttingDown()) {
loopB.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); loopB.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
} }
if (!loopC.isShuttingDown()) {
loopC.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
}
while (!loopA.isTerminated()) { while (!loopA.isTerminated()) {
try { try {
@ -83,6 +89,14 @@ public class SingleThreadEventLoopTest {
// Ignore // Ignore
} }
} }
while (!loopC.isTerminated()) {
try {
loopC.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
// Ignore
}
}
} }
@Test @Test
@ -137,6 +151,11 @@ public class SingleThreadEventLoopTest {
testScheduleTask(loopB); testScheduleTask(loopB);
} }
@Test
public void scheduleTaskC() throws Exception {
testScheduleTask(loopC);
}
private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException { private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException {
long startTime = System.nanoTime(); long startTime = System.nanoTime();
final AtomicLong endTime = new AtomicLong(); final AtomicLong endTime = new AtomicLong();
@ -442,7 +461,39 @@ public class SingleThreadEventLoopTest {
assertThat(loopA.isShutdown(), is(true)); assertThat(loopA.isShutdown(), is(true));
} }
private static class SingleThreadEventLoopA extends SingleThreadEventLoop { @Test(timeout = 10000)
public void testOnEventLoopIteration() throws Exception {
CountingRunnable onIteration = new CountingRunnable();
loopC.executeAfterEventLoopIteration(onIteration);
CountingRunnable noopTask = new CountingRunnable();
loopC.submit(noopTask).sync();
loopC.iterationEndSignal.take();
MatcherAssert.assertThat("Unexpected invocation count for regular task.",
noopTask.getInvocationCount(), is(1));
MatcherAssert.assertThat("Unexpected invocation count for on every eventloop iteration task.",
onIteration.getInvocationCount(), is(1));
}
@Test(timeout = 10000)
public void testRemoveOnEventLoopIteration() throws Exception {
CountingRunnable onIteration1 = new CountingRunnable();
loopC.executeAfterEventLoopIteration(onIteration1);
CountingRunnable onIteration2 = new CountingRunnable();
loopC.executeAfterEventLoopIteration(onIteration2);
loopC.removeAfterEventLoopIterationTask(onIteration1);
CountingRunnable noopTask = new CountingRunnable();
loopC.submit(noopTask).sync();
loopC.iterationEndSignal.take();
MatcherAssert.assertThat("Unexpected invocation count for regular task.",
noopTask.getInvocationCount(), is(1));
MatcherAssert.assertThat("Unexpected invocation count for on every eventloop iteration task.",
onIteration2.getInvocationCount(), is(1));
MatcherAssert.assertThat("Unexpected invocation count for on every eventloop iteration task.",
onIteration1.getInvocationCount(), is(0));
}
private static final class SingleThreadEventLoopA extends SingleThreadEventLoop {
final AtomicInteger cleanedUp = new AtomicInteger(); final AtomicInteger cleanedUp = new AtomicInteger();
@ -486,7 +537,7 @@ public class SingleThreadEventLoopTest {
// Waken up by interruptThread() // Waken up by interruptThread()
} }
runAllTasks(); runTasks0();
if (confirmShutdown()) { if (confirmShutdown()) {
break; break;
@ -494,9 +545,47 @@ public class SingleThreadEventLoopTest {
} }
} }
protected void runTasks0() {
runAllTasks();
}
@Override @Override
protected void wakeup(boolean inEventLoop) { protected void wakeup(boolean inEventLoop) {
interruptThread(); interruptThread();
} }
} }
private static final class SingleThreadEventLoopC extends SingleThreadEventLoopB {
final LinkedBlockingQueue<Boolean> iterationEndSignal = new LinkedBlockingQueue<Boolean>(1);
@Override
protected void afterRunningAllTasks() {
super.afterRunningAllTasks();
iterationEndSignal.offer(true);
}
@Override
protected void runTasks0() {
runAllTasks(TimeUnit.MINUTES.toNanos(1));
}
}
private static class CountingRunnable implements Runnable {
private final AtomicInteger invocationCount = new AtomicInteger();
@Override
public void run() {
invocationCount.incrementAndGet();
}
public int getInvocationCount() {
return invocationCount.get();
}
public void resetInvocationCount() {
invocationCount.set(0);
}
}
} }