Avoid use of global AtomicLong for ScheduledFutureTask ids (#9599)
Motivation Currently a static AtomicLong is used to allocate a unique id whenever a task is scheduled to any event loop. This could be a source of contention if delayed tasks are scheduled at a high frequency and can be easily avoided by having a non-volatile id counter per queue. Modifications - Replace static AtomicLong ScheduledFutureTask#nextTaskId with a long field in AbstractScheduledExecutorService - Set ScheduledFutureTask#id based on this when adding the task to the queue (in event loop) instead of at construction time - Add simple benchmark Result Less contention / cache-miss possibility when scheduling future tasks Before: Benchmark (num) Mode Cnt Score Error Units scheduleLots 100000 thrpt 20 346.008 ± 21.931 ops/s Benchmark (num) Mode Cnt Score Error Units scheduleLots 100000 thrpt 20 654.824 ± 22.064 ops/s
This commit is contained in:
parent
86ff76a4f7
commit
2791f0fefa
@ -39,6 +39,8 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
|
||||
|
||||
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
|
||||
|
||||
long nextTaskId;
|
||||
|
||||
protected AbstractScheduledEventExecutor() {
|
||||
}
|
||||
|
||||
@ -241,12 +243,12 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut
|
||||
|
||||
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
|
||||
if (inEventLoop()) {
|
||||
scheduledTaskQueue().add(task);
|
||||
scheduledTaskQueue().add(task.setId(nextTaskId++));
|
||||
} else {
|
||||
executeScheduledRunnable(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
scheduledTaskQueue().add(task);
|
||||
scheduledTaskQueue().add(task.setId(nextTaskId++));
|
||||
}
|
||||
}, true, task.deadlineNanos());
|
||||
}
|
||||
|
@ -23,11 +23,9 @@ import java.util.Queue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
|
||||
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
|
||||
private static final AtomicLong nextTaskId = new AtomicLong();
|
||||
private static final long START_TIME = System.nanoTime();
|
||||
|
||||
static long nanoTime() {
|
||||
@ -44,7 +42,9 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
|
||||
return START_TIME;
|
||||
}
|
||||
|
||||
private final long id = nextTaskId.getAndIncrement();
|
||||
// set once when added to priority queue
|
||||
private long id;
|
||||
|
||||
private long deadlineNanos;
|
||||
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
|
||||
private final long periodNanos;
|
||||
@ -79,6 +79,11 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
|
||||
periodNanos = 0;
|
||||
}
|
||||
|
||||
ScheduledFutureTask<V> setId(long id) {
|
||||
this.id = id;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EventExecutor executor() {
|
||||
return super.executor();
|
||||
@ -182,9 +187,7 @@ final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFu
|
||||
StringBuilder buf = super.toStringBuilder();
|
||||
buf.setCharAt(buf.length() - 1, ',');
|
||||
|
||||
return buf.append(" id: ")
|
||||
.append(id)
|
||||
.append(", deadline: ")
|
||||
return buf.append(" deadline: ")
|
||||
.append(deadlineNanos)
|
||||
.append(", period: ")
|
||||
.append(periodNanos)
|
||||
|
@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.Level;
|
||||
import org.openjdk.jmh.annotations.Measurement;
|
||||
import org.openjdk.jmh.annotations.Param;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.TearDown;
|
||||
import org.openjdk.jmh.annotations.Threads;
|
||||
import org.openjdk.jmh.annotations.Warmup;
|
||||
|
||||
import io.netty.channel.DefaultEventLoop;
|
||||
import io.netty.microbench.util.AbstractMicrobenchmark;
|
||||
|
||||
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
|
||||
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
|
||||
@State(Scope.Benchmark)
|
||||
public class ScheduleFutureTaskBenchmark extends AbstractMicrobenchmark {
|
||||
|
||||
static final Callable<Void> NO_OP = new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
@State(Scope.Thread)
|
||||
public static class ThreadState {
|
||||
|
||||
@Param({ "100000" })
|
||||
int num;
|
||||
|
||||
AbstractScheduledEventExecutor eventLoop;
|
||||
|
||||
@Setup(Level.Trial)
|
||||
public void reset() {
|
||||
eventLoop = new DefaultEventLoop();
|
||||
}
|
||||
|
||||
@Setup(Level.Invocation)
|
||||
public void clear() {
|
||||
eventLoop.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
eventLoop.cancelScheduledTasks();
|
||||
}
|
||||
}).awaitUninterruptibly();
|
||||
}
|
||||
|
||||
@TearDown(Level.Trial)
|
||||
public void shutdown() {
|
||||
eventLoop.shutdownGracefully().awaitUninterruptibly();
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(3)
|
||||
public Future<?> scheduleLots(final ThreadState threadState) {
|
||||
return threadState.eventLoop.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = 1; i <= threadState.num; i++) {
|
||||
threadState.eventLoop.schedule(NO_OP, i, TimeUnit.HOURS);
|
||||
}
|
||||
}
|
||||
}).syncUninterruptibly();
|
||||
}
|
||||
}
|
@ -0,0 +1,19 @@
|
||||
/*
|
||||
* Copyright 2019 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
/**
|
||||
* Benchmarks for {@link io.netty.util.concurrent}.
|
||||
*/
|
||||
package io.netty.util.concurrent;
|
Loading…
Reference in New Issue
Block a user