Make bifurcate and ensureWritable more flexible

This supports more use cases.
The ensureWritable method can now amortise its allocation cost by allocating more than what is strictly necessary to satisfy the immediate call.
The bifurcate method can now split at a given offset.
This commit is contained in:
Chris Vest 2021-04-22 16:57:31 +02:00
parent a2d49fed3e
commit 7775460984
9 changed files with 299 additions and 54 deletions

View File

@ -54,8 +54,6 @@ import java.nio.ByteOrder;
* such as with the {@link #getByte(int)} method, * such as with the {@link #getByte(int)} method,
* from multiple threads. * from multiple threads.
* <p> * <p>
* Confined buffers will initially be confined to the thread that allocates them.
* <p>
* If a buffer needs to be accessed by a different thread, * If a buffer needs to be accessed by a different thread,
* then the ownership of that buffer must be sent to that thread. * then the ownership of that buffer must be sent to that thread.
* This can be done with the {@link #send()} method. * This can be done with the {@link #send()} method.
@ -101,6 +99,34 @@ import java.nio.ByteOrder;
* 0 <= readerOffset <= writerOffset <= capacity * 0 <= readerOffset <= writerOffset <= capacity
* </pre> * </pre>
* *
* <h3 name="slice-bifurcate">Slice vs. Bifurcate</h3>
*
* The {@link #slice()} and {@link #bifurcate()} methods both return new buffers on the memory of the buffer they're
* called on.
* However, there are also important differences between the two, as they are aimed at different use cases that were
* previously (in the {@code ByteBuf} API) covered by {@code slice()} alone.
*
* <ul>
* <li>
* Slices create a new view onto the memory, that is shared between the slice and the buffer.
* As long as both the slice and the originating buffer are alive, neither will have ownership of the memory.
* Since the memory is shared, changes to the data made through one will be visible through the other.
* </li>
* <li>
* Bifurcation breaks the ownership of the memory in two.
* Both resulting buffers retain ownership of their respective region of memory.
* They can do this because the regions are guaranteed to not overlap; data changes through one buffer will not
* be visible through the other.
* </li>
* </ul>
*
* These differences means that slicing is mostly suitable for when you temporarily want to share a focused area of a
* buffer.
* Examples of this include doing IO, or decoding a bounded part of a larger message.
* On the other hand, bifurcate is suitable for when you want to hand over a region of a buffer to some other,
* perhaps unknown, piece of code, and relinquish your ownership of that buffer region in the process.
* Examples of include aggregating messages into an accumulator buffer, and sending messages down the pipeline for
* further processing, as bifurcated buffer regions, once their data has been received in its entirety.
*/ */
public interface Buffer extends Rc<Buffer>, BufferAccessors { public interface Buffer extends Rc<Buffer>, BufferAccessors {
/** /**
@ -432,7 +458,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* If this buffer already has the necessary space, then this method returns immediately. * If this buffer already has the necessary space, then this method returns immediately.
* If this buffer does not already have the necessary space, then it will be expanded using the * If this buffer does not already have the necessary space, then it will be expanded using the
* {@link BufferAllocator} the buffer was created with. * {@link BufferAllocator} the buffer was created with.
* This method is the same as calling {@link #ensureWritable(int, boolean)} where {@code allowCompaction} is * This method is the same as calling {@link #ensureWritable(int, int, boolean)} where {@code allowCompaction} is
* {@code false}. * {@code false}.
* *
* @param size The requested number of bytes of space that should be available for writing. * @param size The requested number of bytes of space that should be available for writing.
@ -440,7 +466,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* or is {@linkplain #readOnly() read-only}. * or is {@linkplain #readOnly() read-only}.
*/ */
default void ensureWritable(int size) { default void ensureWritable(int size) {
ensureWritable(size, true); ensureWritable(size, 1, true);
} }
/** /**
@ -472,13 +498,17 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* </ul> * </ul>
* *
* @param size The requested number of bytes of space that should be available for writing. * @param size The requested number of bytes of space that should be available for writing.
* @param minimumGrowth The minimum number of bytes to grow by. If it is determined that memory should be allocated
* and copied, make sure that the new memory allocation is bigger than the old one by at least
* this many bytes. This way, the buffer can grow by more than what is immediately necessary,
* thus amortising the costs of allocating and copying.
* @param allowCompaction {@code true} if the method is allowed to modify the * @param allowCompaction {@code true} if the method is allowed to modify the
* {@linkplain #readerOffset() reader offset} and * {@linkplain #readerOffset() reader offset} and
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}. * {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
* @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state, * @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state,
* * or is {@linkplain #readOnly() read-only}. * or is {@linkplain #readOnly() read-only}.
*/ */
void ensureWritable(int size, boolean allowCompaction); void ensureWritable(int size, int minimumGrowth, boolean allowCompaction);
/** /**
* Returns a slice of this buffer's readable bytes. * Returns a slice of this buffer's readable bytes.
@ -492,6 +522,9 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* <p> * <p>
* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice, * The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice,
* so that the entire contents of the slice is ready to be read. * so that the entire contents of the slice is ready to be read.
* <p>
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
* and bifurcate.
* *
* @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()}, * @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()},
* that is a view of the readable region of this buffer. * that is a view of the readable region of this buffer.
@ -511,6 +544,9 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* <p> * <p>
* The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice, * The slice is created with a {@linkplain #writerOffset() write offset} equal to the length of the slice,
* so that the entire contents of the slice is ready to be read. * so that the entire contents of the slice is ready to be read.
* <p>
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
* and bifurcate.
* *
* @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()}, * @return A new buffer instance, with independent {@link #readerOffset()} and {@link #writerOffset()},
* that is a view of the given region of this buffer. * that is a view of the given region of this buffer.
@ -558,10 +594,64 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* simply split its internal array in two. * simply split its internal array in two.
* <p> * <p>
* Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}. * Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}.
* <p>
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
* and bifurcate.
* *
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer. * @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
*/ */
Buffer bifurcate(); default Buffer bifurcate() {
return bifurcate(writerOffset());
}
/**
* Split the buffer into two, at the given {@code splitOffset}.
* <p>
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* <p>
* The region of this buffer that precede the {@code splitOffset}, will be captured and returned in a new
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently
* {@linkplain #send() sent} to other threads.
* <p>
* The returned buffer will adopt the {@link #readerOffset()} and {@link #writerOffset()} of this buffer,
* but truncated to fit within the capacity dictated by the {@code splitOffset}.
* <p>
* The memory region in the returned buffer will become inaccessible through this buffer. If the
* {@link #readerOffset()} or {@link #writerOffset()} of this buffer lie prior to the {@code splitOffset},
* then those offsets will be moved forward so they land on offset 0 after the bifurcation.
* <p>
* Effectively, the following transformation takes place:
* <pre>{@code
* This buffer:
* +--------------------------------+
* 0| |splitOffset |cap
* +---------------+----------------+
* / / \ \
* / / \ \
* / / \ \
* / / \ \
* / / \ \
* +---------------+ +---------------+
* | |cap | |cap
* +---------------+ +---------------+
* Returned buffer. This buffer.
* }</pre>
* When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the
* underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until
* all of the bifurcated parts have been closed.
* <p>
* Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be
* bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can
* simply split its internal array in two.
* <p>
* Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}.
* <p>
* See the <a href="#slice-bifurcate">Slice vs. Bifurcate</a> section for details on the difference between slice
* and bifurcate.
*
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
*/
Buffer bifurcate(int splitOffset);
/** /**
* Discards the read bytes, and moves the buffer contents to the beginning of the buffer. * Discards the read bytes, and moves the buffer contents to the beginning of the buffer.

View File

@ -606,13 +606,16 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
} }
@Override @Override
public void ensureWritable(int size, boolean allowCompaction) { public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) {
if (!isOwned()) { if (!isOwned()) {
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable."); throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
} }
if (size < 0) { if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
} }
if (minimumGrowth < 0) {
throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.');
}
if (readOnly) { if (readOnly) {
throw bufferIsReadOnly(); throw bufferIsReadOnly();
} }
@ -649,13 +652,20 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
// Now we have enough space. // Now we have enough space.
return; return;
} }
} else if (bufs.length == 1) {
// If we only have a single component buffer, then we can safely compact that in-place.
bufs[0].compact();
computeBufferOffsets();
if (writableBytes() >= size) {
// Now we have enough space.
return;
}
} }
} }
long newSize = capacity() + (long) size; int growth = Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(newSize); BufferAllocator.checkSize(capacity() + (long) growth);
int growth = size - writableBytes(); Buffer extension = allocator.allocate(growth, order());
Buffer extension = bufs.length == 0? allocator.allocate(growth) : allocator.allocate(growth, order());
unsafeExtendWith(extension); unsafeExtendWith(extension);
} }
@ -739,7 +749,14 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
} }
@Override @Override
public Buffer bifurcate() { public Buffer bifurcate(int splitOffset) {
if (splitOffset < 0) {
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
}
if (capacity() < splitOffset) {
throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " +
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
}
if (!isOwned()) { if (!isOwned()) {
throw new IllegalStateException("Cannot bifurcate a buffer that is not owned."); throw new IllegalStateException("Cannot bifurcate a buffer that is not owned.");
} }
@ -748,12 +765,12 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order); return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order);
} }
int i = searchOffsets(woff); int i = searchOffsets(splitOffset);
int off = woff - offsets[i]; int off = splitOffset - offsets[i];
Buffer[] bifs = Arrays.copyOf(bufs, off == 0? i : 1 + i); Buffer[] bifs = Arrays.copyOf(bufs, off == 0? i : 1 + i);
bufs = Arrays.copyOfRange(bufs, off == bufs[i].capacity()? 1 + i : i, bufs.length); bufs = Arrays.copyOfRange(bufs, off == bufs[i].capacity()? 1 + i : i, bufs.length);
if (off > 0 && bifs.length > 0 && off < bifs[bifs.length - 1].capacity()) { if (off > 0 && bifs.length > 0 && off < bifs[bifs.length - 1].capacity()) {
bifs[bifs.length - 1] = bufs[0].bifurcate(); bifs[bifs.length - 1] = bufs[0].bifurcate(off);
} }
computeBufferOffsets(); computeBufferOffsets();
try { try {

View File

@ -101,7 +101,7 @@ public interface Send<T extends Rc<T>> extends Deref<T> {
* @param <R> The result type of the mapping function. * @param <R> The result type of the mapping function.
* @return A new {@link Send} instance that will deliver an object that is the result of the mapping. * @return A new {@link Send} instance that will deliver an object that is the result of the mapping.
*/ */
default <R extends Rc<R>> Send<R> map(Class<R> type, Function<T, ? extends R> mapper) { default <R extends Rc<R>> Send<R> map(Class<R> type, Function<T, R> mapper) {
return sending(type, () -> mapper.apply(receive())); return sending(type, () -> mapper.apply(receive()));
} }

View File

@ -364,7 +364,7 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
} }
@Override @Override
public void ensureWritable(int size, boolean allowCompaction) { public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) {
if (!isOwned()) { if (!isOwned()) {
throw attachTrace(new IllegalStateException( throw attachTrace(new IllegalStateException(
"Buffer is not owned. Only owned buffers can call ensureWritable.")); "Buffer is not owned. Only owned buffers can call ensureWritable."));
@ -372,6 +372,9 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
if (size < 0) { if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
} }
if (minimumGrowth < 0) {
throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.');
}
if (rmem != wmem) { if (rmem != wmem) {
throw bufferIsReadOnly(); throw bufferIsReadOnly();
} }
@ -387,7 +390,7 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
} }
// Allocate a bigger buffer. // Allocate a bigger buffer.
long newSize = capacity() + size - (long) writableBytes(); long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(newSize); BufferAllocator.checkSize(newSize);
ByteBuffer buffer = (ByteBuffer) control.allocateUntethered(this, (int) newSize); ByteBuffer buffer = (ByteBuffer) control.allocateUntethered(this, (int) newSize);
buffer.order(order()); buffer.order(order());
@ -414,26 +417,33 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
} }
@Override @Override
public Buffer bifurcate() { public Buffer bifurcate(int splitOffset) {
if (splitOffset < 0) {
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
}
if (capacity() < splitOffset) {
throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " +
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
}
if (!isOwned()) { if (!isOwned()) {
throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned."));
} }
var drop = (ArcDrop<NioBuffer>) unsafeGetDrop(); var drop = (ArcDrop<NioBuffer>) unsafeGetDrop();
unsafeSetDrop(new ArcDrop<>(drop)); unsafeSetDrop(new ArcDrop<>(drop));
var bifurcatedBuffer = rmem.slice(0, woff); var bifurcatedBuffer = rmem.slice(0, splitOffset);
// TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop. // TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop.
var bifurcatedBuf = new NioBuffer(base, bifurcatedBuffer, control, new ArcDrop<>(drop.increment())); var bifurcatedBuf = new NioBuffer(base, bifurcatedBuffer, control, new ArcDrop<>(drop.increment()));
bifurcatedBuf.woff = woff; bifurcatedBuf.woff = Math.min(woff, splitOffset);
bifurcatedBuf.roff = roff; bifurcatedBuf.roff = Math.min(roff, splitOffset);
bifurcatedBuf.order(order()); bifurcatedBuf.order(order());
boolean readOnly = readOnly(); boolean readOnly = readOnly();
bifurcatedBuf.readOnly(readOnly); bifurcatedBuf.readOnly(readOnly);
rmem = rmem.slice(woff, rmem.capacity() - woff); rmem = rmem.slice(splitOffset, rmem.capacity() - splitOffset);
if (!readOnly) { if (!readOnly) {
wmem = rmem; wmem = rmem;
} }
woff = 0; woff = Math.max(woff, splitOffset) - splitOffset;
roff = 0; roff = Math.max(roff, splitOffset) - splitOffset;
return bifurcatedBuf; return bifurcatedBuf;
} }

View File

@ -490,7 +490,7 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
} }
@Override @Override
public void ensureWritable(int size, boolean allowCompaction) { public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) {
if (!isOwned()) { if (!isOwned()) {
throw attachTrace(new IllegalStateException( throw attachTrace(new IllegalStateException(
"Buffer is not owned. Only owned buffers can call ensureWritable.")); "Buffer is not owned. Only owned buffers can call ensureWritable."));
@ -498,6 +498,9 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
if (size < 0) { if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
} }
if (minimumGrowth < 0) {
throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.');
}
if (seg != wseg) { if (seg != wseg) {
throw bufferIsReadOnly(); throw bufferIsReadOnly();
} }
@ -513,7 +516,7 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
} }
// Allocate a bigger buffer. // Allocate a bigger buffer.
long newSize = capacity() + size - (long) writableBytes(); long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(newSize); BufferAllocator.checkSize(newSize);
MemorySegment newSegment = (MemorySegment) alloc.allocateUntethered(this, (int) newSize); MemorySegment newSegment = (MemorySegment) alloc.allocateUntethered(this, (int) newSize);
@ -545,25 +548,32 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
} }
@Override @Override
public Buffer bifurcate() { public Buffer bifurcate(int splitOffset) {
if (splitOffset < 0) {
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
}
if (capacity() < splitOffset) {
throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " +
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
}
if (!isOwned()) { if (!isOwned()) {
throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned."));
} }
var drop = (ArcDrop<MemSegBuffer>) unsafeGetDrop(); var drop = (ArcDrop<MemSegBuffer>) unsafeGetDrop();
unsafeSetDrop(new ArcDrop<>(drop)); unsafeSetDrop(new ArcDrop<>(drop));
var bifurcatedSeg = seg.asSlice(0, woff); var bifurcatedSeg = seg.asSlice(0, splitOffset);
var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop<>(drop.increment()), alloc); var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop<>(drop.increment()), alloc);
bifurcatedBuf.woff = woff; bifurcatedBuf.woff = Math.min(woff, splitOffset);
bifurcatedBuf.roff = roff; bifurcatedBuf.roff = Math.min(roff, splitOffset);
bifurcatedBuf.order(order); bifurcatedBuf.order(order);
boolean readOnly = readOnly(); boolean readOnly = readOnly();
bifurcatedBuf.readOnly(readOnly); bifurcatedBuf.readOnly(readOnly);
seg = seg.asSlice(woff, seg.byteSize() - woff); seg = seg.asSlice(splitOffset, seg.byteSize() - splitOffset);
if (!readOnly) { if (!readOnly) {
wseg = seg; wseg = seg;
} }
woff = 0; woff = Math.max(woff, splitOffset) - splitOffset;
roff = 0; roff = Math.max(roff, splitOffset) - splitOffset;
return bifurcatedBuf; return bifurcatedBuf;
} }

