Optimistically update ref counts

Motivation:
Highly retained and released objects have contention on their ref
count.  Currently, the ref count is updated using compareAndSet
with care to make sure the count doesn't overflow, double free, or
revive the object.

Profiling has shown that a non trivial (~1%) of CPU time on gRPC
latency benchmarks is from the ref count updating.

Modification:
Rather than pessimistically assuming the ref count will be invalid,
optimistically update it assuming it will be.  If the update was
wrong, then use the slow path to revert the change and throw an
execption.  Most of the time, the ref counts are correct.

This changes from using compareAndSet to getAndAdd, which emits a
different CPU instruction on x86 (CMPXCHG to XADD).  Because the
CPU knows it will modifiy the memory, it can avoid contention.

On a highly contended machine, this can be about 2x faster.

There is a downside to the new approach.  The ref counters can
temporarily enter invalid states if over retained or over released.
The code does handle these overflow and underflow scenarios, but it
is possible that another concurrent access may push the failure to
a different location.  For example:

Time 1 Thread 1: obj.retain(INT_MAX - 1)
Time 2 Thread 1: obj.retain(2)
Time 2 Thread 2: obj.retain(1)

Previously Thread 2 would always succeed and Thread 1 would always
fail on the second access.  Now, thread 2 could fail while thread 1
is rolling back its change.

====

There are a few reasons why I think this is okay:

1. Buggy code is going to have bugs.  An exception _is_ going to be
   thrown.  This just causes the other threads to notice the state
   is messed up and stop early.
2. If high retention counts are a use case, then ref count should
   be a long rather than an int.
3. The critical section is greatly reduced compared to the previous
   version, so the likelihood of this happening is lower
4. On error, the code always rollsback the change atomically, so
   there is no possibility of corruption.

Result:
Faster refcounting

```
BEFORE:

Benchmark                                                                                             (delay)    Mode      Cnt         Score    Error  Units
AbstractReferenceCountedByteBufBenchmark.retainRelease_contended                                            1  sample  2901361       804.579 ±  1.835  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_contended                                           10  sample  3038729       785.376 ± 16.471  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_contended                                          100  sample  2899401       817.392 ±  6.668  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_contended                                         1000  sample  3650566      2077.700 ±  0.600  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_contended                                        10000  sample  3005467     19949.334 ±  4.243  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_uncontended                                          1  sample   456091        48.610 ±  1.162  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_uncontended                                         10  sample   732051        62.599 ±  0.815  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_uncontended                                        100  sample   778925       228.629 ±  1.205  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_uncontended                                       1000  sample   633682      2002.987 ±  2.856  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_uncontended                                      10000  sample   506442     19735.345 ± 12.312  ns/op

AFTER:
Benchmark                                                                                             (delay)    Mode      Cnt         Score    Error  Units
AbstractReferenceCountedByteBufBenchmark.retainRelease_contended                                            1  sample  3761980       383.436 ±  1.315  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_contended                                           10  sample  3667304       474.429 ±  1.101  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_contended                                          100  sample  3039374       479.267 ±  0.435  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_contended                                         1000  sample  3709210      2044.603 ±  0.989  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_contended                                        10000  sample  3011591     19904.227 ± 18.025  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_uncontended                                          1  sample   494975        52.269 ±  8.345  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_uncontended                                         10  sample   771094        62.290 ±  0.795  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_uncontended                                        100  sample   763230       235.044 ±  1.552  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_uncontended                                       1000  sample   634037      2006.578 ±  3.574  ns/op
AbstractReferenceCountedByteBufBenchmark.retainRelease_uncontended                                      10000  sample   506284     19742.605 ± 13.729  ns/op

```
This commit is contained in:
Carl Mastrangelo 2017-09-25 15:55:23 -07:00 committed by Norman Maurer
parent 940f15f0d2
commit 83a19d5650
3 changed files with 98 additions and 49 deletions

View File

@ -59,18 +59,12 @@ public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
return retain0(checkPositive(increment, "increment"));
}
private ByteBuf retain0(int increment) {
for (;;) {
int refCnt = this.refCnt;
final int nextCnt = refCnt + increment;
// Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
if (nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
break;
}
private ByteBuf retain0(final int increment) {
int oldRef = refCntUpdater.getAndAdd(this, increment);
if (oldRef <= 0 || oldRef + increment < oldRef) {
// Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
refCntUpdater.getAndAdd(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
}
return this;
}
@ -96,20 +90,16 @@ public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
}
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
if (oldRef == decrement) {
deallocate();
return true;
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// Ensure we don't over-release, and avoid underflow.
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, decrement);
}
return false;
}
/**
* Called once {@link #refCnt()} is equals 0.

View File

@ -52,17 +52,11 @@ public abstract class AbstractReferenceCounted implements ReferenceCounted {
}
private ReferenceCounted retain0(int increment) {
for (;;) {
int refCnt = this.refCnt;
final int nextCnt = refCnt + increment;
// Ensure we not resurrect (which means the refCnt was 0) and also that we encountered an overflow.
if (nextCnt <= increment) {
throw new IllegalReferenceCountException(refCnt, increment);
}
if (refCntUpdater.compareAndSet(this, refCnt, nextCnt)) {
break;
}
int oldRef = refCntUpdater.getAndAdd(this, increment);
if (oldRef <= 0 || oldRef + increment < oldRef) {
// Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
refCntUpdater.getAndAdd(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
}
return this;
}
@ -83,20 +77,16 @@ public abstract class AbstractReferenceCounted implements ReferenceCounted {
}
private boolean release0(int decrement) {
for (;;) {
int refCnt = this.refCnt;
if (refCnt < decrement) {
throw new IllegalReferenceCountException(refCnt, -decrement);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
if (refCnt == decrement) {
deallocate();
return true;
}
return false;
}
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
if (oldRef == decrement) {
deallocate();
return true;
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// Ensure we don't over-release, and avoid underflow.
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, decrement);
}
return false;
}
/**

View File

@ -0,0 +1,69 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.microbench.util.AbstractMicrobenchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
public class AbstractReferenceCountedByteBufBenchmark extends AbstractMicrobenchmark {
@Param({ "1", "10", "100", "1000", "10000" })
public int delay;
AbstractReferenceCountedByteBuf buf;
@Setup
public void setUp() {
buf = (AbstractReferenceCountedByteBuf) Unpooled.buffer(1);
}
@TearDown
public void tearDown() {
buf.release();
}
@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public boolean retainReleaseUncontended() {
buf.retain();
Blackhole.consumeCPU(delay);
return buf.release();
}
@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@GroupThreads(6)
public boolean retainReleaseContended() {
buf.retain();
Blackhole.consumeCPU(delay);
return buf.release();
}
}