Add CompositeBuffer.decomposeBuffer method (#11683)

Motivation:
It may in some cases be useful to unwrap a composite buffer and work on the array of buffers directly.
The decomposeBuffer method makes this possible safely, by killing the composite buffer in the process.

Modification:
Add a CompositeBuffer.decomposeBuffer method, which returns the array of the constituent component buffers of the composite buffer, and at the same time closes the composite buffer without closing its components.
The caller effectively takes ownership of the component buffers away from the composite buffer.

Result:
This API makes buffer composition fully reversible.
This commit is contained in:
Chris Vest 2021-09-15 16:38:43 +02:00 committed by GitHub
parent cf1ab852d1
commit 000f2a0934
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 113 additions and 1 deletions

View File

@ -122,7 +122,7 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
private int capacity;
private int roff;
private int woff;
private int subOffset; // The next offset *within* a consituent buffer to read from or write to.
private int subOffset; // The next offset *within* a constituent buffer to read from or write to.
private boolean closed;
private boolean readOnly;
@ -378,6 +378,9 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
@Override
public CompositeBuffer fill(byte value) {
if (closed) {
throw bufferIsClosed(this);
}
for (Buffer buf : bufs) {
buf.fill(value);
}
@ -406,6 +409,9 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
"Offset and length cannot be negative, but offset was " +
offset + ", and length was " + length + '.');
}
if (closed) {
throw bufferIsClosed(this);
}
Buffer choice = (Buffer) chooseBuffer(offset, 0);
Buffer[] copies;
@ -509,6 +515,9 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
throw new IllegalArgumentException("The fromOffset+length is beyond the end of the buffer: " +
"fromOffset=" + fromOffset + ", length=" + length + '.');
}
if (closed) {
throw bufferIsClosed(this);
}
int startBufferIndex = searchOffsets(fromOffset);
int off = fromOffset - offsets[startBufferIndex];
Buffer startBuf = bufs[startBufferIndex];
@ -528,6 +537,9 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
throw new IllegalArgumentException("The fromOffset-length would underflow the buffer: " +
"fromOffset=" + fromOffset + ", length=" + length + '.');
}
if (closed) {
throw bufferIsClosed(this);
}
int startBufferIndex = searchOffsets(fromOffset);
int off = fromOffset - offsets[startBufferIndex];
Buffer startBuf = bufs[startBufferIndex];
@ -791,6 +803,32 @@ public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuff
return buildSplitBuffer(splits);
}
/**
* Break a composite buffer into its constituent components.
* <p>
* This "consumes" the composite buffer, leaving the composite buffer instance as if it had been closed.
* The buffers in the returned array are not closed, and become owned by the caller.
*
* @return An array of the constituent buffer components.
*/
public Buffer[] decomposeBuffer() {
Buffer[] result = bufs;
bufs = EMPTY_BUFFER_ARRAY;
try {
close();
} catch (Throwable e) {
for (Buffer buffer : result) {
try {
buffer.close();
} catch (Throwable ex) {
e.addSuppressed(ex);
}
}
throw e;
}
return result;
}
@Override
public CompositeBuffer compact() {
if (!isOwned()) {

View File

@ -206,4 +206,9 @@ public interface Statics {
static int countBorrows(ResourceSupport<?, ?> obj) {
return ResourceSupport.countBorrows(obj);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
static void unsafeSetDrop(ResourceSupport<?, ?> obj, Drop<?> replacement) {
obj.unsafeSetDrop((Drop) replacement);
}
}

View File

@ -20,8 +20,10 @@ import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.BufferClosedException;
import io.netty.buffer.api.BufferReadOnlyException;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Send;
import io.netty.buffer.api.internal.ResourceSupport;
import io.netty.buffer.api.internal.Statics;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@ -29,6 +31,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import static io.netty.buffer.api.internal.Statics.acquire;
import static io.netty.buffer.api.internal.Statics.isOwned;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -563,4 +566,70 @@ public class BufferCompositionTest extends BufferTestSupport {
}
}
}
@Test
public void decomposeOfEmptyBufferMustGiveEmptyArray() {
CompositeBuffer composite = CompositeBuffer.compose(BufferAllocator.onHeapUnpooled());
Buffer[] components = composite.decomposeBuffer();
assertThat(components.length).isZero();
verifyInaccessible(composite);
assertThrows(IllegalStateException.class, () -> composite.close());
}
@Test
public void decomposeOfCompositeBufferMustGiveComponentArray() {
try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled()) {
CompositeBuffer composite = CompositeBuffer.compose(
allocator,
allocator.allocate(3).send(),
allocator.allocate(3).send(),
allocator.allocate(2).send());
composite.writeLong(0x0102030405060708L);
assertThat(composite.readInt()).isEqualTo(0x01020304);
Buffer[] components = composite.decomposeBuffer();
assertThat(components.length).isEqualTo(3);
verifyInaccessible(composite);
assertThat(components[0].readableBytes()).isZero();
assertThat(components[0].writableBytes()).isZero();
assertThat(components[1].readableBytes()).isEqualTo(2);
assertThat(components[1].writableBytes()).isZero();
assertThat(components[2].readableBytes()).isEqualTo(2);
assertThat(components[2].writableBytes()).isZero();
assertThat(components[1].readShort()).isEqualTo((short) 0x0506);
assertThat(components[2].readShort()).isEqualTo((short) 0x0708);
}
}
@Test
public void failureInDecomposeMustCloseConstituentBuffers() {
try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled()) {
CompositeBuffer composite = CompositeBuffer.compose(
allocator,
allocator.allocate(3).send(),
allocator.allocate(3).send(),
allocator.allocate(2).send());
Drop<Object> throwingDrop = obj -> {
throw new RuntimeException("Expected.");
};
try {
Statics.unsafeSetDrop(composite, throwingDrop);
} catch (Exception e) {
composite.close();
throw e;
}
Buffer[] inners = new Buffer[3];
assertThat(composite.countWritableComponents()).isEqualTo(3);
int count = composite.forEachWritable(0, (i, component) -> {
inners[i] = (Buffer) component;
return true;
});
assertThat(count).isEqualTo(3);
var re = assertThrows(RuntimeException.class, () -> composite.decomposeBuffer());
assertThat(re.getMessage()).isEqualTo("Expected.");
// The failure to decompose the buffer should have closed the inner buffers we extracted earlier.
for (Buffer inner : inners) {
assertFalse(inner.isAccessible());
}
}
}
}