From a18c57ea8779ceee0e4dccad7e981b36077fc557 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Thu, 31 Oct 2019 02:01:53 -0700 Subject: [PATCH] Externalize lazy execution semantic for EventExecutors (#9587) Motivation This is already done internally for various reasons but it would make sense i.m.o. as a top level concept: submitting a task to be run on the event loop which doesn't need to run immediately but must still be executed in FIFO order relative all other submitted tasks (be those "lazy" or otherwise). It's nice to separate this abstract "relaxed" semantic from concrete implementations - the simplest is to just delegate to existing execute, but for the main EL impls translates to whether a wakeup is required after enqueuing. Having a "global" abstraction also allows for simplification of our internal use - for example encapsulating more of the common scheduled future logic within AbstractScheduledEventExecutor. Modifications - Introduce public LazyRunnable interface and AbstractEventExecutor#lazyExecute method (would be nice for this to be added to EventExecutor interface in netty 5) - Tweak existing SingleThreadEventExecutor mechanics to support these - Replace internal use of NonWakeupRunnable (such as for pre-flush channel writes) - Uplift scheduling-related hooks into AbstractScheduledEventExecutor, eliminating intermediate executeScheduledRunnable method Result Simpler code, cleaner and more useful/flexible abstractions - cleaner in that they fully communicate the intent in a more general way, without implying/exposing/restricting implementation details --- .../concurrent/AbstractEventExecutor.java | 22 +++++ .../AbstractScheduledEventExecutor.java | 57 +++++++++---- .../concurrent/SingleThreadEventExecutor.java | 79 ++++-------------- .../SingleThreadEventExecutorTest.java | 81 ++++++++++++++++++- .../AbstractChannelHandlerContext.java | 3 +- .../netty/channel/SingleThreadEventLoop.java | 7 +- 6 files changed, 162 insertions(+), 87 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 6cfccbd3ae..4799701611 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -15,6 +15,7 @@ */ package io.netty.util.concurrent; +import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -165,4 +166,25 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl logger.warn("A task raised an exception. Task: {}", task, t); } } + + /** + * Like {@link #execute(Runnable)} but does not guarantee the task will be run until either + * a non-lazy task is executed or the executor is shut down. + * + * This is equivalent to submitting a {@link EventExecutor.LazyRunnable} to + * {@link #execute(Runnable)} but for an arbitrary {@link Runnable}. + * + * The default implementation just delegates to {@link #execute(Runnable)}. + */ + @UnstableApi + public void lazyExecute(Runnable task) { + execute(task); + } + + /** + * Marker interface for {@link Runnable} to indicate that it should be queued for execution + * but does not need to run immediately. + */ + @UnstableApi + public interface LazyRunnable extends Runnable { } } diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 33c27f2d3c..c132aa5b20 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -38,6 +38,11 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut } }; + static final Runnable WAKEUP_TASK = new Runnable() { + @Override + public void run() { } // Do nothing + }; + PriorityQueue> scheduledTaskQueue; long nextTaskId; @@ -243,12 +248,22 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut if (inEventLoop()) { scheduledTaskQueue().add(task.setId(nextTaskId++)); } else { - executeScheduledRunnable(new Runnable() { + final long deadlineNanos = task.deadlineNanos(); + final Runnable addToQueue = new Runnable() { @Override public void run() { scheduledTaskQueue().add(task.setId(nextTaskId++)); } - }, true, task.deadlineNanos()); + }; + if (beforeScheduledTaskSubmitted(deadlineNanos)) { + execute(addToQueue); + } else { + lazyExecute(addToQueue); + // Second hook after scheduling to facilitate race-avoidance + if (afterScheduledTaskSubmitted(deadlineNanos)) { + execute(WAKEUP_TASK); + } + } } return task; @@ -258,27 +273,39 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut if (inEventLoop()) { scheduledTaskQueue().removeTyped(task); } else { - executeScheduledRunnable(new Runnable() { + lazyExecute(new Runnable() { @Override public void run() { scheduledTaskQueue().removeTyped(task); } - }, false, task.deadlineNanos()); + }); } } /** - * Execute a {@link Runnable} from outside the event loop thread that is responsible for adding or removing - * a scheduled action. Note that schedule events which occur on the event loop thread do not interact with this - * method. - * @param runnable The {@link Runnable} to execute which will add or remove a scheduled action - * @param isAddition {@code true} if the {@link Runnable} will add an action, {@code false} if it will remove an - * action - * @param deadlineNanos the deadline in nanos of the scheduled task that will be added or removed. + * Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission. + * Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to + * process the scheduled task (if not already awake). + *

+ * If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with + * the same value after the scheduled task is enqueued, providing another opportunity + * to wake the {@link EventExecutor} thread if required. + * + * @param deadlineNanos deadline of the to-be-scheduled task + * relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise */ - void executeScheduledRunnable(Runnable runnable, - @SuppressWarnings("unused") boolean isAddition, - @SuppressWarnings("unused") long deadlineNanos) { - execute(runnable); + protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { + return true; + } + + /** + * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false. + * + * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise + */ + protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { + return true; } } diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index d2b9707f39..0ffad8e12a 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -61,12 +61,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx private static final int ST_SHUTDOWN = 4; private static final int ST_TERMINATED = 5; - private static final Runnable WAKEUP_TASK = new Runnable() { - @Override - public void run() { - // Do nothing. - } - }; private static final Runnable NOOP_TASK = new Runnable() { @Override public void run() { @@ -178,33 +172,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } - /** - * Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission. - * Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to - * process the scheduled task (if not already awake). - *

- * If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with - * the same value after the scheduled task is enqueued, providing another opportunity - * to wake the {@link EventExecutor} thread if required. - * - * @param deadlineNanos deadline of the to-be-scheduled task - * relative to {@link AbstractScheduledEventExecutor#nanoTime()} - * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise - */ - protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { - return true; - } - - /** - * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false. - * - * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()} - * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise - */ - protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { - return true; - } - /** * @deprecated Please use and override {@link #newTaskQueue(int)}. */ @@ -594,25 +561,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } } - @Override - final void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) { - // Don't wakeup if this is a removal task or if beforeScheduledTaskSubmitted returns false - if (isAddition && beforeScheduledTaskSubmitted(deadlineNanos)) { - super.executeScheduledRunnable(runnable, isAddition, deadlineNanos); - } else { - super.executeScheduledRunnable(new NonWakeupRunnable() { - @Override - public void run() { - runnable.run(); - } - }, isAddition, deadlineNanos); - // Second hook after scheduling to facilitate race-avoidance - if (isAddition && afterScheduledTaskSubmitted(deadlineNanos)) { - wakeup(false); - } - } - } - @Override public boolean inEventLoop(Thread thread) { return thread == this.thread; @@ -878,10 +826,16 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx @Override public void execute(Runnable task) { - if (task == null) { - throw new NullPointerException("task"); - } + ObjectUtil.checkNotNull(task, "task"); + execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); + } + @Override + public void lazyExecute(Runnable task) { + execute(ObjectUtil.checkNotNull(task, "task"), false); + } + + private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { @@ -903,7 +857,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } } - if (!addTaskWakesUp && wakesUpForTask(task)) { + if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } } @@ -967,20 +921,17 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } /** - * Marker interface for {@link Runnable} to indicate that it should be queued for execution - * but does not need to run immediately. The default implementation of - * {@link SingleThreadEventExecutor#wakesUpForTask(Runnable)} uses this to avoid waking up - * the {@link EventExecutor} thread when not necessary. + * @deprecated use {@link AbstractEventExecutor.LazyRunnable} */ - protected interface NonWakeupRunnable extends Runnable { } + @Deprecated + protected interface NonWakeupRunnable extends LazyRunnable { } /** * Can be overridden to control which tasks require waking the {@link EventExecutor} thread - * if it is waiting so that they can be run immediately. The default implementation - * decides based on whether the task implements {@link NonWakeupRunnable}. + * if it is waiting so that they can be run immediately. */ protected boolean wakesUpForTask(Runnable task) { - return !(task instanceof NonWakeupRunnable); + return true; } protected static void reject() { diff --git a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java index efb0eb015e..861669ab0e 100644 --- a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java @@ -18,9 +18,16 @@ package io.netty.util.concurrent; import org.junit.Assert; import org.junit.Test; +import io.netty.util.concurrent.AbstractEventExecutor.LazyRunnable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.util.Collections; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -122,7 +129,7 @@ public class SingleThreadEventExecutorTest { private static void testInvokeInEventLoop(final boolean any, final boolean timeout) { final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null, - Executors.defaultThreadFactory(), false) { + Executors.defaultThreadFactory(), true) { @Override protected void run() { while (!confirmShutdown()) { @@ -170,4 +177,76 @@ public class SingleThreadEventExecutorTest { executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } } + + static class LatchTask extends CountDownLatch implements Runnable { + LatchTask() { + super(1); + } + + @Override + public void run() { + countDown(); + } + } + + static class LazyLatchTask extends LatchTask implements LazyRunnable { } + + @Test + public void testLazyExecution() throws Exception { + final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null, + Executors.defaultThreadFactory(), false) { + @Override + protected void run() { + while (!confirmShutdown()) { + try { + synchronized (this) { + if (!hasTasks()) { + wait(); + } + } + runAllTasks(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + } + } + + @Override + protected void wakeup(boolean inEventLoop) { + if (!inEventLoop) { + synchronized (this) { + notifyAll(); + } + } + } + }; + + // Ensure event loop is started + LatchTask latch0 = new LatchTask(); + executor.execute(latch0); + assertTrue(latch0.await(100, TimeUnit.MILLISECONDS)); + // Pause to ensure it enters waiting state + Thread.sleep(100L); + + // Submit task via lazyExecute + LatchTask latch1 = new LatchTask(); + executor.lazyExecute(latch1); + // Sumbit lazy task via regular execute + LatchTask latch2 = new LazyLatchTask(); + executor.execute(latch2); + + // Neither should run yet + assertFalse(latch1.await(100, TimeUnit.MILLISECONDS)); + assertFalse(latch2.await(100, TimeUnit.MILLISECONDS)); + + // Submit regular task via regular execute + LatchTask latch3 = new LatchTask(); + executor.execute(latch3); + + // Should flush latch1 and latch2 and then run latch3 immediately + assertTrue(latch3.await(100, TimeUnit.MILLISECONDS)); + assertEquals(0, latch1.getCount()); + assertEquals(0, latch2.getCount()); + } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 510dfd4f15..b38bacfbd2 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -20,6 +20,7 @@ import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; +import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.OrderedEventExecutor; import io.netty.util.internal.ObjectPool; @@ -1105,7 +1106,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R } } - static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable { + static final class WriteTask extends AbstractWriteTask implements AbstractEventExecutor.LazyRunnable { private static final ObjectPool RECYCLER = ObjectPool.newPool(new ObjectCreator() { @Override diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index d0bc086cce..07afe3091f 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -118,7 +118,7 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im reject(task); } - if (wakesUpForTask(task)) { + if (!(task instanceof LazyRunnable) && wakesUpForTask(task)) { wakeup(inEventLoop()); } } @@ -159,9 +159,4 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im public int registeredChannels() { return -1; } - - /** - * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases. - */ - interface NonWakeupRunnable extends SingleThreadEventExecutor.NonWakeupRunnable { } }