View File

@ -398,7 +398,7 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
} }
@Override @Override
public void ensureWritable(int size, boolean allowCompaction) { public void ensureWritable(int size, int minimumGrowth, boolean allowCompaction) {
if (!isOwned()) { if (!isOwned()) {
throw attachTrace(new IllegalStateException( throw attachTrace(new IllegalStateException(
"Buffer is not owned. Only owned buffers can call ensureWritable.")); "Buffer is not owned. Only owned buffers can call ensureWritable."));
@ -406,6 +406,9 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
if (size < 0) { if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
} }
if (minimumGrowth < 0) {
throw new IllegalArgumentException("The minimum growth cannot be negative: " + minimumGrowth + '.');
}
if (rsize != wsize) { if (rsize != wsize) {
throw bufferIsReadOnly(); throw bufferIsReadOnly();
} }
@ -421,7 +424,7 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
} }
// Allocate a bigger buffer. // Allocate a bigger buffer.
long newSize = capacity() + size - (long) writableBytes(); long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(newSize); BufferAllocator.checkSize(newSize);
UnsafeMemory memory = (UnsafeMemory) control.allocateUntethered(this, (int) newSize); UnsafeMemory memory = (UnsafeMemory) control.allocateUntethered(this, (int) newSize);
@ -455,27 +458,34 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
} }
@Override @Override
public Buffer bifurcate() { public Buffer bifurcate(int splitOffset) {
if (splitOffset < 0) {
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
}
if (capacity() < splitOffset) {
throw new IllegalArgumentException("The split offset cannot be greater than the buffer capacity, " +
"but the split offset was " + splitOffset + ", and capacity is " + capacity() + '.');
}
if (!isOwned()) { if (!isOwned()) {
throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned."));
} }
var drop = (ArcDrop<UnsafeBuffer>) unsafeGetDrop(); var drop = (ArcDrop<UnsafeBuffer>) unsafeGetDrop();
unsafeSetDrop(new ArcDrop<>(drop)); unsafeSetDrop(new ArcDrop<>(drop));
// TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop. // TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop.
var bifurcatedBuf = new UnsafeBuffer(memory, baseOffset, woff, control, new ArcDrop<>(drop.increment())); var bifurcatedBuf = new UnsafeBuffer(memory, baseOffset, splitOffset, control, new ArcDrop<>(drop.increment()));
bifurcatedBuf.woff = woff; bifurcatedBuf.woff = Math.min(woff, splitOffset);
bifurcatedBuf.roff = roff; bifurcatedBuf.roff = Math.min(roff, splitOffset);
bifurcatedBuf.order(order()); bifurcatedBuf.order(order());
boolean readOnly = readOnly(); boolean readOnly = readOnly();
bifurcatedBuf.readOnly(readOnly); bifurcatedBuf.readOnly(readOnly);
rsize -= woff; rsize -= splitOffset;
baseOffset += woff; baseOffset += splitOffset;
address += woff; address += splitOffset;
if (!readOnly) { if (!readOnly) {
wsize = rsize; wsize = rsize;
} }
woff = 0; woff = Math.max(woff, splitOffset) - splitOffset;
roff = 0; roff = Math.max(roff, splitOffset) - splitOffset;
return bifurcatedBuf; return bifurcatedBuf;
} }

