Use Netty's DefaultPriorityQueue instead of JDK's PriorityQueue for scheduled tasks

Motivation:

`AbstractScheduledEventExecutor` uses a standard `java.util.PriorityQueue` to keep track of task deadlines. `ScheduledFuture.cancel` removes tasks from this `PriorityQueue`. Unfortunately, `PriorityQueue.remove` has `O(n)` performance since it must search for the item in the entire queue before removing it. This is fast when the future is at the front of the queue (e.g., already triggered) but not when it's randomly located in the queue.

Many servers will use `ScheduledFuture.cancel` on all requests, e.g., to manage a request timeout. As these cancellations will be happen in arbitrary order, when there are many scheduled futures, `PriorityQueue.remove` is a bottleneck and greatly hurts performance with many concurrent requests (>10K).

Modification:

Use netty's `DefaultPriorityQueue` for scheduling futures instead of the JDK. `DefaultPriorityQueue` is almost identical to the JDK version except it is able to remove futures without searching for them in the queue. This means `DefaultPriorityQueue.remove` has `O(log n)` performance.

Result:

Before - cancelling futures has varying performance, capped at `O(n)`
After - cancelling futures has stable performance, capped at `O(log n)`

Benchmark results

After - cancelling in order and in reverse order have similar performance within `O(log n)` bounds
```
Benchmark                                           (num)   Mode  Cnt       Score      Error  Units
ScheduledFutureTaskBenchmark.cancelInOrder            100  thrpt   20  137779.616 ± 7709.751  ops/s
ScheduledFutureTaskBenchmark.cancelInOrder           1000  thrpt   20   11049.448 ±  385.832  ops/s
ScheduledFutureTaskBenchmark.cancelInOrder          10000  thrpt   20     943.294 ±   12.391  ops/s
ScheduledFutureTaskBenchmark.cancelInOrder         100000  thrpt   20      64.210 ±    1.824  ops/s
ScheduledFutureTaskBenchmark.cancelInReverseOrder     100  thrpt   20  167531.096 ± 9187.865  ops/s
ScheduledFutureTaskBenchmark.cancelInReverseOrder    1000  thrpt   20   33019.786 ± 4737.770  ops/s
ScheduledFutureTaskBenchmark.cancelInReverseOrder   10000  thrpt   20    2976.955 ±  248.555  ops/s
ScheduledFutureTaskBenchmark.cancelInReverseOrder  100000  thrpt   20     362.654 ±   45.716  ops/s
```

Before - cancelling in order and in reverse order have significantly different performance at higher queue size, orders of magnitude worse than the new implementation.
```
Benchmark                                           (num)   Mode  Cnt       Score       Error  Units
ScheduledFutureTaskBenchmark.cancelInOrder            100  thrpt   20  139968.586 ± 12951.333  ops/s
ScheduledFutureTaskBenchmark.cancelInOrder           1000  thrpt   20   12274.420 ±   337.800  ops/s
ScheduledFutureTaskBenchmark.cancelInOrder          10000  thrpt   20     958.168 ±    15.350  ops/s
ScheduledFutureTaskBenchmark.cancelInOrder         100000  thrpt   20      53.381 ±    13.981  ops/s
ScheduledFutureTaskBenchmark.cancelInReverseOrder     100  thrpt   20  123918.829 ±  3642.517  ops/s
ScheduledFutureTaskBenchmark.cancelInReverseOrder    1000  thrpt   20    5099.810 ±   206.992  ops/s
ScheduledFutureTaskBenchmark.cancelInReverseOrder   10000  thrpt   20      72.335 ±     0.443  ops/s
ScheduledFutureTaskBenchmark.cancelInReverseOrder  100000  thrpt   20       0.743 ±     0.003  ops/s
```
This commit is contained in:
Anuraag Agrawal 2017-11-11 16:09:32 +09:00 committed by Scott Mitchell
parent 2adb8bd80f
commit 1f1a60ae7d
7 changed files with 186 additions and 9 deletions

View File

