Return correct result for Futures that are returned from UnorderedThreadPoolExecutor (#11074)

Motivation:

Due a regression in fd8c1874b4e24a18c562c7013efabcb155395459 we did not correctly set the result for the returned Future if it was build for a Callable.

Modifications:

- Adjust code to call get() to retrive the correct result for notification of the future.
- Add unit test

Result:

Fixes https://github.com/netty/netty/issues/11072
This commit is contained in:
Norman Maurer 2021-03-11 08:31:32 +01:00 committed by GitHub
parent e83132fcf2
commit 4949d4a0ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 4 deletions

View File

@ -90,7 +90,7 @@ class PromiseTask<V> extends DefaultPromise<V> implements RunnableFuture<V> {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final V runTask() throws Exception { V runTask() throws Throwable {
final Object task = this.task; final Object task = this.task;
if (task instanceof Callable) { if (task instanceof Callable) {
return ((Callable<V>) task).call(); return ((Callable<V>) task).call();

View File

@ -24,6 +24,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Delayed; import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableScheduledFuture; import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
@ -161,12 +162,12 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
@Override @Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) { protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return runnable instanceof NonNotifyRunnable ? return runnable instanceof NonNotifyRunnable ?
task : new RunnableScheduledFutureTask<V>(this, task); task : new RunnableScheduledFutureTask<V>(this, task, false);
} }
@Override @Override
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) { protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
return new RunnableScheduledFutureTask<V>(this, task); return new RunnableScheduledFutureTask<V>(this, task, true);
} }
@Override @Override
@ -212,10 +213,31 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
private static final class RunnableScheduledFutureTask<V> extends PromiseTask<V> private static final class RunnableScheduledFutureTask<V> extends PromiseTask<V>
implements RunnableScheduledFuture<V>, ScheduledFuture<V> { implements RunnableScheduledFuture<V>, ScheduledFuture<V> {
private final RunnableScheduledFuture<V> future; private final RunnableScheduledFuture<V> future;
private final boolean wasCallable;
RunnableScheduledFutureTask(EventExecutor executor, RunnableScheduledFuture<V> future) { RunnableScheduledFutureTask(EventExecutor executor, RunnableScheduledFuture<V> future, boolean wasCallable) {
super(executor, future); super(executor, future);
this.future = future; this.future = future;
this.wasCallable = wasCallable;
}
@Override
V runTask() throws Throwable {
V result = super.runTask();
if (result == null && wasCallable) {
// If this RunnableScheduledFutureTask wraps a RunnableScheduledFuture that wraps a Callable we need
// to ensure that we return the correct result by calling future.get().
//
// See https://github.com/netty/netty/issues/11072
assert future.isDone();
try {
return future.get();
} catch (ExecutionException e) {
// unwrap exception.
throw e.getCause();
}
}
return result;
} }
@Override @Override

View File

@ -18,6 +18,7 @@ package io.netty.util.concurrent;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -73,4 +74,40 @@ public class UnorderedThreadPoolEventExecutorTest {
executor.shutdownGracefully(); executor.shutdownGracefully();
} }
} }
@Test
public void testGetReturnsCorrectValueOnSuccess() throws Exception {
UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1);
try {
final String expected = "expected";
Future<String> f = executor.submit(new Callable<String>() {
@Override
public String call() {
return expected;
}
});
Assert.assertEquals(expected, f.get());
} finally {
executor.shutdownGracefully();
}
}
@Test
public void testGetReturnsCorrectValueOnFailure() throws Exception {
UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1);
try {
final RuntimeException cause = new RuntimeException();
Future<String> f = executor.submit(new Callable<String>() {
@Override
public String call() {
throw cause;
}
});
Assert.assertSame(cause, f.await().cause());
} finally {
executor.shutdownGracefully();
}
}
} }