Externalize lazy execution semantic for EventExecutors (#9587)

Motivation

This is already done internally for various reasons but it would make
sense i.m.o. as a top level concept: submitting a task to be run on the
event loop which doesn't need to run immediately but must still be
executed in FIFO order relative all other submitted tasks (be those
"lazy" or otherwise).

It's nice to separate this abstract "relaxed" semantic from concrete
implementations - the simplest is to just delegate to existing execute,
but for the main EL impls translates to whether a wakeup is required
after enqueuing.

Having a "global" abstraction also allows for simplification of our
internal use - for example encapsulating more of the common scheduled
future logic within AbstractScheduledEventExecutor.

Modifications

- Introduce public LazyRunnable interface and
AbstractEventExecutor#lazyExecute method (would be nice for this to be
added to EventExecutor interface in netty 5)
- Tweak existing SingleThreadEventExecutor mechanics to support these
- Replace internal use of NonWakeupRunnable (such as for pre-flush
channel writes)
- Uplift scheduling-related hooks into AbstractScheduledEventExecutor,
eliminating intermediate executeScheduledRunnable method

Result

Simpler code, cleaner and more useful/flexible abstractions - cleaner in
that they fully communicate the intent in a more general way, without
implying/exposing/restricting implementation details
This commit is contained in:
Nick Hill 2019-10-31 02:01:53 -07:00 committed by Norman Maurer
parent 1d57241565
commit a18c57ea87
6 changed files with 162 additions and 87 deletions

View File

@ -15,6 +15,7 @@
*/
package io.netty.util.concurrent;
import io.netty.util.internal.UnstableApi;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -165,4 +166,25 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
/**
* Like {@link #execute(Runnable)} but does not guarantee the task will be run until either
* a non-lazy task is executed or the executor is shut down.
*
* This is equivalent to submitting a {@link EventExecutor.LazyRunnable} to
* {@link #execute(Runnable)} but for an arbitrary {@link Runnable}.
*
* The default implementation just delegates to {@link #execute(Runnable)}.
*/
@UnstableApi
public void lazyExecute(Runnable task) {
execute(task);
}
/**
* Marker interface for {@link Runnable} to indicate that it should be queued for execution
* but does not need to run immediately.
*/
@UnstableApi
public interface LazyRunnable extends Runnable { }
}

View File

