From 000f2a0934196c94601f6df9b075a25e542e0b3c Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 15 Sep 2021 16:38:43 +0200 Subject: [PATCH] Add CompositeBuffer.decomposeBuffer method (#11683) Motivation: It may in some cases be useful to unwrap a composite buffer and work on the array of buffers directly. The decomposeBuffer method makes this possible safely, by killing the composite buffer in the process. Modification: Add a CompositeBuffer.decomposeBuffer method, which returns the array of the constituent component buffers of the composite buffer, and at the same time closes the composite buffer without closing its components. The caller effectively takes ownership of the component buffers away from the composite buffer. Result: This API makes buffer composition fully reversible. --- .../io/netty/buffer/api/CompositeBuffer.java | 40 ++++++++++- .../io/netty/buffer/api/internal/Statics.java | 5 ++ .../api/tests/BufferCompositionTest.java | 69 +++++++++++++++++++ 3 files changed, 113 insertions(+), 1 deletion(-) diff --git a/buffer/src/main/java/io/netty/buffer/api/CompositeBuffer.java b/buffer/src/main/java/io/netty/buffer/api/CompositeBuffer.java index be484f8ab2..77bcce8e18 100644 --- a/buffer/src/main/java/io/netty/buffer/api/CompositeBuffer.java +++ b/buffer/src/main/java/io/netty/buffer/api/CompositeBuffer.java @@ -122,7 +122,7 @@ public final class CompositeBuffer extends ResourceSupport obj) { return ResourceSupport.countBorrows(obj); } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + static void unsafeSetDrop(ResourceSupport obj, Drop replacement) { + obj.unsafeSetDrop((Drop) replacement); + } } diff --git a/buffer/src/test/java/io/netty/buffer/api/tests/BufferCompositionTest.java b/buffer/src/test/java/io/netty/buffer/api/tests/BufferCompositionTest.java index 1736ebbfdf..7d01cf3a73 100644 --- a/buffer/src/test/java/io/netty/buffer/api/tests/BufferCompositionTest.java +++ b/buffer/src/test/java/io/netty/buffer/api/tests/BufferCompositionTest.java @@ -20,8 +20,10 @@ import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.BufferClosedException; import io.netty.buffer.api.BufferReadOnlyException; import io.netty.buffer.api.CompositeBuffer; +import io.netty.buffer.api.Drop; import io.netty.buffer.api.Send; import io.netty.buffer.api.internal.ResourceSupport; +import io.netty.buffer.api.internal.Statics; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -29,6 +31,7 @@ import org.junit.jupiter.params.provider.MethodSource; import static io.netty.buffer.api.internal.Statics.acquire; import static io.netty.buffer.api.internal.Statics.isOwned; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -563,4 +566,70 @@ public class BufferCompositionTest extends BufferTestSupport { } } } + + @Test + public void decomposeOfEmptyBufferMustGiveEmptyArray() { + CompositeBuffer composite = CompositeBuffer.compose(BufferAllocator.onHeapUnpooled()); + Buffer[] components = composite.decomposeBuffer(); + assertThat(components.length).isZero(); + verifyInaccessible(composite); + assertThrows(IllegalStateException.class, () -> composite.close()); + } + + @Test + public void decomposeOfCompositeBufferMustGiveComponentArray() { + try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled()) { + CompositeBuffer composite = CompositeBuffer.compose( + allocator, + allocator.allocate(3).send(), + allocator.allocate(3).send(), + allocator.allocate(2).send()); + composite.writeLong(0x0102030405060708L); + assertThat(composite.readInt()).isEqualTo(0x01020304); + Buffer[] components = composite.decomposeBuffer(); + assertThat(components.length).isEqualTo(3); + verifyInaccessible(composite); + assertThat(components[0].readableBytes()).isZero(); + assertThat(components[0].writableBytes()).isZero(); + assertThat(components[1].readableBytes()).isEqualTo(2); + assertThat(components[1].writableBytes()).isZero(); + assertThat(components[2].readableBytes()).isEqualTo(2); + assertThat(components[2].writableBytes()).isZero(); + assertThat(components[1].readShort()).isEqualTo((short) 0x0506); + assertThat(components[2].readShort()).isEqualTo((short) 0x0708); + } + } + + @Test + public void failureInDecomposeMustCloseConstituentBuffers() { + try (BufferAllocator allocator = BufferAllocator.onHeapUnpooled()) { + CompositeBuffer composite = CompositeBuffer.compose( + allocator, + allocator.allocate(3).send(), + allocator.allocate(3).send(), + allocator.allocate(2).send()); + Drop throwingDrop = obj -> { + throw new RuntimeException("Expected."); + }; + try { + Statics.unsafeSetDrop(composite, throwingDrop); + } catch (Exception e) { + composite.close(); + throw e; + } + Buffer[] inners = new Buffer[3]; + assertThat(composite.countWritableComponents()).isEqualTo(3); + int count = composite.forEachWritable(0, (i, component) -> { + inners[i] = (Buffer) component; + return true; + }); + assertThat(count).isEqualTo(3); + var re = assertThrows(RuntimeException.class, () -> composite.decomposeBuffer()); + assertThat(re.getMessage()).isEqualTo("Expected."); + // The failure to decompose the buffer should have closed the inner buffers we extracted earlier. + for (Buffer inner : inners) { + assertFalse(inner.isAccessible()); + } + } + } }