From 2791f0fefac82663ca09dc8aa5dda024152c92f8 Mon Sep 17 00:00:00 2001 From: Nick Hill Date: Tue, 24 Sep 2019 22:34:25 -0700 Subject: [PATCH] Avoid use of global AtomicLong for ScheduledFutureTask ids (#9599) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation Currently a static AtomicLong is used to allocate a unique id whenever a task is scheduled to any event loop. This could be a source of contention if delayed tasks are scheduled at a high frequency and can be easily avoided by having a non-volatile id counter per queue. Modifications - Replace static AtomicLong ScheduledFutureTask#nextTaskId with a long field in AbstractScheduledExecutorService - Set ScheduledFutureTask#id based on this when adding the task to the queue (in event loop) instead of at construction time - Add simple benchmark Result Less contention / cache-miss possibility when scheduling future tasks Before: Benchmark (num) Mode Cnt Score Error Units scheduleLots 100000 thrpt 20 346.008 ± 21.931 ops/s Benchmark (num) Mode Cnt Score Error Units scheduleLots 100000 thrpt 20 654.824 ± 22.064 ops/s --- .../AbstractScheduledEventExecutor.java | 6 +- .../util/concurrent/ScheduledFutureTask.java | 15 ++-- .../ScheduleFutureTaskBenchmark.java | 88 +++++++++++++++++++ .../netty/util/concurrent/package-info.java | 19 ++++ 4 files changed, 120 insertions(+), 8 deletions(-) create mode 100644 microbench/src/main/java/io/netty/util/concurrent/ScheduleFutureTaskBenchmark.java create mode 100644 microbench/src/main/java/io/netty/util/concurrent/package-info.java diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 656c57847b..a44ae521dd 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -39,6 +39,8 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut PriorityQueue> scheduledTaskQueue; + long nextTaskId; + protected AbstractScheduledEventExecutor() { } @@ -241,12 +243,12 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut private ScheduledFuture schedule(final ScheduledFutureTask task) { if (inEventLoop()) { - scheduledTaskQueue().add(task); + scheduledTaskQueue().add(task.setId(nextTaskId++)); } else { executeScheduledRunnable(new Runnable() { @Override public void run() { - scheduledTaskQueue().add(task); + scheduledTaskQueue().add(task.setId(nextTaskId++)); } }, true, task.deadlineNanos()); } diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index 1d1403e3f0..ac77dc5d84 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -23,11 +23,9 @@ import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture, PriorityQueueNode { - private static final AtomicLong nextTaskId = new AtomicLong(); private static final long START_TIME = System.nanoTime(); static long nanoTime() { @@ -44,7 +42,9 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu return START_TIME; } - private final long id = nextTaskId.getAndIncrement(); + // set once when added to priority queue + private long id; + private long deadlineNanos; /* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ private final long periodNanos; @@ -79,6 +79,11 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu periodNanos = 0; } + ScheduledFutureTask setId(long id) { + this.id = id; + return this; + } + @Override protected EventExecutor executor() { return super.executor(); @@ -182,9 +187,7 @@ final class ScheduledFutureTask extends PromiseTask implements ScheduledFu StringBuilder buf = super.toStringBuilder(); buf.setCharAt(buf.length() - 1, ','); - return buf.append(" id: ") - .append(id) - .append(", deadline: ") + return buf.append(" deadline: ") .append(deadlineNanos) .append(", period: ") .append(periodNanos) diff --git a/microbench/src/main/java/io/netty/util/concurrent/ScheduleFutureTaskBenchmark.java b/microbench/src/main/java/io/netty/util/concurrent/ScheduleFutureTaskBenchmark.java new file mode 100644 index 0000000000..72bc71ea22 --- /dev/null +++ b/microbench/src/main/java/io/netty/util/concurrent/ScheduleFutureTaskBenchmark.java @@ -0,0 +1,88 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import io.netty.channel.DefaultEventLoop; +import io.netty.microbench.util.AbstractMicrobenchmark; + +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) +@State(Scope.Benchmark) +public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark { + + static final Callable NO_OP = new Callable() { + @Override + public Void call() throws Exception { + return null; + } + }; + + @State(Scope.Thread) + public static class ThreadState { + + @Param({ "100000" }) + int num; + + AbstractScheduledEventExecutor eventLoop; + + @Setup(Level.Trial) + public void reset() { + eventLoop = new DefaultEventLoop(); + } + + @Setup(Level.Invocation) + public void clear() { + eventLoop.submit(new Runnable() { + @Override + public void run() { + eventLoop.cancelScheduledTasks(); + } + }).awaitUninterruptibly(); + } + + @TearDown(Level.Trial) + public void shutdown() { + eventLoop.shutdownGracefully().awaitUninterruptibly(); + } + } + + @Benchmark + @Threads(3) + public Future scheduleLots(final ThreadState threadState) { + return threadState.eventLoop.submit(new Runnable() { + @Override + public void run() { + for (int i = 1; i <= threadState.num; i++) { + threadState.eventLoop.schedule(NO_OP, i, TimeUnit.HOURS); + } + } + }).syncUninterruptibly(); + } +} diff --git a/microbench/src/main/java/io/netty/util/concurrent/package-info.java b/microbench/src/main/java/io/netty/util/concurrent/package-info.java new file mode 100644 index 0000000000..5059d12273 --- /dev/null +++ b/microbench/src/main/java/io/netty/util/concurrent/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/** + * Benchmarks for {@link io.netty.util.concurrent}. + */ +package io.netty.util.concurrent;