Add CompositeByteBuf.addComponent(boolean ...) method to simplify usage

Motivation:

At the moment the user is responsible to increase the writer index of the composite buffer when a new component is added. We should add some methods that handle this for the user as this is the most popular usage of the composite buffer.

Modifications:

Add new methods that autoamtically increase the writerIndex when buffers are added.

Result:

Easier usage of CompositeByteBuf.
This commit is contained in:
Norman Maurer 2016-05-17 14:19:33 +02:00
parent a5006c1969
commit 7b25402e80
22 changed files with 176 additions and 100 deletions

View File

@ -916,18 +916,42 @@ final class AdvancedLeakAwareCompositeByteBuf extends WrappedCompositeByteBuf {
return super.addComponents(cIndex, buffers); return super.addComponents(cIndex, buffers);
} }
@Override
public CompositeByteBuf removeComponent(int cIndex) {
recordLeakNonRefCountingOperation(leak);
return super.removeComponent(cIndex);
}
@Override @Override
public CompositeByteBuf addComponents(int cIndex, Iterable<ByteBuf> buffers) { public CompositeByteBuf addComponents(int cIndex, Iterable<ByteBuf> buffers) {
recordLeakNonRefCountingOperation(leak); recordLeakNonRefCountingOperation(leak);
return super.addComponents(cIndex, buffers); return super.addComponents(cIndex, buffers);
} }
@Override
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
recordLeakNonRefCountingOperation(leak);
return super.addComponent(increaseWriterIndex, buffer);
}
@Override
public CompositeByteBuf addComponents(boolean increaseWriterIndex, ByteBuf... buffers) {
recordLeakNonRefCountingOperation(leak);
return super.addComponents(increaseWriterIndex, buffers);
}
@Override
public CompositeByteBuf addComponents(boolean increaseWriterIndex, Iterable<ByteBuf> buffers) {
recordLeakNonRefCountingOperation(leak);
return super.addComponents(increaseWriterIndex, buffers);
}
@Override
public CompositeByteBuf addComponent(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
recordLeakNonRefCountingOperation(leak);
return super.addComponent(increaseWriterIndex, cIndex, buffer);
}
@Override
public CompositeByteBuf removeComponent(int cIndex) {
recordLeakNonRefCountingOperation(leak);
return super.removeComponent(cIndex);
}
@Override @Override
public CompositeByteBuf removeComponents(int cIndex, int numComponents) { public CompositeByteBuf removeComponents(int cIndex, int numComponents) {
recordLeakNonRefCountingOperation(leak); recordLeakNonRefCountingOperation(leak);

View File

@ -79,7 +79,7 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
this.maxNumComponents = maxNumComponents; this.maxNumComponents = maxNumComponents;
components = newList(maxNumComponents); components = newList(maxNumComponents);
addComponents0(0, buffers); addComponents0(false, 0, buffers);
consolidateIfNeeded(); consolidateIfNeeded();
setIndex(0, capacity()); setIndex(0, capacity());
} }
@ -100,7 +100,7 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
this.maxNumComponents = maxNumComponents; this.maxNumComponents = maxNumComponents;
components = newList(maxNumComponents); components = newList(maxNumComponents);
addComponents0(0, buffers); addComponents0(false, 0, buffers);
consolidateIfNeeded(); consolidateIfNeeded();
setIndex(0, capacity()); setIndex(0, capacity());
} }
@ -122,24 +122,21 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
* Add the given {@link ByteBuf}. * Add the given {@link ByteBuf}.
* <p> * <p>
* Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}. * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}.
* If you need to have it increased you need to handle it by your own. * If you need to have it increased use {@link #addComponent(boolean, ByteBuf)}.
* <p> * <p>
* {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}. * {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}.
* @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this * @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this
* {@link CompositeByteBuf}. * {@link CompositeByteBuf}.
*/ */
public CompositeByteBuf addComponent(ByteBuf buffer) { public CompositeByteBuf addComponent(ByteBuf buffer) {
checkNotNull(buffer, "buffer"); return addComponent(false, buffer);
addComponent0(components.size(), buffer);
consolidateIfNeeded();
return this;
} }
/** /**
* Add the given {@link ByteBuf}s. * Add the given {@link ByteBuf}s.
* <p> * <p>
* Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}. * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}.
* If you need to have it increased you need to handle it by your own. * If you need to have it increased use {@link #addComponents(boolean, ByteBuf[])}.
* <p> * <p>
* {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this * {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this
* {@link CompositeByteBuf}. * {@link CompositeByteBuf}.
@ -147,16 +144,14 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
* ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}. * ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}.
*/ */
public CompositeByteBuf addComponents(ByteBuf... buffers) { public CompositeByteBuf addComponents(ByteBuf... buffers) {
addComponents0(components.size(), buffers); return addComponents(false, buffers);
consolidateIfNeeded();
return this;
} }
/** /**
* Add the given {@link ByteBuf}s. * Add the given {@link ByteBuf}s.
* <p> * <p>
* Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}. * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}.
* If you need to have it increased you need to handle it by your own. * If you need to have it increased use {@link #addComponents(boolean, Iterable)}.
* <p> * <p>
* {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this * {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this
* {@link CompositeByteBuf}. * {@link CompositeByteBuf}.
@ -164,16 +159,14 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
* ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}. * ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}.
*/ */
public CompositeByteBuf addComponents(Iterable<ByteBuf> buffers) { public CompositeByteBuf addComponents(Iterable<ByteBuf> buffers) {
addComponents0(components.size(), buffers); return addComponents(false, buffers);
consolidateIfNeeded();
return this;
} }
/** /**
* Add the given {@link ByteBuf} on the specific index. * Add the given {@link ByteBuf} on the specific index.
* <p> * <p>
* Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}. * Be aware that this method does not increase the {@code writerIndex} of the {@link CompositeByteBuf}.
* If you need to have it increased you need to handle it by your own. * If you need to have it increased use {@link #addComponent(boolean, int, ByteBuf)}.
* <p> * <p>
* {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}. * {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}.
* @param cIndex the index on which the {@link ByteBuf} will be added. * @param cIndex the index on which the {@link ByteBuf} will be added.
@ -181,8 +174,66 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
* {@link CompositeByteBuf}. * {@link CompositeByteBuf}.
*/ */
public CompositeByteBuf addComponent(int cIndex, ByteBuf buffer) { public CompositeByteBuf addComponent(int cIndex, ByteBuf buffer) {
return addComponent(false, cIndex, buffer);
}
/**
* Add the given {@link ByteBuf} and increase the {@code writerIndex} if {@code increaseWriterIndex} is
* {@code true}.
*
* {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}.
* @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this
* {@link CompositeByteBuf}.
*/
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
checkNotNull(buffer, "buffer"); checkNotNull(buffer, "buffer");
addComponent0(cIndex, buffer); addComponent0(increaseWriterIndex, components.size(), buffer);
consolidateIfNeeded();
return this;
}
/**
* Add the given {@link ByteBuf}s and increase the {@code writerIndex} if {@code increaseWriterIndex} is
* {@code true}.
*
* {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this
* {@link CompositeByteBuf}.
* @param buffers the {@link ByteBuf}s to add. {@link ByteBuf#release()} ownership of all {@link ByteBuf#release()}
* ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}.
*/
public CompositeByteBuf addComponents(boolean increaseWriterIndex, ByteBuf... buffers) {
addComponents0(increaseWriterIndex, components.size(), buffers);
consolidateIfNeeded();
return this;
}
/**
* Add the given {@link ByteBuf}s and increase the {@code writerIndex} if {@code increaseWriterIndex} is
* {@code true}.
*
* {@link ByteBuf#release()} ownership of all {@link ByteBuf} objects in {@code buffers} is transfered to this
* {@link CompositeByteBuf}.
* @param buffers the {@link ByteBuf}s to add. {@link ByteBuf#release()} ownership of all {@link ByteBuf#release()}
* ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}.
*/
public CompositeByteBuf addComponents(boolean increaseWriterIndex, Iterable<ByteBuf> buffers) {
addComponents0(increaseWriterIndex, components.size(), buffers);
consolidateIfNeeded();
return this;
}
/**
* Add the given {@link ByteBuf} on the specific index and increase the {@code writerIndex}
* if {@code increaseWriterIndex} is {@code true}.
*
* {@link ByteBuf#release()} ownership of {@code buffer} is transfered to this {@link CompositeByteBuf}.
* @param cIndex the index on which the {@link ByteBuf} will be added.
* @param buffer the {@link ByteBuf} to add. {@link ByteBuf#release()} ownership is transfered to this
* {@link CompositeByteBuf}.
*/
public CompositeByteBuf addComponent(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
checkNotNull(buffer, "buffer");
addComponent0(increaseWriterIndex, cIndex, buffer);
consolidateIfNeeded(); consolidateIfNeeded();
return this; return this;
} }
@ -190,7 +241,7 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
/** /**
* Precondition is that {@code buffer != null}. * Precondition is that {@code buffer != null}.
*/ */
private int addComponent0(int cIndex, ByteBuf buffer) { private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
assert buffer != null; assert buffer != null;
boolean wasAdded = false; boolean wasAdded = false;
try { try {
@ -217,6 +268,9 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
updateComponentOffsets(cIndex); updateComponentOffsets(cIndex);
} }
} }
if (increaseWriterIndex) {
writerIndex(writerIndex() + buffer.readableBytes());
}
return cIndex; return cIndex;
} finally { } finally {
if (!wasAdded) { if (!wasAdded) {
@ -240,12 +294,12 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
* ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}. * ownership of all {@link ByteBuf} objects is transfered to this {@link CompositeByteBuf}.
*/ */
public CompositeByteBuf addComponents(int cIndex, ByteBuf... buffers) { public CompositeByteBuf addComponents(int cIndex, ByteBuf... buffers) {
addComponents0(cIndex, buffers); addComponents0(false, cIndex, buffers);
consolidateIfNeeded(); consolidateIfNeeded();
return this; return this;
} }
private int addComponents0(int cIndex, ByteBuf... buffers) { private int addComponents0(boolean increaseWriterIndex, int cIndex, ByteBuf... buffers) {
checkNotNull(buffers, "buffers"); checkNotNull(buffers, "buffers");
int i = 0; int i = 0;
try { try {
@ -259,7 +313,7 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
if (b == null) { if (b == null) {
break; break;
} }
cIndex = addComponent0(cIndex, b) + 1; cIndex = addComponent0(increaseWriterIndex, cIndex, b) + 1;
int size = components.size(); int size = components.size();
if (cIndex > size) { if (cIndex > size) {
cIndex = size; cIndex = size;
@ -294,15 +348,15 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
* {@link CompositeByteBuf}. * {@link CompositeByteBuf}.
*/ */
public CompositeByteBuf addComponents(int cIndex, Iterable<ByteBuf> buffers) { public CompositeByteBuf addComponents(int cIndex, Iterable<ByteBuf> buffers) {
addComponents0(cIndex, buffers); addComponents0(false, cIndex, buffers);
consolidateIfNeeded(); consolidateIfNeeded();
return this; return this;
} }
private int addComponents0(int cIndex, Iterable<ByteBuf> buffers) { private int addComponents0(boolean increaseIndex, int cIndex, Iterable<ByteBuf> buffers) {
if (buffers instanceof ByteBuf) { if (buffers instanceof ByteBuf) {
// If buffers also implements ByteBuf (e.g. CompositeByteBuf), it has to go to addComponent(ByteBuf). // If buffers also implements ByteBuf (e.g. CompositeByteBuf), it has to go to addComponent(ByteBuf).
return addComponent0(cIndex, (ByteBuf) buffers); return addComponent0(increaseIndex, cIndex, (ByteBuf) buffers);
} }
checkNotNull(buffers, "buffers"); checkNotNull(buffers, "buffers");
@ -329,7 +383,7 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
} }
Collection<ByteBuf> col = (Collection<ByteBuf>) buffers; Collection<ByteBuf> col = (Collection<ByteBuf>) buffers;
return addComponents0(cIndex, col.toArray(new ByteBuf[col.size()])); return addComponents0(increaseIndex, cIndex, col.toArray(new ByteBuf[col.size()]));
} }
/** /**
@ -596,13 +650,13 @@ public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements
if (nComponents < maxNumComponents) { if (nComponents < maxNumComponents) {
padding = allocBuffer(paddingLength); padding = allocBuffer(paddingLength);
padding.setIndex(0, paddingLength); padding.setIndex(0, paddingLength);
addComponent0(components.size(), padding); addComponent0(false, components.size(), padding);
} else { } else {
padding = allocBuffer(paddingLength); padding = allocBuffer(paddingLength);
padding.setIndex(0, paddingLength); padding.setIndex(0, paddingLength);
// FIXME: No need to create a padding buffer and consolidate. // FIXME: No need to create a padding buffer and consolidate.
// Just create a big single buffer and put the current content there. // Just create a big single buffer and put the current content there.
addComponent0(components.size(), padding); addComponent0(false, components.size(), padding);
consolidateIfNeeded(); consolidateIfNeeded();
} }
} else if (newCapacity < oldCapacity) { } else if (newCapacity < oldCapacity) {

View File

@ -519,6 +519,30 @@ class WrappedCompositeByteBuf extends CompositeByteBuf {
return this; return this;
} }
@Override
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
wrapped.addComponent(increaseWriterIndex, buffer);
return this;
}
@Override
public CompositeByteBuf addComponents(boolean increaseWriterIndex, ByteBuf... buffers) {
wrapped.addComponents(increaseWriterIndex, buffers);
return this;
}
@Override
public CompositeByteBuf addComponents(boolean increaseWriterIndex, Iterable<ByteBuf> buffers) {
wrapped.addComponents(increaseWriterIndex, buffers);
return this;
}
@Override
public CompositeByteBuf addComponent(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
wrapped.addComponent(increaseWriterIndex, cIndex, buffer);
return this;
}
@Override @Override
public CompositeByteBuf removeComponent(int cIndex) { public CompositeByteBuf removeComponent(int cIndex) {
wrapped.removeComponent(cIndex); wrapped.removeComponent(cIndex);

View File

@ -846,11 +846,11 @@ public abstract class AbstractCompositeByteBufTest extends AbstractByteBufTest {
public void testAddEmptyBufferInMiddle() { public void testAddEmptyBufferInMiddle() {
CompositeByteBuf cbuf = compositeBuffer(); CompositeByteBuf cbuf = compositeBuffer();
ByteBuf buf1 = buffer().writeByte((byte) 1); ByteBuf buf1 = buffer().writeByte((byte) 1);
cbuf.addComponent(buf1).writerIndex(cbuf.writerIndex() + buf1.readableBytes()); cbuf.addComponent(true, buf1);
ByteBuf buf2 = EMPTY_BUFFER; ByteBuf buf2 = EMPTY_BUFFER;
cbuf.addComponent(buf2).writerIndex(cbuf.writerIndex() + buf2.readableBytes()); cbuf.addComponent(true, buf2);
ByteBuf buf3 = buffer().writeByte((byte) 2); ByteBuf buf3 = buffer().writeByte((byte) 2);
cbuf.addComponent(buf3).writerIndex(cbuf.writerIndex() + buf3.readableBytes()); cbuf.addComponent(true, buf3);
assertEquals(2, cbuf.readableBytes()); assertEquals(2, cbuf.readableBytes());
assertEquals((byte) 1, cbuf.readByte()); assertEquals((byte) 1, cbuf.readByte());

View File

@ -105,12 +105,10 @@ public abstract class AbstractMemoryHttpData extends AbstractHttpData {
byteBuf = buffer; byteBuf = buffer;
} else if (byteBuf instanceof CompositeByteBuf) { } else if (byteBuf instanceof CompositeByteBuf) {
CompositeByteBuf cbb = (CompositeByteBuf) byteBuf; CompositeByteBuf cbb = (CompositeByteBuf) byteBuf;
cbb.addComponent(buffer); cbb.addComponent(true, buffer);
cbb.writerIndex(cbb.writerIndex() + buffer.readableBytes());
} else { } else {
CompositeByteBuf cbb = compositeBuffer(Integer.MAX_VALUE); CompositeByteBuf cbb = compositeBuffer(Integer.MAX_VALUE);
cbb.addComponents(byteBuf, buffer); cbb.addComponents(true, byteBuf, buffer);
cbb.writerIndex(byteBuf.readableBytes() + buffer.readableBytes());
byteBuf = cbb; byteBuf = cbb;
} }
} }

View File

@ -80,9 +80,7 @@ abstract class DeflateDecoder extends WebSocketExtensionDecoder {
partUncompressedContent.release(); partUncompressedContent.release();
continue; continue;
} }
compositeUncompressedContent.addComponent(partUncompressedContent); compositeUncompressedContent.addComponent(true, partUncompressedContent);
compositeUncompressedContent.writerIndex(compositeUncompressedContent.writerIndex() +
partUncompressedContent.readableBytes());
} }
// Correctly handle empty frames // Correctly handle empty frames
// See https://github.com/netty/netty/issues/4348 // See https://github.com/netty/netty/issues/4348

View File

@ -87,9 +87,7 @@ abstract class DeflateEncoder extends WebSocketExtensionEncoder {
partCompressedContent.release(); partCompressedContent.release();
continue; continue;
} }
fullCompressedContent.addComponent(partCompressedContent); fullCompressedContent.addComponent(true, partCompressedContent);
fullCompressedContent.writerIndex(fullCompressedContent.writerIndex() +
partCompressedContent.readableBytes());
} }
if (fullCompressedContent.numComponents() <= 0) { if (fullCompressedContent.numComponents() <= 0) {
fullCompressedContent.release(); fullCompressedContent.release();

View File

@ -103,15 +103,13 @@ public abstract class WebSocketClientHandshakerTest {
byte[] bytes = "HTTP/1.1 101 Switching Protocols\r\nContent-Length: 0\r\n\r\n".getBytes(CharsetUtil.US_ASCII); byte[] bytes = "HTTP/1.1 101 Switching Protocols\r\nContent-Length: 0\r\n\r\n".getBytes(CharsetUtil.US_ASCII);
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponent(Unpooled.wrappedBuffer(bytes)); compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(bytes));
compositeByteBuf.writerIndex(compositeByteBuf.writerIndex() + bytes.length);
for (;;) { for (;;) {
ByteBuf frameBytes = websocketChannel.readOutbound(); ByteBuf frameBytes = websocketChannel.readOutbound();
if (frameBytes == null) { if (frameBytes == null) {
break; break;
} }
compositeByteBuf.addComponent(frameBytes); compositeByteBuf.addComponent(true, frameBytes);
compositeByteBuf.writerIndex(compositeByteBuf.writerIndex() + frameBytes.readableBytes());
} }
EmbeddedChannel ch = new EmbeddedChannel(new HttpObjectAggregator(Integer.MAX_VALUE), EmbeddedChannel ch = new EmbeddedChannel(new HttpObjectAggregator(Integer.MAX_VALUE),

View File

@ -446,8 +446,7 @@ public class Http2FrameRoundtripTest {
CompositeByteBuf composite = releaseLater(Unpooled.compositeBuffer()); CompositeByteBuf composite = releaseLater(Unpooled.compositeBuffer());
for (ByteBuf buf : captor.getAllValues()) { for (ByteBuf buf : captor.getAllValues()) {
buf = releaseLater(buf.retain()); buf = releaseLater(buf.retain());
composite.addComponent(buf); composite.addComponent(true, buf);
composite.writerIndex(composite.writerIndex() + buf.readableBytes());
} }
return composite; return composite;
} }

View File

@ -119,11 +119,10 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
if (cumulation instanceof CompositeByteBuf) { if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation; composite = (CompositeByteBuf) cumulation;
} else { } else {
int readable = cumulation.readableBytes();
composite = alloc.compositeBuffer(Integer.MAX_VALUE); composite = alloc.compositeBuffer(Integer.MAX_VALUE);
composite.addComponent(cumulation).writerIndex(readable); composite.addComponent(true, cumulation);
} }
composite.addComponent(in).writerIndex(composite.writerIndex() + in.readableBytes()); composite.addComponent(true, in);
buffer = composite; buffer = composite;
} }
return buffer; return buffer;

View File

@ -320,8 +320,7 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
private static void appendPartialContent(CompositeByteBuf content, ByteBuf partialContent) { private static void appendPartialContent(CompositeByteBuf content, ByteBuf partialContent) {
if (partialContent.isReadable()) { if (partialContent.isReadable()) {
partialContent.retain(); partialContent.retain();
content.addComponent(partialContent); content.addComponent(true, partialContent);
content.writerIndex(content.writerIndex() + partialContent.readableBytes());
} }
} }

View File

@ -126,8 +126,7 @@ public abstract class AbstractDecoderTest extends AbstractCompressionTest {
CompositeByteBuf decompressed = Unpooled.compositeBuffer(); CompositeByteBuf decompressed = Unpooled.compositeBuffer();
ByteBuf msg; ByteBuf msg;
while ((msg = channel.readInbound()) != null) { while ((msg = channel.readInbound()) != null) {
decompressed.addComponent(msg); decompressed.addComponent(true, msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
} }
return decompressed; return decompressed;
} }

View File

@ -109,8 +109,7 @@ public abstract class AbstractEncoderTest extends AbstractCompressionTest {
CompositeByteBuf compressed = Unpooled.compositeBuffer(); CompositeByteBuf compressed = Unpooled.compositeBuffer();
ByteBuf msg; ByteBuf msg;
while ((msg = channel.readOutbound()) != null) { while ((msg = channel.readOutbound()) != null) {
compressed.addComponent(msg); compressed.addComponent(true, msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
} }
ByteBuf decompressed = decompress(compressed, dataLength); ByteBuf decompressed = decompress(compressed, dataLength);
compressed.release(); compressed.release();

View File

@ -150,8 +150,7 @@ public abstract class AbstractIntegrationTest {
final CompositeByteBuf compressed = Unpooled.compositeBuffer(); final CompositeByteBuf compressed = Unpooled.compositeBuffer();
ByteBuf msg; ByteBuf msg;
while ((msg = encoder.readOutbound()) != null) { while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(msg); compressed.addComponent(true, msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
} }
assertThat(compressed, is(notNullValue())); assertThat(compressed, is(notNullValue()));
@ -159,8 +158,7 @@ public abstract class AbstractIntegrationTest {
assertFalse(compressed.isReadable()); assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer(); final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) { while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(msg); decompressed.addComponent(true, msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
} }
assertEquals(in.resetReaderIndex(), decompressed); assertEquals(in.resetReaderIndex(), decompressed);

View File

@ -79,8 +79,7 @@ public class FastLzIntegrationTest extends AbstractIntegrationTest {
ByteBuf msg; ByteBuf msg;
final CompositeByteBuf compressed = Unpooled.compositeBuffer(); final CompositeByteBuf compressed = Unpooled.compositeBuffer();
while ((msg = encoder.readOutbound()) != null) { while ((msg = encoder.readOutbound()) != null) {
compressed.addComponent(msg); compressed.addComponent(true, msg);
compressed.writerIndex(compressed.writerIndex() + msg.readableBytes());
} }
assertThat(compressed, is(notNullValue())); assertThat(compressed, is(notNullValue()));
@ -100,8 +99,7 @@ public class FastLzIntegrationTest extends AbstractIntegrationTest {
assertFalse(compressed.isReadable()); assertFalse(compressed.isReadable());
final CompositeByteBuf decompressed = Unpooled.compositeBuffer(); final CompositeByteBuf decompressed = Unpooled.compositeBuffer();
while ((msg = decoder.readInbound()) != null) { while ((msg = decoder.readInbound()) != null) {
decompressed.addComponent(msg); decompressed.addComponent(true, msg);
decompressed.writerIndex(decompressed.writerIndex() + msg.readableBytes());
} }
assertEquals(original, decompressed); assertEquals(original, decompressed);

View File

@ -68,8 +68,7 @@ public class LzmaFrameEncoderTest extends AbstractEncoderTest {
int i = 0; int i = 0;
while ((msg = channel.readOutbound()) != null) { while ((msg = channel.readOutbound()) != null) {
ByteBuf decompressedMsg = decompress(msg, originalLengths.get(i++)); ByteBuf decompressedMsg = decompress(msg, originalLengths.get(i++));
decompressed.addComponent(decompressedMsg); decompressed.addComponent(true, decompressedMsg);
decompressed.writerIndex(decompressed.writerIndex() + decompressedMsg.readableBytes());
msg.release(); msg.release();
} }
assertEquals(originalLengths.size(), i); assertEquals(originalLengths.size(), i);

View File

@ -94,8 +94,7 @@ public class SnappyFrameEncoderTest {
if (m == null) { if (m == null) {
break; break;
} }
actual.addComponent(m); actual.addComponent(true, m);
actual.writerIndex(actual.writerIndex() + m.readableBytes());
} }
assertEquals(releaseLater(expected), releaseLater(actual)); assertEquals(releaseLater(expected), releaseLater(actual));
} }

View File

@ -63,15 +63,13 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable { public void testSimpleSendCompositeDirectByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
CompositeByteBuf buf = Unpooled.compositeBuffer(); CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.directBuffer().writeBytes(BYTES, 0, 2)); buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
buf.addComponent(Unpooled.directBuffer().writeBytes(BYTES, 2, 2)); buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
buf.writerIndex(4);
testSimpleSend0(sb, cb, buf, true, BYTES, 1); testSimpleSend0(sb, cb, buf, true, BYTES, 1);
CompositeByteBuf buf2 = Unpooled.compositeBuffer(); CompositeByteBuf buf2 = Unpooled.compositeBuffer();
buf2.addComponent(Unpooled.directBuffer().writeBytes(BYTES, 0, 2)); buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
buf2.addComponent(Unpooled.directBuffer().writeBytes(BYTES, 2, 2)); buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 2, 2));
buf2.writerIndex(4);
testSimpleSend0(sb, cb, buf2, true, BYTES, 4); testSimpleSend0(sb, cb, buf2, true, BYTES, 4);
} }
@ -82,15 +80,13 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable { public void testSimpleSendCompositeHeapByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
CompositeByteBuf buf = Unpooled.compositeBuffer(); CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.buffer().writeBytes(BYTES, 0, 2)); buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
buf.addComponent(Unpooled.buffer().writeBytes(BYTES, 2, 2)); buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
buf.writerIndex(4);
testSimpleSend0(sb, cb, buf, true, BYTES, 1); testSimpleSend0(sb, cb, buf, true, BYTES, 1);
CompositeByteBuf buf2 = Unpooled.compositeBuffer(); CompositeByteBuf buf2 = Unpooled.compositeBuffer();
buf2.addComponent(Unpooled.buffer().writeBytes(BYTES, 0, 2)); buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 0, 2));
buf2.addComponent(Unpooled.buffer().writeBytes(BYTES, 2, 2)); buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
buf2.writerIndex(4);
testSimpleSend0(sb, cb, buf2, true, BYTES, 4); testSimpleSend0(sb, cb, buf2, true, BYTES, 4);
} }
@ -101,15 +97,13 @@ public class DatagramUnicastTest extends AbstractDatagramTest {
public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable { public void testSimpleSendCompositeMixedByteBuf(Bootstrap sb, Bootstrap cb) throws Throwable {
CompositeByteBuf buf = Unpooled.compositeBuffer(); CompositeByteBuf buf = Unpooled.compositeBuffer();
buf.addComponent(Unpooled.directBuffer().writeBytes(BYTES, 0, 2)); buf.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
buf.addComponent(Unpooled.buffer().writeBytes(BYTES, 2, 2)); buf.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
buf.writerIndex(4);
testSimpleSend0(sb, cb, buf, true, BYTES, 1); testSimpleSend0(sb, cb, buf, true, BYTES, 1);
CompositeByteBuf buf2 = Unpooled.compositeBuffer(); CompositeByteBuf buf2 = Unpooled.compositeBuffer();
buf2.addComponent(Unpooled.directBuffer().writeBytes(BYTES, 0, 2)); buf2.addComponent(true, Unpooled.directBuffer().writeBytes(BYTES, 0, 2));
buf2.addComponent(Unpooled.buffer().writeBytes(BYTES, 2, 2)); buf2.addComponent(true, Unpooled.buffer().writeBytes(BYTES, 2, 2));
buf2.writerIndex(4);
testSimpleSend0(sb, cb, buf2, true, BYTES, 4); testSimpleSend0(sb, cb, buf2, true, BYTES, 4);
} }

View File

@ -127,7 +127,7 @@ public class SocketGatheringWriteTest extends AbstractSocketTest {
buf.writerIndex(split); buf.writerIndex(split);
ByteBuf buf2 = Unpooled.buffer(size).writeBytes(buf, split, oldIndex - split); ByteBuf buf2 = Unpooled.buffer(size).writeBytes(buf, split, oldIndex - split);
CompositeByteBuf comp = Unpooled.compositeBuffer(); CompositeByteBuf comp = Unpooled.compositeBuffer();
comp.addComponent(buf).addComponent(buf2).writerIndex(length); comp.addComponent(true, buf).addComponent(true, buf2);
cc.write(comp); cc.write(comp);
} else { } else {
cc.write(buf); cc.write(buf);

View File

@ -287,7 +287,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
int length = Math.min(random.nextInt(1024 * 64), data.length - clientSendCounterVal); int length = Math.min(random.nextInt(1024 * 64), data.length - clientSendCounterVal);
ByteBuf buf = Unpooled.wrappedBuffer(data, clientSendCounterVal, length); ByteBuf buf = Unpooled.wrappedBuffer(data, clientSendCounterVal, length);
if (useCompositeByteBuf) { if (useCompositeByteBuf) {
buf = Unpooled.compositeBuffer().addComponent(buf).writerIndex(buf.writerIndex()); buf = Unpooled.compositeBuffer().addComponent(true, buf);
} }
ChannelFuture future = clientChannel.writeAndFlush(buf); ChannelFuture future = clientChannel.writeAndFlush(buf);
@ -520,7 +520,7 @@ public class SocketSslEchoTest extends AbstractSocketTest {
ByteBuf buf = Unpooled.wrappedBuffer(actual); ByteBuf buf = Unpooled.wrappedBuffer(actual);
if (useCompositeByteBuf) { if (useCompositeByteBuf) {
buf = Unpooled.compositeBuffer().addComponent(buf).writerIndex(buf.writerIndex()); buf = Unpooled.compositeBuffer().addComponent(true, buf);
} }
ctx.write(buf); ctx.write(buf);

View File

@ -152,16 +152,15 @@ public final class CoalescingBufferQueue {
} }
if (current instanceof CompositeByteBuf) { if (current instanceof CompositeByteBuf) {
CompositeByteBuf composite = (CompositeByteBuf) current; CompositeByteBuf composite = (CompositeByteBuf) current;
composite.addComponent(next); composite.addComponent(true, next);
composite.writerIndex(composite.writerIndex() + next.readableBytes());
return composite; return composite;
} }
// Create a composite buffer to accumulate this pair and potentially all the buffers // Create a composite buffer to accumulate this pair and potentially all the buffers
// in the queue. Using +2 as we have already dequeued current and next. // in the queue. Using +2 as we have already dequeued current and next.
CompositeByteBuf composite = channel.alloc().compositeBuffer(bufAndListenerPairs.size() + 2); CompositeByteBuf composite = channel.alloc().compositeBuffer(bufAndListenerPairs.size() + 2);
composite.addComponent(current); composite.addComponent(true, current);
composite.addComponent(next); composite.addComponent(true, next);
return composite.writerIndex(current.readableBytes() + next.readableBytes()); return composite;
} }
/** /**

View File

@ -99,7 +99,7 @@ public class ChannelOutboundBufferTest {
CompositeByteBuf comp = compositeBuffer(256); CompositeByteBuf comp = compositeBuffer(256);
ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII)); ByteBuf buf = directBuffer().writeBytes("buf1".getBytes(CharsetUtil.US_ASCII));
for (int i = 0; i < 65; i++) { for (int i = 0; i < 65; i++) {
comp.addComponent(buf.copy()).writerIndex(comp.writerIndex() + buf.readableBytes()); comp.addComponent(true, buf.copy());
} }
buffer.addMessage(comp, comp.readableBytes(), channel.voidPromise()); buffer.addMessage(comp, comp.readableBytes(), channel.voidPromise());