Do not unwrap a CompositeByteBuf when it is added as a component of another CompositeByteBuf
.. because Reference counting introduces life cycle issues to the CompositeByteBuf being added. - Fixes #1266
This commit is contained in:
parent
94aad58627
commit
32fa4c07f3
@ -40,8 +40,7 @@ import java.util.Queue;
|
||||
* is recommended to use {@link Unpooled#wrappedBuffer(ByteBuf...)}
|
||||
* instead of calling the constructor explicitly.
|
||||
*/
|
||||
public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf
|
||||
implements CompositeByteBuf {
|
||||
public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf implements CompositeByteBuf {
|
||||
|
||||
private static final ByteBuffer[] EMPTY_NIOBUFFERS = new ByteBuffer[0];
|
||||
|
||||
@ -142,12 +141,6 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf
|
||||
throw new NullPointerException("buffer");
|
||||
}
|
||||
|
||||
if (buffer instanceof Iterable) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Iterable<ByteBuf> composite = (Iterable<ByteBuf>) buffer;
|
||||
return addComponents0(cIndex, composite);
|
||||
}
|
||||
|
||||
int readableBytes = buffer.readableBytes();
|
||||
if (readableBytes == 0) {
|
||||
return cIndex;
|
||||
@ -226,52 +219,21 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf
|
||||
throw new NullPointerException("buffers");
|
||||
}
|
||||
|
||||
if (buffers instanceof DefaultCompositeByteBuf) {
|
||||
DefaultCompositeByteBuf compositeBuf = (DefaultCompositeByteBuf) buffers;
|
||||
List<Component> list = compositeBuf.components;
|
||||
ByteBuf[] array = new ByteBuf[list.size()];
|
||||
for (int i = 0; i < array.length; i ++) {
|
||||
array[i] = list.get(i).buf.retain();
|
||||
}
|
||||
compositeBuf.release();
|
||||
return addComponents0(cIndex, array);
|
||||
}
|
||||
|
||||
if (buffers instanceof CompositeByteBuf) {
|
||||
CompositeByteBuf compositeBuf = (CompositeByteBuf) buffers;
|
||||
final int nComponents = compositeBuf.numComponents();
|
||||
ByteBuf[] array = new ByteBuf[nComponents];
|
||||
for (int i = 0; i < nComponents; i ++) {
|
||||
array[i] = compositeBuf.component(i).retain();
|
||||
}
|
||||
compositeBuf.release();
|
||||
return addComponents0(cIndex, array);
|
||||
}
|
||||
|
||||
if (buffers instanceof List) {
|
||||
List<ByteBuf> list = (List<ByteBuf>) buffers;
|
||||
ByteBuf[] array = new ByteBuf[list.size()];
|
||||
for (int i = 0; i < array.length; i ++) {
|
||||
array[i] = list.get(i);
|
||||
}
|
||||
return addComponents0(cIndex, array);
|
||||
}
|
||||
|
||||
if (buffers instanceof Collection) {
|
||||
Collection<ByteBuf> col = (Collection<ByteBuf>) buffers;
|
||||
ByteBuf[] array = new ByteBuf[col.size()];
|
||||
int i = 0;
|
||||
for (ByteBuf b: col) {
|
||||
array[i ++] = b;
|
||||
}
|
||||
return addComponents0(cIndex, array);
|
||||
if (buffers instanceof ByteBuf) {
|
||||
// If buffers also implements ByteBuf (e.g. CompositeByteBuf), it has to go to addComponent(ByteBuf).
|
||||
return addComponent0(cIndex, (ByteBuf) buffers);
|
||||
}
|
||||
|
||||
if (!(buffers instanceof Collection)) {
|
||||
List<ByteBuf> list = new ArrayList<ByteBuf>();
|
||||
for (ByteBuf b: buffers) {
|
||||
list.add(b);
|
||||
}
|
||||
return addComponents0(cIndex, list.toArray(new ByteBuf[list.size()]));
|
||||
buffers = list;
|
||||
}
|
||||
|
||||
Collection<ByteBuf> col = (Collection<ByteBuf>) buffers;
|
||||
return addComponents0(cIndex, col.toArray(new ByteBuf[col.size()]));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -31,8 +31,7 @@ import static org.junit.Assert.*;
|
||||
* An abstract test class for composite channel buffers
|
||||
*/
|
||||
@SuppressWarnings("ZeroLengthArrayAllocation")
|
||||
public abstract class AbstractCompositeByteBufTest extends
|
||||
AbstractByteBufTest {
|
||||
public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
|
||||
|
||||
private final ByteOrder order;
|
||||
|
||||
@ -473,22 +472,34 @@ public abstract class AbstractCompositeByteBufTest extends
|
||||
ByteBuf c2 = buffer().writeByte(2).retain();
|
||||
ByteBuf c3 = buffer().writeByte(3).retain(2);
|
||||
|
||||
CompositeByteBuf bufA = freeLater(compositeBuffer());
|
||||
bufA.addComponents(c1, c2, c3);
|
||||
CompositeByteBuf bufA = compositeBuffer();
|
||||
bufA.addComponents(c1, c2, c3).writerIndex(3);
|
||||
|
||||
CompositeByteBuf bufB = freeLater(compositeBuffer());
|
||||
bufB.addComponent(bufA);
|
||||
CompositeByteBuf bufB = compositeBuffer();
|
||||
bufB.addComponents(bufA);
|
||||
|
||||
// Ensure that bufA has been released.
|
||||
assertThat(bufA.refCnt(), is(0));
|
||||
// Ensure that bufA.refCnt() did not change.
|
||||
assertThat(bufA.refCnt(), is(1));
|
||||
|
||||
// Ensure that c[123]'s refCnt did not change.
|
||||
// Internally, it's:
|
||||
// 1) increased by 1 by addComponent(), and then
|
||||
// 2) decreased by 1 by bufA.release() which is called by addComponent().
|
||||
assertThat(c1.refCnt(), is(1));
|
||||
assertThat(c2.refCnt(), is(2));
|
||||
assertThat(c3.refCnt(), is(3));
|
||||
|
||||
// This should decrease bufA.refCnt().
|
||||
bufB.release();
|
||||
assertThat(bufB.refCnt(), is(0));
|
||||
|
||||
// Ensure bufA.refCnt() changed.
|
||||
assertThat(bufA.refCnt(), is(0));
|
||||
|
||||
// Ensure that c[123]'s refCnt also changed due to the deallocation of bufA.
|
||||
assertThat(c1.refCnt(), is(0));
|
||||
assertThat(c2.refCnt(), is(1));
|
||||
assertThat(c3.refCnt(), is(2));
|
||||
|
||||
c3.release(2);
|
||||
c2.release();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user