Fix #10614 by making UnorderedTPEExecutor.scheduleAtFixedRate run tasks more than once (#10659)

Motivation:
All scheduled executors should behave in accordance to their API.
The bug here is that scheduled tasks were not run more than once because we executed the runnables directly, instead of through the provided runnable future.

Modification:
We now run tasks through the provided future, so that when each run completes, the internal state of the task is reset and the ScheduledThreadPoolExecutor is informed of the completion.
This allows the executor to prepare the next run.

Result:
The UnorderedThreadPoolEventExecutor is now able to run scheduled tasks more than once.
Which is what one would expect from the API.
This commit is contained in:
Chris Vest 2020-10-14 11:09:16 +02:00 committed by GitHub
parent cd1581faad
commit fd8c1874b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 12 deletions

View File

@ -161,12 +161,12 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
@Override @Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) { protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return runnable instanceof NonNotifyRunnable ? return runnable instanceof NonNotifyRunnable ?
task : new RunnableScheduledFutureTask<V>(this, runnable, task); task : new RunnableScheduledFutureTask<V>(this, task);
} }
@Override @Override
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) { protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return new RunnableScheduledFutureTask<V>(this, callable, task); return new RunnableScheduledFutureTask<V>(this, task);
} }
@Override @Override
@ -213,15 +213,8 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
implements RunnableScheduledFuture<V>, ScheduledFuture<V> { implements RunnableScheduledFuture<V>, ScheduledFuture<V> {
private final RunnableScheduledFuture<V> future; private final RunnableScheduledFuture<V> future;
RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable, RunnableScheduledFutureTask(EventExecutor executor, RunnableScheduledFuture<V> future) {
RunnableScheduledFuture<V> future) { super(executor, future);
super(executor, runnable);
this.future = future;
}
RunnableScheduledFutureTask(EventExecutor executor, Callable<V> callable,
RunnableScheduledFuture<V> future) {
super(executor, callable);
this.future = future; this.future = future;
} }

View File

@ -19,6 +19,7 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class UnorderedThreadPoolEventExecutorTest { public class UnorderedThreadPoolEventExecutorTest {
@ -54,4 +55,22 @@ public class UnorderedThreadPoolEventExecutorTest {
executor.shutdownGracefully(); executor.shutdownGracefully();
} }
} }
@Test(timeout = 10000)
public void scheduledAtFixedRateMustRunTaskRepeatedly() throws InterruptedException {
UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1);
final CountDownLatch latch = new CountDownLatch(3);
Future<?> future = executor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
latch.countDown();
}
}, 1, 1, TimeUnit.MILLISECONDS);
try {
latch.await();
} finally {
future.cancel(true);
executor.shutdownGracefully();
}
}
} }

View File

@ -152,7 +152,12 @@ public class EpollSocketChannelConfigTest {
ch.config().getSoLinger(); ch.config().getSoLinger();
fail(); fail();
} catch (ChannelException e) { } catch (ChannelException e) {
assertTrue(e.getCause() instanceof ClosedChannelException); if (!(e.getCause() instanceof ClosedChannelException)) {
AssertionError error = new AssertionError(
"Expected the suppressed exception to be an instance of ClosedChannelException.");
error.addSuppressed(e.getCause());
throw error;
}
} }
} }