From 1f1a60ae7dfe458d94d64886d01216b75b0884e1 Mon Sep 17 00:00:00 2001 From: Anuraag Agrawal Date: Sat, 11 Nov 2017 16:09:32 +0900 Subject: [PATCH] Use Netty's DefaultPriorityQueue instead of JDK's PriorityQueue for scheduled tasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: `AbstractScheduledEventExecutor` uses a standard `java.util.PriorityQueue` to keep track of task deadlines. `ScheduledFuture.cancel` removes tasks from this `PriorityQueue`. Unfortunately, `PriorityQueue.remove` has `O(n)` performance since it must search for the item in the entire queue before removing it. This is fast when the future is at the front of the queue (e.g., already triggered) but not when it's randomly located in the queue. Many servers will use `ScheduledFuture.cancel` on all requests, e.g., to manage a request timeout. As these cancellations will be happen in arbitrary order, when there are many scheduled futures, `PriorityQueue.remove` is a bottleneck and greatly hurts performance with many concurrent requests (>10K). Modification: Use netty's `DefaultPriorityQueue` for scheduling futures instead of the JDK. `DefaultPriorityQueue` is almost identical to the JDK version except it is able to remove futures without searching for them in the queue. This means `DefaultPriorityQueue.remove` has `O(log n)` performance. Result: Before - cancelling futures has varying performance, capped at `O(n)` After - cancelling futures has stable performance, capped at `O(log n)` Benchmark results After - cancelling in order and in reverse order have similar performance within `O(log n)` bounds ``` Benchmark (num) Mode Cnt Score Error Units ScheduledFutureTaskBenchmark.cancelInOrder 100 thrpt 20 137779.616 ± 7709.751 ops/s ScheduledFutureTaskBenchmark.cancelInOrder 1000 thrpt 20 11049.448 ± 385.832 ops/s ScheduledFutureTaskBenchmark.cancelInOrder 10000 thrpt 20 943.294 ± 12.391 ops/s ScheduledFutureTaskBenchmark.cancelInOrder 100000 thrpt 20 64.210 ± 1.824 ops/s ScheduledFutureTaskBenchmark.cancelInReverseOrder 100 thrpt 20 167531.096 ± 9187.865 ops/s ScheduledFutureTaskBenchmark.cancelInReverseOrder 1000 thrpt 20 33019.786 ± 4737.770 ops/s ScheduledFutureTaskBenchmark.cancelInReverseOrder 10000 thrpt 20 2976.955 ± 248.555 ops/s ScheduledFutureTaskBenchmark.cancelInReverseOrder 100000 thrpt 20 362.654 ± 45.716 ops/s ``` Before - cancelling in order and in reverse order have significantly different performance at higher queue size, orders of magnitude worse than the new implementation. ``` Benchmark (num) Mode Cnt Score Error Units ScheduledFutureTaskBenchmark.cancelInOrder 100 thrpt 20 139968.586 ± 12951.333 ops/s ScheduledFutureTaskBenchmark.cancelInOrder 1000 thrpt 20 12274.420 ± 337.800 ops/s ScheduledFutureTaskBenchmark.cancelInOrder 10000 thrpt 20 958.168 ± 15.350 ops/s ScheduledFutureTaskBenchmark.cancelInOrder 100000 thrpt 20 53.381 ± 13.981 ops/s ScheduledFutureTaskBenchmark.cancelInReverseOrder 100 thrpt 20 123918.829 ± 3642.517 ops/s ScheduledFutureTaskBenchmark.cancelInReverseOrder 1000 thrpt 20 5099.810 ± 206.992 ops/s ScheduledFutureTaskBenchmark.cancelInReverseOrder 10000 thrpt 20 72.335 ± 0.443 ops/s ScheduledFutureTaskBenchmark.cancelInReverseOrder 100000 thrpt 20 0.743 ± 0.003 ops/s ``` --- .../AbstractScheduledEventExecutor.java | 29 ++++-- .../util/concurrent/ScheduledFutureTask.java | 17 +++- .../util/internal/DefaultPriorityQueue.java | 5 + .../util/internal/EmptyPriorityQueue.java | 4 + .../io/netty/util/internal/PriorityQueue.java | 9 ++ .../internal/DefaultPriorityQueueTest.java | 33 +++++++ .../ScheduledFutureTaskBenchmark.java | 98 +++++++++++++++++++ 7 files changed, 186 insertions(+), 9 deletions(-) create mode 100644 microbench/src/main/java/io/netty/microbench/concurrent/ScheduledFutureTaskBenchmark.java diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index e22db961ea..0fbcc28a05 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -15,9 +15,11 @@ */ package io.netty.util.concurrent; +import io.netty.util.internal.DefaultPriorityQueue; import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.PriorityQueue; -import java.util.PriorityQueue; +import java.util.Comparator; import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.Executors; @@ -28,7 +30,15 @@ import java.util.concurrent.TimeUnit; */ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { - Queue> scheduledTaskQueue; + private static final Comparator> SCHEDULED_FUTURE_TASK_COMPARATOR = + new Comparator>() { + @Override + public int compare(ScheduledFutureTask o1, ScheduledFutureTask o2) { + return o1.compareTo(o2); + } + }; + + PriorityQueue> scheduledTaskQueue; protected AbstractScheduledEventExecutor() { } @@ -41,14 +51,17 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut return ScheduledFutureTask.nanoTime(); } - Queue> scheduledTaskQueue() { + PriorityQueue> scheduledTaskQueue() { if (scheduledTaskQueue == null) { - scheduledTaskQueue = new PriorityQueue>(); + scheduledTaskQueue = new DefaultPriorityQueue>( + SCHEDULED_FUTURE_TASK_COMPARATOR, + // Use same initial capacity as java.util.PriorityQueue + 11); } return scheduledTaskQueue; } - private static boolean isNullOrEmpty(Queue> queue) { + private static boolean isNullOrEmpty(Queue> queue) { return queue == null || queue.isEmpty(); } @@ -59,7 +72,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut */ protected void cancelScheduledTasks() { assert inEventLoop(); - Queue> scheduledTaskQueue = this.scheduledTaskQueue; + PriorityQueue> scheduledTaskQueue = this.scheduledTaskQueue; if (isNullOrEmpty(scheduledTaskQueue)) { return; } @@ -71,7 +84,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut task.cancelWithoutRemove(false); } - scheduledTaskQueue.clear(); + scheduledTaskQueue.clearIgnoringIndexes(); } /** @@ -205,7 +218,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut final void removeScheduled(final ScheduledFutureTask task) { if (inEventLoop()) { - scheduledTaskQueue().remove(task); + scheduledTaskQueue().removeTyped(task); } else { execute(new Runnable() { @Override diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index a7a3533d34..05cba6b83a 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -16,6 +16,9 @@ package io.netty.util.concurrent; +import io.netty.util.internal.DefaultPriorityQueue; +import io.netty.util.internal.PriorityQueueNode; + import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; @@ -23,7 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") -final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture { +final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture, PriorityQueueNode { private static final AtomicLong nextTaskId = new AtomicLong(); private static final long START_TIME = System.nanoTime(); @@ -40,6 +43,8 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ private final long periodNanos; + private int queueIndex = INDEX_NOT_IN_QUEUE; + ScheduledFutureTask( AbstractScheduledEventExecutor executor, Runnable runnable, V result, long nanoTime) { @@ -172,4 +177,14 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu .append(periodNanos) .append(')'); } + + @Override + public int priorityQueueIndex(DefaultPriorityQueue queue) { + return queueIndex; + } + + @Override + public void priorityQueueIndex(DefaultPriorityQueue queue, int i) { + queueIndex = i; + } } diff --git a/common/src/main/java/io/netty/util/internal/DefaultPriorityQueue.java b/common/src/main/java/io/netty/util/internal/DefaultPriorityQueue.java index 0a961bb1b9..461b944f1b 100644 --- a/common/src/main/java/io/netty/util/internal/DefaultPriorityQueue.java +++ b/common/src/main/java/io/netty/util/internal/DefaultPriorityQueue.java @@ -77,6 +77,11 @@ public final class DefaultPriorityQueue extends Abs size = 0; } + @Override + public void clearIgnoringIndexes() { + size = 0; + } + @Override public boolean offer(T e) { if (e.priorityQueueIndex(this) != INDEX_NOT_IN_QUEUE) { diff --git a/common/src/main/java/io/netty/util/internal/EmptyPriorityQueue.java b/common/src/main/java/io/netty/util/internal/EmptyPriorityQueue.java index 789dd2d3d3..983cb1fcda 100644 --- a/common/src/main/java/io/netty/util/internal/EmptyPriorityQueue.java +++ b/common/src/main/java/io/netty/util/internal/EmptyPriorityQueue.java @@ -115,6 +115,10 @@ public final class EmptyPriorityQueue implements PriorityQueue { public void clear() { } + @Override + public void clearIgnoringIndexes() { + } + @Override public boolean equals(Object o) { return o instanceof PriorityQueue && ((PriorityQueue) o).isEmpty(); diff --git a/common/src/main/java/io/netty/util/internal/PriorityQueue.java b/common/src/main/java/io/netty/util/internal/PriorityQueue.java index ae052fe2dc..086dd5ac75 100644 --- a/common/src/main/java/io/netty/util/internal/PriorityQueue.java +++ b/common/src/main/java/io/netty/util/internal/PriorityQueue.java @@ -34,4 +34,13 @@ public interface PriorityQueue extends Queue { * @param node An object which is in this queue and the priority may have changed. */ void priorityChanged(T node); + + /** + * Removes all of the elements from this {@link PriorityQueue} without calling + * {@link PriorityQueueNode#priorityQueueIndex(DefaultPriorityQueue)} or explicitly removing references to them to + * allow them to be garbage collected. This should only be used when it is certain that the nodes will not be + * re-inserted into this or any other {@link PriorityQueue} and it is known that the {@link PriorityQueue} itself + * will be garbage collected after this call. + */ + void clearIgnoringIndexes(); } diff --git a/common/src/test/java/io/netty/util/internal/DefaultPriorityQueueTest.java b/common/src/test/java/io/netty/util/internal/DefaultPriorityQueueTest.java index a02752769e..b4adc8e99b 100644 --- a/common/src/test/java/io/netty/util/internal/DefaultPriorityQueueTest.java +++ b/common/src/test/java/io/netty/util/internal/DefaultPriorityQueueTest.java @@ -15,7 +15,9 @@ */ package io.netty.util.internal; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.Serializable; import java.util.ArrayList; @@ -107,6 +109,37 @@ public class DefaultPriorityQueueTest { assertSame(c, queue.peek()); } + @Test + public void testClearIgnoringIndexes() { + PriorityQueue queue = new DefaultPriorityQueue(TestElementComparator.INSTANCE, 0); + assertEmptyQueue(queue); + + TestElement a = new TestElement(5); + TestElement b = new TestElement(10); + TestElement c = new TestElement(2); + TestElement d = new TestElement(6); + TestElement e = new TestElement(11); + + assertOffer(queue, a); + assertOffer(queue, b); + assertOffer(queue, c); + assertOffer(queue, d); + + queue.clearIgnoringIndexes(); + assertEmptyQueue(queue); + + // Elements cannot be re-inserted but new ones can. + try { + queue.offer(a); + fail(); + } catch (IllegalArgumentException t) { + // expected + } + + assertOffer(queue, e); + assertSame(e, queue.peek()); + } + @Test public void testRemoval() { testRemoval(false); diff --git a/microbench/src/main/java/io/netty/microbench/concurrent/ScheduledFutureTaskBenchmark.java b/microbench/src/main/java/io/netty/microbench/concurrent/ScheduledFutureTaskBenchmark.java new file mode 100644 index 0000000000..3b8d2bb1a2 --- /dev/null +++ b/microbench/src/main/java/io/netty/microbench/concurrent/ScheduledFutureTaskBenchmark.java @@ -0,0 +1,98 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.microbench.concurrent; + +import io.netty.channel.DefaultEventLoop; +import io.netty.channel.EventLoop; +import io.netty.microbench.util.AbstractMicrobenchmark; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ScheduledFuture; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +public class ScheduledFutureTaskBenchmark extends AbstractMicrobenchmark { + + static final EventLoop executor = new DefaultEventLoop(); + + @State(Scope.Thread) + public static class FuturesHolder { + + private static final Callable NO_OP = new Callable() { + @Override + public Void call() throws Exception { + return null; + } + }; + + @Param({ "100", "1000", "10000", "100000" }) + int num; + + final List> futures = new ArrayList>(); + + @Setup(Level.Invocation) + public void reset() { + futures.clear(); + executor.submit(new Runnable() { + @Override + public void run() { + for (int i = 1; i <= num; i++) { + futures.add(executor.schedule(NO_OP, i, TimeUnit.HOURS)); + } + } + }).syncUninterruptibly(); + } + } + + @TearDown(Level.Trial) + public void stop() throws Exception { + executor.shutdownGracefully().syncUninterruptibly(); + } + + @Benchmark + public Future cancelInOrder(final FuturesHolder futuresHolder) { + return executor.submit(new Runnable() { + @Override + public void run() { + for (int i = 0; i < futuresHolder.num; i++) { + futuresHolder.futures.get(i).cancel(false); + } + } + }).syncUninterruptibly(); + } + + @Benchmark + public Future cancelInReverseOrder(final FuturesHolder futuresHolder) { + return executor.submit(new Runnable() { + @Override + public void run() { + for (int i = futuresHolder.num - 1; i >= 0; i--) { + futuresHolder.futures.get(i).cancel(false); + } + } + }).syncUninterruptibly(); + } +}