Fix #10614 by making UnorderedTPEExecutor.scheduleAtFixedRate run tasks more than once (#10659)

Motivation:
All scheduled executors should behave in accordance to their API.
The bug here is that scheduled tasks were not run more than once because we executed the runnables directly, instead of through the provided runnable future.

Modification:
We now run tasks through the provided future, so that when each run completes, the internal state of the task is reset and the ScheduledThreadPoolExecutor is informed of the completion.
This allows the executor to prepare the next run.

Result:
The UnorderedThreadPoolEventExecutor is now able to run scheduled tasks more than once.
Which is what one would expect from the API.
This commit is contained in:
Chris Vest 2020-10-14 11:09:16 +02:00 committed by Norman Maurer
parent 3cbbef687e
commit 0ca76c42a5
3 changed files with 27 additions and 3 deletions

View File

@ -44,7 +44,7 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
UnorderedThreadPoolEventExecutor.class);
private final Promise<?> terminationFuture = GlobalEventExecutor.INSTANCE.newPromise();
private final Set<EventExecutor> executorSet = Collections.singleton((EventExecutor) this);
private final Set<EventExecutor> executorSet = Collections.singleton(this);
/**
* Calls {@link UnorderedThreadPoolEventExecutor#UnorderedThreadPoolEventExecutor(int, ThreadFactory)}
@ -227,7 +227,7 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
} else if (!isDone()) {
try {
// Its a periodic task so we need to ignore the return value
task.call();
future.run();
} catch (Throwable cause) {
if (!tryFailureInternal(cause)) {
logger.warn("Failure during execution of task", cause);

View File

@ -19,6 +19,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class UnorderedThreadPoolEventExecutorTest {
@ -44,4 +45,22 @@ public class UnorderedThreadPoolEventExecutorTest {
executor.shutdownGracefully();
}
}
@Test(timeout = 10000)
public void scheduledAtFixedRateMustRunTaskRepeatedly() throws InterruptedException {
UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1);
final CountDownLatch latch = new CountDownLatch(3);
Future<?> future = executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
latch.countDown();
}
}, 1, 1, TimeUnit.MILLISECONDS);
try {
latch.await();
} finally {
future.cancel(true);
executor.shutdownGracefully();
}
}
}

View File

@ -153,7 +153,12 @@ public class EpollSocketChannelConfigTest {
ch.config().getSoLinger();
fail();
} catch (ChannelException e) {
assertTrue(e.getCause() instanceof ClosedChannelException);
if (!(e.getCause() instanceof ClosedChannelException)) {
AssertionError error = new AssertionError(
"Expected the suppressed exception to be an instance of ClosedChannelException.");
error.addSuppressed(e.getCause());
throw error;
}
}
}