From fd8c1874b4e24a18c562c7013efabcb155395459 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 14 Oct 2020 11:09:16 +0200 Subject: [PATCH] Fix #10614 by making UnorderedTPEExecutor.scheduleAtFixedRate run tasks more than once (#10659) Motivation: All scheduled executors should behave in accordance to their API. The bug here is that scheduled tasks were not run more than once because we executed the runnables directly, instead of through the provided runnable future. Modification: We now run tasks through the provided future, so that when each run completes, the internal state of the task is reset and the ScheduledThreadPoolExecutor is informed of the completion. This allows the executor to prepare the next run. Result: The UnorderedThreadPoolEventExecutor is now able to run scheduled tasks more than once. Which is what one would expect from the API. --- .../UnorderedThreadPoolEventExecutor.java | 15 ++++----------- .../UnorderedThreadPoolEventExecutorTest.java | 19 +++++++++++++++++++ .../epoll/EpollSocketChannelConfigTest.java | 7 ++++++- 3 files changed, 29 insertions(+), 12 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java index 277c90322a..dbf3f25fda 100644 --- a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java @@ -161,12 +161,12 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE @Override protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { return runnable instanceof NonNotifyRunnable ? - task : new RunnableScheduledFutureTask(this, runnable, task); + task : new RunnableScheduledFutureTask(this, task); } @Override protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) { - return new RunnableScheduledFutureTask(this, callable, task); + return new RunnableScheduledFutureTask(this, task); } @Override @@ -213,15 +213,8 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE implements RunnableScheduledFuture, ScheduledFuture { private final RunnableScheduledFuture future; - RunnableScheduledFutureTask(EventExecutor executor, Runnable runnable, - RunnableScheduledFuture future) { - super(executor, runnable); - this.future = future; - } - - RunnableScheduledFutureTask(EventExecutor executor, Callable callable, - RunnableScheduledFuture future) { - super(executor, callable); + RunnableScheduledFutureTask(EventExecutor executor, RunnableScheduledFuture future) { + super(executor, future); this.future = future; } diff --git a/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java index d96db3fcb0..2f4e02dc0b 100644 --- a/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java @@ -19,6 +19,7 @@ import org.junit.Assert; import org.junit.Test; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; public class UnorderedThreadPoolEventExecutorTest { @@ -54,4 +55,22 @@ public class UnorderedThreadPoolEventExecutorTest { executor.shutdownGracefully(); } } + + @Test(timeout = 10000) + public void scheduledAtFixedRateMustRunTaskRepeatedly() throws InterruptedException { + UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1); + final CountDownLatch latch = new CountDownLatch(3); + Future future = executor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }, 1, 1, TimeUnit.MILLISECONDS); + try { + latch.await(); + } finally { + future.cancel(true); + executor.shutdownGracefully(); + } + } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelConfigTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelConfigTest.java index 58cf5af646..c2f200a771 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelConfigTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketChannelConfigTest.java @@ -152,7 +152,12 @@ public class EpollSocketChannelConfigTest { ch.config().getSoLinger(); fail(); } catch (ChannelException e) { - assertTrue(e.getCause() instanceof ClosedChannelException); + if (!(e.getCause() instanceof ClosedChannelException)) { + AssertionError error = new AssertionError( + "Expected the suppressed exception to be an instance of ClosedChannelException."); + error.addSuppressed(e.getCause()); + throw error; + } } }