From dde82f62f07de5e794dda1c3324d8331ca522114 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 26 Feb 2021 10:02:49 +0100 Subject: [PATCH] Fix bug in Recycler with racing calls to recycle (#11037) Motivation: It is possible for two separate threads to race on recycling an object. If this happens, the object might be added to a WeakOrderQueue when it shouldn't be. The end result of this is that an object could be acquired multiple times, without a recycle in between. Effectively, it ends up in circulation twice. Modification: We fix this by making the update to the lastRecycledId field of the handle, an atomic state transition. Only the thread that "wins" the race and succeeds in their state transition will be allowed to recycle the object. The others will bail out on their recycling. We use weakCompareAndSet because we only need the atomicity guarantee, and the program order within each thread is sufficient. Also, spurious failures just means we won't recycle that particular object, which is fine. Result: Objects no longer risk circulating twice due to a recycle race. This fixes #10986 --- .../src/main/java/io/netty/util/Recycler.java | 29 ++++- .../test/java/io/netty/util/RecyclerTest.java | 108 ++++++++++++++++++ 2 files changed, 132 insertions(+), 5 deletions(-) diff --git a/common/src/main/java/io/netty/util/Recycler.java b/common/src/main/java/io/netty/util/Recycler.java index 9e81aa527d..d64889488e 100644 --- a/common/src/main/java/io/netty/util/Recycler.java +++ b/common/src/main/java/io/netty/util/Recycler.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.Map; import java.util.WeakHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static io.netty.util.internal.MathUtil.safeFindNextPositivePowerOfTwo; import static java.lang.Math.max; @@ -209,8 +210,16 @@ public abstract class Recycler { public interface Handle extends ObjectPool.Handle { } + @SuppressWarnings("unchecked") private static final class DefaultHandle implements Handle { - int lastRecycledId; + private static final AtomicIntegerFieldUpdater> LAST_RECYCLED_ID_UPDATER; + static { + AtomicIntegerFieldUpdater updater = AtomicIntegerFieldUpdater.newUpdater( + DefaultHandle.class, "lastRecycledId"); + LAST_RECYCLED_ID_UPDATER = (AtomicIntegerFieldUpdater>) updater; + } + + volatile int lastRecycledId; int recycleId; boolean hasBeenRecycled; @@ -235,6 +244,12 @@ public abstract class Recycler { stack.push(this); } + + public boolean compareAndSetLastRecycledId(int expectLastRecycledId, int updateLastRecycledId) { + // Use "weak…" because we do not need synchronize-with ordering, only atomicity. + // Also, spurious failures are fine, since no code should rely on recycling for correctness. + return LAST_RECYCLED_ID_UPDATER.weakCompareAndSet(this, expectLastRecycledId, updateLastRecycledId); + } } private static final FastThreadLocal, WeakOrderQueue>> DELAYED_RECYCLED = @@ -371,11 +386,15 @@ public abstract class Recycler { void reclaimAllSpaceAndUnlink() { head.reclaimAllSpaceAndUnlink(); - this.next = null; + next = null; } void add(DefaultHandle handle) { - handle.lastRecycledId = id; + if (!handle.compareAndSetLastRecycledId(0, id)) { + // Separate threads could be racing to add the handle to each their own WeakOrderQueue. + // We only add the handle to the queue if we win the race and observe that lastRecycledId is zero. + return; + } // While we also enforce the recycling ratio when we transfer objects from the WeakOrderQueue to the Stack // we better should enforce it as well early. Missing to do so may let the WeakOrderQueue grow very fast @@ -649,10 +668,10 @@ public abstract class Recycler { } private void pushNow(DefaultHandle item) { - if ((item.recycleId | item.lastRecycledId) != 0) { + if (item.recycleId != 0 || !item.compareAndSetLastRecycledId(0, OWN_THREAD_ID)) { throw new IllegalStateException("recycled already"); } - item.recycleId = item.lastRecycledId = OWN_THREAD_ID; + item.recycleId = OWN_THREAD_ID; int size = this.size; if (size >= maxCapacity || dropHandle(item)) { diff --git a/common/src/test/java/io/netty/util/RecyclerTest.java b/common/src/test/java/io/netty/util/RecyclerTest.java index 1d711f5e4a..0c2d070560 100644 --- a/common/src/test/java/io/netty/util/RecyclerTest.java +++ b/common/src/test/java/io/netty/util/RecyclerTest.java @@ -18,6 +18,7 @@ package io.netty.util; import org.junit.Test; import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -114,12 +115,119 @@ public class RecyclerTest { }); thread2.start(); thread2.join(); + HandledObject a = recycler.get(); + HandledObject b = recycler.get(); + assertNotSame(a, b); IllegalStateException exception = exceptionStore.get(); if (exception != null) { throw exception; } } + @Test + public void testMultipleRecycleAtDifferentThreadRacing() throws InterruptedException { + Recycler recycler = newRecycler(1024); + final HandledObject object = recycler.get(); + final AtomicReference exceptionStore = new AtomicReference(); + + final CountDownLatch countDownLatch = new CountDownLatch(2); + final Thread thread1 = new Thread(new Runnable() { + @Override + public void run() { + try { + object.recycle(); + } catch (IllegalStateException e) { + Exception x = exceptionStore.getAndSet(e); + if (x != null) { + e.addSuppressed(x); + } + } finally { + countDownLatch.countDown(); + } + } + }); + thread1.start(); + + final Thread thread2 = new Thread(new Runnable() { + @Override + public void run() { + try { + object.recycle(); + } catch (IllegalStateException e) { + Exception x = exceptionStore.getAndSet(e); + if (x != null) { + e.addSuppressed(x); + } + } finally { + countDownLatch.countDown(); + } + } + }); + thread2.start(); + + try { + countDownLatch.await(); + HandledObject a = recycler.get(); + HandledObject b = recycler.get(); + assertNotSame(a, b); + IllegalStateException exception = exceptionStore.get(); + if (exception != null) { + assertEquals("recycled already", exception.getMessage()); + assertEquals(0, exception.getSuppressed().length); + } + } finally { + thread1.join(1000); + thread2.join(1000); + } + } + + @Test + public void testMultipleRecycleRacing() throws InterruptedException { + Recycler recycler = newRecycler(1024); + final HandledObject object = recycler.get(); + final AtomicReference exceptionStore = new AtomicReference(); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + final Thread thread1 = new Thread(new Runnable() { + @Override + public void run() { + try { + object.recycle(); + } catch (IllegalStateException e) { + Exception x = exceptionStore.getAndSet(e); + if (x != null) { + e.addSuppressed(x); + } + } finally { + countDownLatch.countDown(); + } + } + }); + thread1.start(); + + try { + object.recycle(); + } catch (IllegalStateException e) { + Exception x = exceptionStore.getAndSet(e); + if (x != null) { + e.addSuppressed(x); + } + } + + try { + countDownLatch.await(); + HandledObject a = recycler.get(); + HandledObject b = recycler.get(); + assertNotSame(a, b); + IllegalStateException exception = exceptionStore.get(); + if (exception != null) { + throw exception; + } + } finally { + thread1.join(1000); + } + } + @Test public void testRecycle() { Recycler recycler = newRecycler(1024);