diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index dab7e9500d..994b961729 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -201,10 +201,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx */ protected Runnable pollTask() { assert inEventLoop(); - return pollTaskFrom(taskQueue); - } - protected static Runnable pollTaskFrom(Queue taskQueue) { for (;;) { Runnable task = taskQueue.poll(); if (task == WAKEUP_TASK) { @@ -319,7 +316,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx throw new NullPointerException("task"); } if (!offerTask(task)) { - reject(task); + rejectedExecutionHandler.rejected(task, this); } } @@ -346,43 +343,30 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx * @return {@code true} if and only if at least one task was run */ protected boolean runAllTasks() { - assert inEventLoop(); boolean fetchedAll; - boolean ranAtLeastOne = false; - do { fetchedAll = fetchFromScheduledTaskQueue(); - if (runAllTasksFrom(taskQueue)) { - ranAtLeastOne = true; + Runnable task = pollTask(); + if (task == null) { + return false; + } + + for (;;) { + try { + task.run(); + } catch (Throwable t) { + logger.warn("A task raised an exception.", t); + } + + task = pollTask(); + if (task == null) { + break; + } } } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. - if (ranAtLeastOne) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); - } - afterRunningAllTasks(); - return ranAtLeastOne; - } - - /** - * Runs all tasks from the passed {@code taskQueue}. - * - * @param taskQueue To poll and execute all tasks. - * - * @return {@code true} if at least one task was executed. - */ - protected final boolean runAllTasksFrom(Queue taskQueue) { - Runnable task = pollTaskFrom(taskQueue); - if (task == null) { - return false; - } - for (;;) { - safeExecute(task); - task = pollTaskFrom(taskQueue); - if (task == null) { - return true; - } - } + lastExecutionTime = ScheduledFutureTask.nanoTime(); + return true; } /** @@ -393,7 +377,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { - afterRunningAllTasks(); return false; } @@ -401,7 +384,11 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx long runTasks = 0; long lastExecutionTime; for (;;) { - safeExecute(task); + try { + task.run(); + } catch (Throwable t) { + logger.warn("A task raised an exception.", t); + } runTasks ++; @@ -421,16 +408,10 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } } - afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; } - /** - * Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}. - */ - @UnstableApi - protected void afterRunningAllTasks() { } /** * Returns the amount of time left until the scheduled task with the closest dead line is executed. */ @@ -867,15 +848,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx throw new RejectedExecutionException("event executor terminated"); } - /** - * Offers the task to the associated {@link RejectedExecutionHandler}. - * - * @param task to reject. - */ - protected final void reject(Runnable task) { - rejectedExecutionHandler.rejected(task, this); - } - // ScheduledExecutorService implementation private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index c547b341f6..a24ea43f47 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -20,9 +20,7 @@ import io.netty.util.concurrent.RejectedExecutionHandlers; import io.netty.util.concurrent.SingleThreadEventExecutor; import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.SystemPropertyUtil; -import io.netty.util.internal.UnstableApi; -import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory; @@ -35,8 +33,6 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)); - private final Queue tailTasks; - protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject()); } @@ -49,14 +45,12 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); - tailTasks = newTaskQueue(maxPendingTasks); } protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); - tailTasks = newTaskQueue(maxPendingTasks); } @Override @@ -95,59 +89,11 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im return promise; } - /** - * Adds a task to be run once at the end of next (or current) {@code eventloop} iteration. - * - * @param task to be added. - */ - @UnstableApi - public final void executeAfterEventLoopIteration(Runnable task) { - ObjectUtil.checkNotNull(task, "task"); - if (isShutdown()) { - reject(); - } - - if (!tailTasks.offer(task)) { - reject(task); - } - - if (wakesUpForTask(task)) { - wakeup(inEventLoop()); - } - } - - /** - * Removes a task that was added previously via {@link #executeAfterEventLoopIteration(Runnable)}. - * - * @param task to be removed. - * - * @return {@code true} if the task was removed as a result of this call. - */ - @UnstableApi - final boolean removeAfterEventLoopIterationTask(Runnable task) { - return tailTasks.remove(ObjectUtil.checkNotNull(task, "task")); - } - @Override protected boolean wakesUpForTask(Runnable task) { return !(task instanceof NonWakeupRunnable); } - @Override - protected void afterRunningAllTasks() { - runAllTasksFrom(tailTasks); - } - - @Override - protected boolean hasTasks() { - return super.hasTasks() || !tailTasks.isEmpty(); - } - - @Override - public int pendingTasks() { - return super.pendingTasks() + tailTasks.size(); - } - /** * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases. */ diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 78e7d798bd..1b94191522 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -20,7 +20,6 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.Appender; import io.netty.channel.local.LocalChannel; import io.netty.util.concurrent.EventExecutor; -import org.hamcrest.MatcherAssert; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -52,13 +51,11 @@ public class SingleThreadEventLoopTest { private SingleThreadEventLoopA loopA; private SingleThreadEventLoopB loopB; - private SingleThreadEventLoopC loopC; @Before public void newEventLoop() { loopA = new SingleThreadEventLoopA(); loopB = new SingleThreadEventLoopB(); - loopC = new SingleThreadEventLoopC(); } @After @@ -69,9 +66,6 @@ public class SingleThreadEventLoopTest { if (!loopB.isShuttingDown()) { loopB.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); } - if (!loopC.isShuttingDown()) { - loopC.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS); - } while (!loopA.isTerminated()) { try { @@ -89,14 +83,6 @@ public class SingleThreadEventLoopTest { // Ignore } } - - while (!loopC.isTerminated()) { - try { - loopC.awaitTermination(1, TimeUnit.DAYS); - } catch (InterruptedException e) { - // Ignore - } - } } @Test @@ -151,11 +137,6 @@ public class SingleThreadEventLoopTest { testScheduleTask(loopB); } - @Test - public void scheduleTaskC() throws Exception { - testScheduleTask(loopC); - } - private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException { long startTime = System.nanoTime(); final AtomicLong endTime = new AtomicLong(); @@ -473,39 +454,7 @@ public class SingleThreadEventLoopTest { assertThat(loopA.isShutdown(), is(true)); } - @Test(timeout = 10000) - public void testOnEventLoopIteration() throws Exception { - CountingRunnable onIteration = new CountingRunnable(); - loopC.executeAfterEventLoopIteration(onIteration); - CountingRunnable noopTask = new CountingRunnable(); - loopC.submit(noopTask).sync(); - loopC.iterationEndSignal.take(); - MatcherAssert.assertThat("Unexpected invocation count for regular task.", - noopTask.getInvocationCount(), is(1)); - MatcherAssert.assertThat("Unexpected invocation count for on every eventloop iteration task.", - onIteration.getInvocationCount(), is(1)); - } - - @Test(timeout = 10000) - public void testRemoveOnEventLoopIteration() throws Exception { - CountingRunnable onIteration1 = new CountingRunnable(); - loopC.executeAfterEventLoopIteration(onIteration1); - CountingRunnable onIteration2 = new CountingRunnable(); - loopC.executeAfterEventLoopIteration(onIteration2); - loopC.removeAfterEventLoopIterationTask(onIteration1); - CountingRunnable noopTask = new CountingRunnable(); - loopC.submit(noopTask).sync(); - - loopC.iterationEndSignal.take(); - MatcherAssert.assertThat("Unexpected invocation count for regular task.", - noopTask.getInvocationCount(), is(1)); - MatcherAssert.assertThat("Unexpected invocation count for on every eventloop iteration task.", - onIteration2.getInvocationCount(), is(1)); - MatcherAssert.assertThat("Unexpected invocation count for on every eventloop iteration task.", - onIteration1.getInvocationCount(), is(0)); - } - - private static final class SingleThreadEventLoopA extends SingleThreadEventLoop { + private static class SingleThreadEventLoopA extends SingleThreadEventLoop { final AtomicInteger cleanedUp = new AtomicInteger(); @@ -549,7 +498,7 @@ public class SingleThreadEventLoopTest { // Waken up by interruptThread() } - runTasks0(); + runAllTasks(); if (confirmShutdown()) { break; @@ -557,47 +506,9 @@ public class SingleThreadEventLoopTest { } } - protected void runTasks0() { - runAllTasks(); - } - @Override protected void wakeup(boolean inEventLoop) { interruptThread(); } } - - private static final class SingleThreadEventLoopC extends SingleThreadEventLoopB { - - final LinkedBlockingQueue iterationEndSignal = new LinkedBlockingQueue(1); - - @Override - protected void afterRunningAllTasks() { - super.afterRunningAllTasks(); - iterationEndSignal.offer(true); - } - - @Override - protected void runTasks0() { - runAllTasks(TimeUnit.MINUTES.toNanos(1)); - } - } - - private static class CountingRunnable implements Runnable { - - private final AtomicInteger invocationCount = new AtomicInteger(); - - @Override - public void run() { - invocationCount.incrementAndGet(); - } - - public int getInvocationCount() { - return invocationCount.get(); - } - - public void resetInvocationCount() { - invocationCount.set(0); - } - } }