@ -38,6 +38,11 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
}
};
static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() { } // Do nothing
};
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
long nextTaskId;
@ -243,12 +248,22 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
if (inEventLoop()) {
scheduledTaskQueue().add(task.setId(nextTaskId++));
} else {
executeScheduledRunnable(new Runnable() {
final long deadlineNanos = task.deadlineNanos();
final Runnable addToQueue = new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task.setId(nextTaskId++));
}
}, true, task.deadlineNanos());
};
if (beforeScheduledTaskSubmitted(deadlineNanos)) {
execute(addToQueue);
} else {
lazyExecute(addToQueue);
// Second hook after scheduling to facilitate race-avoidance
if (afterScheduledTaskSubmitted(deadlineNanos)) {
execute(WAKEUP_TASK);
}
}
}
return task;
@ -258,27 +273,39 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
if (inEventLoop()) {
scheduledTaskQueue().removeTyped(task);
} else {
executeScheduledRunnable(new Runnable() {
lazyExecute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().removeTyped(task);
}
}, false, task.deadlineNanos());
});
}
}
/**
* Execute a {@link Runnable} from outside the event loop thread that is responsible for adding or removing
* a scheduled action. Note that schedule events which occur on the event loop thread do not interact with this
* method.
* @param runnable The {@link Runnable} to execute which will add or remove a scheduled action
* @param isAddition {@code true} if the {@link Runnable} will add an action, {@code false} if it will remove an
* action
* @param deadlineNanos the deadline in nanos of the scheduled task that will be added or removed.
* Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission.
* Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to
* process the scheduled task (if not already awake).
* <p>
* If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with
* the same value <i>after</i> the scheduled task is enqueued, providing another opportunity
* to wake the {@link EventExecutor} thread if required.
*
* @param deadlineNanos deadline of the to-be-scheduled task
* relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
void executeScheduledRunnable(Runnable runnable,
@SuppressWarnings("unused") boolean isAddition,
@SuppressWarnings("unused") long deadlineNanos) {
execute(runnable);
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
return true;
}
/**
* See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false.
*
* @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
return true;
}
}

View File

@ -61,12 +61,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
private static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() {
// Do nothing.
}
};
private static final Runnable NOOP_TASK = new Runnable() {
@Override
public void run() {
@ -178,33 +172,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
/**
* Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission.
* Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to
* process the scheduled task (if not already awake).
* <p>
* If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with
* the same value <i>after</i> the scheduled task is enqueued, providing another opportunity
* to wake the {@link EventExecutor} thread if required.
*
* @param deadlineNanos deadline of the to-be-scheduled task
* relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
return true;
}
/**
* See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false.
*
* @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
return true;
}
/**
* @deprecated Please use and override {@link #newTaskQueue(int)}.
*/
@ -594,25 +561,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
}
}
@Override
final void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) {
// Don't wakeup if this is a removal task or if beforeScheduledTaskSubmitted returns false
if (isAddition && beforeScheduledTaskSubmitted(deadlineNanos)) {
super.executeScheduledRunnable(runnable, isAddition, deadlineNanos);
} else {
super.executeScheduledRunnable(new NonWakeupRunnable() {
@Override
public void run() {
runnable.run();
}
}, isAddition, deadlineNanos);
// Second hook after scheduling to facilitate race-avoidance
if (isAddition && afterScheduledTaskSubmitted(deadlineNanos)) {
wakeup(false);
}
}
}
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
@ -878,10 +826,16 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
@Override
public void lazyExecute(Runnable task) {
execute(ObjectUtil.checkNotNull(task, "task"), false);
}
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
@ -903,7 +857,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
@ -967,20 +921,17 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
}
/**
* Marker interface for {@link Runnable} to indicate that it should be queued for execution
* but does not need to run immediately. The default implementation of
* {@link SingleThreadEventExecutor#wakesUpForTask(Runnable)} uses this to avoid waking up
* the {@link EventExecutor} thread when not necessary.
* @deprecated use {@link AbstractEventExecutor.LazyRunnable}
*/
protected interface NonWakeupRunnable extends Runnable { }
@Deprecated
protected interface NonWakeupRunnable extends LazyRunnable { }
/**
* Can be overridden to control which tasks require waking the {@link EventExecutor} thread
* if it is waiting so that they can be run immediately. The default implementation
* decides based on whether the task implements {@link NonWakeupRunnable}.
* if it is waiting so that they can be run immediately.
*/
protected boolean wakesUpForTask(Runnable task) {
return !(task instanceof NonWakeupRunnable);
return true;
}
protected static void reject() {

View File

@ -18,9 +18,16 @@ package io.netty.util.concurrent;
import org.junit.Assert;
import org.junit.Test;
import io.netty.util.concurrent.AbstractEventExecutor.LazyRunnable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -122,7 +129,7 @@ public class SingleThreadEventExecutorTest {
private static void testInvokeInEventLoop(final boolean any, final boolean timeout) {
final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null,
Executors.defaultThreadFactory(), false) {
Executors.defaultThreadFactory(), true) {
@Override
protected void run() {
while (!confirmShutdown()) {
@ -170,4 +177,76 @@ public class SingleThreadEventExecutorTest {
executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
}
}
static class LatchTask extends CountDownLatch implements Runnable {
LatchTask() {
super(1);
}
@Override
public void run() {
countDown();
}
}
static class LazyLatchTask extends LatchTask implements LazyRunnable { }
@Test
public void testLazyExecution() throws Exception {
final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null,
Executors.defaultThreadFactory(), false) {
@Override
protected void run() {
while (!confirmShutdown()) {
try {
synchronized (this) {
if (!hasTasks()) {
wait();
}
}
runAllTasks();
} catch (Exception e) {
e.printStackTrace();
Assert.fail(e.toString());
}
}
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop) {
synchronized (this) {
notifyAll();
}
}
}
};
// Ensure event loop is started
LatchTask latch0 = new LatchTask();
executor.execute(latch0);
assertTrue(latch0.await(100, TimeUnit.MILLISECONDS));
// Pause to ensure it enters waiting state
Thread.sleep(100L);
// Submit task via lazyExecute
LatchTask latch1 = new LatchTask();
executor.lazyExecute(latch1);
// Sumbit lazy task via regular execute
LatchTask latch2 = new LazyLatchTask();
executor.execute(latch2);
// Neither should run yet
assertFalse(latch1.await(100, TimeUnit.MILLISECONDS));
assertFalse(latch2.await(100, TimeUnit.MILLISECONDS));
// Submit regular task via regular execute
LatchTask latch3 = new LatchTask();
executor.execute(latch3);
// Should flush latch1 and latch2 and then run latch3 immediately
assertTrue(latch3.await(100, TimeUnit.MILLISECONDS));
assertEquals(0, latch1.getCount());
assertEquals(0, latch2.getCount());
}
}

View File

@ -20,6 +20,7 @@ import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.AbstractEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.OrderedEventExecutor;
import io.netty.util.internal.ObjectPool;
@ -1105,7 +1106,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R
}
}
static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
static final class WriteTask extends AbstractWriteTask implements AbstractEventExecutor.LazyRunnable {
private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
@Override

View File

@ -118,7 +118,7 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
reject(task);
}
if (wakesUpForTask(task)) {
if (!(task instanceof LazyRunnable) && wakesUpForTask(task)) {
wakeup(inEventLoop());
}
}
@ -159,9 +159,4 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
public int registeredChannels() {
return -1;
}
/**
* Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases.
*/
interface NonWakeupRunnable extends SingleThreadEventExecutor.NonWakeupRunnable { }
}