View File

@ -53,8 +53,8 @@ public class BufferEnsureWritableTest extends BufferTestSupport {
@MethodSource("allocators") @MethodSource("allocators")
public void ensureWritableMustThrowIfRequestedSizeWouldGrowBeyondMaxAllowed(Fixture fixture) { public void ensureWritableMustThrowIfRequestedSizeWouldGrowBeyondMaxAllowed(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator(); try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(512)) { Buffer buf = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> buf.ensureWritable(Integer.MAX_VALUE - 8)); assertThrows(IllegalArgumentException.class, () -> buf.ensureWritable(Integer.MAX_VALUE - 7));
} }
} }
@ -130,15 +130,31 @@ public class BufferEnsureWritableTest extends BufferTestSupport {
while (buf.readableBytes() > 0) { while (buf.readableBytes() > 0) {
buf.readByte(); buf.readByte();
} }
buf.ensureWritable(4, true); buf.ensureWritable(4, 4, true);
buf.writeInt(42); buf.writeInt(42);
assertThat(buf.capacity()).isEqualTo(64); assertThat(buf.capacity()).isEqualTo(64);
buf.writerOffset(60).readerOffset(60); buf.writerOffset(60).readerOffset(60);
buf.ensureWritable(8, true); buf.ensureWritable(8, 8, true);
buf.writeLong(42); buf.writeLong(42);
// Don't assert the capacity on this one, because single-component // Don't assert the capacity on this one, because single-component
// composite buffers may choose to allocate rather than compact. // composite buffers may choose to allocate rather than compact.
} }
} }
@ParameterizedTest
@MethodSource("allocators")
public void ensureWritableWithLargeMinimumGrowthMustGrowByAtLeastThatMuch(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(16)) {
buf.writeLong(0).writeInt(0);
buf.readLong();
buf.readInt(); // Compaction is now possible as well.
buf.ensureWritable(8, 32, true); // We don't need to allocate.
assertThat(buf.capacity()).isEqualTo(16);
buf.writeByte((byte) 1);
buf.ensureWritable(16, 32, true); // Now we DO need to allocate, because we can't compact.
assertThat(buf.capacity()).isEqualTo(16 /* existing capacity */ + 32 /* minimum growth */);
}
}
} }

