Add splitComponentsFloor and splitComponentsCeil

These methods make it possible to accurately split composite buffers at component boundaries, either by rounding the offset down or up to the nearest component boundary, respectively.

Composite buffers already support the split method, but it is hard for client code to predict precisely where component boundaries are placed inside composite buffers.
When split is used with an offset that does not land exactly on a component boundary, then the internal component that the offset lands on will also be split.
This may make it harder to precisely reason about memory life cycles and reuse.
This commit is contained in:
Chris Vest 2021-05-07 10:41:16 +02:00
parent 83643a5dc9
commit 556d0acc89
3 changed files with 308 additions and 4 deletions

View File

@ -607,7 +607,8 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* See the <a href="#slice-split">Slice vs. Split</a> section for details on the difference between slice
* and split.
*
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
* @return A new buffer with independent and exclusive ownership over the bytes from the beginning to the given
* offset of this buffer.
*/
Buffer split(int splitOffset);

View File

@ -921,8 +921,7 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
computeBufferOffsets();
}
@Override
public CompositeBuffer split(int splitOffset) {
private void checkSplit(int splitOffset) {
if (splitOffset < 0) {
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
}
@ -933,6 +932,11 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
if (!isOwned()) {
throw new IllegalStateException("Cannot split a buffer that is not owned.");
}
}
@Override
public CompositeBuffer split(int splitOffset) {
checkSplit(splitOffset);
if (bufs.length == 0) {
// Splitting a zero-length buffer is trivial.
return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order);
@ -946,6 +950,10 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
splits[splits.length - 1] = bufs[0].split(off);
}
computeBufferOffsets();
return buildSplitBuffer(splits);
}
private CompositeBuffer buildSplitBuffer(Buffer[] splits) {
try {
var compositeBuf = new CompositeBuffer(allocator, splits, unsafeGetDrop(), true);
compositeBuf.order = order; // Preserve byte order even if splits array is empty.
@ -958,6 +966,62 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
}
}
/**
* Split this buffer at a component boundary that is less than or equal to the given offset.
* <p>
* This method behaves the same as {@link #split(int)}, except no components are split.
*
* @param splitOffset The maximum split offset. The real split offset will be at a component boundary that is less
* than or equal to this offset.
* @return A new buffer with independent and exclusive ownership over the bytes from the beginning to a component
* boundary less than or equal to the given offset of this buffer.
*/
public CompositeBuffer splitComponentsFloor(int splitOffset) {
checkSplit(splitOffset);
if (bufs.length == 0) {
// Splitting a zero-length buffer is trivial.
return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order);
}
int i = searchOffsets(splitOffset);
int off = splitOffset - offsets[i];
if (off == bufs[i].capacity()) {
i++;
}
Buffer[] splits = Arrays.copyOf(bufs, i);
bufs = Arrays.copyOfRange(bufs, i, bufs.length);
computeBufferOffsets();
return buildSplitBuffer(splits);
}
/**
* Split this buffer at a component boundary that is greater than or equal to the given offset.
* <p>
* This method behaves the same as {@link #split(int)}, except no components are split.
*
* @param splitOffset The minimum split offset. The real split offset will be at a component boundary that is
* greater than or equal to this offset.
* @return A new buffer with independent and exclusive ownership over the bytes from the beginning to a component
* boundary greater than or equal to the given offset of this buffer.
*/
public CompositeBuffer splitComponentsCeil(int splitOffset) {
checkSplit(splitOffset);
if (bufs.length == 0) {
// Splitting a zero-length buffer is trivial.
return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order);
}
int i = searchOffsets(splitOffset);
int off = splitOffset - offsets[i];
if (0 < off && off <= bufs[i].capacity()) {
i++;
}
Buffer[] splits = Arrays.copyOf(bufs, i);
bufs = Arrays.copyOfRange(bufs, i, bufs.length);
computeBufferOffsets();
return buildSplitBuffer(splits);
}
@Override
public void compact() {
if (!isOwned()) {

View File

@ -29,7 +29,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class BufferCompositionTest extends BufferTestSupport {
@Test
public void compositeBufferCanOnlyBeOwnedWhenAllConstituentBuffersAreOwned() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
@ -470,4 +469,244 @@ public class BufferCompositionTest extends BufferTestSupport {
}
}
}
@Test
public void splitComponentsFloorMustThrowIfCompositeBufferIsNotOwned() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
CompositeBuffer composite = CompositeBuffer.compose(allocator, a, b)) {
assertThrows(IllegalStateException.class, () -> composite.splitComponentsFloor(0));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsFloor(4));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsFloor(7));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsFloor(8));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsFloor(9));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsFloor(12));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsFloor(16));
}
}
@Test
public void splitComponentsCeilMustThrowIfCompositeBufferIsNotOwned() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
CompositeBuffer composite = CompositeBuffer.compose(allocator, a, b)) {
assertThrows(IllegalStateException.class, () -> composite.splitComponentsCeil(0));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsCeil(4));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsCeil(7));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsCeil(8));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsCeil(9));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsCeil(12));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsCeil(16));
}
}
@Test
public void splitComponentsFloorMustThrowOnOutOfBounds() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
assertThrows(IllegalArgumentException.class, () -> composite.splitComponentsFloor(-1));
assertThrows(IllegalArgumentException.class, () -> composite.splitComponentsFloor(17));
try (CompositeBuffer split = composite.splitComponentsFloor(16)) {
assertThat(split.capacity()).isEqualTo(16);
assertThat(composite.capacity()).isZero();
}
}
}
@Test
public void splitComponentsCeilMustThrowOnOutOfBounds() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
assertThrows(IllegalArgumentException.class, () -> composite.splitComponentsCeil(-1));
assertThrows(IllegalArgumentException.class, () -> composite.splitComponentsCeil(17));
try (CompositeBuffer split = composite.splitComponentsCeil(16)) {
assertThat(split.capacity()).isEqualTo(16);
assertThat(composite.capacity()).isZero();
}
}
}
@Test
public void splitComponentsFloorMustGiveEmptyBufferForOffsetInFirstComponent() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
try (CompositeBuffer split = composite.splitComponentsFloor(4)) {
assertTrue(split.isOwned());
assertTrue(split.isAccessible());
assertThat(split.capacity()).isZero();
assertTrue(composite.isOwned());
assertTrue(composite.isAccessible());
assertThat(composite.capacity()).isEqualTo(16);
}
}
}
@Test
public void splitComponentsFloorMustGiveEmptyBufferForOffsetLastByteInFirstComponent() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
try (CompositeBuffer split = composite.splitComponentsFloor(7)) {
assertTrue(split.isOwned());
assertTrue(split.isAccessible());
assertThat(split.capacity()).isZero();
assertTrue(composite.isOwned());
assertTrue(composite.isAccessible());
assertThat(composite.capacity()).isEqualTo(16);
}
}
}
@Test
public void splitComponentsFloorMustGiveBufferWithFirstComponentForOffsetInSecondComponent() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
try (CompositeBuffer split = composite.splitComponentsFloor(12)) {
assertTrue(split.isOwned());
assertTrue(split.isAccessible());
assertThat(split.capacity()).isEqualTo(8);
assertTrue(composite.isOwned());
assertTrue(composite.isAccessible());
assertThat(composite.capacity()).isEqualTo(8);
}
}
}
@Test
public void splitComponentsFloorMustGiveBufferWithFirstComponentForOffsetOnFirstByteInSecondComponent() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
try (CompositeBuffer split = composite.splitComponentsFloor(8)) {
assertTrue(split.isOwned());
assertTrue(split.isAccessible());
assertThat(split.capacity()).isEqualTo(8);
assertTrue(composite.isOwned());
assertTrue(composite.isAccessible());
assertThat(composite.capacity()).isEqualTo(8);
}
}
}
@Test
public void splitComponentsCeilMustGiveBufferWithFirstComponenForOffsetInFirstComponent() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
try (CompositeBuffer split = composite.splitComponentsCeil(4)) {
assertTrue(split.isOwned());
assertTrue(split.isAccessible());
assertThat(split.capacity()).isEqualTo(8);
assertTrue(composite.isOwned());
assertTrue(composite.isAccessible());
assertThat(composite.capacity()).isEqualTo(8);
}
}
}
@Test
public void splitComponentsCeilMustGiveBufferWithFirstComponentFofOffsetOnLastByteInFirstComponent() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
try (CompositeBuffer split = composite.splitComponentsCeil(7)) {
assertTrue(split.isOwned());
assertTrue(split.isAccessible());
assertThat(split.capacity()).isEqualTo(8);
assertTrue(composite.isOwned());
assertTrue(composite.isAccessible());
assertThat(composite.capacity()).isEqualTo(8);
}
}
}
@Test
public void splitComponentsCeilMustGiveBufferWithFirstAndSecondComponentForfOffsetInSecondComponent() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
try (CompositeBuffer split = composite.splitComponentsCeil(12)) {
assertTrue(split.isOwned());
assertTrue(split.isAccessible());
assertThat(split.capacity()).isEqualTo(16);
assertTrue(composite.isOwned());
assertTrue(composite.isAccessible());
assertThat(composite.capacity()).isEqualTo(0);
}
}
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
try (CompositeBuffer split = composite.splitComponentsCeil(12)) {
assertTrue(split.isOwned());
assertTrue(split.isAccessible());
assertThat(split.capacity()).isEqualTo(16);
assertTrue(composite.isOwned());
assertTrue(composite.isAccessible());
assertThat(composite.capacity()).isEqualTo(8);
}
}
}
@Test
public void splitComponentsCeilMustGiveBufferWithFirstComponentForfOffsetOnFirstByteInSecondComponent() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
try (CompositeBuffer split = composite.splitComponentsCeil(7)) {
assertTrue(split.isOwned());
assertTrue(split.isAccessible());
assertThat(split.capacity()).isEqualTo(8);
assertTrue(composite.isOwned());
assertTrue(composite.isAccessible());
assertThat(composite.capacity()).isEqualTo(8);
}
}
}
@Test
public void splitComponentsCeilMustGiveEmptyBufferForOffsetOnFirstByteInFirstComponent() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
try (CompositeBuffer split = composite.splitComponentsCeil(0)) {
assertTrue(split.isOwned());
assertTrue(split.isAccessible());
assertThat(split.capacity()).isZero();
assertTrue(composite.isOwned());
assertTrue(composite.isAccessible());
assertThat(composite.capacity()).isEqualTo(16);
}
}
}
}