retained[Slice|Duplicate] buffer reference count bug
Motivation: Currently the ByteBuf created as a result of retained[Slice|Duplicate] maintains its own reference count, and when this reference count is depleated it will release the ByteBuf returned from unwrap(). The unwrap() buffer is designed to be the 'root parent' and will skip all intermediate layers of buffers. If the intermediate layers of buffers contain a retained[Slice|Duplicate] then these reference counts will be ignored during deallocation. This may lead to deallocating the 'root parent' before all derived pooled buffers are actually released. This same issue holds if a retained[Slice|Duplicate] is in the heirachy and a 'regular' slice() or duplicate() buffer is created. Modifications: - AbstractPooledDerivedByteBuf must maintain a reference to the direct parent (the buffer which retained[Slice|Duplicate] was called on) and release on this buffer instead of the 'root parent' returned by unwrap() - slice() and duplicate() buffers created from AbstractPooledDerivedByteBuf must also delegate reference count operations to their immediate parent (or first ancestor which maintains an independent reference count). Result: Fixes https://github.com/netty/netty/issues/5999
This commit is contained in:
parent
e7631867d3
commit
4bba7526e2
@ -33,35 +33,59 @@ public abstract class AbstractDerivedByteBuf extends AbstractByteBuf {
|
||||
|
||||
@Override
|
||||
public final int refCnt() {
|
||||
return refCnt0();
|
||||
}
|
||||
|
||||
int refCnt0() {
|
||||
return unwrap().refCnt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ByteBuf retain() {
|
||||
return retain0();
|
||||
}
|
||||
|
||||
ByteBuf retain0() {
|
||||
unwrap().retain();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ByteBuf retain(int increment) {
|
||||
return retain0(increment);
|
||||
}
|
||||
|
||||
ByteBuf retain0(int increment) {
|
||||
unwrap().retain(increment);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ByteBuf touch() {
|
||||
return touch0();
|
||||
}
|
||||
|
||||
ByteBuf touch0() {
|
||||
unwrap().touch();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final ByteBuf touch(Object hint) {
|
||||
return touch0(hint);
|
||||
}
|
||||
|
||||
ByteBuf touch0(Object hint) {
|
||||
unwrap().touch(hint);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean release() {
|
||||
return release0();
|
||||
}
|
||||
|
||||
boolean release0() {
|
||||
return unwrap().release();
|
||||
}
|
||||
|
||||
@ -70,6 +94,10 @@ public abstract class AbstractDerivedByteBuf extends AbstractByteBuf {
|
||||
return unwrap().release(decrement);
|
||||
}
|
||||
|
||||
boolean release0(int decrement) {
|
||||
return unwrap().release(decrement);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReadOnly() {
|
||||
return unwrap().isReadOnly();
|
||||
|
@ -17,6 +17,7 @@
|
||||
package io.netty.buffer;
|
||||
|
||||
import io.netty.util.Recycler.Handle;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
@ -27,7 +28,14 @@ import java.nio.ByteOrder;
|
||||
abstract class AbstractPooledDerivedByteBuf extends AbstractReferenceCountedByteBuf {
|
||||
|
||||
private final Handle<AbstractPooledDerivedByteBuf> recyclerHandle;
|
||||
private AbstractByteBuf buffer;
|
||||
private AbstractByteBuf rootParent;
|
||||
/**
|
||||
* Deallocations of a pooled derived buffer should always propagate through the entire chain of derived buffers.
|
||||
* This is because each pooled derived buffer maintains its own reference count and we should respect each one.
|
||||
* If deallocations cause a release of the "root parent" then then we may prematurely release the underlying
|
||||
* content before all the derived buffers have been released.
|
||||
*/
|
||||
private ByteBuf parent;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
AbstractPooledDerivedByteBuf(Handle<? extends AbstractPooledDerivedByteBuf> recyclerHandle) {
|
||||
@ -37,14 +45,14 @@ abstract class AbstractPooledDerivedByteBuf extends AbstractReferenceCountedByte
|
||||
|
||||
@Override
|
||||
public final AbstractByteBuf unwrap() {
|
||||
return buffer;
|
||||
return rootParent;
|
||||
}
|
||||
|
||||
final <U extends AbstractPooledDerivedByteBuf> U init(
|
||||
AbstractByteBuf unwrapped, ByteBuf wrapped, int readerIndex, int writerIndex, int maxCapacity) {
|
||||
|
||||
wrapped.retain(); // Retain up front to ensure the wrapped buffer is accessible before doing more work.
|
||||
this.buffer = unwrapped;
|
||||
wrapped.retain(); // Retain up front to ensure the parent is accessible before doing more work.
|
||||
parent = wrapped;
|
||||
rootParent = unwrapped;
|
||||
|
||||
try {
|
||||
maxCapacity(maxCapacity);
|
||||
@ -57,7 +65,7 @@ abstract class AbstractPooledDerivedByteBuf extends AbstractReferenceCountedByte
|
||||
return castThis;
|
||||
} finally {
|
||||
if (wrapped != null) {
|
||||
this.buffer = null;
|
||||
parent = rootParent = null;
|
||||
wrapped.release();
|
||||
}
|
||||
}
|
||||
@ -65,12 +73,12 @@ abstract class AbstractPooledDerivedByteBuf extends AbstractReferenceCountedByte
|
||||
|
||||
@Override
|
||||
protected final void deallocate() {
|
||||
// We need to first store a reference to the wrapped buffer before recycle this instance. This is needed as
|
||||
// We need to first store a reference to the parent before recycle this instance. This is needed as
|
||||
// otherwise it is possible that the same AbstractPooledDerivedByteBuf is again obtained and init(...) is
|
||||
// called before we actually have a chance to call release(). This leads to call release() on the wrong buffer.
|
||||
ByteBuf wrapped = unwrap();
|
||||
// called before we actually have a chance to call release(). This leads to call release() on the wrong parent.
|
||||
ByteBuf parent = this.parent;
|
||||
recyclerHandle.recycle(this);
|
||||
wrapped.release();
|
||||
parent.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -124,4 +132,169 @@ abstract class AbstractPooledDerivedByteBuf extends AbstractReferenceCountedByte
|
||||
final int index = readerIndex();
|
||||
return retainedSlice(index, writerIndex() - index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf slice(int index, int length) {
|
||||
// All reference count methods should be inherited from this object (this is the "parent").
|
||||
return new PooledNonRetainedSlicedByteBuf(this, unwrap(), index, length);
|
||||
}
|
||||
|
||||
final ByteBuf duplicate0() {
|
||||
// All reference count methods should be inherited from this object (this is the "parent").
|
||||
return new PooledNonRetainedDuplicateByteBuf(this, unwrap());
|
||||
}
|
||||
|
||||
private static final class PooledNonRetainedDuplicateByteBuf extends UnpooledDuplicatedByteBuf {
|
||||
private final ReferenceCounted referenceCountDelegate;
|
||||
|
||||
PooledNonRetainedDuplicateByteBuf(ReferenceCounted referenceCountDelegate, AbstractByteBuf buffer) {
|
||||
super(buffer);
|
||||
this.referenceCountDelegate = referenceCountDelegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
int refCnt0() {
|
||||
return referenceCountDelegate.refCnt();
|
||||
}
|
||||
|
||||
@Override
|
||||
ByteBuf retain0() {
|
||||
referenceCountDelegate.retain();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
ByteBuf retain0(int increment) {
|
||||
referenceCountDelegate.retain(increment);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
ByteBuf touch0() {
|
||||
referenceCountDelegate.touch();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
ByteBuf touch0(Object hint) {
|
||||
referenceCountDelegate.touch(hint);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean release0() {
|
||||
return referenceCountDelegate.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean release0(int decrement) {
|
||||
return referenceCountDelegate.release(decrement);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf duplicate() {
|
||||
return new PooledNonRetainedDuplicateByteBuf(referenceCountDelegate, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf retainedDuplicate() {
|
||||
return PooledDuplicatedByteBuf.newInstance(unwrap(), this, readerIndex(), writerIndex());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf slice(int index, int length) {
|
||||
checkIndex0(index, length);
|
||||
return new PooledNonRetainedSlicedByteBuf(referenceCountDelegate, unwrap(), index, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf retainedSlice() {
|
||||
// Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
|
||||
return retainedSlice(readerIndex(), capacity());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf retainedSlice(int index, int length) {
|
||||
return PooledSlicedByteBuf.newInstance(unwrap(), this, index, length);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class PooledNonRetainedSlicedByteBuf extends UnpooledSlicedByteBuf {
|
||||
private final ReferenceCounted referenceCountDelegate;
|
||||
|
||||
PooledNonRetainedSlicedByteBuf(ReferenceCounted referenceCountDelegate,
|
||||
AbstractByteBuf buffer, int index, int length) {
|
||||
super(buffer, index, length);
|
||||
this.referenceCountDelegate = referenceCountDelegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
int refCnt0() {
|
||||
return referenceCountDelegate.refCnt();
|
||||
}
|
||||
|
||||
@Override
|
||||
ByteBuf retain0() {
|
||||
referenceCountDelegate.retain();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
ByteBuf retain0(int increment) {
|
||||
referenceCountDelegate.retain(increment);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
ByteBuf touch0() {
|
||||
referenceCountDelegate.touch();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
ByteBuf touch0(Object hint) {
|
||||
referenceCountDelegate.touch(hint);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean release0() {
|
||||
return referenceCountDelegate.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean release0(int decrement) {
|
||||
return referenceCountDelegate.release(decrement);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf duplicate() {
|
||||
// Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
|
||||
final ByteBuf duplicate = slice(0, capacity());
|
||||
duplicate.setIndex(readerIndex(), writerIndex());
|
||||
return duplicate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf retainedDuplicate() {
|
||||
return PooledDuplicatedByteBuf.newInstance(unwrap(), this, readerIndex(), writerIndex());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf slice(int index, int length) {
|
||||
checkIndex0(index, length);
|
||||
return new PooledNonRetainedSlicedByteBuf(referenceCountDelegate, unwrap(), idx(index), length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf retainedSlice() {
|
||||
// Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
|
||||
return retainedSlice(0, capacity());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf retainedSlice(int index, int length) {
|
||||
return PooledSlicedByteBuf.newInstance(unwrap(), this, idx(index), length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ final class PooledDuplicatedByteBuf extends AbstractPooledDerivedByteBuf {
|
||||
static PooledDuplicatedByteBuf newInstance(AbstractByteBuf unwrapped, ByteBuf wrapped,
|
||||
int readerIndex, int writerIndex) {
|
||||
final PooledDuplicatedByteBuf duplicate = RECYCLER.get();
|
||||
duplicate.init(unwrapped, wrapped, readerIndex, writerIndex, wrapped.maxCapacity());
|
||||
duplicate.init(unwrapped, wrapped, readerIndex, writerIndex, unwrapped.maxCapacity());
|
||||
duplicate.markReaderIndex();
|
||||
duplicate.markWriterIndex();
|
||||
|
||||
@ -88,13 +88,15 @@ final class PooledDuplicatedByteBuf extends AbstractPooledDerivedByteBuf {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf slice(int index, int length) {
|
||||
return unwrap().slice(index, length);
|
||||
public ByteBuf retainedSlice(int index, int length) {
|
||||
return PooledSlicedByteBuf.newInstance(unwrap(), this, index, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf retainedSlice(int index, int length) {
|
||||
return PooledSlicedByteBuf.newInstance(unwrap(), this, index, length);
|
||||
public ByteBuf duplicate() {
|
||||
ByteBuf duplicate = duplicate0();
|
||||
duplicate.setIndex(readerIndex(), writerIndex());
|
||||
return duplicate;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -102,7 +102,7 @@ final class PooledSlicedByteBuf extends AbstractPooledDerivedByteBuf {
|
||||
@Override
|
||||
public ByteBuf slice(int index, int length) {
|
||||
checkIndex0(index, length);
|
||||
return unwrap().slice(idx(index), length);
|
||||
return super.slice(idx(index), length);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -113,18 +113,15 @@ final class PooledSlicedByteBuf extends AbstractPooledDerivedByteBuf {
|
||||
|
||||
@Override
|
||||
public ByteBuf duplicate() {
|
||||
// Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
|
||||
final ByteBuf duplicate = unwrap().slice(adjustment, capacity());
|
||||
duplicate.setIndex(readerIndex(), writerIndex());
|
||||
ByteBuf duplicate = duplicate0();
|
||||
duplicate.setIndex(idx(readerIndex()), adjustment + capacity());
|
||||
return duplicate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf retainedDuplicate() {
|
||||
// Capacity is not allowed to change for a sliced ByteBuf, so length == capacity()
|
||||
final ByteBuf duplicate = retainedSlice(0, capacity());
|
||||
duplicate.setIndex(readerIndex(), writerIndex());
|
||||
return duplicate;
|
||||
return PooledDuplicatedByteBuf.newInstance(unwrap(), this, idx(readerIndex()), adjustment + capacity());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -19,7 +19,7 @@ package io.netty.buffer;
|
||||
* {@link DuplicatedByteBuf} implementation that can do optimizations because it knows the duplicated buffer
|
||||
* is of type {@link AbstractByteBuf}.
|
||||
*/
|
||||
final class UnpooledDuplicatedByteBuf extends DuplicatedByteBuf {
|
||||
class UnpooledDuplicatedByteBuf extends DuplicatedByteBuf {
|
||||
UnpooledDuplicatedByteBuf(AbstractByteBuf buffer) {
|
||||
super(buffer);
|
||||
}
|
||||
|
@ -19,8 +19,7 @@ package io.netty.buffer;
|
||||
* A special {@link AbstractUnpooledSlicedByteBuf} that can make optimizations because it knows the sliced buffer is of
|
||||
* type {@link AbstractByteBuf}.
|
||||
*/
|
||||
final class UnpooledSlicedByteBuf extends AbstractUnpooledSlicedByteBuf {
|
||||
|
||||
class UnpooledSlicedByteBuf extends AbstractUnpooledSlicedByteBuf {
|
||||
UnpooledSlicedByteBuf(AbstractByteBuf buffer, int index, int length) {
|
||||
super(buffer, index, length);
|
||||
}
|
||||
|
@ -19,7 +19,6 @@ import io.netty.util.ByteProcessor;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import io.netty.util.IllegalReferenceCountException;
|
||||
import io.netty.util.internal.ThreadLocalRandom;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
@ -48,11 +47,22 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static io.netty.buffer.Unpooled.*;
|
||||
import static io.netty.util.ReferenceCountUtil.*;
|
||||
import static io.netty.util.internal.EmptyArrays.*;
|
||||
import static org.hamcrest.CoreMatchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
import static io.netty.buffer.Unpooled.LITTLE_ENDIAN;
|
||||
import static io.netty.buffer.Unpooled.buffer;
|
||||
import static io.netty.buffer.Unpooled.copiedBuffer;
|
||||
import static io.netty.buffer.Unpooled.directBuffer;
|
||||
import static io.netty.buffer.Unpooled.wrappedBuffer;
|
||||
import static io.netty.util.ReferenceCountUtil.releaseLater;
|
||||
import static io.netty.util.internal.EmptyArrays.EMPTY_BYTES;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* An abstract test class for channel buffers
|
||||
@ -2921,11 +2931,234 @@ public abstract class AbstractByteBufTest {
|
||||
testSliceOutOfBounds(false, false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainedSliceAndRetainedDuplicateContentIsExpected() {
|
||||
ByteBuf buf = newBuffer(8).resetWriterIndex();
|
||||
ByteBuf expected1 = newBuffer(6).resetWriterIndex();
|
||||
ByteBuf expected2 = newBuffer(5).resetWriterIndex();
|
||||
ByteBuf expected3 = newBuffer(4).resetWriterIndex();
|
||||
ByteBuf expected4 = newBuffer(3).resetWriterIndex();
|
||||
buf.writeBytes(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
|
||||
expected1.writeBytes(new byte[] {2, 3, 4, 5, 6, 7});
|
||||
expected2.writeBytes(new byte[] {3, 4, 5, 6, 7});
|
||||
expected3.writeBytes(new byte[] {4, 5, 6, 7});
|
||||
expected4.writeBytes(new byte[] {5, 6, 7});
|
||||
|
||||
ByteBuf slice1 = buf.retainedSlice(buf.readerIndex() + 1, 6);
|
||||
assertEquals(0, slice1.compareTo(expected1));
|
||||
assertEquals(0, slice1.compareTo(buf.slice(buf.readerIndex() + 1, 6)));
|
||||
// Simulate a handler that releases the original buffer, and propagates a slice.
|
||||
buf.release();
|
||||
|
||||
// Advance the reader index on the slice.
|
||||
slice1.readByte();
|
||||
|
||||
ByteBuf dup1 = slice1.retainedDuplicate();
|
||||
assertEquals(0, dup1.compareTo(expected2));
|
||||
assertEquals(0, dup1.compareTo(slice1.duplicate()));
|
||||
|
||||
// Advance the reader index on dup1.
|
||||
dup1.readByte();
|
||||
|
||||
ByteBuf dup2 = dup1.duplicate();
|
||||
assertEquals(0, dup2.compareTo(expected3));
|
||||
|
||||
// Advance the reader index on dup2.
|
||||
dup2.readByte();
|
||||
|
||||
ByteBuf slice2 = dup2.retainedSlice(dup2.readerIndex(), 3);
|
||||
assertEquals(0, slice2.compareTo(expected4));
|
||||
assertEquals(0, slice2.compareTo(dup2.slice(dup2.readerIndex(), 3)));
|
||||
|
||||
// Cleanup the expected buffers used for testing.
|
||||
assertTrue(expected1.release());
|
||||
assertTrue(expected2.release());
|
||||
assertTrue(expected3.release());
|
||||
assertTrue(expected4.release());
|
||||
|
||||
slice2.release();
|
||||
dup2.release();
|
||||
|
||||
assertEquals(slice2.refCnt(), dup2.refCnt());
|
||||
assertEquals(dup2.refCnt(), dup1.refCnt());
|
||||
|
||||
// The handler is now done with the original slice
|
||||
assertTrue(slice1.release());
|
||||
|
||||
// Reference counting may be shared, or may be independently tracked, but at this point all buffers should
|
||||
// be deallocated and have a reference count of 0.
|
||||
assertEquals(0, buf.refCnt());
|
||||
assertEquals(0, slice1.refCnt());
|
||||
assertEquals(0, slice2.refCnt());
|
||||
assertEquals(0, dup1.refCnt());
|
||||
assertEquals(0, dup2.refCnt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainedDuplicateAndRetainedSliceContentIsExpected() {
|
||||
ByteBuf buf = newBuffer(8).resetWriterIndex();
|
||||
ByteBuf expected1 = newBuffer(6).resetWriterIndex();
|
||||
ByteBuf expected2 = newBuffer(5).resetWriterIndex();
|
||||
ByteBuf expected3 = newBuffer(4).resetWriterIndex();
|
||||
buf.writeBytes(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
|
||||
expected1.writeBytes(new byte[] {2, 3, 4, 5, 6, 7});
|
||||
expected2.writeBytes(new byte[] {3, 4, 5, 6, 7});
|
||||
expected3.writeBytes(new byte[] {5, 6, 7});
|
||||
|
||||
ByteBuf dup1 = buf.retainedDuplicate();
|
||||
assertEquals(0, dup1.compareTo(buf));
|
||||
assertEquals(0, dup1.compareTo(buf.slice()));
|
||||
// Simulate a handler that releases the original buffer, and propagates a slice.
|
||||
buf.release();
|
||||
|
||||
// Advance the reader index on the dup.
|
||||
dup1.readByte();
|
||||
|
||||
ByteBuf slice1 = dup1.retainedSlice(dup1.readerIndex(), 6);
|
||||
assertEquals(0, slice1.compareTo(expected1));
|
||||
assertEquals(0, slice1.compareTo(slice1.duplicate()));
|
||||
|
||||
// Advance the reader index on slice1.
|
||||
slice1.readByte();
|
||||
|
||||
ByteBuf dup2 = slice1.duplicate();
|
||||
assertEquals(0, dup2.compareTo(slice1));
|
||||
|
||||
// Advance the reader index on dup2.
|
||||
dup2.readByte();
|
||||
|
||||
ByteBuf slice2 = dup2.retainedSlice(dup2.readerIndex() + 1, 3);
|
||||
assertEquals(0, slice2.compareTo(expected3));
|
||||
assertEquals(0, slice2.compareTo(dup2.slice(dup2.readerIndex() + 1, 3)));
|
||||
|
||||
// Cleanup the expected buffers used for testing.
|
||||
assertTrue(expected1.release());
|
||||
assertTrue(expected2.release());
|
||||
assertTrue(expected3.release());
|
||||
|
||||
slice2.release();
|
||||
slice1.release();
|
||||
|
||||
assertEquals(slice2.refCnt(), dup2.refCnt());
|
||||
assertEquals(dup2.refCnt(), slice1.refCnt());
|
||||
|
||||
// The handler is now done with the original slice
|
||||
assertTrue(dup1.release());
|
||||
|
||||
// Reference counting may be shared, or may be independently tracked, but at this point all buffers should
|
||||
// be deallocated and have a reference count of 0.
|
||||
assertEquals(0, buf.refCnt());
|
||||
assertEquals(0, slice1.refCnt());
|
||||
assertEquals(0, slice2.refCnt());
|
||||
assertEquals(0, dup1.refCnt());
|
||||
assertEquals(0, dup2.refCnt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainedSliceContents() {
|
||||
testSliceContents(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleLevelRetainedSlice1() {
|
||||
testMultipleLevelRetainedSliceWithNonRetained(true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleLevelRetainedSlice2() {
|
||||
testMultipleLevelRetainedSliceWithNonRetained(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleLevelRetainedSlice3() {
|
||||
testMultipleLevelRetainedSliceWithNonRetained(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleLevelRetainedSlice4() {
|
||||
testMultipleLevelRetainedSliceWithNonRetained(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainedSliceReleaseOriginal1() {
|
||||
testSliceReleaseOriginal(true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainedSliceReleaseOriginal2() {
|
||||
testSliceReleaseOriginal(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainedSliceReleaseOriginal3() {
|
||||
testSliceReleaseOriginal(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainedSliceReleaseOriginal4() {
|
||||
testSliceReleaseOriginal(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainedDuplicateReleaseOriginal1() {
|
||||
testDuplicateReleaseOriginal(true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainedDuplicateReleaseOriginal2() {
|
||||
testDuplicateReleaseOriginal(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainedDuplicateReleaseOriginal3() {
|
||||
testDuplicateReleaseOriginal(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRetainedDuplicateReleaseOriginal4() {
|
||||
testDuplicateReleaseOriginal(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRetainedSliceReleaseOriginal1() {
|
||||
testMultipleRetainedSliceReleaseOriginal(true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRetainedSliceReleaseOriginal2() {
|
||||
testMultipleRetainedSliceReleaseOriginal(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRetainedSliceReleaseOriginal3() {
|
||||
testMultipleRetainedSliceReleaseOriginal(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRetainedSliceReleaseOriginal4() {
|
||||
testMultipleRetainedSliceReleaseOriginal(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRetainedDuplicateReleaseOriginal1() {
|
||||
testMultipleRetainedDuplicateReleaseOriginal(true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRetainedDuplicateReleaseOriginal2() {
|
||||
testMultipleRetainedDuplicateReleaseOriginal(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRetainedDuplicateReleaseOriginal3() {
|
||||
testMultipleRetainedDuplicateReleaseOriginal(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRetainedDuplicateReleaseOriginal4() {
|
||||
testMultipleRetainedDuplicateReleaseOriginal(false, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSliceContents() {
|
||||
testSliceContents(false);
|
||||
@ -3030,6 +3263,214 @@ public abstract class AbstractByteBufTest {
|
||||
}
|
||||
}
|
||||
|
||||
private void testSliceReleaseOriginal(boolean retainedSlice1, boolean retainedSlice2) {
|
||||
ByteBuf buf = newBuffer(8).resetWriterIndex();
|
||||
ByteBuf expected1 = newBuffer(3).resetWriterIndex();
|
||||
ByteBuf expected2 = newBuffer(2).resetWriterIndex();
|
||||
buf.writeBytes(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
|
||||
expected1.writeBytes(new byte[] {6, 7, 8});
|
||||
expected2.writeBytes(new byte[] {7, 8});
|
||||
ByteBuf slice1 = retainedSlice1 ? buf.retainedSlice(buf.readerIndex() + 5, 3)
|
||||
: buf.slice(buf.readerIndex() + 5, 3).retain();
|
||||
assertEquals(0, slice1.compareTo(expected1));
|
||||
// Simulate a handler that releases the original buffer, and propagates a slice.
|
||||
buf.release();
|
||||
|
||||
ByteBuf slice2 = retainedSlice2 ? slice1.retainedSlice(slice1.readerIndex() + 1, 2)
|
||||
: slice1.slice(slice1.readerIndex() + 1, 2).retain();
|
||||
assertEquals(0, slice2.compareTo(expected2));
|
||||
|
||||
// Cleanup the expected buffers used for testing.
|
||||
assertTrue(expected1.release());
|
||||
assertTrue(expected2.release());
|
||||
|
||||
// The handler created a slice of the slice and is now done with it.
|
||||
slice2.release();
|
||||
|
||||
// The handler is now done with the original slice
|
||||
assertTrue(slice1.release());
|
||||
|
||||
// Reference counting may be shared, or may be independently tracked, but at this point all buffers should
|
||||
// be deallocated and have a reference count of 0.
|
||||
assertEquals(0, buf.refCnt());
|
||||
assertEquals(0, slice1.refCnt());
|
||||
assertEquals(0, slice2.refCnt());
|
||||
}
|
||||
|
||||
private void testMultipleLevelRetainedSliceWithNonRetained(boolean doSlice1, boolean doSlice2) {
|
||||
ByteBuf buf = newBuffer(8).resetWriterIndex();
|
||||
ByteBuf expected1 = newBuffer(6).resetWriterIndex();
|
||||
ByteBuf expected2 = newBuffer(4).resetWriterIndex();
|
||||
ByteBuf expected3 = newBuffer(2).resetWriterIndex();
|
||||
ByteBuf expected4SliceSlice = newBuffer(1).resetWriterIndex();
|
||||
ByteBuf expected4DupSlice = newBuffer(1).resetWriterIndex();
|
||||
buf.writeBytes(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
|
||||
expected1.writeBytes(new byte[] {2, 3, 4, 5, 6, 7});
|
||||
expected2.writeBytes(new byte[] {3, 4, 5, 6});
|
||||
expected3.writeBytes(new byte[] {4, 5});
|
||||
expected4SliceSlice.writeBytes(new byte[] {5});
|
||||
expected4DupSlice.writeBytes(new byte[] {4});
|
||||
|
||||
ByteBuf slice1 = buf.retainedSlice(buf.readerIndex() + 1, 6);
|
||||
assertEquals(0, slice1.compareTo(expected1));
|
||||
// Simulate a handler that releases the original buffer, and propagates a slice.
|
||||
buf.release();
|
||||
|
||||
ByteBuf slice2 = slice1.retainedSlice(slice1.readerIndex() + 1, 4);
|
||||
assertEquals(0, slice2.compareTo(expected2));
|
||||
|
||||
ByteBuf slice3 = doSlice1 ? slice2.slice(slice2.readerIndex() + 1, 2) : slice2.duplicate();
|
||||
if (doSlice1) {
|
||||
assertEquals(0, slice3.compareTo(expected3));
|
||||
} else {
|
||||
assertEquals(0, slice3.compareTo(expected2));
|
||||
}
|
||||
|
||||
ByteBuf slice4 = doSlice2 ? slice3.slice(slice3.readerIndex() + 1, 1) : slice3.duplicate();
|
||||
if (doSlice1 && doSlice2) {
|
||||
assertEquals(0, slice4.compareTo(expected4SliceSlice));
|
||||
} else if (doSlice2) {
|
||||
assertEquals(0, slice4.compareTo(expected4DupSlice));
|
||||
} else {
|
||||
assertEquals(0, slice3.compareTo(slice4));
|
||||
}
|
||||
|
||||
// Cleanup the expected buffers used for testing.
|
||||
assertTrue(expected1.release());
|
||||
assertTrue(expected2.release());
|
||||
assertTrue(expected3.release());
|
||||
assertTrue(expected4SliceSlice.release());
|
||||
assertTrue(expected4DupSlice.release());
|
||||
|
||||
// Slice 4, 3, and 2 should effectively "share" a reference count.
|
||||
slice4.release();
|
||||
assertEquals(slice3.refCnt(), slice2.refCnt());
|
||||
assertEquals(slice3.refCnt(), slice4.refCnt());
|
||||
|
||||
// Slice 1 should also release the original underlying buffer without throwing exceptions
|
||||
assertTrue(slice1.release());
|
||||
|
||||
// Reference counting may be shared, or may be independently tracked, but at this point all buffers should
|
||||
// be deallocated and have a reference count of 0.
|
||||
assertEquals(0, buf.refCnt());
|
||||
assertEquals(0, slice1.refCnt());
|
||||
assertEquals(0, slice2.refCnt());
|
||||
assertEquals(0, slice3.refCnt());
|
||||
}
|
||||
|
||||
private void testDuplicateReleaseOriginal(boolean retainedDuplicate1, boolean retainedDuplicate2) {
|
||||
ByteBuf buf = newBuffer(8).resetWriterIndex();
|
||||
ByteBuf expected = newBuffer(8).resetWriterIndex();
|
||||
buf.writeBytes(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
|
||||
expected.writeBytes(buf, buf.readerIndex(), buf.readableBytes());
|
||||
ByteBuf dup1 = retainedDuplicate1 ? buf.retainedDuplicate()
|
||||
: buf.duplicate().retain();
|
||||
assertEquals(0, dup1.compareTo(expected));
|
||||
// Simulate a handler that releases the original buffer, and propagates a slice.
|
||||
buf.release();
|
||||
|
||||
ByteBuf dup2 = retainedDuplicate2 ? dup1.retainedDuplicate()
|
||||
: dup1.duplicate().retain();
|
||||
assertEquals(0, dup2.compareTo(expected));
|
||||
|
||||
// Cleanup the expected buffers used for testing.
|
||||
assertTrue(expected.release());
|
||||
|
||||
// The handler created a slice of the slice and is now done with it.
|
||||
dup2.release();
|
||||
|
||||
// The handler is now done with the original slice
|
||||
assertTrue(dup1.release());
|
||||
|
||||
// Reference counting may be shared, or may be independently tracked, but at this point all buffers should
|
||||
// be deallocated and have a reference count of 0.
|
||||
assertEquals(0, buf.refCnt());
|
||||
assertEquals(0, dup1.refCnt());
|
||||
assertEquals(0, dup2.refCnt());
|
||||
}
|
||||
|
||||
private void testMultipleRetainedSliceReleaseOriginal(boolean retainedSlice1, boolean retainedSlice2) {
|
||||
ByteBuf buf = newBuffer(8).resetWriterIndex();
|
||||
ByteBuf expected1 = newBuffer(3).resetWriterIndex();
|
||||
ByteBuf expected2 = newBuffer(2).resetWriterIndex();
|
||||
ByteBuf expected3 = newBuffer(2).resetWriterIndex();
|
||||
buf.writeBytes(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
|
||||
expected1.writeBytes(new byte[] {6, 7, 8});
|
||||
expected2.writeBytes(new byte[] {7, 8});
|
||||
expected3.writeBytes(new byte[] {6, 7});
|
||||
ByteBuf slice1 = retainedSlice1 ? buf.retainedSlice(buf.readerIndex() + 5, 3)
|
||||
: buf.slice(buf.readerIndex() + 5, 3).retain();
|
||||
assertEquals(0, slice1.compareTo(expected1));
|
||||
// Simulate a handler that releases the original buffer, and propagates a slice.
|
||||
buf.release();
|
||||
|
||||
ByteBuf slice2 = retainedSlice2 ? slice1.retainedSlice(slice1.readerIndex() + 1, 2)
|
||||
: slice1.slice(slice1.readerIndex() + 1, 2).retain();
|
||||
assertEquals(0, slice2.compareTo(expected2));
|
||||
|
||||
// The handler created a slice of the slice and is now done with it.
|
||||
slice2.release();
|
||||
|
||||
ByteBuf slice3 = slice1.retainedSlice(slice1.readerIndex(), 2);
|
||||
assertEquals(0, slice3.compareTo(expected3));
|
||||
|
||||
// The handler created another slice of the slice and is now done with it.
|
||||
slice3.release();
|
||||
|
||||
// The handler is now done with the original slice
|
||||
assertTrue(slice1.release());
|
||||
|
||||
// Cleanup the expected buffers used for testing.
|
||||
assertTrue(expected1.release());
|
||||
assertTrue(expected2.release());
|
||||
assertTrue(expected3.release());
|
||||
|
||||
// Reference counting may be shared, or may be independently tracked, but at this point all buffers should
|
||||
// be deallocated and have a reference count of 0.
|
||||
assertEquals(0, buf.refCnt());
|
||||
assertEquals(0, slice1.refCnt());
|
||||
assertEquals(0, slice2.refCnt());
|
||||
assertEquals(0, slice3.refCnt());
|
||||
}
|
||||
|
||||
private void testMultipleRetainedDuplicateReleaseOriginal(boolean retainedDuplicate1, boolean retainedDuplicate2) {
|
||||
ByteBuf buf = newBuffer(8).resetWriterIndex();
|
||||
ByteBuf expected = newBuffer(8).resetWriterIndex();
|
||||
buf.writeBytes(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
|
||||
expected.writeBytes(buf, buf.readerIndex(), buf.readableBytes());
|
||||
ByteBuf dup1 = retainedDuplicate1 ? buf.retainedDuplicate()
|
||||
: buf.duplicate().retain();
|
||||
assertEquals(0, dup1.compareTo(expected));
|
||||
// Simulate a handler that releases the original buffer, and propagates a slice.
|
||||
buf.release();
|
||||
|
||||
ByteBuf dup2 = retainedDuplicate2 ? dup1.retainedDuplicate()
|
||||
: dup1.duplicate().retain();
|
||||
assertEquals(0, dup2.compareTo(expected));
|
||||
|
||||
// The handler created a slice of the slice and is now done with it.
|
||||
dup2.release();
|
||||
|
||||
ByteBuf dup3 = dup1.retainedDuplicate();
|
||||
assertEquals(0, dup3.compareTo(expected));
|
||||
|
||||
// The handler created another slice of the slice and is now done with it.
|
||||
dup3.release();
|
||||
|
||||
// The handler is now done with the original slice
|
||||
assertTrue(dup1.release());
|
||||
|
||||
// Cleanup the expected buffers used for testing.
|
||||
assertTrue(expected.release());
|
||||
|
||||
// Reference counting may be shared, or may be independently tracked, but at this point all buffers should
|
||||
// be deallocated and have a reference count of 0.
|
||||
assertEquals(0, buf.refCnt());
|
||||
assertEquals(0, dup1.refCnt());
|
||||
assertEquals(0, dup2.refCnt());
|
||||
assertEquals(0, dup3.refCnt());
|
||||
}
|
||||
|
||||
private void testDuplicateContents(boolean retainedDuplicate) {
|
||||
ByteBuf buf = releaseLater(newBuffer(8)).resetWriterIndex();
|
||||
buf.writeBytes(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
|
||||
|
@ -24,7 +24,8 @@ public class RetainedSlicedByteBufTest extends SlicedByteBufTest {
|
||||
@Override
|
||||
protected ByteBuf newBuffer(int length) {
|
||||
ByteBuf wrapped = Unpooled.wrappedBuffer(new byte[length * 2]);
|
||||
ByteBuf buffer = wrapped.retainedSlice(ThreadLocalRandom.current().nextInt(length - 1) + 1, length);
|
||||
ByteBuf buffer = wrapped.retainedSlice(length > 1 ? ThreadLocalRandom.current().nextInt(length - 1) + 1 : 0,
|
||||
length);
|
||||
wrapped.release();
|
||||
|
||||
assertEquals(0, buffer.readerIndex());
|
||||
|
@ -31,7 +31,7 @@ public class SlicedByteBufTest extends AbstractByteBufTest {
|
||||
@Override
|
||||
protected ByteBuf newBuffer(int length) {
|
||||
ByteBuf buffer = Unpooled.wrappedBuffer(
|
||||
new byte[length * 2], ThreadLocalRandom.current().nextInt(length - 1) + 1, length);
|
||||
new byte[length * 2], length > 1 ? ThreadLocalRandom.current().nextInt(length - 1) + 1 : 0, length);
|
||||
assertEquals(0, buffer.readerIndex());
|
||||
assertEquals(length, buffer.writerIndex());
|
||||
return buffer;
|
||||
|
Loading…
Reference in New Issue
Block a user