diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java index 6cfccbd3ae..4799701611 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractEventExecutor.java @@ -15,6 +15,7 @@ */ package io.netty.util.concurrent; +import io.netty.util.internal.UnstableApi; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -165,4 +166,25 @@ public abstract class AbstractEventExecutor extends AbstractExecutorService impl logger.warn("A task raised an exception. Task: {}", task, t); } } + + /** + * Like {@link #execute(Runnable)} but does not guarantee the task will be run until either + * a non-lazy task is executed or the executor is shut down. + * + * This is equivalent to submitting a {@link EventExecutor.LazyRunnable} to + * {@link #execute(Runnable)} but for an arbitrary {@link Runnable}. + * + * The default implementation just delegates to {@link #execute(Runnable)}. + */ + @UnstableApi + public void lazyExecute(Runnable task) { + execute(task); + } + + /** + * Marker interface for {@link Runnable} to indicate that it should be queued for execution + * but does not need to run immediately. + */ + @UnstableApi + public interface LazyRunnable extends Runnable { } } diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 33c27f2d3c..c132aa5b20 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -38,6 +38,11 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut } }; + static final Runnable WAKEUP_TASK = new Runnable() { + @Override + public void run() { } // Do nothing + }; + PriorityQueue> scheduledTaskQueue; long nextTaskId; @@ -243,12 +248,22 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut if (inEventLoop()) { scheduledTaskQueue().add(task.setId(nextTaskId++)); } else { - executeScheduledRunnable(new Runnable() { + final long deadlineNanos = task.deadlineNanos(); + final Runnable addToQueue = new Runnable() { @Override public void run() { scheduledTaskQueue().add(task.setId(nextTaskId++)); } - }, true, task.deadlineNanos()); + }; + if (beforeScheduledTaskSubmitted(deadlineNanos)) { + execute(addToQueue); + } else { + lazyExecute(addToQueue); + // Second hook after scheduling to facilitate race-avoidance + if (afterScheduledTaskSubmitted(deadlineNanos)) { + execute(WAKEUP_TASK); + } + } } return task; @@ -258,27 +273,39 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut if (inEventLoop()) { scheduledTaskQueue().removeTyped(task); } else { - executeScheduledRunnable(new Runnable() { + lazyExecute(new Runnable() { @Override public void run() { scheduledTaskQueue().removeTyped(task); } - }, false, task.deadlineNanos()); + }); } } /** - * Execute a {@link Runnable} from outside the event loop thread that is responsible for adding or removing - * a scheduled action. Note that schedule events which occur on the event loop thread do not interact with this - * method. - * @param runnable The {@link Runnable} to execute which will add or remove a scheduled action - * @param isAddition {@code true} if the {@link Runnable} will add an action, {@code false} if it will remove an - * action - * @param deadlineNanos the deadline in nanos of the scheduled task that will be added or removed. + * Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission. + * Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to + * process the scheduled task (if not already awake). + *

+ * If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with + * the same value after the scheduled task is enqueued, providing another opportunity + * to wake the {@link EventExecutor} thread if required. + * + * @param deadlineNanos deadline of the to-be-scheduled task + * relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise */ - void executeScheduledRunnable(Runnable runnable, - @SuppressWarnings("unused") boolean isAddition, - @SuppressWarnings("unused") long deadlineNanos) { - execute(runnable); + protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { + return true; + } + + /** + * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false. + * + * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise + */ + protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { + return true; } } diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index d2b9707f39..0ffad8e12a 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -61,12 +61,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx private static final int ST_SHUTDOWN = 4; private static final int ST_TERMINATED = 5; - private static final Runnable WAKEUP_TASK = new Runnable() { - @Override - public void run() { - // Do nothing. - } - }; private static final Runnable NOOP_TASK = new Runnable() { @Override public void run() { @@ -178,33 +172,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } - /** - * Called from arbitrary non-{@link EventExecutor} threads prior to scheduled task submission. - * Returns {@code true} if the {@link EventExecutor} thread should be woken immediately to - * process the scheduled task (if not already awake). - *

- * If {@code false} is returned, {@link #afterScheduledTaskSubmitted(long)} will be called with - * the same value after the scheduled task is enqueued, providing another opportunity - * to wake the {@link EventExecutor} thread if required. - * - * @param deadlineNanos deadline of the to-be-scheduled task - * relative to {@link AbstractScheduledEventExecutor#nanoTime()} - * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise - */ - protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { - return true; - } - - /** - * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false. - * - * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()} - * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise - */ - protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { - return true; - } - /** * @deprecated Please use and override {@link #newTaskQueue(int)}. */ @@ -594,25 +561,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } } - @Override - final void executeScheduledRunnable(final Runnable runnable, boolean isAddition, long deadlineNanos) { - // Don't wakeup if this is a removal task or if beforeScheduledTaskSubmitted returns false - if (isAddition && beforeScheduledTaskSubmitted(deadlineNanos)) { - super.executeScheduledRunnable(runnable, isAddition, deadlineNanos); - } else { - super.executeScheduledRunnable(new NonWakeupRunnable() { - @Override - public void run() { - runnable.run(); - } - }, isAddition, deadlineNanos); - // Second hook after scheduling to facilitate race-avoidance - if (isAddition && afterScheduledTaskSubmitted(deadlineNanos)) { - wakeup(false); - } - } - } - @Override public boolean inEventLoop(Thread thread) { return thread == this.thread; @@ -878,10 +826,16 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx @Override public void execute(Runnable task) { - if (task == null) { - throw new NullPointerException("task"); - } + ObjectUtil.checkNotNull(task, "task"); + execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task)); + } + @Override + public void lazyExecute(Runnable task) { + execute(ObjectUtil.checkNotNull(task, "task"), false); + } + + private void execute(Runnable task, boolean immediate) { boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { @@ -903,7 +857,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } } - if (!addTaskWakesUp && wakesUpForTask(task)) { + if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } } @@ -967,20 +921,17 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } /** - * Marker interface for {@link Runnable} to indicate that it should be queued for execution - * but does not need to run immediately. The default implementation of - * {@link SingleThreadEventExecutor#wakesUpForTask(Runnable)} uses this to avoid waking up - * the {@link EventExecutor} thread when not necessary. + * @deprecated use {@link AbstractEventExecutor.LazyRunnable} */ - protected interface NonWakeupRunnable extends Runnable { } + @Deprecated + protected interface NonWakeupRunnable extends LazyRunnable { } /** * Can be overridden to control which tasks require waking the {@link EventExecutor} thread - * if it is waiting so that they can be run immediately. The default implementation - * decides based on whether the task implements {@link NonWakeupRunnable}. + * if it is waiting so that they can be run immediately. */ protected boolean wakesUpForTask(Runnable task) { - return !(task instanceof NonWakeupRunnable); + return true; } protected static void reject() { diff --git a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java index efb0eb015e..861669ab0e 100644 --- a/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/SingleThreadEventExecutorTest.java @@ -18,9 +18,16 @@ package io.netty.util.concurrent; import org.junit.Assert; import org.junit.Test; +import io.netty.util.concurrent.AbstractEventExecutor.LazyRunnable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.util.Collections; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -122,7 +129,7 @@ public class SingleThreadEventExecutorTest { private static void testInvokeInEventLoop(final boolean any, final boolean timeout) { final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null, - Executors.defaultThreadFactory(), false) { + Executors.defaultThreadFactory(), true) { @Override protected void run() { while (!confirmShutdown()) { @@ -170,4 +177,76 @@ public class SingleThreadEventExecutorTest { executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } } + + static class LatchTask extends CountDownLatch implements Runnable { + LatchTask() { + super(1); + } + + @Override + public void run() { + countDown(); + } + } + + static class LazyLatchTask extends LatchTask implements LazyRunnable { } + + @Test + public void testLazyExecution() throws Exception { + final SingleThreadEventExecutor executor = new SingleThreadEventExecutor(null, + Executors.defaultThreadFactory(), false) { + @Override + protected void run() { + while (!confirmShutdown()) { + try { + synchronized (this) { + if (!hasTasks()) { + wait(); + } + } + runAllTasks(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + } + } + + @Override + protected void wakeup(boolean inEventLoop) { + if (!inEventLoop) { + synchronized (this) { + notifyAll(); + } + } + } + }; + + // Ensure event loop is started + LatchTask latch0 = new LatchTask(); + executor.execute(latch0); + assertTrue(latch0.await(100, TimeUnit.MILLISECONDS)); + // Pause to ensure it enters waiting state + Thread.sleep(100L); + + // Submit task via lazyExecute + LatchTask latch1 = new LatchTask(); + executor.lazyExecute(latch1); + // Sumbit lazy task via regular execute + LatchTask latch2 = new LazyLatchTask(); + executor.execute(latch2); + + // Neither should run yet + assertFalse(latch1.await(100, TimeUnit.MILLISECONDS)); + assertFalse(latch2.await(100, TimeUnit.MILLISECONDS)); + + // Submit regular task via regular execute + LatchTask latch3 = new LatchTask(); + executor.execute(latch3); + + // Should flush latch1 and latch2 and then run latch3 immediately + assertTrue(latch3.await(100, TimeUnit.MILLISECONDS)); + assertEquals(0, latch1.getCount()); + assertEquals(0, latch2.getCount()); + } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 510dfd4f15..b38bacfbd2 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -20,6 +20,7 @@ import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; +import io.netty.util.concurrent.AbstractEventExecutor; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.OrderedEventExecutor; import io.netty.util.internal.ObjectPool; @@ -1105,7 +1106,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R } } - static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable { + static final class WriteTask extends AbstractWriteTask implements AbstractEventExecutor.LazyRunnable { private static final ObjectPool RECYCLER = ObjectPool.newPool(new ObjectCreator() { @Override diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index d0bc086cce..07afe3091f 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -118,7 +118,7 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im reject(task); } - if (wakesUpForTask(task)) { + if (!(task instanceof LazyRunnable) && wakesUpForTask(task)) { wakeup(inEventLoop()); } } @@ -159,9 +159,4 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im public int registeredChannels() { return -1; } - - /** - * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases. - */ - interface NonWakeupRunnable extends SingleThreadEventExecutor.NonWakeupRunnable { } }