[#707] Hopefully now the correct fix for it which also takes the index into account when consolidate

This commit is contained in:
Norman Maurer 2012-11-04 21:21:26 +01:00
parent 36ac52a4bd
commit d293e6c389

View File

@ -61,7 +61,8 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
this.maxNumComponents = maxNumComponents; this.maxNumComponents = maxNumComponents;
addComponents(0, buffers); addComponents0(0, buffers);
consolidateIfNeeded();
setIndex(0, capacity()); setIndex(0, capacity());
} }
@ -74,30 +75,41 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
} }
this.maxNumComponents = maxNumComponents; this.maxNumComponents = maxNumComponents;
addComponents(0, buffers); addComponents0(0, buffers);
consolidateIfNeeded();
setIndex(0, capacity()); setIndex(0, capacity());
} }
@Override @Override
public CompositeByteBuf addComponent(ByteBuf buffer) { public CompositeByteBuf addComponent(ByteBuf buffer) {
addComponent(components.size(), buffer); addComponent0(components.size(), buffer);
consolidateIfNeeded();
return this; return this;
} }
@Override @Override
public CompositeByteBuf addComponents(ByteBuf... buffers) { public CompositeByteBuf addComponents(ByteBuf... buffers) {
addComponents(components.size(), buffers); addComponents0(components.size(), buffers);
consolidateIfNeeded();
return this; return this;
} }
@Override @Override
public CompositeByteBuf addComponents(Iterable<ByteBuf> buffers) { public CompositeByteBuf addComponents(Iterable<ByteBuf> buffers) {
addComponents(components.size(), buffers); addComponents0(components.size(), buffers);
consolidateIfNeeded();
return this; return this;
} }
@Override @Override
public CompositeByteBuf addComponent(int cIndex, ByteBuf buffer) { public CompositeByteBuf addComponent(int cIndex, ByteBuf buffer) {
addComponent0(cIndex, buffer);
consolidateIfNeeded();
return this;
}
private int addComponent0(int cIndex, ByteBuf buffer) {
checkComponentIndex(cIndex); checkComponentIndex(cIndex);
if (buffer == null) { if (buffer == null) {
@ -107,34 +119,12 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
if (buffer instanceof Iterable) { if (buffer instanceof Iterable) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Iterable<ByteBuf> composite = (Iterable<ByteBuf>) buffer; Iterable<ByteBuf> composite = (Iterable<ByteBuf>) buffer;
addComponents(cIndex, composite); return addComponents0(cIndex, composite);
return this;
} }
int readableBytes = buffer.readableBytes(); int readableBytes = buffer.readableBytes();
if (readableBytes == 0) { if (readableBytes == 0) {
return this; return cIndex;
}
// Consolidate if the number of components will exceed the allowed maximum by the current
// operation.
final int numComponents = components.size();
if (numComponents >= maxNumComponents) {
final int capacity = components.get(numComponents - 1).endOffset + readableBytes;
ByteBuf consolidated = buffer.unsafe().newBuffer(capacity);
for (int i = 0; i < numComponents; i ++) {
ByteBuf b = components.get(i).buf;
consolidated.writeBytes(b);
b.unsafe().release();
}
consolidated.writeBytes(buffer, buffer.readerIndex(), readableBytes);
Component c = new Component(consolidated);
c.endOffset = c.length;
components.clear();
components.add(c);
return this;
} }
// No need to consolidate - just add a component to the list. // No need to consolidate - just add a component to the list.
@ -152,74 +142,33 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
components.add(cIndex, c); components.add(cIndex, c);
updateComponentOffsets(cIndex); updateComponentOffsets(cIndex);
} }
return cIndex;
}
@Override
public CompositeByteBuf addComponents(int cIndex, ByteBuf... buffers) {
addComponents0(cIndex, buffers);
consolidateIfNeeded();
return this; return this;
} }
@Override private int addComponents0(int cIndex, ByteBuf... buffers) {
public CompositeByteBuf addComponents(int cIndex, ByteBuf... buffers) {
checkComponentIndex(cIndex); checkComponentIndex(cIndex);
if (buffers == null) { if (buffers == null) {
throw new NullPointerException("buffers"); throw new NullPointerException("buffers");
} }
ByteBuf lastBuf = null;
int cnt = 0;
int readableBytes = 0; int readableBytes = 0;
for (ByteBuf b: buffers) { for (ByteBuf b: buffers) {
if (b == null) { if (b == null) {
break; break;
} }
lastBuf = b;
cnt ++;
readableBytes += b.readableBytes(); readableBytes += b.readableBytes();
} }
if (readableBytes == 0) { if (readableBytes == 0) {
return this; return cIndex;
}
// Consolidate if the number of components will exceed the maximum by this operation.
final int numComponents = components.size();
if (numComponents + cnt >= maxNumComponents) {
final ByteBuf consolidated;
if (numComponents != 0) {
final int capacity = components.get(numComponents - 1).endOffset + readableBytes;
consolidated = lastBuf.unsafe().newBuffer(capacity);
for (int i = 0; i < cIndex; i ++) {
ByteBuf b = components.get(i).buf;
consolidated.writeBytes(b);
b.unsafe().release();
}
for (ByteBuf b: buffers) {
if (b == null) {
break;
}
consolidated.writeBytes(b, b.readerIndex(), b.readableBytes());
}
for (int i = cIndex; i < numComponents; i ++) {
ByteBuf b = components.get(i).buf;
consolidated.writeBytes(b);
b.unsafe().release();
}
} else {
consolidated = lastBuf.unsafe().newBuffer(readableBytes);
for (ByteBuf b: buffers) {
if (b == null) {
break;
}
consolidated.writeBytes(b, b.readerIndex(), b.readableBytes());
}
}
Component c = new Component(consolidated);
c.endOffset = c.length;
components.clear();
components.add(c);
updateComponentOffsets(0);
return this;
} }
// No need for consolidation // No need for consolidation
@ -228,15 +177,24 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
break; break;
} }
if (b.readable()) { if (b.readable()) {
// Always add b to the end of the component list because we are appending buffers cIndex = addComponent0(cIndex, b) + 1;
addComponent(components.size(), b); int size = components.size();
if (cIndex > size) {
cIndex = size;
}
} }
} }
return this; return cIndex;
} }
@Override @Override
public CompositeByteBuf addComponents(int cIndex, Iterable<ByteBuf> buffers) { public CompositeByteBuf addComponents(int cIndex, Iterable<ByteBuf> buffers) {
addComponents0(cIndex, buffers);
consolidateIfNeeded();
return this;
}
private int addComponents0(int cIndex, Iterable<ByteBuf> buffers) {
if (buffers == null) { if (buffers == null) {
throw new NullPointerException("buffers"); throw new NullPointerException("buffers");
} }
@ -247,8 +205,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
for (int i = 0; i < array.length; i ++) { for (int i = 0; i < array.length; i ++) {
array[i] = list.get(i).buf; array[i] = list.get(i).buf;
} }
addComponents(cIndex, array); return addComponents0(cIndex, array);
return this;
} }
if (buffers instanceof List) { if (buffers instanceof List) {
@ -257,8 +214,7 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
for (int i = 0; i < array.length; i ++) { for (int i = 0; i < array.length; i ++) {
array[i] = list.get(i); array[i] = list.get(i);
} }
addComponents(cIndex, array); return addComponents0(cIndex, array);
return this;
} }
if (buffers instanceof Collection) { if (buffers instanceof Collection) {
@ -268,16 +224,39 @@ public class DefaultCompositeByteBuf extends AbstractByteBuf implements Composit
for (ByteBuf b: col) { for (ByteBuf b: col) {
array[i ++] = b; array[i ++] = b;
} }
addComponents(cIndex, array); return addComponents0(cIndex, array);
return this;
} }
List<ByteBuf> list = new ArrayList<ByteBuf>(); List<ByteBuf> list = new ArrayList<ByteBuf>();
for (ByteBuf b: buffers) { for (ByteBuf b: buffers) {
list.add(b); list.add(b);
} }
addComponents(cIndex, list.toArray(new ByteBuf[list.size()])); return addComponents0(cIndex, list.toArray(new ByteBuf[list.size()]));
return this; }
/**
* This should only be called as last operation from a method as this may adjust the underlying
* array of components and so affect the index etc.
*/
private void consolidateIfNeeded() {
// Consolidate if the number of components will exceed the allowed maximum by the current
// operation.
final int numComponents = components.size();
if (numComponents > maxNumComponents) {
final int capacity = components.get(numComponents - 1).endOffset;
ByteBuf consolidated = components.get(numComponents - 1).buf.unsafe().newBuffer(capacity);
for (int i = 0; i < numComponents; i ++) {
ByteBuf b = components.get(i).buf;
consolidated.writeBytes(b);
b.unsafe().release();
}
Component c = new Component(consolidated);
c.endOffset = c.length;
components.clear();
components.add(c);
}
} }
private void checkComponentIndex(int cIndex) { private void checkComponentIndex(int cIndex) {