From 4949d4a0ad271d5d48e4f67f1b691c53c2bafe8c Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 11 Mar 2021 08:31:32 +0100 Subject: [PATCH] Return correct result for Futures that are returned from UnorderedThreadPoolExecutor (#11074) Motivation: Due a regression in fd8c1874b4e24a18c562c7013efabcb155395459 we did not correctly set the result for the returned Future if it was build for a Callable. Modifications: - Adjust code to call get() to retrive the correct result for notification of the future. - Add unit test Result: Fixes https://github.com/netty/netty/issues/11072 --- .../io/netty/util/concurrent/PromiseTask.java | 2 +- .../UnorderedThreadPoolEventExecutor.java | 28 ++++++++++++-- .../UnorderedThreadPoolEventExecutorTest.java | 37 +++++++++++++++++++ 3 files changed, 63 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java index b1b106bdc6..1a14219b8c 100644 --- a/common/src/main/java/io/netty/util/concurrent/PromiseTask.java +++ b/common/src/main/java/io/netty/util/concurrent/PromiseTask.java @@ -90,7 +90,7 @@ class PromiseTask extends DefaultPromise implements RunnableFuture { } @SuppressWarnings("unchecked") - final V runTask() throws Exception { + V runTask() throws Throwable { final Object task = this.task; if (task instanceof Callable) { return ((Callable) task).call(); diff --git a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java index 9ebae11c54..3709a7f961 100644 --- a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RunnableScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -161,12 +162,12 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE @Override protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { return runnable instanceof NonNotifyRunnable ? - task : new RunnableScheduledFutureTask(this, task); + task : new RunnableScheduledFutureTask(this, task, false); } @Override protected RunnableScheduledFuture decorateTask(Callable callable, RunnableScheduledFuture task) { - return new RunnableScheduledFutureTask(this, task); + return new RunnableScheduledFutureTask(this, task, true); } @Override @@ -212,10 +213,31 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE private static final class RunnableScheduledFutureTask extends PromiseTask implements RunnableScheduledFuture, ScheduledFuture { private final RunnableScheduledFuture future; + private final boolean wasCallable; - RunnableScheduledFutureTask(EventExecutor executor, RunnableScheduledFuture future) { + RunnableScheduledFutureTask(EventExecutor executor, RunnableScheduledFuture future, boolean wasCallable) { super(executor, future); this.future = future; + this.wasCallable = wasCallable; + } + + @Override + V runTask() throws Throwable { + V result = super.runTask(); + if (result == null && wasCallable) { + // If this RunnableScheduledFutureTask wraps a RunnableScheduledFuture that wraps a Callable we need + // to ensure that we return the correct result by calling future.get(). + // + // See https://github.com/netty/netty/issues/11072 + assert future.isDone(); + try { + return future.get(); + } catch (ExecutionException e) { + // unwrap exception. + throw e.getCause(); + } + } + return result; } @Override diff --git a/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java index 59120f01ef..6c7f274aa4 100644 --- a/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java @@ -18,6 +18,7 @@ package io.netty.util.concurrent; import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -73,4 +74,40 @@ public class UnorderedThreadPoolEventExecutorTest { executor.shutdownGracefully(); } } + + @Test + public void testGetReturnsCorrectValueOnSuccess() throws Exception { + UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1); + try { + final String expected = "expected"; + Future f = executor.submit(new Callable() { + @Override + public String call() { + return expected; + } + }); + + Assert.assertEquals(expected, f.get()); + } finally { + executor.shutdownGracefully(); + } + } + + @Test + public void testGetReturnsCorrectValueOnFailure() throws Exception { + UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1); + try { + final RuntimeException cause = new RuntimeException(); + Future f = executor.submit(new Callable() { + @Override + public String call() { + throw cause; + } + }); + + Assert.assertSame(cause, f.await().cause()); + } finally { + executor.shutdownGracefully(); + } + } }