UnorderedThreadPoolEventExecutor consumes 100% CPU when idle

Motivation:

When UnorderedThreadPoolEventExecutor.execute / submit etc is called it will consume up to 100 % CPU even after the task was executed.

Modifications:

Add a special wrapper which we will be used in execute(...) to wrap the submitted Runnable. This is needed as  ScheduledThreadPoolExecutor.execute(...) will delegate to submit(...) which will then use decorateTask(...). The problem with this is that decorateTask(...) needs to ensure we only do our own decoration if we not call from execute(...) as otherwise we may end up creating an endless loop because DefaultPromise will call  EventExecutor.execute(...) when notify the listeners of the promise.

Result:

Fixes [#6507].
This commit is contained in:
Norman Maurer 2017-03-08 21:04:45 +01:00
parent cfebaa36c0
commit c6a3cae269
2 changed files with 87 additions and 1 deletions

View File

@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
/**
* {@link EventExecutor} implementation which makes no guarantees about the ordering of task execution that
* are submitted because there may be multiple threads executing these tasks.
@ -158,7 +160,8 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
return new RunnableScheduledFutureTask<V>(this, runnable, task);
return runnable instanceof NonNotifyRunnable ?
task : new RunnableScheduledFutureTask<V>(this, runnable, task);
}
@Override
@ -201,6 +204,11 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
return (Future<T>) super.submit(task);
}
@Override
public void execute(Runnable command) {
super.schedule(new NonNotifyRunnable(command), 0, NANOSECONDS);
}
private static final class RunnableScheduledFutureTask<V> extends PromiseTask<V>
implements RunnableScheduledFuture<V>, ScheduledFuture<V> {
private final RunnableScheduledFuture<V> future;
@ -248,4 +256,25 @@ public final class UnorderedThreadPoolEventExecutor extends ScheduledThreadPoolE
return future.compareTo(o);
}
}
// This is a special wrapper which we will be used in execute(...) to wrap the submitted Runnable. This is needed as
// ScheduledThreadPoolExecutor.execute(...) will delegate to submit(...) which will then use decorateTask(...).
// The problem with this is that decorateTask(...) needs to ensure we only do our own decoration if we not call
// from execute(...) as otherwise we may end up creating an endless loop because DefaultPromise will call
// EventExecutor.execute(...) when notify the listeners of the promise.
//
// See https://github.com/netty/netty/issues/6507
private static final class NonNotifyRunnable implements Runnable {
private final Runnable task;
NonNotifyRunnable(Runnable task) {
this.task = task;
}
@Override
public void run() {
task.run();
}
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.concurrent;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
public class UnorderedThreadPoolEventExecutorTest {
// See https://github.com/netty/netty/issues/6507
@Test
public void testNotEndlessExecute() throws Exception {
UnorderedThreadPoolEventExecutor executor = new UnorderedThreadPoolEventExecutor(1);
try {
final CountDownLatch latch = new CountDownLatch(3);
Runnable task = new Runnable() {
@Override
public void run() {
latch.countDown();
}
};
executor.execute(task);
Future<?> future = executor.submit(task).addListener(new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
latch.countDown();
}
});
latch.await();
future.syncUninterruptibly();
// Now just check if the queue stays empty multiple times. This is needed as the submit to execute(...)
// by DefaultPromise may happen in an async fashion
for (int i = 0; i < 10000; i++) {
Assert.assertTrue(executor.getQueue().isEmpty());
}
} finally {
executor.shutdownGracefully();
}
}
}