diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index e22db961ea..0fbcc28a05 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -15,9 +15,11 @@ */ package io.netty.util.concurrent; +import io.netty.util.internal.DefaultPriorityQueue; import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.PriorityQueue; -import java.util.PriorityQueue; +import java.util.Comparator; import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.Executors; @@ -28,7 +30,15 @@ import java.util.concurrent.TimeUnit; */ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { - Queue> scheduledTaskQueue; + private static final Comparator> SCHEDULED_FUTURE_TASK_COMPARATOR = + new Comparator>() { + @Override + public int compare(ScheduledFutureTask o1, ScheduledFutureTask o2) { + return o1.compareTo(o2); + } + }; + + PriorityQueue> scheduledTaskQueue; protected AbstractScheduledEventExecutor() { } @@ -41,14 +51,17 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut return ScheduledFutureTask.nanoTime(); } - Queue> scheduledTaskQueue() { + PriorityQueue> scheduledTaskQueue() { if (scheduledTaskQueue == null) { - scheduledTaskQueue = new PriorityQueue>(); + scheduledTaskQueue = new DefaultPriorityQueue>( + SCHEDULED_FUTURE_TASK_COMPARATOR, + // Use same initial capacity as java.util.PriorityQueue + 11); } return scheduledTaskQueue; } - private static boolean isNullOrEmpty(Queue> queue) { + private static boolean isNullOrEmpty(Queue> queue) { return queue == null || queue.isEmpty(); } @@ -59,7 +72,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut */ protected void cancelScheduledTasks() { assert inEventLoop(); - Queue> scheduledTaskQueue = this.scheduledTaskQueue; + PriorityQueue> scheduledTaskQueue = this.scheduledTaskQueue; if (isNullOrEmpty(scheduledTaskQueue)) { return; } @@ -71,7 +84,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut task.cancelWithoutRemove(false); } - scheduledTaskQueue.clear(); + scheduledTaskQueue.clearIgnoringIndexes(); } /** @@ -205,7 +218,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut final void removeScheduled(final ScheduledFutureTask task) { if (inEventLoop()) { - scheduledTaskQueue().remove(task); + scheduledTaskQueue().removeTyped(task); } else { execute(new Runnable() { @Override diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index a7a3533d34..05cba6b83a 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -16,6 +16,9 @@ package io.netty.util.concurrent; +import io.netty.util.internal.DefaultPriorityQueue; +import io.netty.util.internal.PriorityQueueNode; + import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; @@ -23,7 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") -final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture { +final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture, PriorityQueueNode { private static final AtomicLong nextTaskId = new AtomicLong(); private static final long START_TIME = System.nanoTime(); @@ -40,6 +43,8 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ private final long periodNanos; + private int queueIndex = INDEX_NOT_IN_QUEUE; + ScheduledFutureTask( AbstractScheduledEventExecutor executor, Runnable runnable, V result, long nanoTime) { @@ -172,4 +177,14 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu .append(periodNanos) .append(')'); } + + @Override + public int priorityQueueIndex(DefaultPriorityQueue queue) { + return queueIndex; + } + + @Override + public void priorityQueueIndex(DefaultPriorityQueue queue, int i) { + queueIndex = i; + } } diff --git a/common/src/main/java/io/netty/util/internal/DefaultPriorityQueue.java b/common/src/main/java/io/netty/util/internal/DefaultPriorityQueue.java index 0a961bb1b9..461b944f1b 100644 --- a/common/src/main/java/io/netty/util/internal/DefaultPriorityQueue.java +++ b/common/src/main/java/io/netty/util/internal/DefaultPriorityQueue.java @@ -77,6 +77,11 @@ public final class DefaultPriorityQueue extends Abs size = 0; } + @Override + public void clearIgnoringIndexes() { + size = 0; + } + @Override public boolean offer(T e) { if (e.priorityQueueIndex(this) != INDEX_NOT_IN_QUEUE) { diff --git a/common/src/main/java/io/netty/util/internal/EmptyPriorityQueue.java b/common/src/main/java/io/netty/util/internal/EmptyPriorityQueue.java index 789dd2d3d3..983cb1fcda 100644 --- a/common/src/main/java/io/netty/util/internal/EmptyPriorityQueue.java +++ b/common/src/main/java/io/netty/util/internal/EmptyPriorityQueue.java @@ -115,6 +115,10 @@ public final class EmptyPriorityQueue implements PriorityQueue { public void clear() { } + @Override + public void clearIgnoringIndexes() { + } + @Override public boolean equals(Object o) { return o instanceof PriorityQueue && ((PriorityQueue) o).isEmpty(); diff --git a/common/src/main/java/io/netty/util/internal/PriorityQueue.java b/common/src/main/java/io/netty/util/internal/PriorityQueue.java index ae052fe2dc..086dd5ac75 100644 --- a/common/src/main/java/io/netty/util/internal/PriorityQueue.java +++ b/common/src/main/java/io/netty/util/internal/PriorityQueue.java @@ -34,4 +34,13 @@ public interface PriorityQueue extends Queue { * @param node An object which is in this queue and the priority may have changed. */ void priorityChanged(T node); + + /** + * Removes all of the elements from this {@link PriorityQueue} without calling + * {@link PriorityQueueNode#priorityQueueIndex(DefaultPriorityQueue)} or explicitly removing references to them to + * allow them to be garbage collected. This should only be used when it is certain that the nodes will not be + * re-inserted into this or any other {@link PriorityQueue} and it is known that the {@link PriorityQueue} itself + * will be garbage collected after this call. + */ + void clearIgnoringIndexes(); } diff --git a/common/src/test/java/io/netty/util/internal/DefaultPriorityQueueTest.java b/common/src/test/java/io/netty/util/internal/DefaultPriorityQueueTest.java index a02752769e..b4adc8e99b 100644 --- a/common/src/test/java/io/netty/util/internal/DefaultPriorityQueueTest.java +++ b/common/src/test/java/io/netty/util/internal/DefaultPriorityQueueTest.java @@ -15,7 +15,9 @@ */ package io.netty.util.internal; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.io.Serializable; import java.util.ArrayList; @@ -107,6 +109,37 @@ public class DefaultPriorityQueueTest { assertSame(c, queue.peek()); } + @Test + public void testClearIgnoringIndexes() { + PriorityQueue queue = new DefaultPriorityQueue(TestElementComparator.INSTANCE, 0); + assertEmptyQueue(queue); + + TestElement a = new TestElement(5); + TestElement b = new TestElement(10); + TestElement c = new TestElement(2); + TestElement d = new TestElement(6); + TestElement e = new TestElement(11); + + assertOffer(queue, a); + assertOffer(queue, b); + assertOffer(queue, c); + assertOffer(queue, d); + + queue.clearIgnoringIndexes(); + assertEmptyQueue(queue); + + // Elements cannot be re-inserted but new ones can. + try { + queue.offer(a); + fail(); + } catch (IllegalArgumentException t) { + // expected + } + + assertOffer(queue, e); + assertSame(e, queue.peek()); + } + @Test public void testRemoval() { testRemoval(false); diff --git a/microbench/src/main/java/io/netty/microbench/concurrent/ScheduledFutureTaskBenchmark.java b/microbench/src/main/java/io/netty/microbench/concurrent/ScheduledFutureTaskBenchmark.java new file mode 100644 index 0000000000..3b8d2bb1a2 --- /dev/null +++ b/microbench/src/main/java/io/netty/microbench/concurrent/ScheduledFutureTaskBenchmark.java @@ -0,0 +1,98 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.microbench.concurrent; + +import io.netty.channel.DefaultEventLoop; +import io.netty.channel.EventLoop; +import io.netty.microbench.util.AbstractMicrobenchmark; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.ScheduledFuture; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +public class ScheduledFutureTaskBenchmark extends AbstractMicrobenchmark { + + static final EventLoop executor = new DefaultEventLoop(); + + @State(Scope.Thread) + public static class FuturesHolder { + + private static final Callable NO_OP = new Callable() { + @Override + public Void call() throws Exception { + return null; + } + }; + + @Param({ "100", "1000", "10000", "100000" }) + int num; + + final List> futures = new ArrayList>(); + + @Setup(Level.Invocation) + public void reset() { + futures.clear(); + executor.submit(new Runnable() { + @Override + public void run() { + for (int i = 1; i <= num; i++) { + futures.add(executor.schedule(NO_OP, i, TimeUnit.HOURS)); + } + } + }).syncUninterruptibly(); + } + } + + @TearDown(Level.Trial) + public void stop() throws Exception { + executor.shutdownGracefully().syncUninterruptibly(); + } + + @Benchmark + public Future cancelInOrder(final FuturesHolder futuresHolder) { + return executor.submit(new Runnable() { + @Override + public void run() { + for (int i = 0; i < futuresHolder.num; i++) { + futuresHolder.futures.get(i).cancel(false); + } + } + }).syncUninterruptibly(); + } + + @Benchmark + public Future cancelInReverseOrder(final FuturesHolder futuresHolder) { + return executor.submit(new Runnable() { + @Override + public void run() { + for (int i = futuresHolder.num - 1; i >= 0; i--) { + futuresHolder.futures.get(i).cancel(false); + } + } + }).syncUninterruptibly(); + } +}