Revert "Ability to run a task at the end of an eventloop iteration." (#8637)
Motivation:
executeAfterEventLoopIteration is an Unstable API and isnt used in Netty. We should remove it to reduce complexity.
Changes:
This reverts commit 77770374fb
.
Result:
Simplify implementation / cleanup.
This commit is contained in:
parent
563793688f
commit
7f22743d92
@ -201,10 +201,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
|
|||||||
*/
|
*/
|
||||||
protected Runnable pollTask() {
|
protected Runnable pollTask() {
|
||||||
assert inEventLoop();
|
assert inEventLoop();
|
||||||
return pollTaskFrom(taskQueue);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) {
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
Runnable task = taskQueue.poll();
|
Runnable task = taskQueue.poll();
|
||||||
if (task == WAKEUP_TASK) {
|
if (task == WAKEUP_TASK) {
|
||||||
@ -319,7 +316,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
|
|||||||
throw new NullPointerException("task");
|
throw new NullPointerException("task");
|
||||||
}
|
}
|
||||||
if (!offerTask(task)) {
|
if (!offerTask(task)) {
|
||||||
reject(task);
|
rejectedExecutionHandler.rejected(task, this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -346,43 +343,30 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
|
|||||||
* @return {@code true} if and only if at least one task was run
|
* @return {@code true} if and only if at least one task was run
|
||||||
*/
|
*/
|
||||||
protected boolean runAllTasks() {
|
protected boolean runAllTasks() {
|
||||||
assert inEventLoop();
|
|
||||||
boolean fetchedAll;
|
boolean fetchedAll;
|
||||||
boolean ranAtLeastOne = false;
|
|
||||||
|
|
||||||
do {
|
do {
|
||||||
fetchedAll = fetchFromScheduledTaskQueue();
|
fetchedAll = fetchFromScheduledTaskQueue();
|
||||||
if (runAllTasksFrom(taskQueue)) {
|
Runnable task = pollTask();
|
||||||
ranAtLeastOne = true;
|
if (task == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("A task raised an exception.", t);
|
||||||
|
}
|
||||||
|
|
||||||
|
task = pollTask();
|
||||||
|
if (task == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
|
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
|
||||||
|
|
||||||
if (ranAtLeastOne) {
|
lastExecutionTime = ScheduledFutureTask.nanoTime();
|
||||||
lastExecutionTime = ScheduledFutureTask.nanoTime();
|
return true;
|
||||||
}
|
|
||||||
afterRunningAllTasks();
|
|
||||||
return ranAtLeastOne;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Runs all tasks from the passed {@code taskQueue}.
|
|
||||||
*
|
|
||||||
* @param taskQueue To poll and execute all tasks.
|
|
||||||
*
|
|
||||||
* @return {@code true} if at least one task was executed.
|
|
||||||
*/
|
|
||||||
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
|
|
||||||
Runnable task = pollTaskFrom(taskQueue);
|
|
||||||
if (task == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
for (;;) {
|
|
||||||
safeExecute(task);
|
|
||||||
task = pollTaskFrom(taskQueue);
|
|
||||||
if (task == null) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -393,7 +377,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
|
|||||||
fetchFromScheduledTaskQueue();
|
fetchFromScheduledTaskQueue();
|
||||||
Runnable task = pollTask();
|
Runnable task = pollTask();
|
||||||
if (task == null) {
|
if (task == null) {
|
||||||
afterRunningAllTasks();
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -401,7 +384,11 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
|
|||||||
long runTasks = 0;
|
long runTasks = 0;
|
||||||
long lastExecutionTime;
|
long lastExecutionTime;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
safeExecute(task);
|
try {
|
||||||
|
task.run();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("A task raised an exception.", t);
|
||||||
|
}
|
||||||
|
|
||||||
runTasks ++;
|
runTasks ++;
|
||||||
|
|
||||||
@ -421,16 +408,10 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
afterRunningAllTasks();
|
|
||||||
this.lastExecutionTime = lastExecutionTime;
|
this.lastExecutionTime = lastExecutionTime;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Invoked before returning from {@link #runAllTasks()} and {@link #runAllTasks(long)}.
|
|
||||||
*/
|
|
||||||
@UnstableApi
|
|
||||||
protected void afterRunningAllTasks() { }
|
|
||||||
/**
|
/**
|
||||||
* Returns the amount of time left until the scheduled task with the closest dead line is executed.
|
* Returns the amount of time left until the scheduled task with the closest dead line is executed.
|
||||||
*/
|
*/
|
||||||
@ -867,15 +848,6 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
|
|||||||
throw new RejectedExecutionException("event executor terminated");
|
throw new RejectedExecutionException("event executor terminated");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Offers the task to the associated {@link RejectedExecutionHandler}.
|
|
||||||
*
|
|
||||||
* @param task to reject.
|
|
||||||
*/
|
|
||||||
protected final void reject(Runnable task) {
|
|
||||||
rejectedExecutionHandler.rejected(task, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ScheduledExecutorService implementation
|
// ScheduledExecutorService implementation
|
||||||
|
|
||||||
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
||||||
|
@ -20,9 +20,7 @@ import io.netty.util.concurrent.RejectedExecutionHandlers;
|
|||||||
import io.netty.util.concurrent.SingleThreadEventExecutor;
|
import io.netty.util.concurrent.SingleThreadEventExecutor;
|
||||||
import io.netty.util.internal.ObjectUtil;
|
import io.netty.util.internal.ObjectUtil;
|
||||||
import io.netty.util.internal.SystemPropertyUtil;
|
import io.netty.util.internal.SystemPropertyUtil;
|
||||||
import io.netty.util.internal.UnstableApi;
|
|
||||||
|
|
||||||
import java.util.Queue;
|
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
|
||||||
@ -35,8 +33,6 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
|
|||||||
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
|
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
|
||||||
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
|
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
|
||||||
|
|
||||||
private final Queue<Runnable> tailTasks;
|
|
||||||
|
|
||||||
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
|
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
|
||||||
this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
|
this(parent, threadFactory, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
|
||||||
}
|
}
|
||||||
@ -49,14 +45,12 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
|
|||||||
boolean addTaskWakesUp, int maxPendingTasks,
|
boolean addTaskWakesUp, int maxPendingTasks,
|
||||||
RejectedExecutionHandler rejectedExecutionHandler) {
|
RejectedExecutionHandler rejectedExecutionHandler) {
|
||||||
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
|
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
|
||||||
tailTasks = newTaskQueue(maxPendingTasks);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
|
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
|
||||||
boolean addTaskWakesUp, int maxPendingTasks,
|
boolean addTaskWakesUp, int maxPendingTasks,
|
||||||
RejectedExecutionHandler rejectedExecutionHandler) {
|
RejectedExecutionHandler rejectedExecutionHandler) {
|
||||||
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
|
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
|
||||||
tailTasks = newTaskQueue(maxPendingTasks);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -95,59 +89,11 @@ public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor im
|
|||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds a task to be run once at the end of next (or current) {@code eventloop} iteration.
|
|
||||||
*
|
|
||||||
* @param task to be added.
|
|
||||||
*/
|
|
||||||
@UnstableApi
|
|
||||||
public final void executeAfterEventLoopIteration(Runnable task) {
|
|
||||||
ObjectUtil.checkNotNull(task, "task");
|
|
||||||
if (isShutdown()) {
|
|
||||||
reject();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!tailTasks.offer(task)) {
|
|
||||||
reject(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (wakesUpForTask(task)) {
|
|
||||||
wakeup(inEventLoop());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Removes a task that was added previously via {@link #executeAfterEventLoopIteration(Runnable)}.
|
|
||||||
*
|
|
||||||
* @param task to be removed.
|
|
||||||
*
|
|
||||||
* @return {@code true} if the task was removed as a result of this call.
|
|
||||||
*/
|
|
||||||
@UnstableApi
|
|
||||||
final boolean removeAfterEventLoopIterationTask(Runnable task) {
|
|
||||||
return tailTasks.remove(ObjectUtil.checkNotNull(task, "task"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected boolean wakesUpForTask(Runnable task) {
|
protected boolean wakesUpForTask(Runnable task) {
|
||||||
return !(task instanceof NonWakeupRunnable);
|
return !(task instanceof NonWakeupRunnable);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void afterRunningAllTasks() {
|
|
||||||
runAllTasksFrom(tailTasks);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean hasTasks() {
|
|
||||||
return super.hasTasks() || !tailTasks.isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int pendingTasks() {
|
|
||||||
return super.pendingTasks() + tailTasks.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases.
|
* Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases.
|
||||||
*/
|
*/
|
||||||
|
@ -20,7 +20,6 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
|
|||||||
import ch.qos.logback.core.Appender;
|
import ch.qos.logback.core.Appender;
|
||||||
import io.netty.channel.local.LocalChannel;
|
import io.netty.channel.local.LocalChannel;
|
||||||
import io.netty.util.concurrent.EventExecutor;
|
import io.netty.util.concurrent.EventExecutor;
|
||||||
import org.hamcrest.MatcherAssert;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -52,13 +51,11 @@ public class SingleThreadEventLoopTest {
|
|||||||
|
|
||||||
private SingleThreadEventLoopA loopA;
|
private SingleThreadEventLoopA loopA;
|
||||||
private SingleThreadEventLoopB loopB;
|
private SingleThreadEventLoopB loopB;
|
||||||
private SingleThreadEventLoopC loopC;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void newEventLoop() {
|
public void newEventLoop() {
|
||||||
loopA = new SingleThreadEventLoopA();
|
loopA = new SingleThreadEventLoopA();
|
||||||
loopB = new SingleThreadEventLoopB();
|
loopB = new SingleThreadEventLoopB();
|
||||||
loopC = new SingleThreadEventLoopC();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
@ -69,9 +66,6 @@ public class SingleThreadEventLoopTest {
|
|||||||
if (!loopB.isShuttingDown()) {
|
if (!loopB.isShuttingDown()) {
|
||||||
loopB.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
|
loopB.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
if (!loopC.isShuttingDown()) {
|
|
||||||
loopC.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (!loopA.isTerminated()) {
|
while (!loopA.isTerminated()) {
|
||||||
try {
|
try {
|
||||||
@ -89,14 +83,6 @@ public class SingleThreadEventLoopTest {
|
|||||||
// Ignore
|
// Ignore
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
while (!loopC.isTerminated()) {
|
|
||||||
try {
|
|
||||||
loopC.awaitTermination(1, TimeUnit.DAYS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// Ignore
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -151,11 +137,6 @@ public class SingleThreadEventLoopTest {
|
|||||||
testScheduleTask(loopB);
|
testScheduleTask(loopB);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void scheduleTaskC() throws Exception {
|
|
||||||
testScheduleTask(loopC);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException {
|
private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException {
|
||||||
long startTime = System.nanoTime();
|
long startTime = System.nanoTime();
|
||||||
final AtomicLong endTime = new AtomicLong();
|
final AtomicLong endTime = new AtomicLong();
|
||||||
@ -473,39 +454,7 @@ public class SingleThreadEventLoopTest {
|
|||||||
assertThat(loopA.isShutdown(), is(true));
|
assertThat(loopA.isShutdown(), is(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
private static class SingleThreadEventLoopA extends SingleThreadEventLoop {
|
||||||
public void testOnEventLoopIteration() throws Exception {
|
|
||||||
CountingRunnable onIteration = new CountingRunnable();
|
|
||||||
loopC.executeAfterEventLoopIteration(onIteration);
|
|
||||||
CountingRunnable noopTask = new CountingRunnable();
|
|
||||||
loopC.submit(noopTask).sync();
|
|
||||||
loopC.iterationEndSignal.take();
|
|
||||||
MatcherAssert.assertThat("Unexpected invocation count for regular task.",
|
|
||||||
noopTask.getInvocationCount(), is(1));
|
|
||||||
MatcherAssert.assertThat("Unexpected invocation count for on every eventloop iteration task.",
|
|
||||||
onIteration.getInvocationCount(), is(1));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 10000)
|
|
||||||
public void testRemoveOnEventLoopIteration() throws Exception {
|
|
||||||
CountingRunnable onIteration1 = new CountingRunnable();
|
|
||||||
loopC.executeAfterEventLoopIteration(onIteration1);
|
|
||||||
CountingRunnable onIteration2 = new CountingRunnable();
|
|
||||||
loopC.executeAfterEventLoopIteration(onIteration2);
|
|
||||||
loopC.removeAfterEventLoopIterationTask(onIteration1);
|
|
||||||
CountingRunnable noopTask = new CountingRunnable();
|
|
||||||
loopC.submit(noopTask).sync();
|
|
||||||
|
|
||||||
loopC.iterationEndSignal.take();
|
|
||||||
MatcherAssert.assertThat("Unexpected invocation count for regular task.",
|
|
||||||
noopTask.getInvocationCount(), is(1));
|
|
||||||
MatcherAssert.assertThat("Unexpected invocation count for on every eventloop iteration task.",
|
|
||||||
onIteration2.getInvocationCount(), is(1));
|
|
||||||
MatcherAssert.assertThat("Unexpected invocation count for on every eventloop iteration task.",
|
|
||||||
onIteration1.getInvocationCount(), is(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final class SingleThreadEventLoopA extends SingleThreadEventLoop {
|
|
||||||
|
|
||||||
final AtomicInteger cleanedUp = new AtomicInteger();
|
final AtomicInteger cleanedUp = new AtomicInteger();
|
||||||
|
|
||||||
@ -549,7 +498,7 @@ public class SingleThreadEventLoopTest {
|
|||||||
// Waken up by interruptThread()
|
// Waken up by interruptThread()
|
||||||
}
|
}
|
||||||
|
|
||||||
runTasks0();
|
runAllTasks();
|
||||||
|
|
||||||
if (confirmShutdown()) {
|
if (confirmShutdown()) {
|
||||||
break;
|
break;
|
||||||
@ -557,47 +506,9 @@ public class SingleThreadEventLoopTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void runTasks0() {
|
|
||||||
runAllTasks();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void wakeup(boolean inEventLoop) {
|
protected void wakeup(boolean inEventLoop) {
|
||||||
interruptThread();
|
interruptThread();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class SingleThreadEventLoopC extends SingleThreadEventLoopB {
|
|
||||||
|
|
||||||
final LinkedBlockingQueue<Boolean> iterationEndSignal = new LinkedBlockingQueue<Boolean>(1);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void afterRunningAllTasks() {
|
|
||||||
super.afterRunningAllTasks();
|
|
||||||
iterationEndSignal.offer(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void runTasks0() {
|
|
||||||
runAllTasks(TimeUnit.MINUTES.toNanos(1));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class CountingRunnable implements Runnable {
|
|
||||||
|
|
||||||
private final AtomicInteger invocationCount = new AtomicInteger();
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
invocationCount.incrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getInvocationCount() {
|
|
||||||
return invocationCount.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void resetInvocationCount() {
|
|
||||||
invocationCount.set(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user