Fix a bug where Recycler's capacity can increase beyond its maximum

Related: #3166

Motivation:

When the recyclable object created at one thread is returned at the
other thread, it is stored in a WeakOrderedQueue.

The objects stored in the WeakOrderedQueue is added back to the stack by
WeakOrderedQueue.transfer() when the owner thread ran out of recyclable
objects.

However, WeakOrderedQueue.transfer() does not have any mechanism that
prevents the stack from growing beyond its maximum capacity.

Modifications:

- Make WeakOrderedQueue.transfer() increase the capacity of the stack
  only up to its maximum
- Add tests for the cases where the recyclable object is returned at the
  non-owner thread
- Fix a bug where Stack.scavengeSome() does not scavenge the objects
  when it's the first time it ran out of objects and thus its cursor is
  null.
- Overall clean-up of scavengeSome() and transfer()

Result:

The capacity of Stack never increases beyond its maximum.
This commit is contained in:
Trustin Lee 2014-12-05 21:09:28 +09:00
parent a79466769f
commit 7f92771496
2 changed files with 150 additions and 39 deletions

View File

@ -100,6 +100,10 @@ public abstract class Recycler<T> {
return threadLocal.get().elements.length; return threadLocal.get().elements.length;
} }
final int threadLocalSize() {
return threadLocal.get().size;
}
protected abstract T newObject(Handle<T> handle); protected abstract T newObject(Handle<T> handle);
public interface Handle<T> { public interface Handle<T> {
@ -199,7 +203,7 @@ public abstract class Recycler<T> {
// transfer as many items as we can from this queue to the stack, returning true if any were transferred // transfer as many items as we can from this queue to the stack, returning true if any were transferred
@SuppressWarnings("rawtypes") @SuppressWarnings("rawtypes")
boolean transfer(Stack<?> to) { boolean transfer(Stack<?> dst) {
Link head = this.head; Link head = this.head;
if (head == null) { if (head == null) {
@ -213,39 +217,48 @@ public abstract class Recycler<T> {
this.head = head = head.next; this.head = head = head.next;
} }
int start = head.readIndex; final int srcStart = head.readIndex;
int end = head.get(); int srcEnd = head.get();
if (start == end) { final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false; return false;
} }
int count = end - start; final int dstSize = dst.size;
if (to.size + count > to.elements.length) { final int expectedCapacity = dstSize + srcSize;
to.elements = Arrays.copyOf(to.elements, (to.size + count) * 2);
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
} }
DefaultHandle[] src = head.elements; if (srcStart != srcEnd) {
DefaultHandle[] trg = to.elements; final DefaultHandle[] srcElems = head.elements;
int size = to.size; final DefaultHandle[] dstElems = dst.elements;
while (start < end) { int newDstSize = dstSize;
DefaultHandle element = src[start]; for (int i = srcStart; i < srcEnd; i++) {
if (element.recycleId == 0) { DefaultHandle element = srcElems[i];
element.recycleId = element.lastRecycledId; if (element.recycleId == 0) {
} else if (element.recycleId != element.lastRecycledId) { element.recycleId = element.lastRecycledId;
throw new IllegalStateException("recycled already"); } else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
element.stack = dst;
dstElems[newDstSize ++] = element;
srcElems[i] = null;
} }
element.stack = to; dst.size = newDstSize;
trg[size++] = element;
src[start++] = null;
}
to.size = size;
if (end == LINK_CAPACITY && head.next != null) { if (srcEnd == LINK_CAPACITY && head.next != null) {
this.head = head.next; this.head = head.next;
} }
head.readIndex = end; head.readIndex = srcEnd;
return true; return true;
} else {
// The destination stack is full already.
return false;
}
} }
} }
@ -268,7 +281,22 @@ public abstract class Recycler<T> {
this.parent = parent; this.parent = parent;
this.thread = thread; this.thread = thread;
this.maxCapacity = maxCapacity; this.maxCapacity = maxCapacity;
elements = new DefaultHandle[INITIAL_CAPACITY]; elements = new DefaultHandle[Math.min(INITIAL_CAPACITY, maxCapacity)];
}
int increaseCapacity(int expectedCapacity) {
int newCapacity = elements.length;
int maxCapacity = this.maxCapacity;
do {
newCapacity <<= 1;
} while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
newCapacity = Math.min(newCapacity, maxCapacity);
if (newCapacity != elements.length) {
elements = Arrays.copyOf(elements, newCapacity);
}
return newCapacity;
} }
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
@ -304,21 +332,32 @@ public abstract class Recycler<T> {
} }
boolean scavengeSome() { boolean scavengeSome() {
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
cursor = head;
if (cursor == null) {
return false;
}
}
boolean success = false; boolean success = false;
WeakOrderQueue cursor = this.cursor, prev = this.prev; WeakOrderQueue prev = this.prev;
while (cursor != null) { do {
if (cursor.transfer(this)) { if (cursor.transfer(this)) {
success = true; success = true;
break; break;
} }
WeakOrderQueue next = cursor.next; WeakOrderQueue next = cursor.next;
if (cursor.owner.get() == null) { if (cursor.owner.get() == null) {
// if the thread associated with the queue is gone, unlink it, after // If the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect // performing a volatile read to confirm there is no data left to collect.
// we never unlink the first queue, as we don't want to synchronize on updating the head // We never unlink the first queue, as we don't want to synchronize on updating the head.
if (cursor.hasFinalData()) { if (cursor.hasFinalData()) {
for (;;) { for (;;) {
if (!cursor.transfer(this)) { if (cursor.transfer(this)) {
success = true;
} else {
break; break;
} }
} }
@ -329,8 +368,11 @@ public abstract class Recycler<T> {
} else { } else {
prev = cursor; prev = cursor;
} }
cursor = next; cursor = next;
}
} while (cursor != null && !success);
this.prev = prev; this.prev = prev;
this.cursor = cursor; this.cursor = cursor;
return success; return success;
@ -343,7 +385,7 @@ public abstract class Recycler<T> {
item.recycleId = item.lastRecycledId = OWN_THREAD_ID; item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size; int size = this.size;
if (size == maxCapacity) { if (size >= maxCapacity) {
// Hit the maximum capacity - drop the possibly youngest object. // Hit the maximum capacity - drop the possibly youngest object.
return; return;
} }

View File

@ -15,10 +15,12 @@
*/ */
package io.netty.util; package io.netty.util;
import org.junit.Test;
import java.util.Random; import java.util.Random;
import org.junit.Assert; import static org.hamcrest.CoreMatchers.*;
import org.junit.Test; import static org.junit.Assert.*;
public class RecyclerTest { public class RecyclerTest {
@ -34,7 +36,7 @@ public class RecyclerTest {
RecyclableObject object = RecyclableObject.newInstance(); RecyclableObject object = RecyclableObject.newInstance();
object.recycle(); object.recycle();
RecyclableObject object2 = RecyclableObject.newInstance(); RecyclableObject object2 = RecyclableObject.newInstance();
Assert.assertSame(object, object2); assertSame(object, object2);
object2.recycle(); object2.recycle();
} }
@ -94,7 +96,74 @@ public class RecyclerTest {
objects[i] = null; objects[i] = null;
} }
Assert.assertEquals(maxCapacity, recycler.threadLocalCapacity()); assertEquals(maxCapacity, recycler.threadLocalCapacity());
}
@Test
public void testRecycleAtDifferentThread() throws Exception {
final Recycler<HandledObject> recycler = new Recycler<HandledObject>(256) {
@Override
protected HandledObject newObject(Recycler.Handle handle) {
return new HandledObject(handle);
}
};
final HandledObject o = recycler.get();
final Thread thread = new Thread() {
@Override
public void run() {
recycler.recycle(o, o.handle);
}
};
thread.start();
thread.join();
assertThat(recycler.get(), is(sameInstance(o)));
}
@Test
public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception {
final int maxCapacity = 4; // Choose the number smaller than WeakOrderQueue.LINK_CAPACITY
final Recycler<HandledObject> recycler = new Recycler<HandledObject>(maxCapacity) {
@Override
protected HandledObject newObject(Recycler.Handle handle) {
return new HandledObject(handle);
}
};
// Borrow 2 * maxCapacity objects.
// Return the half from the same thread.
// Return the other half from the different thread.
final HandledObject[] array = new HandledObject[maxCapacity * 3];
for (int i = 0; i < array.length; i ++) {
array[i] = recycler.get();
}
for (int i = 0; i < maxCapacity; i ++) {
recycler.recycle(array[i], array[i].handle);
}
final Thread thread = new Thread() {
@Override
public void run() {
for (int i = maxCapacity; i < array.length; i ++) {
recycler.recycle(array[i], array[i].handle);
}
}
};
thread.start();
thread.join();
assertThat(recycler.threadLocalCapacity(), is(maxCapacity));
assertThat(recycler.threadLocalSize(), is(maxCapacity));
for (int i = 0; i < array.length; i ++) {
recycler.get();
}
assertThat(recycler.threadLocalCapacity(), is(maxCapacity));
assertThat(recycler.threadLocalSize(), is(0));
} }
static final class HandledObject { static final class HandledObject {