Make CompositeBuffer part of the public API

And move the composite buffer related methods to it.
This commit is contained in:
Chris Vest 2021-04-27 12:10:44 +02:00
parent 86c929dd5a
commit 2f12455fa9
15 changed files with 324 additions and 285 deletions

View File

@ -37,7 +37,7 @@ import java.nio.ByteOrder;
* When the buffer is initially allocated, a pairing {@link #close()} call will deallocate it.
* In this state, the buffer {@linkplain #isOwned() is "owned"}.
* <p>
* The buffer can also be {@linkplain #acquire() acquired} when it's about to be involved in a complicated life time.
* The buffer can also be {@linkplain #acquire() acquired} when it's about to be involved in a complicated lifetime.
* The {@link #acquire()} call increments the reference count of the buffer,
* and a pairing {@link #close()} call will decrement the reference count.
* Each acquire lends out the buffer, and the buffer is said to be in a "borrowed" state.
@ -58,14 +58,15 @@ import java.nio.ByteOrder;
* then the ownership of that buffer must be sent to that thread.
* This can be done with the {@link #send()} method.
* The send method consumes the buffer, if it is in an owned state, and produces a {@link Send} object.
* The {@link Send} object can then be shared in a thread-safe way (so called "safe publication"),
* The {@link Send} object can then be shared in a thread-safe way (so-called "safe publication"),
* with the intended recipient thread.
* <p>
* To send a buffer to another thread, the buffer must not have any outstanding borrows.
* That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()};
* all {@linkplain #slice() slices} must have been closed.
* And if this buffer is a constituent of a {@linkplain Buffer#compose(BufferAllocator, Buffer...) composite buffer},
* then that composite buffer must be closed.
* And if this buffer is a constituent of a
* {@linkplain CompositeBuffer#compose(BufferAllocator, Buffer...) composite buffer}, then that composite buffer must
* be closed.
* And if this buffer is itself a composite buffer, then it must own all of its constituent buffers.
* The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not.
*
@ -79,7 +80,7 @@ import java.nio.ByteOrder;
* <ul><li>These accessor methods are typically called {@code getX} or {@code setX}.</li></ul>
* </ol>
*
* A buffer contain two mutable offset positions: one for reading and one for writing.
* A buffer contains two mutable offset positions: one for reading and one for writing.
* These positions use <a href="https://en.wikipedia.org/wiki/Zero-based_numbering">zero-based indexing</a>,
* such that the first byte of data in the buffer is placed at offset {@code 0},
* and the last byte in the buffer is at offset {@link #capacity() capacity - 1}.
@ -109,7 +110,7 @@ import java.nio.ByteOrder;
* <ul>
* <li>
* Slices create a new view onto the memory, that is shared between the slice and the buffer.
* As long as both the slice and the originating buffer are alive, neither will have ownership of the memory.
* As long as both the slice, and the originating buffer are alive, neither will have ownership of the memory.
* Since the memory is shared, changes to the data made through one will be visible through the other.
* </li>
* <li>
@ -120,7 +121,7 @@ import java.nio.ByteOrder;
* </li>
* </ul>
*
* These differences means that slicing is mostly suitable for when you temporarily want to share a focused area of a
* These differences mean that slicing is mostly suitable for when you temporarily want to share a focused area of a
* buffer.
* Examples of this include doing IO, or decoding a bounded part of a larger message.
* On the other hand, bifurcate is suitable for when you want to hand over a region of a buffer to some other,
@ -129,161 +130,6 @@ import java.nio.ByteOrder;
* further processing, as bifurcated buffer regions, once their data has been received in its entirety.
*/
public interface Buffer extends Rc<Buffer>, BufferAccessors {
/**
* Compose the given sequence of buffers and present them as a single buffer.
* <p>
* <strong>Note:</strong> The composite buffer increments the reference count on all the constituent buffers,
* and holds a reference to them until the composite buffer is deallocated.
* This means the constituent buffers must still have their outside-reference count decremented as normal.
* If the buffers are allocated for the purpose of participating in the composite buffer,
* then they should be closed as soon as the composite buffer has been created, like in this example:
* <pre>{@code
* try (Buffer a = allocator.allocate(size);
* Buffer b = allocator.allocate(size)) {
* return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>
* {@linkplain Buffer#send() Sending} a composite buffer implies sending all of its constituent buffers.
* For sending to be possible, both the composite buffer itself, and all of its constituent buffers, must be in an
* {@linkplain Rc#isOwned() owned state}.
* This means that the composite buffer must be the only reference to the constituent buffers.
* <p>
* All of the constituent buffers must have the same {@linkplain Buffer#order() byte order}.
* An exception will be thrown if you attempt to compose buffers that have different byte orders,
* and changing the byte order of the constituent buffers so they become inconsistent after construction,
* will result in unspecified behaviour.
* <p>
* The read and write offsets of the constituent buffers must be arranged such that there are no "gaps" when viewed
* as a single connected chunk of memory.
* Specifically, there can be at most one buffer whose write offset is neither zero nor at capacity,
* and all buffers prior to it must have their write offsets at capacity, and all buffers after it must have a write
* offset of zero.
* Likewise, there can be at most one buffer whose read offset is neither zero nor at capacity,
* and all buffers prior to it must have their read offsets at capacity, and all buffers after it must have a read
* offset of zero.
* Furthermore, the sum of the read offsets must be less than or equal to the sum of the write offsets.
* <p>
* Reads and writes to the composite buffer that modifies the read or write offsets, will also modify the relevant
* offsets in the constituent buffers.
* <p>
* It is not a requirement that the buffers have the same size.
* <p>
* It is not a requirement that the buffers are allocated by this allocator, but if
* {@link Buffer#ensureWritable(int)} is called on the composed buffer, and the composed buffer needs to be
* expanded, then this allocator instance will be used for allocation the extra memory.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @param bufs The buffers to compose into a single buffer view.
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent
* {@linkplain Buffer#order() byte order}.
*/
static Buffer compose(BufferAllocator allocator, Buffer... bufs) {
return new CompositeBuffer(allocator, bufs);
}
/**
* Compose the given sequence of sends of buffers and present them as a single buffer.
* <p>
* <strong>Note:</strong> The composite buffer holds a reference to all the constituent buffers,
* until the composite buffer is deallocated.
* This means the constituent buffers must still have their outside-reference count decremented as normal.
* If the buffers are allocated for the purpose of participating in the composite buffer,
* then they should be closed as soon as the composite buffer has been created, like in this example:
* <pre>{@code
* try (Buffer a = allocator.allocate(size);
* Buffer b = allocator.allocate(size)) {
* return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>
* {@linkplain Buffer#send() Sending} a composite buffer implies sending all of its constituent buffers.
* For sending to be possible, both the composite buffer itself, and all of its constituent buffers, must be in an
* {@linkplain Rc#isOwned() owned state}.
* This means that the composite buffer must be the only reference to the constituent buffers.
* <p>
* All of the constituent buffers must have the same {@linkplain Buffer#order() byte order}.
* An exception will be thrown if you attempt to compose buffers that have different byte orders,
* and changing the byte order of the constituent buffers so they become inconsistent after construction,
* will result in unspecified behaviour.
* <p>
* The read and write offsets of the constituent buffers must be arranged such that there are no "gaps" when viewed
* as a single connected chunk of memory.
* Specifically, there can be at most one buffer whose write offset is neither zero nor at capacity,
* and all buffers prior to it must have their write offsets at capacity, and all buffers after it must have a write
* offset of zero.
* Likewise, there can be at most one buffer whose read offset is neither zero nor at capacity,
* and all buffers prior to it must have their read offsets at capacity, and all buffers after it must have a read
* offset of zero.
* Furthermore, the sum of the read offsets must be less than or equal to the sum of the write offsets.
* <p>
* Reads and writes to the composite buffer that modifies the read or write offsets, will also modify the relevant
* offsets in the constituent buffers.
* <p>
* It is not a requirement that the buffers have the same size.
* <p>
* It is not a requirement that the buffers are allocated by this allocator, but if
* {@link Buffer#ensureWritable(int)} is called on the composed buffer, and the composed buffer needs to be
* expanded, then this allocator instance will be used for allocation the extra memory.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @param sends The sent buffers to compose into a single buffer view.
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent
* {@linkplain Buffer#order() byte order}.
*/
@SafeVarargs
static Buffer compose(BufferAllocator allocator, Send<Buffer>... sends) {
return new CompositeBuffer(allocator, sends);
}
/**
* Create an empty composite buffer, that has no components. The buffer can be extended with components using either
* {@link #ensureWritable(int)} or {@link #extendComposite(Buffer, Buffer)}.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @return A composite buffer that has no components, and has a capacity of zero.
*/
static Buffer compose(BufferAllocator allocator) {
return new CompositeBuffer(allocator, new Buffer[0]);
}
/**
* Extend the given composite buffer with the given extension buffer.
* This works as if the extension had originally been included at the end of the list of constituent buffers when
* the composite buffer was created.
* The composite buffer is modified in-place.
*
* @see #compose(BufferAllocator, Buffer...)
* @see #compose(BufferAllocator, Send...)
* @param composite The composite buffer (from a prior {@link #compose(BufferAllocator, Buffer...)} call) to extend
* with the given extension buffer.
* @param extension The buffer to extend the composite buffer with.
*/
static void extendComposite(Buffer composite, Buffer extension) {
if (!isComposite(composite)) {
throw new IllegalArgumentException(
"Expected the first buffer to be a composite buffer, " +
"but it is a " + composite.getClass() + " buffer: " + composite + '.');
}
CompositeBuffer compositeBuffer = (CompositeBuffer) composite;
compositeBuffer.extendWith(extension);
}
/**
* Check if the given buffer is a {@linkplain #compose(BufferAllocator, Buffer...) composite} buffer or not.
* @param composite The buffer to check.
* @return {@code true} if the given buffer was created with {@link #compose(BufferAllocator, Buffer...)},
* {@code false} otherwise.
*/
static boolean isComposite(Buffer composite) {
return composite.getClass() == CompositeBuffer.class;
}
/**
* Change the default byte order of this buffer, and return this buffer.
*
@ -399,7 +245,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* @param length The number of bytes to copy.
* @throws NullPointerException if the destination array is null.
* @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative,
* or if the resulting end positions reaches beyond the end of either this buffer or the destination array.
* or if the resulting end positions reaches beyond the end of either this buffer, or the destination array.
*/
void copyInto(int srcPos, byte[] dest, int destPos, int length);
@ -420,7 +266,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* @param length The number of bytes to copy.
* @throws NullPointerException if the destination array is null.
* @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative,
* or if the resulting end positions reaches beyond the end of either this buffer or the destination array.
* or if the resulting end positions reaches beyond the end of either this buffer, or the destination array.
*/
void copyInto(int srcPos, ByteBuffer dest, int destPos, int length);
@ -441,7 +287,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* @param length The number of bytes to copy.
* @throws NullPointerException if the destination array is null.
* @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative,
* or if the resulting end positions reaches beyond the end of either this buffer or the destination array.
* or if the resulting end positions reaches beyond the end of either this buffer, or the destination array.
*/
void copyInto(int srcPos, Buffer dest, int destPos, int length);
@ -476,9 +322,9 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* Open a cursor to iterate the readable bytes of this buffer. The {@linkplain #readerOffset() reader offset} and
* {@linkplain #writerOffset() witer offset} are not modified by the cursor.
* <p>
* Care should be taken to ensure that the buffers lifetime extends beyond the cursor and the iteration, and that
* Care should be taken to ensure that the buffer's lifetime extends beyond the cursor and the iteration, and that
* the {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() writer offset} are not modified
* while the iteration takes place. Otherwise unpredictable behaviour might result.
* while the iteration takes place. Otherwise, unpredictable behaviour might result.
*
* @return A {@link ByteCursor} for iterating the readable bytes of this buffer.
*/
@ -489,16 +335,16 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* The {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() witer offset} are not modified by
* the cursor.
* <p>
* Care should be taken to ensure that the buffers lifetime extends beyond the cursor and the iteration, and that
* Care should be taken to ensure that the buffer's lifetime extends beyond the cursor and the iteration, and that
* the {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() writer offset} are not modified
* while the iteration takes place. Otherwise unpredictable behaviour might result.
* while the iteration takes place. Otherwise, unpredictable behaviour might result.
*
* @param fromOffset The offset into the buffer where iteration should start.
* The first byte read from the iterator will be the byte at this offset.
* @param length The number of bytes to iterate.
* @return A {@link ByteCursor} for the given stretch of bytes of this buffer.
* @throws IllegalArgumentException if the length is negative, or if the region given by the {@code fromOffset} and
* the {@code length} reaches outside of the bounds of this buffer.
* the {@code length} reaches outside the bounds of this buffer.
*/
ByteCursor openCursor(int fromOffset, int length);
@ -507,9 +353,9 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* The {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() witer offset} are not modified by
* the cursor.
* <p>
* Care should be taken to ensure that the buffers lifetime extends beyond the cursor and the iteration, and that
* Care should be taken to ensure that the buffer's lifetime extends beyond the cursor and the iteration, and that
* the {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() writer offset} are not modified
* while the iteration takes place. Otherwise unpredictable behaviour might result.
* while the iteration takes place. Otherwise, unpredictable behaviour might result.
*
* @return A {@link ByteCursor} for the readable bytes of this buffer.
*/
@ -523,16 +369,16 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* The {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() witer offset} are not modified by
* the cursor.
* <p>
* Care should be taken to ensure that the buffers lifetime extends beyond the cursor and the iteration, and that
* Care should be taken to ensure that the buffer's lifetime extends beyond the cursor and the iteration, and that
* the {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() writer offset} are not modified
* while the iteration takes place. Otherwise unpredictable behaviour might result.
* while the iteration takes place. Otherwise, unpredictable behaviour might result.
*
* @param fromOffset The offset into the buffer where iteration should start.
* The first byte read from the iterator will be the byte at this offset.
* @param length The number of bytes to iterate.
* @return A {@link ByteCursor} for the given stretch of bytes of this buffer.
* @throws IllegalArgumentException if the length is negative, or if the region given by the {@code fromOffset} and
* the {@code length} reaches outside of the bounds of this buffer.
* the {@code length} reaches outside the bounds of this buffer.
*/
ByteCursor openReverseCursor(int fromOffset, int length);
@ -648,7 +494,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* {@linkplain #send() sent} to other threads.
* <p>
* The returned buffer will adopt the {@link #readerOffset()} of this buffer, and have its {@link #writerOffset()}
* and {@link #capacity()} both set to the equal to the write offset of this buffer.
* and {@link #capacity()} both set to the equal to the write-offset of this buffer.
* <p>
* The memory region in the returned buffer will become inaccessible through this buffer. This buffer will have its
* capacity reduced by the capacity of the returned buffer, and the read and write offsets of this buffer will both
@ -672,7 +518,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* }</pre>
* When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the
* underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until
* all of the bifurcated parts have been closed.
* all the bifurcated parts have been closed.
* <p>
* Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be
* bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can
@ -703,7 +549,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* <p>
* The memory region in the returned buffer will become inaccessible through this buffer. If the
* {@link #readerOffset()} or {@link #writerOffset()} of this buffer lie prior to the {@code splitOffset},
* then those offsets will be moved forward so they land on offset 0 after the bifurcation.
* then those offsets will be moved forward, so they land on offset 0 after the bifurcation.
* <p>
* Effectively, the following transformation takes place:
* <pre>{@code
@ -723,7 +569,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* }</pre>
* When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the
* underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until
* all of the bifurcated parts have been closed.
* all the bifurcated parts have been closed.
* <p>
* Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be
* bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can
@ -799,7 +645,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* <strong>Note</strong> that the {@link ReadableComponent} instance passed to the consumer could be reused for
* multiple calls, so the data must be extracted from the component in the context of the iteration.
* <p>
* The {@link ByteBuffer} instances obtained from the component, share life time with that internal component.
* The {@link ByteBuffer} instances obtained from the component, share lifetime with that internal component.
* This means they can be accessed as long as the internal memory store remain unchanged. Methods that may cause
* such changes, are any method that requires the buffer to be {@linkplain #isOwned() owned}.
* <p>
@ -841,7 +687,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* <strong>Note</strong> that the {@link WritableComponent} instance passed to the consumer could be reused for
* multiple calls, so the data must be extracted from the component in the context of the iteration.
* <p>
* The {@link ByteBuffer} instances obtained from the component, share life time with that internal component.
* The {@link ByteBuffer} instances obtained from the component, share lifetime with that internal component.
* This means they can be accessed as long as the internal memory store remain unchanged. Methods that may cause
* such changes, are any method that requires the buffer to be {@linkplain #isOwned() owned}.
* <p>

View File

@ -24,7 +24,70 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implements Buffer {
/**
* The {@code CompositeBuffer} is a concrete {@link Buffer} implementation that make a number of other buffers appear
* as one. A composite buffer behaves the same as a normal, non-composite buffer in every way, so you normally don't
* need to handle them specially.
* <p>
* A composite buffer is constructed using one of the {@code compose} methods:
* <ul>
* <li>
* {@link #compose(BufferAllocator, Buffer...)} creates a composite buffer from the given set of buffers.
* The reference count to the passed buffers will be increased, and the resulting composite buffer may or may
* not have ownership.
* </li>
* <li>
* {@link #compose(BufferAllocator, Send[])} creates a composite buffer from the buffers that are sent to it via
* the passed in send objects. Since {@link Send#receive()} transfers ownership, the resulting composite buffer
* will have ownership, because it is guaranteed that there are no other references to its constituent buffers.
* </li>
* <li>
* {@link #compose(BufferAllocator)} creates and empty, zero capacity, composite buffer. Such empty buffers may
* change their {@linkplain #order() byte order} or {@linkplain #readOnly() read-only} states when they gain
* their first component.
* </li>
* </ul>
* Composite buffers can later be extended with internally allocated components, with {@link #ensureWritable(int)},
* or with externally allocated buffers, using {@link #extendComposite(Buffer, Buffer)}.
*
* <h3>Constituent buffer requirements</h3>
*
* The buffers that a being composed to form the composite buffer, need to live up to a number of requirements.
* Basically, if we imagine that the constituent buffers have their memory regions concatenated together, then the
* result needs to make sense.
* <p>
* All the constituent buffers must have the same {@linkplain Buffer#order() byte order}.
* An exception will be thrown if you attempt to compose buffers that have different byte orders,
* and changing the byte order of the constituent buffers so that they become inconsistent after construction,
* will result in unspecified behaviour.
* <p>
* The read and write offsets of the constituent buffers must be arranged such that there are no "gaps" when viewed
* as a single connected chunk of memory.
* Specifically, there can be at most one buffer whose write offset is neither zero nor at capacity,
* and all buffers prior to it must have their write offsets at capacity, and all buffers after it must have a
* write-offset of zero.
* Likewise, there can be at most one buffer whose read offset is neither zero nor at capacity,
* and all buffers prior to it must have their read offsets at capacity, and all buffers after it must have a read
* offset of zero.
* Furthermore, the sum of the read offsets must be less than or equal to the sum of the write-offsets.
* <p>
* Reads and writes to the composite buffer that modifies the read or write offsets, will also modify the relevant
* offsets in the constituent buffers.
* <p>
* It is not a requirement that the buffers have the same size.
* <p>
* It is not a requirement that the buffers are allocated by this allocator, but if
* {@link Buffer#ensureWritable(int)} is called on the composed buffer, and the composed buffer needs to be
* expanded, then this allocator instance will be used for allocation the extra memory.
*
* <h3>Ownership and Send</h3>
*
* {@linkplain Buffer#send() Sending} a composite buffer implies sending all of its constituent buffers.
* For sending to be possible, both the composite buffer itself, and all of its constituent buffers, must be in an
* {@linkplain Rc#isOwned() owned state}.
* This means that the composite buffer must be the only reference to the constituent buffers.
*/
public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implements Buffer {
/**
* The max array size is JVM implementation dependant, but most seem to settle on {@code Integer.MAX_VALUE - 8}.
* We set the max composite buffer capacity to the same, since it would otherwise be impossible to create a
@ -45,6 +108,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
return "COMPOSITE_DROP";
}
};
private static final Buffer[] EMPTY_BUFFER_ARRAY = new Buffer[0];
private final BufferAllocator allocator;
private final TornBufferAccessors tornBufAccessors;
@ -58,14 +122,130 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
private boolean closed;
private boolean readOnly;
CompositeBuffer(BufferAllocator allocator, Buffer[] refs) {
this(allocator, filterExternalBufs(Arrays.stream(refs)
.map(buf -> buf.acquire() /* Increments reference counts. */)), COMPOSITE_DROP, false);
/**
* Compose the given sequence of buffers and present them as a single buffer.
* <p>
* <strong>Note:</strong> The composite buffer increments the reference count on all the constituent buffers,
* and holds a reference to them until the composite buffer is deallocated.
* This means the constituent buffers must still have their outside-reference count decremented as normal.
* If the buffers are allocated for the purpose of participating in the composite buffer,
* then they should be closed as soon as the composite buffer has been created, like in this example:
* <pre>{@code
* try (Buffer a = allocator.allocate(size);
* Buffer b = allocator.allocate(size)) {
* return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>
* See the class documentation for more information on what is required of the given buffers for composition to be
* allowed.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @param bufs The buffers to compose into a single buffer view.
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent
* {@linkplain Buffer#order() byte order}.
*/
public static Buffer compose(BufferAllocator allocator, Buffer... bufs) {
Stream<Buffer> bufferStream = Arrays.stream(bufs)
.map(buf -> buf.acquire()); // Increments reference counts.
return new CompositeBuffer(allocator, filterExternalBufs(bufferStream), COMPOSITE_DROP, false);
}
CompositeBuffer(BufferAllocator allocator, Send<Buffer>[] refs) {
this(allocator, filterExternalBufs(Arrays.stream(refs)
.map(buf -> buf.receive())), COMPOSITE_DROP, false);
/**
* Compose the given sequence of sends of buffers and present them as a single buffer.
* <p>
* <strong>Note:</strong> The composite buffer holds a reference to all the constituent buffers,
* until the composite buffer is deallocated.
* This means the constituent buffers must still have their outside-reference count decremented as normal.
* If the buffers are allocated for the purpose of participating in the composite buffer,
* then they should be closed as soon as the composite buffer has been created, like in this example:
* <pre>{@code
* try (Buffer a = allocator.allocate(size);
* Buffer b = allocator.allocate(size)) {
* return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>
* See the class documentation for more information on what is required of the given buffers for composition to be
* allowed.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @param sends The sent buffers to compose into a single buffer view.
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent
* {@linkplain Buffer#order() byte order}.
* @throws IllegalStateException if one of the sends have already been received. The remaining buffers and sends
* will be closed and descarded, respectively.
*/
@SafeVarargs
public static Buffer compose(BufferAllocator allocator, Send<Buffer>... sends) {
Buffer[] bufs = new Buffer[sends.length];
IllegalStateException ise = null;
for (int i = 0; i < sends.length; i++) {
if (ise != null) {
sends[i].discard();
} else {
try {
bufs[i] = sends[i].receive();
} catch (IllegalStateException e) {
ise = e;
for (int j = 0; j < i; j++) {
bufs[j].close();
}
}
}
}
if (ise != null) {
throw ise;
}
return compose(allocator, bufs);
}
/**
* Create an empty composite buffer, that has no components. The buffer can be extended with components using either
* {@link #ensureWritable(int)} or {@link #extendComposite(Buffer, Buffer)}.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @return A composite buffer that has no components, and has a capacity of zero.
*/
public static Buffer compose(BufferAllocator allocator) {
return new CompositeBuffer(allocator, EMPTY_BUFFER_ARRAY, COMPOSITE_DROP, false);
}
/**
* Extend the given composite buffer with the given extension buffer.
* This works as if the extension had originally been included at the end of the list of constituent buffers when
* the composite buffer was created.
* The composite buffer is modified in-place.
*
* @see #compose(BufferAllocator, Buffer...)
* @see #compose(BufferAllocator, Send...)
* @param composite The composite buffer (from a prior {@link #compose(BufferAllocator, Buffer...)} call) to extend
* with the given extension buffer.
* @param extension The buffer to extend the composite buffer with.
*/
public static void extendComposite(Buffer composite, Buffer extension) {
if (!isComposite(composite)) {
throw new IllegalArgumentException(
"Expected the first buffer to be a composite buffer, " +
"but it is a " + composite.getClass() + " buffer: " + composite + '.');
}
CompositeBuffer compositeBuffer = (CompositeBuffer) composite;
compositeBuffer.extendWith(extension);
}
/**
* Check if the given buffer is a {@linkplain #compose(BufferAllocator, Buffer...) composite} buffer or not.
* @param composite The buffer to check.
* @return {@code true} if the given buffer was created with {@link #compose(BufferAllocator, Buffer...)},
* {@code false} otherwise.
*/
public static boolean isComposite(Buffer composite) {
return composite.getClass() == CompositeBuffer.class;
}
private static Buffer[] filterExternalBufs(Stream<Buffer> refs) {
@ -706,7 +886,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
Buffer[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
try {
if (extension instanceof CompositeBuffer) {
// If the extension is itself a composite buffer, then extend this one by all of the constituent
// If the extension is itself a composite buffer, then extend this one by all the constituent
// component buffers.
CompositeBuffer compositeExtension = (CompositeBuffer) extension;
Buffer[] addedBuffers = compositeExtension.bufs;

View File

@ -115,7 +115,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst, bufSecond);
}
});
}
@ -131,7 +131,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst, bufSecond);
}
});
}
@ -147,7 +147,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst, bufSecond);
}
});
}
@ -163,7 +163,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst, bufSecond);
}
});
}
@ -180,7 +180,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}
@ -197,7 +197,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}
@ -214,7 +214,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}
@ -231,7 +231,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}

