Fix bug in Recycler with racing calls to recycle (#11037)
Motivation: It is possible for two separate threads to race on recycling an object. If this happens, the object might be added to a WeakOrderQueue when it shouldn't be. The end result of this is that an object could be acquired multiple times, without a recycle in between. Effectively, it ends up in circulation twice. Modification: We fix this by making the update to the lastRecycledId field of the handle, an atomic state transition. Only the thread that "wins" the race and succeeds in their state transition will be allowed to recycle the object. The others will bail out on their recycling. We use weakCompareAndSet because we only need the atomicity guarantee, and the program order within each thread is sufficient. Also, spurious failures just means we won't recycle that particular object, which is fine. Result: Objects no longer risk circulating twice due to a recycle race. This fixes #10986
This commit is contained in:
parent
a60825c3b4
commit
dde82f62f0
@ -27,6 +27,7 @@ import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.WeakHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
|
||||
|
||||
import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo;
|
||||
import static java.lang.Math.max;
|
||||
@ -209,8 +210,16 @@ public abstract class Recycler<T> {
|
||||
|
||||
public interface Handle<T> extends ObjectPool.Handle<T> { }
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static final class DefaultHandle<T> implements Handle<T> {
|
||||
int lastRecycledId;
|
||||
private static final AtomicIntegerFieldUpdater<DefaultHandle<?>> LAST_RECYCLED_ID_UPDATER;
|
||||
static {
|
||||
AtomicIntegerFieldUpdater<?> updater = AtomicIntegerFieldUpdater.newUpdater(
|
||||
DefaultHandle.class, "lastRecycledId");
|
||||
LAST_RECYCLED_ID_UPDATER = (AtomicIntegerFieldUpdater<DefaultHandle<?>>) updater;
|
||||
}
|
||||
|
||||
volatile int lastRecycledId;
|
||||
int recycleId;
|
||||
|
||||
boolean hasBeenRecycled;
|
||||
@ -235,6 +244,12 @@ public abstract class Recycler<T> {
|
||||
|
||||
stack.push(this);
|
||||
}
|
||||
|
||||
public boolean compareAndSetLastRecycledId(int expectLastRecycledId, int updateLastRecycledId) {
|
||||
// Use "weak…" because we do not need synchronize-with ordering, only atomicity.
|
||||
// Also, spurious failures are fine, since no code should rely on recycling for correctness.
|
||||
return LAST_RECYCLED_ID_UPDATER.weakCompareAndSet(this, expectLastRecycledId, updateLastRecycledId);
|
||||
}
|
||||
}
|
||||
|
||||
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
|
||||
@ -371,11 +386,15 @@ public abstract class Recycler<T> {
|
||||
|
||||
void reclaimAllSpaceAndUnlink() {
|
||||
head.reclaimAllSpaceAndUnlink();
|
||||
this.next = null;
|
||||
next = null;
|
||||
}
|
||||
|
||||
void add(DefaultHandle<?> handle) {
|
||||
handle.lastRecycledId = id;
|
||||
if (!handle.compareAndSetLastRecycledId(0, id)) {
|
||||
// Separate threads could be racing to add the handle to each their own WeakOrderQueue.
|
||||
// We only add the handle to the queue if we win the race and observe that lastRecycledId is zero.
|
||||
return;
|
||||
}
|
||||
|
||||
// While we also enforce the recycling ratio when we transfer objects from the WeakOrderQueue to the Stack
|
||||
// we better should enforce it as well early. Missing to do so may let the WeakOrderQueue grow very fast
|
||||
@ -649,10 +668,10 @@ public abstract class Recycler<T> {
|
||||
}
|
||||
|
||||
private void pushNow(DefaultHandle<?> item) {
|
||||
if ((item.recycleId | item.lastRecycledId) != 0) {
|
||||
if (item.recycleId != 0 || !item.compareAndSetLastRecycledId(0, OWN_THREAD_ID)) {
|
||||
throw new IllegalStateException("recycled already");
|
||||
}
|
||||
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
|
||||
item.recycleId = OWN_THREAD_ID;
|
||||
|
||||
int size = this.size;
|
||||
if (size >= maxCapacity || dropHandle(item)) {
|
||||
|
@ -18,6 +18,7 @@ package io.netty.util;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
@ -114,12 +115,119 @@ public class RecyclerTest {
|
||||
});
|
||||
thread2.start();
|
||||
thread2.join();
|
||||
HandledObject a = recycler.get();
|
||||
HandledObject b = recycler.get();
|
||||
assertNotSame(a, b);
|
||||
IllegalStateException exception = exceptionStore.get();
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRecycleAtDifferentThreadRacing() throws InterruptedException {
|
||||
Recycler<HandledObject> recycler = newRecycler(1024);
|
||||
final HandledObject object = recycler.get();
|
||||
final AtomicReference<IllegalStateException> exceptionStore = new AtomicReference<IllegalStateException>();
|
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(2);
|
||||
final Thread thread1 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
object.recycle();
|
||||
} catch (IllegalStateException e) {
|
||||
Exception x = exceptionStore.getAndSet(e);
|
||||
if (x != null) {
|
||||
e.addSuppressed(x);
|
||||
}
|
||||
} finally {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
thread1.start();
|
||||
|
||||
final Thread thread2 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
object.recycle();
|
||||
} catch (IllegalStateException e) {
|
||||
Exception x = exceptionStore.getAndSet(e);
|
||||
if (x != null) {
|
||||
e.addSuppressed(x);
|
||||
}
|
||||
} finally {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
thread2.start();
|
||||
|
||||
try {
|
||||
countDownLatch.await();
|
||||
HandledObject a = recycler.get();
|
||||
HandledObject b = recycler.get();
|
||||
assertNotSame(a, b);
|
||||
IllegalStateException exception = exceptionStore.get();
|
||||
if (exception != null) {
|
||||
assertEquals("recycled already", exception.getMessage());
|
||||
assertEquals(0, exception.getSuppressed().length);
|
||||
}
|
||||
} finally {
|
||||
thread1.join(1000);
|
||||
thread2.join(1000);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRecycleRacing() throws InterruptedException {
|
||||
Recycler<HandledObject> recycler = newRecycler(1024);
|
||||
final HandledObject object = recycler.get();
|
||||
final AtomicReference<IllegalStateException> exceptionStore = new AtomicReference<IllegalStateException>();
|
||||
|
||||
final CountDownLatch countDownLatch = new CountDownLatch(1);
|
||||
final Thread thread1 = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
object.recycle();
|
||||
} catch (IllegalStateException e) {
|
||||
Exception x = exceptionStore.getAndSet(e);
|
||||
if (x != null) {
|
||||
e.addSuppressed(x);
|
||||
}
|
||||
} finally {
|
||||
countDownLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
thread1.start();
|
||||
|
||||
try {
|
||||
object.recycle();
|
||||
} catch (IllegalStateException e) {
|
||||
Exception x = exceptionStore.getAndSet(e);
|
||||
if (x != null) {
|
||||
e.addSuppressed(x);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
countDownLatch.await();
|
||||
HandledObject a = recycler.get();
|
||||
HandledObject b = recycler.get();
|
||||
assertNotSame(a, b);
|
||||
IllegalStateException exception = exceptionStore.get();
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
} finally {
|
||||
thread1.join(1000);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecycle() {
|
||||
Recycler<HandledObject> recycler = newRecycler(1024);
|
||||
|
Loading…
Reference in New Issue
Block a user