Ensure CompositeByteBuf.addComponent* handles buffer in consistent way and not causes leaks

Motivation:

At the moment we have two problems:
 - CompositeByteBuf.addComponent(...) will not add the supplied buffer to the CompositeByteBuf if its empty, which means it will not be released on CompositeByteBuf.release() call. This is a problem as a user will expect everything added will be released (the user not know we not added it).
 - CompositeByteBuf.addComponents(...) will either add no buffers if none is readable and so has the same problem as addComponent(...) or directly release the ByteBuf if at least one ByteBuf is readable. Again this gives inconsistent handling and may lead to memory leaks.

Modifications:

 - Always add the buffer to the CompositeByteBuf and so release it on release call.

Result:

Consistent handling and no buffer leaks.
This commit is contained in:
Norman Maurer 2014-12-11 21:13:03 +01:00
parent daba2b3313
commit 41fd857a7c
2 changed files with 78 additions and 27 deletions

View File

@ -165,9 +165,6 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf {
} }
int readableBytes = buffer.readableBytes(); int readableBytes = buffer.readableBytes();
if (readableBytes == 0) {
return cIndex;
}
// No need to consolidate - just add a component to the list. // No need to consolidate - just add a component to the list.
Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice()); Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
@ -182,7 +179,9 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf {
} }
} else { } else {
components.add(cIndex, c); components.add(cIndex, c);
updateComponentOffsets(cIndex); if (readableBytes != 0) {
updateComponentOffsets(cIndex);
}
} }
return cIndex; return cIndex;
} }
@ -209,31 +208,15 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf {
throw new NullPointerException("buffers"); throw new NullPointerException("buffers");
} }
int readableBytes = 0;
for (ByteBuf b: buffers) {
if (b == null) {
break;
}
readableBytes += b.readableBytes();
}
if (readableBytes == 0) {
return cIndex;
}
// No need for consolidation // No need for consolidation
for (ByteBuf b: buffers) { for (ByteBuf b: buffers) {
if (b == null) { if (b == null) {
break; break;
} }
if (b.isReadable()) { cIndex = addComponent0(cIndex, b) + 1;
cIndex = addComponent0(cIndex, b) + 1; int size = components.size();
int size = components.size(); if (cIndex > size) {
if (cIndex > size) { cIndex = size;
cIndex = size;
}
} else {
b.release();
} }
} }
return cIndex; return cIndex;
@ -350,8 +333,12 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf {
*/ */
public CompositeByteBuf removeComponent(int cIndex) { public CompositeByteBuf removeComponent(int cIndex) {
checkComponentIndex(cIndex); checkComponentIndex(cIndex);
components.remove(cIndex).freeIfNecessary(); Component comp = components.remove(cIndex);
updateComponentOffsets(cIndex); comp.freeIfNecessary();
if (comp.length > 0) {
// Only need to call updateComponentOffsets if the length was > 0
updateComponentOffsets(cIndex);
}
return this; return this;
} }
@ -364,13 +351,23 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf {
public CompositeByteBuf removeComponents(int cIndex, int numComponents) { public CompositeByteBuf removeComponents(int cIndex, int numComponents) {
checkComponentIndex(cIndex, numComponents); checkComponentIndex(cIndex, numComponents);
if (numComponents == 0) {
return this;
}
List<Component> toRemove = components.subList(cIndex, cIndex + numComponents); List<Component> toRemove = components.subList(cIndex, cIndex + numComponents);
boolean needsUpdate = false;
for (Component c: toRemove) { for (Component c: toRemove) {
if (c.length > 0) {
needsUpdate = true;
}
c.freeIfNecessary(); c.freeIfNecessary();
} }
toRemove.clear(); toRemove.clear();
updateComponentOffsets(cIndex); if (needsUpdate) {
// Only need to call updateComponentOffsets if the length was > 0
updateComponentOffsets(cIndex);
}
return this; return this;
} }
@ -1105,6 +1102,7 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf {
} else if (offset < c.offset) { } else if (offset < c.offset) {
high = mid - 1; high = mid - 1;
} else { } else {
assert c.length != 0;
return c; return c;
} }
} }

View File

@ -812,4 +812,57 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
cbuf.discardSomeReadBytes(); cbuf.discardSomeReadBytes();
} }
@Test
public void testAddEmptyBufferRelease() {
CompositeByteBuf cbuf = compositeBuffer();
ByteBuf buf = buffer();
assertEquals(1, buf.refCnt());
cbuf.addComponent(buf);
assertEquals(1, buf.refCnt());
cbuf.release();
assertEquals(0, buf.refCnt());
}
@Test
public void testAddEmptyBuffersRelease() {
CompositeByteBuf cbuf = compositeBuffer();
ByteBuf buf = buffer();
ByteBuf buf2 = buffer().writeInt(1);
ByteBuf buf3 = buffer();
assertEquals(1, buf.refCnt());
assertEquals(1, buf2.refCnt());
assertEquals(1, buf3.refCnt());
cbuf.addComponents(buf, buf2, buf3);
assertEquals(1, buf.refCnt());
assertEquals(1, buf2.refCnt());
assertEquals(1, buf3.refCnt());
cbuf.release();
assertEquals(0, buf.refCnt());
assertEquals(0, buf2.refCnt());
assertEquals(0, buf3.refCnt());
}
@Test
public void testAddEmptyBufferInMiddle() {
CompositeByteBuf cbuf = compositeBuffer();
ByteBuf buf1 = buffer().writeByte((byte) 1);
cbuf.addComponent(buf1).writerIndex(cbuf.writerIndex() + buf1.readableBytes());
ByteBuf buf2 = EMPTY_BUFFER;
cbuf.addComponent(buf2).writerIndex(cbuf.writerIndex() + buf2.readableBytes());
ByteBuf buf3 = buffer().writeByte((byte) 2);
cbuf.addComponent(buf3).writerIndex(cbuf.writerIndex() + buf3.readableBytes());
assertEquals(2, cbuf.readableBytes());
assertEquals((byte) 1, cbuf.readByte());
assertEquals((byte) 2, cbuf.readByte());
assertSame(EMPTY_BUFFER, cbuf.internalComponent(1));
assertNotSame(EMPTY_BUFFER, cbuf.internalComponentAtOffset(1));
cbuf.release();
}
} }