View File

@ -69,8 +69,8 @@ public class BufferComponentIterationTest extends BufferTestSupport {
try (Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
Buffer c = allocator.allocate(8);
Buffer x = Buffer.compose(allocator, b, c)) {
buf = Buffer.compose(allocator, a, x);
Buffer x = CompositeBuffer.compose(allocator, b, c)) {
buf = CompositeBuffer.compose(allocator, a, x);
}
assertThat(buf.countComponents()).isEqualTo(3);
assertThat(buf.countReadableComponents()).isZero();
@ -122,7 +122,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
a.writeInt(1);
b.writeInt(2);
c.writeInt(3);
composite = Buffer.compose(allocator, a, b, c);
composite = CompositeBuffer.compose(allocator, a, b, c);
}
var list = new LinkedList<Integer>(List.of(1, 2, 3));
int count = composite.forEachReadable(0, (index, component) -> {
@ -159,7 +159,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
a.writeInt(1);
b.writeInt(2);
c.writeInt(3);
composite = Buffer.compose(allocator, a, b, c);
composite = CompositeBuffer.compose(allocator, a, b, c);
}
int readPos = composite.readerOffset();
int writePos = composite.writerOffset();
@ -197,7 +197,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
try (Buffer a = allocator.allocate(4);
Buffer b = allocator.allocate(4);
Buffer c = allocator.allocate(4)) {
buf = Buffer.compose(allocator, a, b, c);
buf = CompositeBuffer.compose(allocator, a, b, c);
}
int i = 1;
while (buf.writableBytes() > 0) {
@ -269,7 +269,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
try (Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
Buffer c = allocator.allocate(8)) {
buf = Buffer.compose(allocator, a, b, c);
buf = CompositeBuffer.compose(allocator, a, b, c);
}
buf.order(BIG_ENDIAN);
buf.forEachWritable(0, (index, component) -> {

View File

@ -40,7 +40,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (Buffer b = allocator.allocate(8)) {
assertTrue(a.isOwned());
assertTrue(b.isOwned());
composite = Buffer.compose(allocator, a, b);
composite = CompositeBuffer.compose(allocator, a, b);
assertFalse(composite.isOwned());
assertFalse(a.isOwned());
assertFalse(b.isOwned());
@ -58,13 +58,14 @@ public class BufferCompositionTest extends BufferTestSupport {
public void compositeBuffersCannotHaveDuplicateComponents() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(4)) {
var e = assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, a, a));
var e = assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, a));
assertThat(e).hasMessageContaining("duplicate");
try (Buffer composite = Buffer.compose(allocator, a)) {
try (Buffer composite = CompositeBuffer.compose(allocator, a)) {
a.close();
try {
e = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, a));
e = assertThrows(IllegalArgumentException.class,
() -> CompositeBuffer.extendComposite(composite, a));
assertThat(e).hasMessageContaining("duplicate");
} finally {
a.acquire();
@ -76,7 +77,7 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void compositeBufferFromSends() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator,
Buffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
@ -89,18 +90,19 @@ public class BufferCompositionTest extends BufferTestSupport {
public void compositeBufferMustNotBeAllowedToContainThemselves() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer a = allocator.allocate(4);
Buffer buf = Buffer.compose(allocator, a);
Buffer buf = CompositeBuffer.compose(allocator, a);
try (buf; a) {
a.close();
try {
assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(buf, buf));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.extendComposite(buf, buf));
assertTrue(buf.isOwned());
try (Buffer composite = Buffer.compose(allocator, buf)) {
try (Buffer composite = CompositeBuffer.compose(allocator, buf)) {
// the composing increments the reference count of constituent buffers...
// counter-act this so it can be extended:
a.close(); // buf is now owned so it can be extended.
try {
assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(buf, composite));
assertThrows(IllegalArgumentException.class,
() -> CompositeBuffer.extendComposite(buf, composite));
} finally {
a.acquire(); // restore the reference count to align with our try-with-resources structure.
}
@ -119,7 +121,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer composite;
try (Buffer a = allocator.allocate(4, BIG_ENDIAN)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeInt(0x01020304);
@ -136,7 +138,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer composite;
try (Buffer a = allocator.allocate(4, LITTLE_ENDIAN)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeInt(0x05060708);
@ -150,7 +152,7 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void emptyCompositeBufferMustUseNativeByteOrder() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator)) {
Buffer composite = CompositeBuffer.compose(allocator)) {
assertThat(composite.order()).isEqualTo(ByteOrder.nativeOrder());
}
}
@ -160,7 +162,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(a, b));
var exc = assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.extendComposite(a, b));
assertThat(exc).hasMessageContaining("Expected").hasMessageContaining("composite");
}
}
@ -170,9 +172,9 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
Buffer composed = Buffer.compose(allocator, a)) {
Buffer composed = CompositeBuffer.compose(allocator, a)) {
try (Buffer ignore = composed.acquire()) {
var exc = assertThrows(IllegalStateException.class, () -> Buffer.extendComposite(composed, b));
var exc = assertThrows(IllegalStateException.class, () -> CompositeBuffer.extendComposite(composed, b));
assertThat(exc).hasMessageContaining("owned");
}
}
@ -183,11 +185,11 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
var exc = assertThrows(IllegalArgumentException.class,
() -> Buffer.extendComposite(composite, composite));
() -> CompositeBuffer.extendComposite(composite, composite));
assertThat(exc).hasMessageContaining("cannot be extended");
}
}
@ -196,20 +198,20 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void extendingWithZeroCapacityBufferHasNoEffect() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator)) {
Buffer.extendComposite(composite, composite);
Buffer composite = CompositeBuffer.compose(allocator)) {
CompositeBuffer.extendComposite(composite, composite);
assertThat(composite.capacity()).isZero();
assertThat(composite.countComponents()).isZero();
}
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer a = allocator.allocate(1);
Buffer composite = Buffer.compose(allocator, a);
Buffer composite = CompositeBuffer.compose(allocator, a);
a.close();
assertTrue(composite.isOwned());
assertThat(composite.capacity()).isOne();
assertThat(composite.countComponents()).isOne();
try (Buffer b = Buffer.compose(allocator)) {
Buffer.extendComposite(composite, b);
try (Buffer b = CompositeBuffer.compose(allocator)) {
CompositeBuffer.extendComposite(composite, b);
}
assertTrue(composite.isOwned());
assertThat(composite.capacity()).isOne();
@ -220,18 +222,18 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void extendingCompositeBufferWithNullMustThrow() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator)) {
assertThrows(NullPointerException.class, () -> Buffer.extendComposite(composite, null));
Buffer composite = CompositeBuffer.compose(allocator)) {
assertThrows(NullPointerException.class, () -> CompositeBuffer.extendComposite(composite, null));
}
}
@Test
public void extendingCompositeBufferMustIncreaseCapacityByGivenBigEndianBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator)) {
Buffer composite = CompositeBuffer.compose(allocator)) {
assertThat(composite.capacity()).isZero();
try (Buffer buf = allocator.allocate(8, BIG_ENDIAN)) {
Buffer.extendComposite(composite, buf);
CompositeBuffer.extendComposite(composite, buf);
}
assertThat(composite.capacity()).isEqualTo(8);
composite.writeLong(0x0102030405060708L);
@ -242,10 +244,10 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void extendingCompositeBufferMustIncreaseCapacityByGivenLittleEndianBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator)) {
Buffer composite = CompositeBuffer.compose(allocator)) {
assertThat(composite.capacity()).isZero();
try (Buffer buf = allocator.allocate(8, LITTLE_ENDIAN)) {
Buffer.extendComposite(composite, buf);
CompositeBuffer.extendComposite(composite, buf);
}
assertThat(composite.capacity()).isEqualTo(8);
composite.writeLong(0x0102030405060708L);
@ -258,11 +260,12 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
try (Buffer a = allocator.allocate(8, BIG_ENDIAN)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
try (Buffer b = allocator.allocate(8, LITTLE_ENDIAN)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
var exc = assertThrows(IllegalArgumentException.class,
() -> CompositeBuffer.extendComposite(composite, b));
assertThat(exc).hasMessageContaining("byte order");
}
}
@ -274,11 +277,12 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
try (Buffer a = allocator.allocate(8, LITTLE_ENDIAN)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
try (Buffer b = allocator.allocate(8, BIG_ENDIAN)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
var exc = assertThrows(IllegalArgumentException.class,
() -> CompositeBuffer.extendComposite(composite, b));
assertThat(exc).hasMessageContaining("byte order");
}
}
@ -288,9 +292,9 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void emptyCompositeBufferMustAllowExtendingWithBufferWithBigEndianByteOrder() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
try (Buffer composite = Buffer.compose(allocator)) {
try (Buffer composite = CompositeBuffer.compose(allocator)) {
try (Buffer b = allocator.allocate(8, BIG_ENDIAN)) {
Buffer.extendComposite(composite, b);
CompositeBuffer.extendComposite(composite, b);
assertThat(composite.order()).isEqualTo(BIG_ENDIAN);
}
}
@ -300,9 +304,9 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void emptyCompositeBufferMustAllowExtendingWithBufferWithLittleEndianByteOrder() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
try (Buffer composite = Buffer.compose(allocator)) {
try (Buffer composite = CompositeBuffer.compose(allocator)) {
try (Buffer b = allocator.allocate(8, LITTLE_ENDIAN)) {
Buffer.extendComposite(composite, b);
CompositeBuffer.extendComposite(composite, b);
assertThat(composite.order()).isEqualTo(LITTLE_ENDIAN);
}
}
@ -312,9 +316,9 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void emptyCompositeBufferMustAllowExtendingWithReadOnlyBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
try (Buffer composite = Buffer.compose(allocator)) {
try (Buffer composite = CompositeBuffer.compose(allocator)) {
try (Buffer b = allocator.allocate(8).readOnly(true)) {
Buffer.extendComposite(composite, b);
CompositeBuffer.extendComposite(composite, b);
assertTrue(composite.readOnly());
}
}
@ -326,13 +330,13 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeLong(0);
try (Buffer b = allocator.allocate(8)) {
b.writeInt(1);
Buffer.extendComposite(composite, b);
CompositeBuffer.extendComposite(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
}
@ -345,16 +349,17 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeInt(0);
try (Buffer b = allocator.allocate(8)) {
b.writeInt(1);
var exc = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
var exc = assertThrows(IllegalArgumentException.class,
() -> CompositeBuffer.extendComposite(composite, b));
assertThat(exc).hasMessageContaining("unwritten gap");
b.writerOffset(0);
Buffer.extendComposite(composite, b);
CompositeBuffer.extendComposite(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(4);
}
@ -367,7 +372,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeLong(0);
@ -375,7 +380,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (Buffer b = allocator.allocate(8)) {
b.writeInt(1);
b.readInt();
Buffer.extendComposite(composite, b);
CompositeBuffer.extendComposite(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
}
@ -388,7 +393,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeLong(0);
@ -396,10 +401,11 @@ public class BufferCompositionTest extends BufferTestSupport {
try (Buffer b = allocator.allocate(8)) {
b.writeInt(1);
b.readInt();
var exc = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
var exc = assertThrows(IllegalArgumentException.class,
() -> CompositeBuffer.extendComposite(composite, b));
assertThat(exc).hasMessageContaining("unread gap");
b.readerOffset(0);
Buffer.extendComposite(composite, b);
CompositeBuffer.extendComposite(composite, b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
assertThat(composite.readerOffset()).isEqualTo(4);
@ -413,7 +419,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(4, BIG_ENDIAN);
Buffer b = allocator.allocate(4, LITTLE_ENDIAN)) {
assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, a, b));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, b));
}
}
@ -422,7 +428,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(4).readOnly(true);
Buffer b = allocator.allocate(4).readOnly(true);
Buffer composite = Buffer.compose(allocator, a, b)) {
Buffer composite = CompositeBuffer.compose(allocator, a, b)) {
assertTrue(composite.readOnly());
verifyWriteInaccessible(composite);
}
@ -433,10 +439,10 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8).readOnly(true);
Buffer b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, a, b));
assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, b, a));
assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, a, b, a));
assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, b, a, b));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, b));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, b, a));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, b, a));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, b, a, b));
}
}
@ -445,10 +451,10 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite; Buffer b = allocator.allocate(8).readOnly(true)) {
assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.extendComposite(composite, b));
}
}
}
@ -458,10 +464,10 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
try (Buffer a = allocator.allocate(8).readOnly(true)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite; Buffer b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.extendComposite(composite, b));
}
}
}

