diff --git a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java index 5d073debec..4ed94da537 100644 --- a/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutor.java @@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + /** * {@link EventExecutor} implementation which makes no guarantees about the ordering of task execution that * are submitted because there may be multiple threads executing these tasks. @@ -158,7 +160,8 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE @Override protected RunnableScheduledFuture decorateTask(Runnable runnable, RunnableScheduledFuture task) { - return new RunnableScheduledFutureTask(this, runnable, task); + return runnable instanceof NonNotifyRunnable ? + task : new RunnableScheduledFutureTask(this, runnable, task); } @Override @@ -201,6 +204,11 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE return (Future) super.submit(task); } + @Override + public void execute(Runnable command) { + super.schedule(new NonNotifyRunnable(command), 0, NANOSECONDS); + } + private static final class RunnableScheduledFutureTask extends PromiseTask implements RunnableScheduledFuture, ScheduledFuture { private final RunnableScheduledFuture future; @@ -248,4 +256,25 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE return future.compareTo(o); } } + + // This is a special wrapper which we will be used in execute(...) to wrap the submitted Runnable. This is needed as + // ScheduledThreadPoolExecutor.execute(...) will delegate to submit(...) which will then use decorateTask(...). + // The problem with this is that decorateTask(...) needs to ensure we only do our own decoration if we not call + // from execute(...) as otherwise we may end up creating an endless loop because DefaultPromise will call + // EventExecutor.execute(...) when notify the listeners of the promise. + // + // See https://github.com/netty/netty/issues/6507 + private static final class NonNotifyRunnable implements Runnable { + + private final Runnable task; + + NonNotifyRunnable(Runnable task) { + this.task = task; + } + + @Override + public void run() { + task.run(); + } + } } diff --git a/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java new file mode 100644 index 0000000000..d96db3fcb0 --- /dev/null +++ b/common/src/test/java/io/netty/util/concurrent/UnorderedThreadPoolEventExecutorTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; + +public class UnorderedThreadPoolEventExecutorTest { + + // See https://github.com/netty/netty/issues/6507 + @Test + public void testNotEndlessExecute() throws Exception { + UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1); + + try { + final CountDownLatch latch = new CountDownLatch(3); + Runnable task = new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }; + executor.execute(task); + Future future = executor.submit(task).addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + latch.countDown(); + } + }); + latch.await(); + future.syncUninterruptibly(); + + // Now just check if the queue stays empty multiple times. This is needed as the submit to execute(...) + // by DefaultPromise may happen in an async fashion + for (int i = 0; i < 10000; i++) { + Assert.assertTrue(executor.getQueue().isEmpty()); + } + } finally { + executor.shutdownGracefully(); + } + } +}