Fix a bug where Recycler's capacity can increase beyond its maximum
Related: #3166 Motivation: When the recyclable object created at one thread is returned at the other thread, it is stored in a WeakOrderedQueue. The objects stored in the WeakOrderedQueue is added back to the stack by WeakOrderedQueue.transfer() when the owner thread ran out of recyclable objects. However, WeakOrderedQueue.transfer() does not have any mechanism that prevents the stack from growing beyond its maximum capacity. Modifications: - Make WeakOrderedQueue.transfer() increase the capacity of the stack only up to its maximum - Add tests for the cases where the recyclable object is returned at the non-owner thread - Fix a bug where Stack.scavengeSome() does not scavenge the objects when it's the first time it ran out of objects and thus its cursor is null. - Overall clean-up of scavengeSome() and transfer() Result: The capacity of Stack never increases beyond its maximum.
This commit is contained in:
parent
47319d3cfc
commit
13ad58aed7
@ -100,6 +100,10 @@ public abstract class Recycler<T> {
|
||||
return threadLocal.get().elements.length;
|
||||
}
|
||||
|
||||
final int threadLocalSize() {
|
||||
return threadLocal.get().size;
|
||||
}
|
||||
|
||||
protected abstract T newObject(Handle<T> handle);
|
||||
|
||||
public interface Handle<T> {
|
||||
@ -199,7 +203,7 @@ public abstract class Recycler<T> {
|
||||
|
||||
// transfer as many items as we can from this queue to the stack, returning true if any were transferred
|
||||
@SuppressWarnings("rawtypes")
|
||||
boolean transfer(Stack<?> to) {
|
||||
boolean transfer(Stack<?> dst) {
|
||||
|
||||
Link head = this.head;
|
||||
if (head == null) {
|
||||
@ -213,39 +217,48 @@ public abstract class Recycler<T> {
|
||||
this.head = head = head.next;
|
||||
}
|
||||
|
||||
int start = head.readIndex;
|
||||
int end = head.get();
|
||||
if (start == end) {
|
||||
final int srcStart = head.readIndex;
|
||||
int srcEnd = head.get();
|
||||
final int srcSize = srcEnd - srcStart;
|
||||
if (srcSize == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int count = end - start;
|
||||
if (to.size + count > to.elements.length) {
|
||||
to.elements = Arrays.copyOf(to.elements, (to.size + count) * 2);
|
||||
final int dstSize = dst.size;
|
||||
final int expectedCapacity = dstSize + srcSize;
|
||||
|
||||
if (expectedCapacity > dst.elements.length) {
|
||||
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
|
||||
srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
|
||||
}
|
||||
|
||||
DefaultHandle[] src = head.elements;
|
||||
DefaultHandle[] trg = to.elements;
|
||||
int size = to.size;
|
||||
while (start < end) {
|
||||
DefaultHandle element = src[start];
|
||||
if (srcStart != srcEnd) {
|
||||
final DefaultHandle[] srcElems = head.elements;
|
||||
final DefaultHandle[] dstElems = dst.elements;
|
||||
int newDstSize = dstSize;
|
||||
for (int i = srcStart; i < srcEnd; i++) {
|
||||
DefaultHandle element = srcElems[i];
|
||||
if (element.recycleId == 0) {
|
||||
element.recycleId = element.lastRecycledId;
|
||||
} else if (element.recycleId != element.lastRecycledId) {
|
||||
throw new IllegalStateException("recycled already");
|
||||
}
|
||||
element.stack = to;
|
||||
trg[size++] = element;
|
||||
src[start++] = null;
|
||||
element.stack = dst;
|
||||
dstElems[newDstSize ++] = element;
|
||||
srcElems[i] = null;
|
||||
}
|
||||
to.size = size;
|
||||
dst.size = newDstSize;
|
||||
|
||||
if (end == LINK_CAPACITY && head.next != null) {
|
||||
if (srcEnd == LINK_CAPACITY && head.next != null) {
|
||||
this.head = head.next;
|
||||
}
|
||||
|
||||
head.readIndex = end;
|
||||
head.readIndex = srcEnd;
|
||||
return true;
|
||||
} else {
|
||||
// The destination stack is full already.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -268,7 +281,22 @@ public abstract class Recycler<T> {
|
||||
this.parent = parent;
|
||||
this.thread = thread;
|
||||
this.maxCapacity = maxCapacity;
|
||||
elements = new DefaultHandle[INITIAL_CAPACITY];
|
||||
elements = new DefaultHandle[Math.min(INITIAL_CAPACITY, maxCapacity)];
|
||||
}
|
||||
|
||||
int increaseCapacity(int expectedCapacity) {
|
||||
int newCapacity = elements.length;
|
||||
int maxCapacity = this.maxCapacity;
|
||||
do {
|
||||
newCapacity <<= 1;
|
||||
} while (newCapacity < expectedCapacity && newCapacity < maxCapacity);
|
||||
|
||||
newCapacity = Math.min(newCapacity, maxCapacity);
|
||||
if (newCapacity != elements.length) {
|
||||
elements = Arrays.copyOf(elements, newCapacity);
|
||||
}
|
||||
|
||||
return newCapacity;
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@ -304,21 +332,32 @@ public abstract class Recycler<T> {
|
||||
}
|
||||
|
||||
boolean scavengeSome() {
|
||||
WeakOrderQueue cursor = this.cursor;
|
||||
if (cursor == null) {
|
||||
cursor = head;
|
||||
if (cursor == null) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
boolean success = false;
|
||||
WeakOrderQueue cursor = this.cursor, prev = this.prev;
|
||||
while (cursor != null) {
|
||||
WeakOrderQueue prev = this.prev;
|
||||
do {
|
||||
if (cursor.transfer(this)) {
|
||||
success = true;
|
||||
break;
|
||||
}
|
||||
|
||||
WeakOrderQueue next = cursor.next;
|
||||
if (cursor.owner.get() == null) {
|
||||
// if the thread associated with the queue is gone, unlink it, after
|
||||
// performing a volatile read to confirm there is no data left to collect
|
||||
// we never unlink the first queue, as we don't want to synchronize on updating the head
|
||||
// If the thread associated with the queue is gone, unlink it, after
|
||||
// performing a volatile read to confirm there is no data left to collect.
|
||||
// We never unlink the first queue, as we don't want to synchronize on updating the head.
|
||||
if (cursor.hasFinalData()) {
|
||||
for (;;) {
|
||||
if (!cursor.transfer(this)) {
|
||||
if (cursor.transfer(this)) {
|
||||
success = true;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -329,8 +368,11 @@ public abstract class Recycler<T> {
|
||||
} else {
|
||||
prev = cursor;
|
||||
}
|
||||
|
||||
cursor = next;
|
||||
}
|
||||
|
||||
} while (cursor != null && !success);
|
||||
|
||||
this.prev = prev;
|
||||
this.cursor = cursor;
|
||||
return success;
|
||||
@ -343,7 +385,7 @@ public abstract class Recycler<T> {
|
||||
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
|
||||
|
||||
int size = this.size;
|
||||
if (size == maxCapacity) {
|
||||
if (size >= maxCapacity) {
|
||||
// Hit the maximum capacity - drop the possibly youngest object.
|
||||
return;
|
||||
}
|
||||
|
@ -15,10 +15,12 @@
|
||||
*/
|
||||
package io.netty.util;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import static org.hamcrest.CoreMatchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class RecyclerTest {
|
||||
|
||||
@ -34,7 +36,7 @@ public class RecyclerTest {
|
||||
RecyclableObject object = RecyclableObject.newInstance();
|
||||
object.recycle();
|
||||
RecyclableObject object2 = RecyclableObject.newInstance();
|
||||
Assert.assertSame(object, object2);
|
||||
assertSame(object, object2);
|
||||
object2.recycle();
|
||||
}
|
||||
|
||||
@ -94,7 +96,74 @@ public class RecyclerTest {
|
||||
objects[i] = null;
|
||||
}
|
||||
|
||||
Assert.assertEquals(maxCapacity, recycler.threadLocalCapacity());
|
||||
assertEquals(maxCapacity, recycler.threadLocalCapacity());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecycleAtDifferentThread() throws Exception {
|
||||
final Recycler<HandledObject> recycler = new Recycler<HandledObject>(256) {
|
||||
@Override
|
||||
protected HandledObject newObject(Recycler.Handle handle) {
|
||||
return new HandledObject(handle);
|
||||
}
|
||||
};
|
||||
|
||||
final HandledObject o = recycler.get();
|
||||
final Thread thread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
recycler.recycle(o, o.handle);
|
||||
}
|
||||
};
|
||||
thread.start();
|
||||
thread.join();
|
||||
|
||||
assertThat(recycler.get(), is(sameInstance(o)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxCapacityWithRecycleAtDifferentThread() throws Exception {
|
||||
final int maxCapacity = 4; // Choose the number smaller than WeakOrderQueue.LINK_CAPACITY
|
||||
final Recycler<HandledObject> recycler = new Recycler<HandledObject>(maxCapacity) {
|
||||
@Override
|
||||
protected HandledObject newObject(Recycler.Handle handle) {
|
||||
return new HandledObject(handle);
|
||||
}
|
||||
};
|
||||
|
||||
// Borrow 2 * maxCapacity objects.
|
||||
// Return the half from the same thread.
|
||||
// Return the other half from the different thread.
|
||||
|
||||
final HandledObject[] array = new HandledObject[maxCapacity * 3];
|
||||
for (int i = 0; i < array.length; i ++) {
|
||||
array[i] = recycler.get();
|
||||
}
|
||||
|
||||
for (int i = 0; i < maxCapacity; i ++) {
|
||||
recycler.recycle(array[i], array[i].handle);
|
||||
}
|
||||
|
||||
final Thread thread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (int i = maxCapacity; i < array.length; i ++) {
|
||||
recycler.recycle(array[i], array[i].handle);
|
||||
}
|
||||
}
|
||||
};
|
||||
thread.start();
|
||||
thread.join();
|
||||
|
||||
assertThat(recycler.threadLocalCapacity(), is(maxCapacity));
|
||||
assertThat(recycler.threadLocalSize(), is(maxCapacity));
|
||||
|
||||
for (int i = 0; i < array.length; i ++) {
|
||||
recycler.get();
|
||||
}
|
||||
|
||||
assertThat(recycler.threadLocalCapacity(), is(maxCapacity));
|
||||
assertThat(recycler.threadLocalSize(), is(0));
|
||||
}
|
||||
|
||||
static final class HandledObject {
|
||||
|
Loading…
Reference in New Issue
Block a user