View File

@ -33,7 +33,7 @@ public class BufferEnsureWritableTest extends BufferTestSupport {
assertThrows(IllegalStateException.class, () -> slice.ensureWritable(1));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
}
try (Buffer compose = Buffer.compose(allocator, buf)) {
try (Buffer compose = CompositeBuffer.compose(allocator, buf)) {
assertThrows(IllegalStateException.class, () -> compose.ensureWritable(1));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
}
@ -92,7 +92,7 @@ public class BufferEnsureWritableTest extends BufferTestSupport {
@Test
public void ensureWritableMustExpandCapacityOfEmptyCompositeBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer buf = Buffer.compose(allocator)) {
Buffer buf = CompositeBuffer.compose(allocator)) {
assertThat(buf.writableBytes()).isEqualTo(0);
buf.ensureWritable(8);
assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8);

View File

@ -82,7 +82,7 @@ public class BufferReadOnlyTest extends BufferTestSupport {
@Test
public void readOnlyBufferMustRemainReadOnlyAfterSendForEmptyCompositeBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer buf = Buffer.compose(allocator)) {
Buffer buf = CompositeBuffer.compose(allocator)) {
buf.readOnly(true);
var send = buf.send();
try (Buffer receive = send.receive()) {

View File

@ -310,7 +310,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
assertEquals(0, slice.capacity()); // We haven't written anything, so the slice is empty.
int sliceBorrows = slice.countBorrows();
assertEquals(borrows + 2, buf.countBorrows());
try (Buffer ignored1 = Buffer.compose(allocator, buf, slice)) {
try (Buffer ignored1 = CompositeBuffer.compose(allocator, buf, slice)) {
assertEquals(borrows + 3, buf.countBorrows());
// Note: Slice is empty; not acquired by the composite buffer.
assertEquals(sliceBorrows, slice.countBorrows());
@ -337,7 +337,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
assertEquals(1, slice.capacity());
int sliceBorrows = slice.countBorrows();
assertEquals(borrows + 2, buf.countBorrows());
try (Buffer ignored1 = Buffer.compose(allocator, buf, slice)) {
try (Buffer ignored1 = CompositeBuffer.compose(allocator, buf, slice)) {
assertEquals(borrows + 3, buf.countBorrows());
assertEquals(sliceBorrows + 1, slice.countBorrows());
}
@ -656,7 +656,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@Test
public void bifurcateOnEmptyBigEndianCompositeBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer buf = Buffer.compose(allocator).order(BIG_ENDIAN)) {
Buffer buf = CompositeBuffer.compose(allocator).order(BIG_ENDIAN)) {
verifyBifurcateEmptyCompositeBuffer(buf);
}
}
@ -664,7 +664,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@Test
public void bifurcateOnEmptyLittleEndianCompositeBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer buf = Buffer.compose(allocator).order(LITTLE_ENDIAN)) {
Buffer buf = CompositeBuffer.compose(allocator).order(LITTLE_ENDIAN)) {
verifyBifurcateEmptyCompositeBuffer(buf);
}
}

View File

@ -166,7 +166,7 @@ public abstract class BufferTestSupport {
int half = size / 2;
try (Buffer firstHalf = a.allocate(half);
Buffer secondHalf = b.allocate(size - half)) {
return Buffer.compose(a, firstHalf, secondHalf);
return CompositeBuffer.compose(a, firstHalf, secondHalf);
}
}
@ -190,7 +190,7 @@ public abstract class BufferTestSupport {
try (Buffer a = alloc.allocate(part);
Buffer b = alloc.allocate(part);
Buffer c = alloc.allocate(size - part * 2)) {
return Buffer.compose(alloc, a, b, c);
return CompositeBuffer.compose(alloc, a, b, c);
}
}
@ -229,7 +229,7 @@ public abstract class BufferTestSupport {
if (size < 2) {
return allocator.allocate(size);
}
var buf = Buffer.compose(allocator);
var buf = CompositeBuffer.compose(allocator);
buf.ensureWritable(size);
return buf;
}
@ -364,8 +364,8 @@ public abstract class BufferTestSupport {
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, target, 0, 1));
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, new byte[1], 0, 1));
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, ByteBuffer.allocate(1), 0, 1));
if (Buffer.isComposite(buf)) {
assertThrows(IllegalStateException.class, () -> Buffer.extendComposite(buf, target));
if (CompositeBuffer.isComposite(buf)) {
assertThrows(IllegalStateException.class, () -> CompositeBuffer.extendComposite(buf, target));
}
}