@ -15,9 +15,11 @@
*/
package io.netty.util.concurrent;
import io.netty.util.internal.DefaultPriorityQueue;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PriorityQueue;
import java.util.PriorityQueue;
import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
@ -28,7 +30,15 @@ import java.util.concurrent.TimeUnit;
*/
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
new Comparator<ScheduledFutureTask<?>>() {
@Override
public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
return o1.compareTo(o2);
}
};
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
protected AbstractScheduledEventExecutor() {
}
@ -41,14 +51,17 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
return ScheduledFutureTask.nanoTime();
}
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
// Use same initial capacity as java.util.PriorityQueue
11);
}
return scheduledTaskQueue;
}
private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
private static boolean isNullOrEmpty(Queue<ScheduledFutureTask<?>> queue) {
return queue == null || queue.isEmpty();
}
@ -59,7 +72,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
*/
protected void cancelScheduledTasks() {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
if (isNullOrEmpty(scheduledTaskQueue)) {
return;
}
@ -71,7 +84,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
task.cancelWithoutRemove(false);
}
scheduledTaskQueue.clear();
scheduledTaskQueue.clearIgnoringIndexes();
}
/**
@ -205,7 +218,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
final void removeScheduled(final ScheduledFutureTask<?> task) {
if (inEventLoop()) {
scheduledTaskQueue().remove(task);
scheduledTaskQueue().removeTyped(task);
} else {
execute(new Runnable() {
@Override

View File

@ -16,6 +16,9 @@
package io.netty.util.concurrent;
import io.netty.util.internal.DefaultPriorityQueue;
import io.netty.util.internal.PriorityQueueNode;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
@ -23,7 +26,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
private static final AtomicLong nextTaskId = new AtomicLong();
private static final long START_TIME = System.nanoTime();
@ -40,6 +43,8 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
private int queueIndex = INDEX_NOT_IN_QUEUE;
ScheduledFutureTask(
AbstractScheduledEventExecutor executor,
Runnable runnable, V result, long nanoTime) {
@ -172,4 +177,14 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
.append(periodNanos)
.append(')');
}
@Override
public int priorityQueueIndex(DefaultPriorityQueue<?> queue) {
return queueIndex;
}
@Override
public void priorityQueueIndex(DefaultPriorityQueue<?> queue, int i) {
queueIndex = i;
}
}

View File

@ -77,6 +77,11 @@ public final class DefaultPriorityQueue<T extends PriorityQueueNode> extends Abs
size = 0;
}
@Override
public void clearIgnoringIndexes() {
size = 0;
}
@Override
public boolean offer(T e) {
if (e.priorityQueueIndex(this) != INDEX_NOT_IN_QUEUE) {

View File

@ -115,6 +115,10 @@ public final class EmptyPriorityQueue<T> implements PriorityQueue<T> {
public void clear() {
}
@Override
public void clearIgnoringIndexes() {
}
@Override
public boolean equals(Object o) {
return o instanceof PriorityQueue && ((PriorityQueue) o).isEmpty();

View File

@ -34,4 +34,13 @@ public interface PriorityQueue<T> extends Queue<T> {
* @param node An object which is in this queue and the priority may have changed.
*/
void priorityChanged(T node);
/**
* Removes all of the elements from this {@link PriorityQueue} without calling
* {@link PriorityQueueNode#priorityQueueIndex(DefaultPriorityQueue)} or explicitly removing references to them to
* allow them to be garbage collected. This should only be used when it is certain that the nodes will not be
* re-inserted into this or any other {@link PriorityQueue} and it is known that the {@link PriorityQueue} itself
* will be garbage collected after this call.
*/
void clearIgnoringIndexes();
}

View File

@ -15,7 +15,9 @@
*/
package io.netty.util.internal;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.Serializable;
import java.util.ArrayList;
@ -107,6 +109,37 @@ public class DefaultPriorityQueueTest {
assertSame(c, queue.peek());
}
@Test
public void testClearIgnoringIndexes() {
PriorityQueue<TestElement> queue = new DefaultPriorityQueue<TestElement>(TestElementComparator.INSTANCE, 0);
assertEmptyQueue(queue);
TestElement a = new TestElement(5);
TestElement b = new TestElement(10);
TestElement c = new TestElement(2);
TestElement d = new TestElement(6);
TestElement e = new TestElement(11);
assertOffer(queue, a);
assertOffer(queue, b);
assertOffer(queue, c);
assertOffer(queue, d);
queue.clearIgnoringIndexes();
assertEmptyQueue(queue);
// Elements cannot be re-inserted but new ones can.
try {
queue.offer(a);
fail();
} catch (IllegalArgumentException t) {
// expected
}
assertOffer(queue, e);
assertSame(e, queue.peek());
}
@Test
public void testRemoval() {
testRemoval(false);

View File

@ -0,0 +1,98 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.microbench.concurrent;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import io.netty.microbench.util.AbstractMicrobenchmark;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ScheduledFuture;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
public class ScheduledFutureTaskBenchmark extends AbstractMicrobenchmark {
static final EventLoop executor = new DefaultEventLoop();
@State(Scope.Thread)
public static class FuturesHolder {
private static final Callable<Void> NO_OP = new Callable<Void>() {
@Override
public Void call() throws Exception {
return null;
}
};
@Param({ "100", "1000", "10000", "100000" })
int num;
final List<ScheduledFuture<Void>> futures = new ArrayList<ScheduledFuture<Void>>();
@Setup(Level.Invocation)
public void reset() {
futures.clear();
executor.submit(new Runnable() {
@Override
public void run() {
for (int i = 1; i <= num; i++) {
futures.add(executor.schedule(NO_OP, i, TimeUnit.HOURS));
}
}
}).syncUninterruptibly();
}
}
@TearDown(Level.Trial)
public void stop() throws Exception {
executor.shutdownGracefully().syncUninterruptibly();
}
@Benchmark
public Future<?> cancelInOrder(final FuturesHolder futuresHolder) {
return executor.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < futuresHolder.num; i++) {
futuresHolder.futures.get(i).cancel(false);
}
}
}).syncUninterruptibly();
}
@Benchmark
public Future<?> cancelInReverseOrder(final FuturesHolder futuresHolder) {
return executor.submit(new Runnable() {
@Override
public void run() {
for (int i = futuresHolder.num - 1; i >= 0; i--) {
futuresHolder.futures.get(i).cancel(false);
}
}
}).syncUninterruptibly();
}
}