Make CompositeyteBuf and MessageBuf call release() on its elements when it is deallocated
This commit is contained in:
parent
bf0bfe9a69
commit
aca0d5fa68
@ -105,7 +105,7 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
|
||||
|
||||
@Override
|
||||
public CompositeByteBuf addComponent(ByteBuf buffer) {
|
||||
addComponent0(components.size(), buffer, false);
|
||||
addComponent0(components.size(), buffer);
|
||||
consolidateIfNeeded();
|
||||
return this;
|
||||
}
|
||||
@ -126,12 +126,12 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
|
||||
|
||||
@Override
|
||||
public CompositeByteBuf addComponent(int cIndex, ByteBuf buffer) {
|
||||
addComponent0(cIndex, buffer, false);
|
||||
addComponent0(cIndex, buffer);
|
||||
consolidateIfNeeded();
|
||||
return this;
|
||||
}
|
||||
|
||||
private int addComponent0(int cIndex, ByteBuf buffer, boolean addedBySelf) {
|
||||
private int addComponent0(int cIndex, ByteBuf buffer) {
|
||||
checkComponentIndex(cIndex);
|
||||
|
||||
if (buffer == null) {
|
||||
@ -150,7 +150,7 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
|
||||
}
|
||||
|
||||
// No need to consolidate - just add a component to the list.
|
||||
Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice(), addedBySelf);
|
||||
Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
|
||||
if (cIndex == components.size()) {
|
||||
components.add(c);
|
||||
if (cIndex == 0) {
|
||||
@ -198,7 +198,7 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
|
||||
break;
|
||||
}
|
||||
if (b.isReadable()) {
|
||||
cIndex = addComponent0(cIndex, b, false) + 1;
|
||||
cIndex = addComponent0(cIndex, b) + 1;
|
||||
int size = components.size();
|
||||
if (cIndex > size) {
|
||||
cIndex = size;
|
||||
@ -276,7 +276,7 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
|
||||
consolidated.writeBytes(b);
|
||||
c.freeIfNecessary();
|
||||
}
|
||||
Component c = new Component(consolidated, true);
|
||||
Component c = new Component(consolidated);
|
||||
c.endOffset = c.length;
|
||||
components.clear();
|
||||
components.add(c);
|
||||
@ -450,13 +450,13 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
|
||||
if (nComponents < maxNumComponents) {
|
||||
padding = allocBuffer(paddingLength);
|
||||
padding.setIndex(0, paddingLength);
|
||||
addComponent0(0, padding, true);
|
||||
addComponent0(0, padding);
|
||||
} else {
|
||||
padding = allocBuffer(paddingLength);
|
||||
padding.setIndex(0, paddingLength);
|
||||
// FIXME: No need to create a padding buffer and consolidate.
|
||||
// Just create a big single buffer and put the current content there.
|
||||
addComponent0(components.size(), padding, true);
|
||||
addComponent0(components.size(), padding);
|
||||
consolidateIfNeeded();
|
||||
}
|
||||
} else if (newCapacity < oldCapacity) {
|
||||
@ -470,7 +470,7 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
|
||||
}
|
||||
|
||||
// Replace the last component with the trimmed slice.
|
||||
Component newC = new Component(c.buf.slice(0, c.length - bytesToTrim), c.allocatedBySelf);
|
||||
Component newC = new Component(c.buf.slice(0, c.length - bytesToTrim));
|
||||
newC.offset = c.offset;
|
||||
newC.endOffset = newC.offset + newC.length;
|
||||
i.set(newC);
|
||||
@ -1144,7 +1144,7 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
|
||||
}
|
||||
|
||||
components.clear();
|
||||
components.add(new Component(consolidated, true));
|
||||
components.add(new Component(consolidated));
|
||||
updateComponentOffsets(0);
|
||||
return this;
|
||||
}
|
||||
@ -1169,7 +1169,7 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
|
||||
}
|
||||
|
||||
components.subList(cIndex + 1, endCIndex).clear();
|
||||
components.set(cIndex, new Component(consolidated, true));
|
||||
components.set(cIndex, new Component(consolidated));
|
||||
updateComponentOffsets(cIndex);
|
||||
return this;
|
||||
}
|
||||
@ -1243,7 +1243,7 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
|
||||
// new slice would be empty, so remove instead
|
||||
components.remove(0);
|
||||
} else {
|
||||
Component newC = new Component(c.buf.slice(adjustment, c.length - adjustment), c.allocatedBySelf);
|
||||
Component newC = new Component(c.buf.slice(adjustment, c.length - adjustment));
|
||||
components.set(0, newC);
|
||||
}
|
||||
|
||||
@ -1271,21 +1271,15 @@ public class DefaultCompositeByteBuf extends AbstractReferenceCountedByteBuf imp
|
||||
private final class Component {
|
||||
final ByteBuf buf;
|
||||
final int length;
|
||||
final boolean allocatedBySelf;
|
||||
int offset;
|
||||
int endOffset;
|
||||
|
||||
Component(ByteBuf buf, boolean allocatedBySelf) {
|
||||
Component(ByteBuf buf) {
|
||||
this.buf = buf;
|
||||
length = buf.readableBytes();
|
||||
this.allocatedBySelf = allocatedBySelf;
|
||||
}
|
||||
|
||||
void freeIfNecessary() {
|
||||
if (!allocatedBySelf) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Unwrap so that we can free slices, too.
|
||||
ByteBuf buf;
|
||||
for (buf = this.buf; buf.unwrap() != null; buf = buf.unwrap()) {
|
||||
|
@ -77,9 +77,22 @@ final class DefaultMessageBuf<T> extends AbstractMessageBuf<T> {
|
||||
|
||||
@Override
|
||||
protected void deallocate() {
|
||||
int head = this.head;
|
||||
int tail = this.tail;
|
||||
if (head != tail) {
|
||||
this.head = this.tail = 0;
|
||||
final int mask = elements.length - 1;
|
||||
int i = head;
|
||||
do {
|
||||
BufUtil.release(elements[i]);
|
||||
elements[i] = null;
|
||||
i = i + 1 & mask;
|
||||
} while (i != tail);
|
||||
}
|
||||
|
||||
elements = null;
|
||||
head = 0;
|
||||
tail = 0;
|
||||
this.head = 0;
|
||||
this.tail = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -124,6 +124,9 @@ final class QueueBackedMessageBuf<T> extends AbstractMessageBuf<T> {
|
||||
|
||||
@Override
|
||||
protected void deallocate() {
|
||||
for (T e: queue) {
|
||||
BufUtil.release(e);
|
||||
}
|
||||
queue = null;
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,11 @@ package io.netty.buffer;
|
||||
* {@link #retain()} increases the reference count, and {@link #release()} decreases the reference count.
|
||||
* If the reference count is decreased to {@code 0}, the object will be deallocated explicitly, and accessing
|
||||
* the deallocated object will usually result in an access violation.
|
||||
* </p>
|
||||
* <p>
|
||||
* If this object is a container such as {@link MessageBuf} and {@link CompositeByteBuf}, its elements whose type is
|
||||
* {@link ReferenceCounted} will also be released when this object is deallocated.
|
||||
* </p>
|
||||
*/
|
||||
public interface ReferenceCounted {
|
||||
/**
|
||||
@ -40,14 +45,16 @@ public interface ReferenceCounted {
|
||||
void retain(int increment);
|
||||
|
||||
/**
|
||||
* Decreases the reference count by {@code 1}.
|
||||
* Decreases the reference count by {@code 1} and deallocates this object if the reference count reaches at
|
||||
* {@code 0}.
|
||||
*
|
||||
* @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
|
||||
*/
|
||||
boolean release();
|
||||
|
||||
/**
|
||||
* Decreases the reference count by the specified {@code decrement}.
|
||||
* Decreases the reference count by the specified {@code decrement} and deallocates this object if the reference
|
||||
* count reaches at {@code 0}.
|
||||
*
|
||||
* @return {@code true} if and only if the reference count became {@code 0} and this object has been deallocated
|
||||
*/
|
||||
|
Loading…
x
Reference in New Issue
Block a user