View File

@ -17,6 +17,7 @@ package io.netty.buffer.api.benchmarks;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.CompositeBuffer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -62,14 +63,14 @@ public class ByteIterationBenchmark {
allocator = BufferAllocator.heap();
try (var a = allocator.allocate(SIZE / 2);
var b = allocator.allocate(SIZE / 2)) {
buf = Buffer.compose(allocator, a, b);
buf = CompositeBuffer.compose(allocator, a, b);
}
break;
case "composite-direct":
allocator = BufferAllocator.direct();
try (var a = allocator.allocate(SIZE / 2);
var b = allocator.allocate(SIZE / 2)) {
buf = Buffer.compose(allocator, a, b);
buf = CompositeBuffer.compose(allocator, a, b);
}
break;
default:

View File

@ -17,6 +17,7 @@ package io.netty.buffer.api.examples;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.Scope;
import java.util.concurrent.ThreadLocalRandom;
@ -44,7 +45,7 @@ public final class ComposingAndSlicingExample {
private static Buffer createBigBuffer(BufferAllocator allocator) {
try (Scope scope = new Scope()) {
return Buffer.compose(allocator,
return CompositeBuffer.compose(allocator,
scope.add(allocator.allocate(64)),
scope.add(allocator.allocate(64)),
scope.add(allocator.allocate(64)),

View File

@ -17,6 +17,7 @@ package io.netty.buffer.api.examples;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.Send;
import java.nio.channels.FileChannel;
@ -37,7 +38,7 @@ public final class FileCopyExample {
try (BufferAllocator allocator = BufferAllocator.pooledDirect();
var input = FileChannel.open(Path.of("/dev/urandom"), READ);
var output = FileChannel.open(Path.of("random.bin"), CREATE, TRUNCATE_EXISTING, WRITE)) {
Send<Buffer> done = Buffer.compose(allocator).send();
Send<Buffer> done = CompositeBuffer.compose(allocator).send();
var reader = executor.submit(() -> {
for (int i = 0; i < 1024; i++) {

View File

@ -17,6 +17,7 @@ package io.netty.buffer.api.examples.bytetomessagedecoder;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.Send;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@ -82,11 +83,11 @@ public abstract class AlternativeMessageDecoder extends ChannelHandlerAdapter {
}
private void processRead(ChannelHandlerContext ctx, Buffer input) {
if (collector.isOwned() && Buffer.isComposite(collector) && input.isOwned()
if (collector.isOwned() && CompositeBuffer.isComposite(collector) && input.isOwned()
&& (collector.writableBytes() == 0 || input.writerOffset() == 0)
&& (collector.readableBytes() == 0 || input.readerOffset() == 0)
&& collector.order() == input.order()) {
Buffer.extendComposite(collector, input);
CompositeBuffer.extendComposite(collector, input);
drainCollector(ctx);
return;
}

View File

@ -18,6 +18,7 @@ package io.netty.buffer.api.examples.bytetomessagedecoder;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
@ -94,7 +95,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
* Cumulate {@link Buffer}s by merge them into one {@link Buffer}'s, using memory copies.
*/
public static final Cumulator MERGE_CUMULATOR = (alloc, cumulation, in) -> {
if (cumulation.readableBytes() == 0 && !Buffer.isComposite(cumulation)) {
if (cumulation.readableBytes() == 0 && !CompositeBuffer.isComposite(cumulation)) {
// If cumulation is empty and input buffer is contiguous, use it directly
cumulation.close();
return in;
@ -128,7 +129,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
}
Buffer composite;
try (in) {
if (Buffer.isComposite(cumulation) && cumulation.isOwned()) {
if (CompositeBuffer.isComposite(cumulation) && cumulation.isOwned()) {
composite = cumulation;
if (composite.writerOffset() != composite.capacity()) {
// Writer index must equal capacity if we are going to "write"
@ -137,9 +138,9 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
cumulation.close();
}
} else {
composite = Buffer.compose(alloc, cumulation);
composite = CompositeBuffer.compose(alloc, cumulation);
}
Buffer.extendComposite(composite, in);
CompositeBuffer.extendComposite(composite, in);
return composite;
}
};
@ -224,7 +225,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
}
private static Buffer newEmptyBuffer() {
return Buffer.compose(BufferAllocator.heap());
return CompositeBuffer.compose(BufferAllocator.heap());
}
@Override

View File

@ -17,6 +17,7 @@ package io.netty.buffer.api.examples.bytetomessagedecoder;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.buffer.api.BufferAllocator.heap;
import static io.netty.buffer.api.BufferTestSupport.assertEquals;
import static io.netty.buffer.api.CompositeBuffer.compose;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static org.assertj.core.api.Assertions.assertThat;
@ -421,7 +423,7 @@ public class ByteToMessageDecoderTest {
@Test
public void releaseWhenCompositeCumulateThrows() {
Buffer in = heap().allocate(12, LITTLE_ENDIAN).writerOffset(12);
try (Buffer cumulation = Buffer.compose(heap(), heap().allocate(1, BIG_ENDIAN).writeByte((byte) 0).send())) {
try (Buffer cumulation = compose(heap(), heap().allocate(1, BIG_ENDIAN).writeByte((byte) 0).send())) {
ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(heap(), cumulation, in);
fail();
} catch (IllegalArgumentException expected) {