View File

@ -401,6 +401,26 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
} }
} }
@ParameterizedTest
@MethodSource("allocators")
public void bifurcateWithNegativeOffsetMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.bifurcate(0).close();
assertThrows(IllegalArgumentException.class, () -> buf.bifurcate(-1));
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bifurcateWithOversizedOffsetMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> buf.bifurcate(9));
buf.bifurcate(8).close();
}
}
@ParameterizedTest @ParameterizedTest
@MethodSource("allocators") @MethodSource("allocators")
public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) { public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) {
@ -414,6 +434,53 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
} }
} }
@ParameterizedTest
@MethodSource("allocators")
public void bifurcateOnOffsetOfNonOwnedBufferMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer acquired = buf.acquire()) {
var exc = assertThrows(IllegalStateException.class, () -> acquired.bifurcate(4));
assertThat(exc).hasMessageContaining("owned");
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bifurcateOnOffsetMustTruncateGreaterOffsets(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.writeInt(0x01020304);
buf.writeByte((byte) 0x05);
buf.readInt();
try (Buffer bif = buf.bifurcate(2)) {
assertThat(buf.readerOffset()).isEqualTo(2);
assertThat(buf.writerOffset()).isEqualTo(3);
assertThat(bif.readerOffset()).isEqualTo(2);
assertThat(bif.writerOffset()).isEqualTo(2);
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bifurcateOnOffsetMustExtendLesserOffsets(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.writeInt(0x01020304);
buf.readInt();
try (Buffer bif = buf.bifurcate(6)) {
assertThat(buf.readerOffset()).isEqualTo(0);
assertThat(buf.writerOffset()).isEqualTo(0);
assertThat(bif.readerOffset()).isEqualTo(4);
assertThat(bif.writerOffset()).isEqualTo(4);
}
}
}
@ParameterizedTest @ParameterizedTest
@MethodSource("allocators") @MethodSource("allocators")
public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) { public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) {
@ -507,6 +574,27 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
} }
} }
@ParameterizedTest
@MethodSource("allocators")
public void mustBePossibleToBifurcateOwnedSlices(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer buf = allocator.allocate(16).order(BIG_ENDIAN);
buf.writeLong(0x0102030405060708L);
try (Buffer slice = buf.slice()) {
buf.close();
assertTrue(slice.isOwned());
try (Buffer bifurcate = slice.bifurcate(4)) {
bifurcate.reset().ensureWritable(Long.BYTES);
slice.reset().ensureWritable(Long.BYTES);
assertThat(bifurcate.capacity()).isEqualTo(Long.BYTES);
assertThat(slice.capacity()).isEqualTo(Long.BYTES);
assertThat(bifurcate.getLong(0)).isEqualTo(0x01020304_00000000L);
assertThat(slice.getLong(0)).isEqualTo(0x05060708_00000000L);
}
}
}
}
@ParameterizedTest @ParameterizedTest
@MethodSource("allocators") @MethodSource("allocators")
public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) { public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) {

View File

@ -92,13 +92,13 @@ public abstract class BufferTestSupport {
if ("nosample".equalsIgnoreCase(sampleSetting)) { if ("nosample".equalsIgnoreCase(sampleSetting)) {
return fixture -> true; return fixture -> true;
} }
Instant today = Instant.now().truncatedTo(ChronoUnit.DAYS); Instant today = Instant.now().truncatedTo(ChronoUnit.DAYS); // New seed every day.
SplittableRandom rng = new SplittableRandom(today.hashCode()); SplittableRandom rng = new SplittableRandom(today.hashCode());
AtomicInteger counter = new AtomicInteger(); AtomicInteger counter = new AtomicInteger();
return fixture -> { return fixture -> {
boolean res = counter.getAndIncrement() < 1 || rng.nextInt(0, 100) <= 2; // Filter out 95% of tests.
return res; return counter.getAndIncrement() < 1 || rng.nextInt(0, 100) < 5;
}; // Filter out 97% of tests. };
} }
static Fixture[] allocators() { static Fixture[] allocators() {
@ -977,6 +977,10 @@ public abstract class BufferTestSupport {
return bs; return bs;
} }
public static void assertEquals(Buffer expected, Buffer actual) {
assertThat(toByteArray(actual)).containsExactly(toByteArray(expected));
}
public static void assertEquals(byte expected, byte actual) { public static void assertEquals(byte expected, byte actual) {
if (expected != actual) { if (expected != actual) {
fail(String.format("expected: %1$s (0x%1$X) but was: %2$s (0x%2$X)", expected, actual)); fail(String.format("expected: %1$s (0x%1$X) but was: %2$s (0x%2$X)", expected, actual));