Add ReferenceCountUtil.releaseLater() to make writing tests easy with ReferenceCounteds

This commit is contained in:
Trustin Lee 2013-12-06 15:12:46 +09:00
parent 128c4b96b5
commit e506581eb1
3 changed files with 179 additions and 85 deletions

View File

@ -30,10 +30,8 @@ import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -43,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.buffer.Unpooled.*;
import static io.netty.util.ReferenceCountUtil.*;
import static io.netty.util.internal.EmptyArrays.*;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
@ -55,13 +54,6 @@ public abstract class AbstractByteBufTest {
private static final int CAPACITY = 4096; // Must be even
private static final int BLOCK_SIZE = 128;
private static final Queue<ByteBuf> freeLaterQueue = new ArrayDeque<ByteBuf>();
protected static <T extends ByteBuf> T freeLater(T buf) {
freeLaterQueue.add(buf);
return buf;
}
private long seed;
private Random random;
private ByteBuf buffer;
@ -93,17 +85,6 @@ public abstract class AbstractByteBufTest {
}
buffer = null;
}
for (;;) {
ByteBuf buf = freeLaterQueue.poll();
if (buf == null) {
break;
}
if (buf.refCnt() > 0) {
buf.release(buf.refCnt());
}
}
}
@Test
@ -888,7 +869,7 @@ public abstract class AbstractByteBufTest {
@Test
public void testRandomDirectBufferTransfer() {
byte[] tmp = new byte[BLOCK_SIZE * 2];
ByteBuf value = freeLater(directBuffer(BLOCK_SIZE * 2));
ByteBuf value = releaseLater(directBuffer(BLOCK_SIZE * 2));
for (int i = 0; i < buffer.capacity() - BLOCK_SIZE + 1; i += BLOCK_SIZE) {
random.nextBytes(tmp);
value.setBytes(0, tmp, 0, value.capacity());
@ -896,7 +877,7 @@ public abstract class AbstractByteBufTest {
}
random.setSeed(seed);
ByteBuf expectedValue = freeLater(directBuffer(BLOCK_SIZE * 2));
ByteBuf expectedValue = releaseLater(directBuffer(BLOCK_SIZE * 2));
for (int i = 0; i < buffer.capacity() - BLOCK_SIZE + 1; i += BLOCK_SIZE) {
random.nextBytes(tmp);
expectedValue.setBytes(0, tmp, 0, expectedValue.capacity());
@ -1052,7 +1033,7 @@ public abstract class AbstractByteBufTest {
@Test
public void testSequentialDirectBufferTransfer1() {
byte[] valueContent = new byte[BLOCK_SIZE * 2];
ByteBuf value = freeLater(directBuffer(BLOCK_SIZE * 2));
ByteBuf value = releaseLater(directBuffer(BLOCK_SIZE * 2));
buffer.writerIndex(0);
for (int i = 0; i < buffer.capacity() - BLOCK_SIZE + 1; i += BLOCK_SIZE) {
random.nextBytes(valueContent);
@ -1066,7 +1047,7 @@ public abstract class AbstractByteBufTest {
random.setSeed(seed);
byte[] expectedValueContent = new byte[BLOCK_SIZE * 2];
ByteBuf expectedValue = freeLater(wrappedBuffer(expectedValueContent));
ByteBuf expectedValue = releaseLater(wrappedBuffer(expectedValueContent));
for (int i = 0; i < buffer.capacity() - BLOCK_SIZE + 1; i += BLOCK_SIZE) {
random.nextBytes(expectedValueContent);
int valueOffset = random.nextInt(BLOCK_SIZE);
@ -1085,7 +1066,7 @@ public abstract class AbstractByteBufTest {
@Test
public void testSequentialDirectBufferTransfer2() {
byte[] valueContent = new byte[BLOCK_SIZE * 2];
ByteBuf value = freeLater(directBuffer(BLOCK_SIZE * 2));
ByteBuf value = releaseLater(directBuffer(BLOCK_SIZE * 2));
buffer.writerIndex(0);
for (int i = 0; i < buffer.capacity() - BLOCK_SIZE + 1; i += BLOCK_SIZE) {
random.nextBytes(valueContent);
@ -1103,7 +1084,7 @@ public abstract class AbstractByteBufTest {
random.setSeed(seed);
byte[] expectedValueContent = new byte[BLOCK_SIZE * 2];
ByteBuf expectedValue = freeLater(wrappedBuffer(expectedValueContent));
ByteBuf expectedValue = releaseLater(wrappedBuffer(expectedValueContent));
for (int i = 0; i < buffer.capacity() - BLOCK_SIZE + 1; i += BLOCK_SIZE) {
random.nextBytes(expectedValueContent);
value.setBytes(0, valueContent);
@ -1308,7 +1289,7 @@ public abstract class AbstractByteBufTest {
for (int i = 0; i < buffer.capacity(); i += 4) {
buffer.writeInt(i);
}
ByteBuf copy = freeLater(copiedBuffer(buffer));
ByteBuf copy = releaseLater(copiedBuffer(buffer));
// Make sure there's no effect if called when readerIndex is 0.
buffer.readerIndex(CAPACITY / 4);
@ -1360,7 +1341,7 @@ public abstract class AbstractByteBufTest {
for (int i = 0; i < buffer.capacity(); i ++) {
buffer.writeByte((byte) i);
}
ByteBuf copy = freeLater(copiedBuffer(buffer));
ByteBuf copy = releaseLater(copiedBuffer(buffer));
// Discard the first (CAPACITY / 2 - 1) bytes.
buffer.setIndex(CAPACITY / 2 - 1, CAPACITY - 1);
@ -1426,7 +1407,7 @@ public abstract class AbstractByteBufTest {
buffer.setIndex(readerIndex, writerIndex);
// Make sure all properties are copied.
ByteBuf copy = freeLater(buffer.copy());
ByteBuf copy = releaseLater(buffer.copy());
assertEquals(0, copy.readerIndex());
assertEquals(buffer.readableBytes(), copy.writerIndex());
assertEquals(buffer.readableBytes(), copy.capacity());
@ -1635,8 +1616,8 @@ public abstract class AbstractByteBufTest {
@Test
public void testHashCode() {
ByteBuf elemA = freeLater(buffer(15));
ByteBuf elemB = freeLater(directBuffer(15));
ByteBuf elemA = releaseLater(buffer(15));
ByteBuf elemB = releaseLater(directBuffer(15));
elemA.writeBytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5 });
elemB.writeBytes(new byte[] { 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9 });
@ -1645,9 +1626,9 @@ public abstract class AbstractByteBufTest {
set.add(elemB);
assertEquals(2, set.size());
assertTrue(set.contains(freeLater(elemA.copy())));
assertTrue(set.contains(releaseLater(elemA.copy())));
ByteBuf elemBCopy = freeLater(elemB.copy());
ByteBuf elemBCopy = releaseLater(elemB.copy());
assertTrue(set.contains(elemBCopy));
buffer.clear();
@ -1714,7 +1695,7 @@ public abstract class AbstractByteBufTest {
return false;
}
i ++;
i++;
return true;
}
}), is(stop));
@ -1755,7 +1736,7 @@ public abstract class AbstractByteBufTest {
}
private void testInternalNioBuffer(int a) {
ByteBuf buffer = freeLater(newBuffer(2));
ByteBuf buffer = releaseLater(newBuffer(2));
ByteBuffer buf = buffer.internalNioBuffer(0, 1);
assertEquals(1, buf.remaining());
@ -1786,7 +1767,7 @@ public abstract class AbstractByteBufTest {
final byte[] bytes = new byte[8];
random.nextBytes(bytes);
final ByteBuf buffer = freeLater(newBuffer(8));
final ByteBuf buffer = releaseLater(newBuffer(8));
buffer.writeBytes(bytes);
final CountDownLatch latch = new CountDownLatch(60000);
final CyclicBarrier barrier = new CyclicBarrier(11);
@ -1840,7 +1821,7 @@ public abstract class AbstractByteBufTest {
final byte[] bytes = new byte[8];
random.nextBytes(bytes);
final ByteBuf buffer = freeLater(newBuffer(8));
final ByteBuf buffer = releaseLater(newBuffer(8));
buffer.writeBytes(bytes);
final CountDownLatch latch = new CountDownLatch(60000);
final CyclicBarrier barrier = new CyclicBarrier(11);
@ -1894,7 +1875,7 @@ public abstract class AbstractByteBufTest {
final byte[] bytes = new byte[8];
random.nextBytes(bytes);
final ByteBuf buffer = freeLater(newBuffer(8));
final ByteBuf buffer = releaseLater(newBuffer(8));
buffer.writeBytes(bytes);
final AtomicReference<Throwable> cause = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(60000);
@ -1937,15 +1918,15 @@ public abstract class AbstractByteBufTest {
@Test(expected = IndexOutOfBoundsException.class)
public void readByteThrowsIndexOutOfBoundsException() {
final ByteBuf buffer = freeLater(newBuffer(8));
final ByteBuf buffer = releaseLater(newBuffer(8));
buffer.writeByte(0);
assertEquals((byte) 0 , buffer.readByte());
assertEquals((byte) 0, buffer.readByte());
buffer.readByte();
}
@Test
public void testNioBufferExposeOnlyRegion() {
final ByteBuf buffer = freeLater(newBuffer(8));
final ByteBuf buffer = releaseLater(newBuffer(8));
byte[] data = new byte[8];
random.nextBytes(data);
buffer.writeBytes(data);

View File

@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.List;
import static io.netty.buffer.Unpooled.*;
import static io.netty.util.ReferenceCountUtil.*;
import static io.netty.util.internal.EmptyArrays.*;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
@ -123,7 +124,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
public void testDiscardReadBytes3() {
ByteBuf a, b;
a = wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }).order(order);
b = freeLater(wrappedBuffer(
b = releaseLater(wrappedBuffer(
wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, 0, 5).order(order),
wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, 5, 5).order(order)));
a.skipBytes(6);
@ -199,7 +200,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
@Test
public void testFullConsolidation() {
CompositeByteBuf buf = freeLater(compositeBuffer(Integer.MAX_VALUE));
CompositeByteBuf buf = releaseLater(compositeBuffer(Integer.MAX_VALUE));
buf.addComponent(wrappedBuffer(new byte[] { 1 }));
buf.addComponent(wrappedBuffer(new byte[] { 2, 3 }));
buf.addComponent(wrappedBuffer(new byte[] { 4, 5, 6 }));
@ -213,7 +214,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
@Test
public void testRangedConsolidation() {
CompositeByteBuf buf = freeLater(compositeBuffer(Integer.MAX_VALUE));
CompositeByteBuf buf = releaseLater(compositeBuffer(Integer.MAX_VALUE));
buf.addComponent(wrappedBuffer(new byte[] { 1 }));
buf.addComponent(wrappedBuffer(new byte[] { 2, 3 }));
buf.addComponent(wrappedBuffer(new byte[] { 4, 5, 6 }));
@ -248,66 +249,66 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
ByteBuf a, b;
// XXX Same tests with several buffers in wrappedCheckedBuffer
// Different length.
a = freeLater(wrappedBuffer(new byte[] { 1 }).order(order));
b = freeLater(wrappedBuffer(
a = releaseLater(wrappedBuffer(new byte[] { 1 }).order(order));
b = releaseLater(wrappedBuffer(
wrappedBuffer(new byte[] { 1 }).order(order),
wrappedBuffer(new byte[] { 2 }).order(order)));
assertFalse(ByteBufUtil.equals(a, b));
// Same content, same firstIndex, short length.
a = freeLater(wrappedBuffer(new byte[] { 1, 2, 3 }).order(order));
b = freeLater(wrappedBuffer(
a = releaseLater(wrappedBuffer(new byte[] { 1, 2, 3 }).order(order));
b = releaseLater(wrappedBuffer(
wrappedBuffer(new byte[]{1}).order(order),
wrappedBuffer(new byte[]{2}).order(order),
wrappedBuffer(new byte[]{3}).order(order)));
assertTrue(ByteBufUtil.equals(a, b));
// Same content, different firstIndex, short length.
a = freeLater(wrappedBuffer(new byte[] { 1, 2, 3 }).order(order));
b = freeLater(wrappedBuffer(
a = releaseLater(wrappedBuffer(new byte[] { 1, 2, 3 }).order(order));
b = releaseLater(wrappedBuffer(
wrappedBuffer(new byte[] { 0, 1, 2, 3, 4 }, 1, 2).order(order),
wrappedBuffer(new byte[] { 0, 1, 2, 3, 4 }, 3, 1).order(order)));
assertTrue(ByteBufUtil.equals(a, b));
// Different content, same firstIndex, short length.
a = freeLater(wrappedBuffer(new byte[] { 1, 2, 3 }).order(order));
b = freeLater(wrappedBuffer(
a = releaseLater(wrappedBuffer(new byte[] { 1, 2, 3 }).order(order));
b = releaseLater(wrappedBuffer(
wrappedBuffer(new byte[] { 1, 2 }).order(order),
wrappedBuffer(new byte[] { 4 }).order(order)));
assertFalse(ByteBufUtil.equals(a, b));
// Different content, different firstIndex, short length.
a = freeLater(wrappedBuffer(new byte[] { 1, 2, 3 }).order(order));
b = freeLater(wrappedBuffer(
a = releaseLater(wrappedBuffer(new byte[] { 1, 2, 3 }).order(order));
b = releaseLater(wrappedBuffer(
wrappedBuffer(new byte[] { 0, 1, 2, 4, 5 }, 1, 2).order(order),
wrappedBuffer(new byte[] { 0, 1, 2, 4, 5 }, 3, 1).order(order)));
assertFalse(ByteBufUtil.equals(a, b));
// Same content, same firstIndex, long length.
a = freeLater(wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }).order(order));
b = freeLater(wrappedBuffer(
a = releaseLater(wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }).order(order));
b = releaseLater(wrappedBuffer(
wrappedBuffer(new byte[] { 1, 2, 3 }).order(order),
wrappedBuffer(new byte[] { 4, 5, 6 }).order(order),
wrappedBuffer(new byte[] { 7, 8, 9, 10 }).order(order)));
assertTrue(ByteBufUtil.equals(a, b));
// Same content, different firstIndex, long length.
a = freeLater(wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }).order(order));
b = freeLater(wrappedBuffer(
a = releaseLater(wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }).order(order));
b = releaseLater(wrappedBuffer(
wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 1, 5).order(order),
wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 6, 5).order(order)));
assertTrue(ByteBufUtil.equals(a, b));
// Different content, same firstIndex, long length.
a = freeLater(wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }).order(order));
b = freeLater(wrappedBuffer(
a = releaseLater(wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }).order(order));
b = releaseLater(wrappedBuffer(
wrappedBuffer(new byte[] { 1, 2, 3, 4, 6 }).order(order),
wrappedBuffer(new byte[] { 7, 8, 5, 9, 10 }).order(order)));
assertFalse(ByteBufUtil.equals(a, b));
// Different content, different firstIndex, long length.
a = freeLater(wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }).order(order));
b = freeLater(wrappedBuffer(
a = releaseLater(wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }).order(order));
b = releaseLater(wrappedBuffer(
wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 6, 7, 8, 5, 9, 10, 11 }, 1, 5).order(order),
wrappedBuffer(new byte[] { 0, 1, 2, 3, 4, 6, 7, 8, 5, 9, 10, 11 }, 6, 5).order(order)));
assertFalse(ByteBufUtil.equals(a, b));
@ -324,7 +325,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
assertEquals(
wrappedBuffer(wrappedBuffer(new byte[] { 1, 2, 3 }).order(order)),
freeLater(wrappedBuffer(wrappedBuffer(
releaseLater(wrappedBuffer(wrappedBuffer(
new byte[] { 1 },
new byte[] { 2 },
new byte[] { 3 }).order(order))));
@ -337,7 +338,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
assertEquals(
wrappedBuffer(wrappedBuffer(new byte[] { 1, 2, 3 }).order(order)),
freeLater(wrappedBuffer(
releaseLater(wrappedBuffer(
wrappedBuffer(new byte[] { 1 }).order(order),
wrappedBuffer(new byte[] { 2 }).order(order),
wrappedBuffer(new byte[] { 3 }).order(order))));
@ -350,7 +351,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
assertEquals(
wrappedBuffer(wrappedBuffer(new byte[] { 1, 2, 3 }).order(order)),
freeLater(wrappedBuffer(wrappedBuffer(
releaseLater(wrappedBuffer(wrappedBuffer(
ByteBuffer.wrap(new byte[] { 1 }),
ByteBuffer.wrap(new byte[] { 2 }),
ByteBuffer.wrap(new byte[] { 3 })))));
@ -387,7 +388,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
// Different content, same firstIndex, short length.
a = wrappedBuffer(new byte[] { 1, 2, 3 }).order(order);
b = freeLater(wrappedBuffer(wrappedBuffer(new byte[] { 1, 2 }, new byte[1]).order(order)));
b = releaseLater(wrappedBuffer(wrappedBuffer(new byte[] { 1, 2 }, new byte[1]).order(order)));
// to enable writeBytes
b.writerIndex(b.writerIndex() - 1);
b.writeBytes(wrappedBuffer(new byte[] { 4 }).order(order));
@ -403,7 +404,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
// Same content, same firstIndex, long length.
a = wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }).order(order);
b = freeLater(wrappedBuffer(wrappedBuffer(new byte[] { 1, 2, 3 }, new byte[7])).order(order));
b = releaseLater(wrappedBuffer(wrappedBuffer(new byte[] { 1, 2, 3 }, new byte[7])).order(order));
// to enable writeBytes
b.writerIndex(b.writerIndex() - 7);
b.writeBytes(wrappedBuffer(new byte[] { 4, 5, 6 }).order(order));
@ -420,7 +421,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
// Different content, same firstIndex, long length.
a = wrappedBuffer(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }).order(order);
b = freeLater(wrappedBuffer(wrappedBuffer(new byte[] { 1, 2, 3, 4, 6 }, new byte[5])).order(order));
b = releaseLater(wrappedBuffer(wrappedBuffer(new byte[] { 1, 2, 3, 4, 6 }, new byte[5])).order(order));
// to enable writeBytes
b.writerIndex(b.writerIndex() - 5);
b.writeBytes(wrappedBuffer(new byte[] { 7, 8, 5, 9, 10 }).order(order));
@ -437,7 +438,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
@Test
public void testEmptyBuffer() {
ByteBuf b = freeLater(wrappedBuffer(new byte[]{1, 2}, new byte[]{3, 4}));
ByteBuf b = releaseLater(wrappedBuffer(new byte[]{1, 2}, new byte[]{3, 4}));
b.readBytes(new byte[4]);
b.readBytes(EMPTY_BYTES);
}
@ -445,7 +446,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
// Test for https://github.com/netty/netty/issues/1060
@Test
public void testReadWithEmptyCompositeBuffer() {
ByteBuf buf = freeLater(compositeBuffer());
ByteBuf buf = releaseLater(compositeBuffer());
int n = 65;
for (int i = 0; i < n; i ++) {
buf.writeByte(1);
@ -455,7 +456,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
@Test
public void testComponentMustBeSlice() {
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
buf.addComponent(buffer(4).setIndex(1, 3));
assertThat(buf.component(0), is(instanceOf(SlicedByteBuf.class)));
assertThat(buf.component(0).capacity(), is(2));
@ -468,7 +469,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
ByteBuf c2 = buffer().writeByte(2).retain();
ByteBuf c3 = buffer().writeByte(3).retain(2);
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
assertThat(buf.refCnt(), is(1));
buf.addComponents(c1, c2, c3);
@ -529,7 +530,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
ByteBuf c2 = buffer().writeByte(2).retain();
ByteBuf c3 = buffer().writeByte(3).retain(2);
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
assertThat(buf.refCnt(), is(1));
List<ByteBuf> components = new ArrayList<ByteBuf>();
@ -551,7 +552,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
@Test
public void testNestedLayout() {
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
buf.addComponent(
compositeBuffer()
.addComponent(wrappedBuffer(new byte[]{1, 2}))
@ -567,7 +568,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
@Test
public void testRemoveLastComponent() {
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
buf.addComponent(wrappedBuffer(new byte[]{1, 2}));
assertEquals(1, buf.numComponents());
buf.removeComponent(0);
@ -576,21 +577,21 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
@Test
public void testCopyEmpty() {
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
assertEquals(0, buf.numComponents());
assertEquals(0, freeLater(buf.copy()).readableBytes());
assertEquals(0, releaseLater(buf.copy()).readableBytes());
}
@Test
public void testDuplicateEmpty() {
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
assertEquals(0, buf.numComponents());
assertEquals(0, freeLater(buf.duplicate()).readableBytes());
assertEquals(0, releaseLater(buf.duplicate()).readableBytes());
}
@Test
public void testRemoveLastComponentWithOthersLeft() {
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
buf.addComponent(wrappedBuffer(new byte[]{1, 2}));
buf.addComponent(wrappedBuffer(new byte[]{1, 2}));
assertEquals(2, buf.numComponents());
@ -632,7 +633,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
}
private static void testGatheringWrites(ByteBuf buf1, ByteBuf buf2) throws Exception {
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
buf.addComponent(buf1.writeBytes(new byte[]{1, 2}));
buf.addComponent(buf2.writeBytes(new byte[]{1, 2}));
buf.writerIndex(3);
@ -714,7 +715,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
}
private static void testGatheringWritesPartial(ByteBuf buf1, ByteBuf buf2, boolean slice) throws Exception {
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
buf1.writeBytes(new byte[]{1, 2, 3, 4});
buf2.writeBytes(new byte[]{1, 2, 3, 4});
if (slice) {
@ -757,7 +758,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
}
private static void testGatheringWritesSingleBuf(ByteBuf buf1) throws Exception {
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
buf.addComponent(buf1.writeBytes(new byte[]{1, 2, 3, 4}));
buf.writerIndex(3);
buf.readerIndex(1);
@ -778,7 +779,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
@Test
public void testisDirectMultipleBufs() {
CompositeByteBuf buf = freeLater(compositeBuffer());
CompositeByteBuf buf = releaseLater(compositeBuffer());
assertFalse(buf.isDirect());
buf.addComponent(directBuffer().writeByte(1));
@ -794,7 +795,7 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
// See https://github.com/netty/netty/issues/1976
@Test
public void testDiscardSomeReadBytes() {
CompositeByteBuf cbuf = freeLater(compositeBuffer());
CompositeByteBuf cbuf = releaseLater(compositeBuffer());
int len = 8 * 4;
for (int i = 0; i < len; i += 4) {
ByteBuf buf = Unpooled.buffer().writeInt(i);

View File

@ -15,11 +15,26 @@
*/
package io.netty.util;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Collection of method to handle objects that may implement {@link ReferenceCounted}.
*/
public final class ReferenceCountUtil {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ReferenceCountUtil.class);
/**
* Try to call {@link ReferenceCounted#retain()} if the specified message implements {@link ReferenceCounted}.
* If the specified message doesn't implement {@link ReferenceCounted}, this method does nothing.
@ -66,5 +81,102 @@ public final class ReferenceCountUtil {
return false;
}
private static final Map<Thread, List<Entry>> pendingReleases = new IdentityHashMap<Thread, List<Entry>>();
/**
* Schedules the specified object to be released when the caller thread terminates. Note that this operation is
* intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the
* intended use case.
*/
public static <T> T releaseLater(T msg) {
return releaseLater(msg, 1);
}
/**
* Schedules the specified object to be released when the caller thread terminates. Note that this operation is
* intended to simplify reference counting of ephemeral objects during unit tests. Do not use it beyond the
* intended use case.
*/
public static <T> T releaseLater(T msg, int decrement) {
if (msg instanceof ReferenceCounted) {
synchronized (pendingReleases) {
Thread thread = Thread.currentThread();
List<Entry> entries = pendingReleases.get(thread);
if (entries == null) {
// Start the periodic releasing task (if not started yet.)
if (pendingReleases.isEmpty()) {
ReleasingTask task = new ReleasingTask();
task.future = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(task, 1, 1, TimeUnit.SECONDS);
}
// Create a new entry.
entries = new ArrayList<Entry>();
pendingReleases.put(thread, entries);
}
entries.add(new Entry((ReferenceCounted) msg, decrement));
}
}
return msg;
}
private static final class Entry {
final ReferenceCounted obj;
final int decrement;
Entry(ReferenceCounted obj, int decrement) {
this.obj = obj;
this.decrement = decrement;
}
public String toString() {
return StringUtil.simpleClassName(obj) + ".release(" + decrement + ") refCnt: " + obj.refCnt();
}
}
/**
* Releases the objects when the thread that called {@link #releaseLater(Object)} has been terminated.
*/
private static final class ReleasingTask implements Runnable {
volatile ScheduledFuture<?> future;
@Override
public void run() {
synchronized (pendingReleases) {
for (Iterator<Map.Entry<Thread, List<Entry>>> i = pendingReleases.entrySet().iterator();
i.hasNext();) {
Map.Entry<Thread, List<Entry>> e = i.next();
if (e.getKey().isAlive()) {
continue;
}
releaseAll(e.getValue());
// Remove from the map since the thread is not alive anymore.
i.remove();
}
if (pendingReleases.isEmpty()) {
future.cancel(false);
}
}
}
private static void releaseAll(Iterable<Entry> entries) {
for (Entry e: entries) {
try {
if (!e.obj.release(e.decrement)) {
logger.warn("Non-zero refCnt: {}", e);
} else {
logger.warn("Released: {}", e);
}
} catch (Exception ex) {
logger.warn("Failed to release an object: {}", e.obj, ex);
}
}
}
}
private ReferenceCountUtil() { }
}