Ensure Thread can be collected in a timely manner if Recycler.Stack holds a reference to it.
Motivation: In our Recycler implementation we store a reference to the current Thread in the Stack that is stored in a FastThreadLocal. The Stack itself is referenced in the DefaultHandle itself. A problem can arise if a user stores a Reference to an Object that holds a reference to the DefaultHandle somewhere and either not remove the reference at all or remove it very late. In this case the Thread itself can not be collected as its still referenced in the Stack that is referenced by the DefaultHandle. Modifications: - Use a WeakReference to store the reference to the Thread in the Stack - Add a test case Result: Ensure a Thread can be collected in a timely manner in all cases even if it used the Recycler.
This commit is contained in:
parent
1988cd041d
commit
0276b6e0f6
@ -416,7 +416,14 @@ public abstract class Recycler<T> {
|
||||
// to scavenge those that can be reused. this permits us to incur minimal thread synchronisation whilst
|
||||
// still recycling all items.
|
||||
final Recycler<T> parent;
|
||||
final Thread thread;
|
||||
|
||||
// We store the Thread in a WeakReference as otherwise we may be the only ones that still hold a strong
|
||||
// Reference to the Thread itself after it died because DefaultHandle will hold a reference to the Stack.
|
||||
//
|
||||
// The biggest issue is if we do not use a WeakReference the Thread may not be able to be collected at all if
|
||||
// the user will store a reference to the DefaultHandle somewhere and never clear this reference (or not clear
|
||||
// it in a timely manner).
|
||||
final WeakReference<Thread> threadRef;
|
||||
final AtomicInteger availableSharedCapacity;
|
||||
final int maxDelayedQueues;
|
||||
|
||||
@ -431,7 +438,7 @@ public abstract class Recycler<T> {
|
||||
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
|
||||
int ratioMask, int maxDelayedQueues) {
|
||||
this.parent = parent;
|
||||
this.thread = thread;
|
||||
threadRef = new WeakReference<Thread>(thread);
|
||||
this.maxCapacity = maxCapacity;
|
||||
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
|
||||
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
|
||||
@ -545,11 +552,12 @@ public abstract class Recycler<T> {
|
||||
|
||||
void push(DefaultHandle<?> item) {
|
||||
Thread currentThread = Thread.currentThread();
|
||||
if (thread == currentThread) {
|
||||
if (threadRef.get() == currentThread) {
|
||||
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
|
||||
pushNow(item);
|
||||
} else {
|
||||
// The current Thread is not the one that belongs to the Stack, we need to signal that the push
|
||||
// The current Thread is not the one that belongs to the Stack
|
||||
// (or the Thread that belonged to the Stack was collected already), we need to signal that the push
|
||||
// happens later.
|
||||
pushLater(item, currentThread);
|
||||
}
|
||||
|
@ -18,7 +18,9 @@ package io.netty.util;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
@ -34,6 +36,43 @@ public class RecyclerTest {
|
||||
};
|
||||
}
|
||||
|
||||
@Test(timeout = 5000L)
|
||||
public void testThreadCanBeCollectedEvenIfHandledObjectIsReferenced() throws Exception {
|
||||
final Recycler<HandledObject> recycler = newRecycler(1024);
|
||||
final AtomicBoolean collected = new AtomicBoolean();
|
||||
final AtomicReference<HandledObject> reference = new AtomicReference<HandledObject>();
|
||||
Thread thread = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
HandledObject object = recycler.get();
|
||||
// Store a reference to the HandledObject to ensure it is not collected when the run method finish.
|
||||
reference.set(object);
|
||||
}
|
||||
}) {
|
||||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
super.finalize();
|
||||
collected.set(true);
|
||||
}
|
||||
};
|
||||
assertFalse(collected.get());
|
||||
thread.start();
|
||||
thread.join();
|
||||
|
||||
// Null out so it can be collected.
|
||||
thread = null;
|
||||
|
||||
// Loop until the Thread was collected. If we can not collect it the Test will fail due of a timeout.
|
||||
while (!collected.get()) {
|
||||
System.gc();
|
||||
System.runFinalization();
|
||||
Thread.sleep(50);
|
||||
}
|
||||
|
||||
// Now call recycle after the Thread was collected to ensure this still works...
|
||||
reference.getAndSet(null).recycle();
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testMultipleRecycle() {
|
||||
Recycler<HandledObject> recycler = newRecycler(1024);
|
||||
|
Loading…
x
Reference in New Issue
Block a user