Merge pull request #51 from netty/composite-buffer

Composite buffer API updates
This commit is contained in:
Chris Vest 2021-04-29 11:33:53 +02:00 committed by GitHub
commit c665db6ec7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 372 additions and 336 deletions

View File

@ -37,7 +37,7 @@ import java.nio.ByteOrder;
* When the buffer is initially allocated, a pairing {@link #close()} call will deallocate it.
* In this state, the buffer {@linkplain #isOwned() is "owned"}.
* <p>
* The buffer can also be {@linkplain #acquire() acquired} when it's about to be involved in a complicated life time.
* The buffer can also be {@linkplain #acquire() acquired} when it's about to be involved in a complicated lifetime.
* The {@link #acquire()} call increments the reference count of the buffer,
* and a pairing {@link #close()} call will decrement the reference count.
* Each acquire lends out the buffer, and the buffer is said to be in a "borrowed" state.
@ -58,14 +58,15 @@ import java.nio.ByteOrder;
* then the ownership of that buffer must be sent to that thread.
* This can be done with the {@link #send()} method.
* The send method consumes the buffer, if it is in an owned state, and produces a {@link Send} object.
* The {@link Send} object can then be shared in a thread-safe way (so called "safe publication"),
* The {@link Send} object can then be shared in a thread-safe way (so-called "safe publication"),
* with the intended recipient thread.
* <p>
* To send a buffer to another thread, the buffer must not have any outstanding borrows.
* That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()};
* all {@linkplain #slice() slices} must have been closed.
* And if this buffer is a constituent of a {@linkplain Buffer#compose(BufferAllocator, Deref...) composite buffer},
* then that composite buffer must be closed.
* And if this buffer is a constituent of a
* {@linkplain CompositeBuffer#compose(BufferAllocator, Buffer...) composite buffer}, then that composite buffer must
* be closed.
* And if this buffer is itself a composite buffer, then it must own all of its constituent buffers.
* The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not.
*
@ -79,7 +80,7 @@ import java.nio.ByteOrder;
* <ul><li>These accessor methods are typically called {@code getX} or {@code setX}.</li></ul>
* </ol>
*
* A buffer contain two mutable offset positions: one for reading and one for writing.
* A buffer contains two mutable offset positions: one for reading and one for writing.
* These positions use <a href="https://en.wikipedia.org/wiki/Zero-based_numbering">zero-based indexing</a>,
* such that the first byte of data in the buffer is placed at offset {@code 0},
* and the last byte in the buffer is at offset {@link #capacity() capacity - 1}.
@ -109,7 +110,7 @@ import java.nio.ByteOrder;
* <ul>
* <li>
* Slices create a new view onto the memory, that is shared between the slice and the buffer.
* As long as both the slice and the originating buffer are alive, neither will have ownership of the memory.
* As long as both the slice, and the originating buffer are alive, neither will have ownership of the memory.
* Since the memory is shared, changes to the data made through one will be visible through the other.
* </li>
* <li>
@ -120,7 +121,7 @@ import java.nio.ByteOrder;
* </li>
* </ul>
*
* These differences means that slicing is mostly suitable for when you temporarily want to share a focused area of a
* These differences mean that slicing is mostly suitable for when you temporarily want to share a focused area of a
* buffer.
* Examples of this include doing IO, or decoding a bounded part of a larger message.
* On the other hand, bifurcate is suitable for when you want to hand over a region of a buffer to some other,
@ -129,93 +130,6 @@ import java.nio.ByteOrder;
* further processing, as bifurcated buffer regions, once their data has been received in its entirety.
*/
public interface Buffer extends Rc<Buffer>, BufferAccessors {
/**
* Compose the given sequence of buffers and present them as a single buffer.
* <p>
* <strong>Note:</strong> The composite buffer increments the reference count on all the constituent buffers,
* and holds a reference to them until the composite buffer is deallocated.
* This means the constituent buffers must still have their outside-reference count decremented as normal.
* If the buffers are allocated for the purpose of participating in the composite buffer,
* then they should be closed as soon as the composite buffer has been created, like in this example:
* <pre>{@code
* try (Buffer a = allocator.allocate(size);
* Buffer b = allocator.allocate(size)) {
* return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>
* {@linkplain Buffer#send() Sending} a composite buffer implies sending all of its constituent buffers.
* For sending to be possible, both the composite buffer itself, and all of its constituent buffers, must be in an
* {@linkplain Rc#isOwned() owned state}.
* This means that the composite buffer must be the only reference to the constituent buffers.
* <p>
* All of the constituent buffers must have the same {@linkplain Buffer#order() byte order}.
* An exception will be thrown if you attempt to compose buffers that have different byte orders,
* and changing the byte order of the constituent buffers so they become inconsistent after construction,
* will result in unspecified behaviour.
* <p>
* The read and write offsets of the constituent buffers must be arranged such that there are no "gaps" when viewed
* as a single connected chunk of memory.
* Specifically, there can be at most one buffer whose write offset is neither zero nor at capacity,
* and all buffers prior to it must have their write offsets at capacity, and all buffers after it must have a write
* offset of zero.
* Likewise, there can be at most one buffer whose read offset is neither zero nor at capacity,
* and all buffers prior to it must have their read offsets at capacity, and all buffers after it must have a read
* offset of zero.
* Furthermore, the sum of the read offsets must be less than or equal to the sum of the write offsets.
* <p>
* Reads and writes to the composite buffer that modifies the read or write offsets, will also modify the relevant
* offsets in the constituent buffers.
* <p>
* It is not a requirement that the buffers have the same size.
* <p>
* It is not a requirement that the buffers are allocated by this allocator, but if
* {@link Buffer#ensureWritable(int)} is called on the composed buffer, and the composed buffer needs to be
* expanded, then this allocator instance will be used for allocation the extra memory.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @param bufs The buffers to compose into a single buffer view.
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent
* {@linkplain Buffer#order() byte order}.
*/
@SafeVarargs
static Buffer compose(BufferAllocator allocator, Deref<Buffer>... bufs) {
return new CompositeBuffer(allocator, bufs);
}
/**
* Extend the given composite buffer with the given extension buffer.
* This works as if the extension had originally been included at the end of the list of constituent buffers when
* the composite buffer was created.
* The composite buffer is modified in-place.
*
* @see #compose(BufferAllocator, Deref...)
* @param composite The composite buffer (from a prior {@link #compose(BufferAllocator, Deref...)} call) to extend
* with the given extension buffer.
* @param extension The buffer to extend the composite buffer with.
*/
static void extendComposite(Buffer composite, Buffer extension) {
if (!isComposite(composite)) {
throw new IllegalArgumentException(
"Expected the first buffer to be a composite buffer, " +
"but it is a " + composite.getClass() + " buffer: " + composite + '.');
}
CompositeBuffer compositeBuffer = (CompositeBuffer) composite;
compositeBuffer.extendWith(extension);
}
/**
* Check if the given buffer is a {@linkplain #compose(BufferAllocator, Deref...) composite} buffer or not.
* @param composite The buffer to check.
* @return {@code true} if the given buffer was created with {@link #compose(BufferAllocator, Deref...)},
* {@code false} otherwise.
*/
static boolean isComposite(Buffer composite) {
return composite.getClass() == CompositeBuffer.class;
}
/**
* Change the default byte order of this buffer, and return this buffer.
*
@ -331,7 +245,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* @param length The number of bytes to copy.
* @throws NullPointerException if the destination array is null.
* @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative,
* or if the resulting end positions reaches beyond the end of either this buffer or the destination array.
* or if the resulting end positions reaches beyond the end of either this buffer, or the destination array.
*/
void copyInto(int srcPos, byte[] dest, int destPos, int length);
@ -352,7 +266,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* @param length The number of bytes to copy.
* @throws NullPointerException if the destination array is null.
* @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative,
* or if the resulting end positions reaches beyond the end of either this buffer or the destination array.
* or if the resulting end positions reaches beyond the end of either this buffer, or the destination array.
*/
void copyInto(int srcPos, ByteBuffer dest, int destPos, int length);
@ -373,7 +287,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* @param length The number of bytes to copy.
* @throws NullPointerException if the destination array is null.
* @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative,
* or if the resulting end positions reaches beyond the end of either this buffer or the destination array.
* or if the resulting end positions reaches beyond the end of either this buffer, or the destination array.
*/
void copyInto(int srcPos, Buffer dest, int destPos, int length);
@ -408,9 +322,9 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* Open a cursor to iterate the readable bytes of this buffer. The {@linkplain #readerOffset() reader offset} and
* {@linkplain #writerOffset() witer offset} are not modified by the cursor.
* <p>
* Care should be taken to ensure that the buffers lifetime extends beyond the cursor and the iteration, and that
* Care should be taken to ensure that the buffer's lifetime extends beyond the cursor and the iteration, and that
* the {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() writer offset} are not modified
* while the iteration takes place. Otherwise unpredictable behaviour might result.
* while the iteration takes place. Otherwise, unpredictable behaviour might result.
*
* @return A {@link ByteCursor} for iterating the readable bytes of this buffer.
*/
@ -421,16 +335,16 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* The {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() witer offset} are not modified by
* the cursor.
* <p>
* Care should be taken to ensure that the buffers lifetime extends beyond the cursor and the iteration, and that
* Care should be taken to ensure that the buffer's lifetime extends beyond the cursor and the iteration, and that
* the {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() writer offset} are not modified
* while the iteration takes place. Otherwise unpredictable behaviour might result.
* while the iteration takes place. Otherwise, unpredictable behaviour might result.
*
* @param fromOffset The offset into the buffer where iteration should start.
* The first byte read from the iterator will be the byte at this offset.
* @param length The number of bytes to iterate.
* @return A {@link ByteCursor} for the given stretch of bytes of this buffer.
* @throws IllegalArgumentException if the length is negative, or if the region given by the {@code fromOffset} and
* the {@code length} reaches outside of the bounds of this buffer.
* the {@code length} reaches outside the bounds of this buffer.
*/
ByteCursor openCursor(int fromOffset, int length);
@ -439,9 +353,9 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* The {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() witer offset} are not modified by
* the cursor.
* <p>
* Care should be taken to ensure that the buffers lifetime extends beyond the cursor and the iteration, and that
* Care should be taken to ensure that the buffer's lifetime extends beyond the cursor and the iteration, and that
* the {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() writer offset} are not modified
* while the iteration takes place. Otherwise unpredictable behaviour might result.
* while the iteration takes place. Otherwise, unpredictable behaviour might result.
*
* @return A {@link ByteCursor} for the readable bytes of this buffer.
*/
@ -455,16 +369,16 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* The {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() witer offset} are not modified by
* the cursor.
* <p>
* Care should be taken to ensure that the buffers lifetime extends beyond the cursor and the iteration, and that
* Care should be taken to ensure that the buffer's lifetime extends beyond the cursor and the iteration, and that
* the {@linkplain #readerOffset() reader offset} and {@linkplain #writerOffset() writer offset} are not modified
* while the iteration takes place. Otherwise unpredictable behaviour might result.
* while the iteration takes place. Otherwise, unpredictable behaviour might result.
*
* @param fromOffset The offset into the buffer where iteration should start.
* The first byte read from the iterator will be the byte at this offset.
* @param length The number of bytes to iterate.
* @return A {@link ByteCursor} for the given stretch of bytes of this buffer.
* @throws IllegalArgumentException if the length is negative, or if the region given by the {@code fromOffset} and
* the {@code length} reaches outside of the bounds of this buffer.
* the {@code length} reaches outside the bounds of this buffer.
*/
ByteCursor openReverseCursor(int fromOffset, int length);
@ -580,7 +494,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* {@linkplain #send() sent} to other threads.
* <p>
* The returned buffer will adopt the {@link #readerOffset()} of this buffer, and have its {@link #writerOffset()}
* and {@link #capacity()} both set to the equal to the write offset of this buffer.
* and {@link #capacity()} both set to the equal to the write-offset of this buffer.
* <p>
* The memory region in the returned buffer will become inaccessible through this buffer. This buffer will have its
* capacity reduced by the capacity of the returned buffer, and the read and write offsets of this buffer will both
@ -604,7 +518,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* }</pre>
* When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the
* underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until
* all of the bifurcated parts have been closed.
* all the bifurcated parts have been closed.
* <p>
* Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be
* bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can
@ -635,7 +549,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* <p>
* The memory region in the returned buffer will become inaccessible through this buffer. If the
* {@link #readerOffset()} or {@link #writerOffset()} of this buffer lie prior to the {@code splitOffset},
* then those offsets will be moved forward so they land on offset 0 after the bifurcation.
* then those offsets will be moved forward, so they land on offset 0 after the bifurcation.
* <p>
* Effectively, the following transformation takes place:
* <pre>{@code
@ -655,7 +569,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* }</pre>
* When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the
* underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until
* all of the bifurcated parts have been closed.
* all the bifurcated parts have been closed.
* <p>
* Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be
* bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can
@ -731,7 +645,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* <strong>Note</strong> that the {@link ReadableComponent} instance passed to the consumer could be reused for
* multiple calls, so the data must be extracted from the component in the context of the iteration.
* <p>
* The {@link ByteBuffer} instances obtained from the component, share life time with that internal component.
* The {@link ByteBuffer} instances obtained from the component, share lifetime with that internal component.
* This means they can be accessed as long as the internal memory store remain unchanged. Methods that may cause
* such changes, are any method that requires the buffer to be {@linkplain #isOwned() owned}.
* <p>
@ -773,7 +687,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* <strong>Note</strong> that the {@link WritableComponent} instance passed to the consumer could be reused for
* multiple calls, so the data must be extracted from the component in the context of the iteration.
* <p>
* The {@link ByteBuffer} instances obtained from the component, share life time with that internal component.
* The {@link ByteBuffer} instances obtained from the component, share lifetime with that internal component.
* This means they can be accessed as long as the internal memory store remain unchanged. Methods that may cause
* such changes, are any method that requires the buffer to be {@linkplain #isOwned() owned}.
* <p>

View File

@ -24,7 +24,70 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Stream;
final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implements Buffer {
/**
* The {@code CompositeBuffer} is a concrete {@link Buffer} implementation that make a number of other buffers appear
* as one. A composite buffer behaves the same as a normal, non-composite buffer in every way, so you normally don't
* need to handle them specially.
* <p>
* A composite buffer is constructed using one of the {@code compose} methods:
* <ul>
* <li>
* {@link #compose(BufferAllocator, Buffer...)} creates a composite buffer from the given set of buffers.
* The reference count to the passed buffers will be increased, and the resulting composite buffer may or may
* not have ownership.
* </li>
* <li>
* {@link #compose(BufferAllocator, Send[])} creates a composite buffer from the buffers that are sent to it via
* the passed in send objects. Since {@link Send#receive()} transfers ownership, the resulting composite buffer
* will have ownership, because it is guaranteed that there are no other references to its constituent buffers.
* </li>
* <li>
* {@link #compose(BufferAllocator)} creates and empty, zero capacity, composite buffer. Such empty buffers may
* change their {@linkplain #order() byte order} or {@linkplain #readOnly() read-only} states when they gain
* their first component.
* </li>
* </ul>
* Composite buffers can later be extended with internally allocated components, with {@link #ensureWritable(int)},
* or with externally allocated buffers, using {@link #extendWith(Buffer)}.
*
* <h3>Constituent buffer requirements</h3>
*
* The buffers that a being composed to form the composite buffer, need to live up to a number of requirements.
* Basically, if we imagine that the constituent buffers have their memory regions concatenated together, then the
* result needs to make sense.
* <p>
* All the constituent buffers must have the same {@linkplain Buffer#order() byte order}.
* An exception will be thrown if you attempt to compose buffers that have different byte orders,
* and changing the byte order of the constituent buffers so that they become inconsistent after construction,
* will result in unspecified behaviour.
* <p>
* The read and write offsets of the constituent buffers must be arranged such that there are no "gaps" when viewed
* as a single connected chunk of memory.
* Specifically, there can be at most one buffer whose write offset is neither zero nor at capacity,
* and all buffers prior to it must have their write offsets at capacity, and all buffers after it must have a
* write-offset of zero.
* Likewise, there can be at most one buffer whose read offset is neither zero nor at capacity,
* and all buffers prior to it must have their read offsets at capacity, and all buffers after it must have a read
* offset of zero.
* Furthermore, the sum of the read offsets must be less than or equal to the sum of the write-offsets.
* <p>
* Reads and writes to the composite buffer that modifies the read or write offsets, will also modify the relevant
* offsets in the constituent buffers.
* <p>
* It is not a requirement that the buffers have the same size.
* <p>
* It is not a requirement that the buffers are allocated by this allocator, but if
* {@link Buffer#ensureWritable(int)} is called on the composed buffer, and the composed buffer needs to be
* expanded, then this allocator instance will be used for allocation the extra memory.
*
* <h3>Ownership and Send</h3>
*
* {@linkplain Buffer#send() Sending} a composite buffer implies sending all of its constituent buffers.
* For sending to be possible, both the composite buffer itself, and all of its constituent buffers, must be in an
* {@linkplain Rc#isOwned() owned state}.
* This means that the composite buffer must be the only reference to the constituent buffers.
*/
public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implements Buffer {
/**
* The max array size is JVM implementation dependant, but most seem to settle on {@code Integer.MAX_VALUE - 8}.
* We set the max composite buffer capacity to the same, since it would otherwise be impossible to create a
@ -45,6 +108,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
return "COMPOSITE_DROP";
}
};
private static final Buffer[] EMPTY_BUFFER_ARRAY = new Buffer[0];
private final BufferAllocator allocator;
private final TornBufferAccessors tornBufAccessors;
@ -58,11 +122,111 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
private boolean closed;
private boolean readOnly;
CompositeBuffer(BufferAllocator allocator, Deref<Buffer>[] refs) {
this(allocator, filterExternalBufs(refs), COMPOSITE_DROP, false);
/**
* Compose the given sequence of buffers and present them as a single buffer.
* <p>
* <strong>Note:</strong> The composite buffer increments the reference count on all the constituent buffers,
* and holds a reference to them until the composite buffer is deallocated.
* This means the constituent buffers must still have their outside-reference count decremented as normal.
* If the buffers are allocated for the purpose of participating in the composite buffer,
* then they should be closed as soon as the composite buffer has been created, like in this example:
* <pre>{@code
* try (Buffer a = allocator.allocate(size);
* Buffer b = allocator.allocate(size)) {
* return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>
* See the class documentation for more information on what is required of the given buffers for composition to be
* allowed.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @param bufs The buffers to compose into a single buffer view.
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent
* {@linkplain Buffer#order() byte order}.
*/
public static CompositeBuffer compose(BufferAllocator allocator, Buffer... bufs) {
Stream<Buffer> bufferStream = Arrays.stream(bufs)
.map(buf -> buf.acquire()); // Increments reference counts.
return new CompositeBuffer(allocator, filterExternalBufs(bufferStream), COMPOSITE_DROP, false);
}
private static Buffer[] filterExternalBufs(Deref<Buffer>[] refs) {
/**
* Compose the given sequence of sends of buffers and present them as a single buffer.
* <p>
* <strong>Note:</strong> The composite buffer holds a reference to all the constituent buffers,
* until the composite buffer is deallocated.
* This means the constituent buffers must still have their outside-reference count decremented as normal.
* If the buffers are allocated for the purpose of participating in the composite buffer,
* then they should be closed as soon as the composite buffer has been created, like in this example:
* <pre>{@code
* try (Buffer a = allocator.allocate(size);
* Buffer b = allocator.allocate(size)) {
* return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>
* See the class documentation for more information on what is required of the given buffers for composition to be
* allowed.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @param sends The sent buffers to compose into a single buffer view.
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent
* {@linkplain Buffer#order() byte order}.
* @throws IllegalStateException if one of the sends have already been received. The remaining buffers and sends
* will be closed and descarded, respectively.
*/
@SafeVarargs
public static CompositeBuffer compose(BufferAllocator allocator, Send<Buffer>... sends) {
Buffer[] bufs = new Buffer[sends.length];
IllegalStateException ise = null;
for (int i = 0; i < sends.length; i++) {
if (ise != null) {
sends[i].discard();
} else {
try {
bufs[i] = sends[i].receive();
} catch (IllegalStateException e) {
ise = e;
for (int j = 0; j < i; j++) {
bufs[j].close();
}
}
}
}
if (ise != null) {
throw ise;
}
return new CompositeBuffer(allocator, filterExternalBufs(Arrays.stream(bufs)), COMPOSITE_DROP, false);
}
/**
* Create an empty composite buffer, that has no components. The buffer can be extended with components using either
* {@link #ensureWritable(int)} or {@link #extendWith(Buffer)}.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @return A composite buffer that has no components, and has a capacity of zero.
*/
public static CompositeBuffer compose(BufferAllocator allocator) {
return new CompositeBuffer(allocator, EMPTY_BUFFER_ARRAY, COMPOSITE_DROP, false);
}
/**
* Check if the given buffer is a {@linkplain #compose(BufferAllocator, Buffer...) composite} buffer or not.
* @param composite The buffer to check.
* @return {@code true} if the given buffer was created with {@link #compose(BufferAllocator, Buffer...)},
* {@code false} otherwise.
*/
public static boolean isComposite(Buffer composite) {
return composite.getClass() == CompositeBuffer.class;
}
private static Buffer[] filterExternalBufs(Stream<Buffer> refs) {
// We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway,
// and also, by ensuring that all constituent buffers contribute to the size of the composite buffer,
// we make sure that the number of composite buffers will never become greater than the number of bytes in
@ -70,11 +234,10 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
// will never overflow their component counts.
// Allocating a new array unconditionally also prevents external modification of the array.
Buffer[] bufs = Arrays.stream(refs)
.map(r -> r.get()) // Increments reference counts.
.filter(CompositeBuffer::discardEmpty)
.flatMap(CompositeBuffer::flattenBuffer)
.toArray(Buffer[]::new);
Buffer[] bufs = refs
.filter(CompositeBuffer::discardEmpty)
.flatMap(CompositeBuffer::flattenBuffer)
.toArray(Buffer[]::new);
// Make sure there are no duplicates among the buffers.
Set<Buffer> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));
@ -213,7 +376,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer order(ByteOrder order) {
public CompositeBuffer order(ByteOrder order) {
if (this.order != order) {
this.order = order;
for (Buffer buf : bufs) {
@ -239,7 +402,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer readerOffset(int index) {
public CompositeBuffer readerOffset(int index) {
prepRead(index, 0);
int indexLeft = index;
for (Buffer buf : bufs) {
@ -256,7 +419,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer writerOffset(int index) {
public CompositeBuffer writerOffset(int index) {
checkWriteBounds(index, 0);
int indexLeft = index;
for (Buffer buf : bufs) {
@ -268,7 +431,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer fill(byte value) {
public CompositeBuffer fill(byte value) {
for (Buffer buf : bufs) {
buf.fill(value);
}
@ -281,7 +444,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer readOnly(boolean readOnly) {
public CompositeBuffer readOnly(boolean readOnly) {
for (Buffer buf : bufs) {
buf.readOnly(readOnly);
}
@ -295,7 +458,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer slice(int offset, int length) {
public CompositeBuffer slice(int offset, int length) {
checkWriteBounds(offset, length);
if (offset < 0 || length < 0) {
throw new IllegalArgumentException(
@ -669,7 +832,17 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
unsafeExtendWith(extension);
}
void extendWith(Buffer extension) {
/**
* Extend this composite buffer with the given extension buffer.
* This works as if the extension had originally been included at the end of the list of constituent buffers when
* the composite buffer was created.
* The extension buffer is added to the end of this composite buffer, which is modified in-place.
*
* @see #compose(BufferAllocator, Buffer...)
* @see #compose(BufferAllocator, Send...)
* @param extension The buffer to extend the composite buffer with.
*/
public void extendWith(Buffer extension) {
Objects.requireNonNull(extension, "Extension buffer cannot be null.");
if (!isOwned()) {
throw new IllegalStateException("This buffer cannot be extended because it is not in an owned state.");
@ -701,7 +874,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
Buffer[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
try {
if (extension instanceof CompositeBuffer) {
// If the extension is itself a composite buffer, then extend this one by all of the constituent
// If the extension is itself a composite buffer, then extend this one by all the constituent
// component buffers.
CompositeBuffer compositeExtension = (CompositeBuffer) extension;
Buffer[] addedBuffers = compositeExtension.bufs;
@ -749,7 +922,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer bifurcate(int splitOffset) {
public CompositeBuffer bifurcate(int splitOffset) {
if (splitOffset < 0) {
throw new IllegalArgumentException("The split offset cannot be negative: " + splitOffset + '.');
}
@ -904,25 +1077,25 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer writeByte(byte value) {
public CompositeBuffer writeByte(byte value) {
prepWrite(Byte.BYTES).writeByte(value);
return this;
}
@Override
public Buffer setByte(int woff, byte value) {
public CompositeBuffer setByte(int woff, byte value) {
prepWrite(woff, Byte.BYTES).setByte(subOffset, value);
return this;
}
@Override
public Buffer writeUnsignedByte(int value) {
public CompositeBuffer writeUnsignedByte(int value) {
prepWrite(Byte.BYTES).writeUnsignedByte(value);
return this;
}
@Override
public Buffer setUnsignedByte(int woff, int value) {
public CompositeBuffer setUnsignedByte(int woff, int value) {
prepWrite(woff, Byte.BYTES).setUnsignedByte(subOffset, value);
return this;
}
@ -938,13 +1111,13 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer writeChar(char value) {
public CompositeBuffer writeChar(char value) {
prepWrite(2).writeChar(value);
return this;
}
@Override
public Buffer setChar(int woff, char value) {
public CompositeBuffer setChar(int woff, char value) {
prepWrite(woff, 2).setChar(subOffset, value);
return this;
}
@ -970,25 +1143,25 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer writeShort(short value) {
public CompositeBuffer writeShort(short value) {
prepWrite(Short.BYTES).writeShort(value);
return this;
}
@Override
public Buffer setShort(int woff, short value) {
public CompositeBuffer setShort(int woff, short value) {
prepWrite(woff, Short.BYTES).setShort(subOffset, value);
return this;
}
@Override
public Buffer writeUnsignedShort(int value) {
public CompositeBuffer writeUnsignedShort(int value) {
prepWrite(Short.BYTES).writeUnsignedShort(value);
return this;
}
@Override
public Buffer setUnsignedShort(int woff, int value) {
public CompositeBuffer setUnsignedShort(int woff, int value) {
prepWrite(woff, Short.BYTES).setUnsignedShort(subOffset, value);
return this;
}
@ -1014,25 +1187,25 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer writeMedium(int value) {
public CompositeBuffer writeMedium(int value) {
prepWrite(3).writeMedium(value);
return this;
}
@Override
public Buffer setMedium(int woff, int value) {
public CompositeBuffer setMedium(int woff, int value) {
prepWrite(woff, 3).setMedium(subOffset, value);
return this;
}
@Override
public Buffer writeUnsignedMedium(int value) {
public CompositeBuffer writeUnsignedMedium(int value) {
prepWrite(3).writeUnsignedMedium(value);
return this;
}
@Override
public Buffer setUnsignedMedium(int woff, int value) {
public CompositeBuffer setUnsignedMedium(int woff, int value) {
prepWrite(woff, 3).setUnsignedMedium(subOffset, value);
return this;
}
@ -1058,25 +1231,25 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer writeInt(int value) {
public CompositeBuffer writeInt(int value) {
prepWrite(Integer.BYTES).writeInt(value);
return this;
}
@Override
public Buffer setInt(int woff, int value) {
public CompositeBuffer setInt(int woff, int value) {
prepWrite(woff, Integer.BYTES).setInt(subOffset, value);
return this;
}
@Override
public Buffer writeUnsignedInt(long value) {
public CompositeBuffer writeUnsignedInt(long value) {
prepWrite(Integer.BYTES).writeUnsignedInt(value);
return this;
}
@Override
public Buffer setUnsignedInt(int woff, long value) {
public CompositeBuffer setUnsignedInt(int woff, long value) {
prepWrite(woff, Integer.BYTES).setUnsignedInt(subOffset, value);
return this;
}
@ -1092,13 +1265,13 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer writeFloat(float value) {
public CompositeBuffer writeFloat(float value) {
prepWrite(Float.BYTES).writeFloat(value);
return this;
}
@Override
public Buffer setFloat(int woff, float value) {
public CompositeBuffer setFloat(int woff, float value) {
prepWrite(woff, Float.BYTES).setFloat(subOffset, value);
return this;
}
@ -1114,13 +1287,13 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer writeLong(long value) {
public CompositeBuffer writeLong(long value) {
prepWrite(Long.BYTES).writeLong(value);
return this;
}
@Override
public Buffer setLong(int woff, long value) {
public CompositeBuffer setLong(int woff, long value) {
prepWrite(woff, Long.BYTES).setLong(subOffset, value);
return this;
}
@ -1136,13 +1309,13 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
@Override
public Buffer writeDouble(double value) {
public CompositeBuffer writeDouble(double value) {
prepWrite(Double.BYTES).writeDouble(value);
return this;
}
@Override
public Buffer setDouble(int woff, double value) {
public CompositeBuffer setDouble(int woff, double value) {
prepWrite(woff, Double.BYTES).setDouble(subOffset, value);
return this;
}

View File

@ -1,52 +0,0 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import java.util.function.Supplier;
/**
* A Deref provides the capability to acquire a reference to a {@linkplain Rc reference counted} object.
* <p>
* <strong>Note:</strong> Callers must ensure that they close any references they obtain.
* <p>
* Deref itself does not specify if a reference can be obtained more than once.
* For instance, any {@link Send} object is also a {@code Deref}, but the reference can only be acquired once.
* Meanwhile, {@link Rc} objects are themselves their own {@code Derefs}, and permit references to be acquired multiple
* times.
*
* @param <T> The concrete type of reference counted object that can be obtained.
*/
public interface Deref<T extends Rc<T>> extends Supplier<T> {
/**
* Acquire a reference to the reference counted object.
* <p>
* <strong>Note:</strong> This call increments the reference count of the acquired object, and must be paired with
* a {@link Rc#close()} call.
* Using a try-with-resources clause is the easiest way to ensure this.
*
* @return A reference to the reference counted object.
*/
@Override
T get();
/**
* Determine if the object in this {@code Deref} is an instance of the given class.
*
* @param cls The type to check.
* @return {@code true} if the object in this {@code Deref} can be assigned fields or variables of the given type.
*/
boolean referentIsInstanceOf(Class<?> cls);
}

View File

@ -26,7 +26,7 @@ package io.netty.buffer.api;
*
* @param <I> The concrete subtype.
*/
public interface Rc<I extends Rc<I>> extends AutoCloseable, Deref<I> {
public interface Rc<I extends Rc<I>> extends AutoCloseable {
/**
* Increment the reference count.
* <p>
@ -36,16 +36,6 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable, Deref<I> {
*/
I acquire();
@Override
default I get() {
return acquire();
}
@Override
default boolean referentIsInstanceOf(Class<?> cls) {
return cls.isInstance(this);
}
/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
* <p>

View File

@ -32,7 +32,7 @@ import java.util.function.Supplier;
*
* @param <T>
*/
public interface Send<T extends Rc<T>> extends Deref<T> {
public interface Send<T extends Rc<T>> {
/**
* Construct a {@link Send} based on the given {@link Supplier}.
* The supplier will be called only once, in the receiving thread.
@ -117,8 +117,12 @@ public interface Send<T extends Rc<T>> extends Deref<T> {
}
}
@Override
default T get() {
return receive();
}
/**
* Determine if the object received from this {@code Send} is an instance of the given class.
*
* @param cls The type to check.
* @return {@code true} if the object received from this {@code Send} can be assigned fields or variables of the
* given type, otherwise false.
*/
boolean referentIsInstanceOf(Class<?> cls);
}

View File

@ -43,11 +43,6 @@ class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
return (I) copy;
}
Owned<T> unsafeUnwrapOwned() {
gateReception();
return outgoing;
}
private void gateReception() {
if ((boolean) RECEIVED.getAndSet(this, true)) {
throw new IllegalStateException("This object has already been received.");

View File

@ -115,7 +115,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst, bufSecond);
}
});
}
@ -131,7 +131,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst, bufSecond);
}
});
}
@ -147,7 +147,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst, bufSecond);
}
});
}
@ -163,7 +163,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst, bufSecond);
}
});
}
@ -180,7 +180,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}
@ -197,7 +197,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}
@ -214,7 +214,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}
@ -231,7 +231,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}

View File

@ -69,8 +69,8 @@ public class BufferComponentIterationTest extends BufferTestSupport {
try (Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
Buffer c = allocator.allocate(8);
Buffer x = Buffer.compose(allocator, b, c)) {
buf = Buffer.compose(allocator, a, x);
Buffer x = CompositeBuffer.compose(allocator, b, c)) {
buf = CompositeBuffer.compose(allocator, a, x);
}
assertThat(buf.countComponents()).isEqualTo(3);
assertThat(buf.countReadableComponents()).isZero();
@ -122,7 +122,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
a.writeInt(1);
b.writeInt(2);
c.writeInt(3);
composite = Buffer.compose(allocator, a, b, c);
composite = CompositeBuffer.compose(allocator, a, b, c);
}
var list = new LinkedList<Integer>(List.of(1, 2, 3));
int count = composite.forEachReadable(0, (index, component) -> {
@ -159,7 +159,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
a.writeInt(1);
b.writeInt(2);
c.writeInt(3);
composite = Buffer.compose(allocator, a, b, c);
composite = CompositeBuffer.compose(allocator, a, b, c);
}
int readPos = composite.readerOffset();
int writePos = composite.writerOffset();
@ -197,7 +197,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
try (Buffer a = allocator.allocate(4);
Buffer b = allocator.allocate(4);
Buffer c = allocator.allocate(4)) {
buf = Buffer.compose(allocator, a, b, c);
buf = CompositeBuffer.compose(allocator, a, b, c);
}
int i = 1;
while (buf.writableBytes() > 0) {
@ -269,7 +269,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
try (Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
Buffer c = allocator.allocate(8)) {
buf = Buffer.compose(allocator, a, b, c);
buf = CompositeBuffer.compose(allocator, a, b, c);
}
buf.order(BIG_ENDIAN);
buf.forEachWritable(0, (index, component) -> {

View File

@ -40,7 +40,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (Buffer b = allocator.allocate(8)) {
assertTrue(a.isOwned());
assertTrue(b.isOwned());
composite = Buffer.compose(allocator, a, b);
composite = CompositeBuffer.compose(allocator, a, b);
assertFalse(composite.isOwned());
assertFalse(a.isOwned());
assertFalse(b.isOwned());
@ -58,13 +58,14 @@ public class BufferCompositionTest extends BufferTestSupport {
public void compositeBuffersCannotHaveDuplicateComponents() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(4)) {
var e = assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, a, a));
var e = assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, a));
assertThat(e).hasMessageContaining("duplicate");
try (Buffer composite = Buffer.compose(allocator, a)) {
try (CompositeBuffer composite = CompositeBuffer.compose(allocator, a)) {
a.close();
try {
e = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, a));
e = assertThrows(IllegalArgumentException.class,
() -> composite.extendWith(a));
assertThat(e).hasMessageContaining("duplicate");
} finally {
a.acquire();
@ -76,7 +77,7 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void compositeBufferFromSends() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator,
Buffer composite = CompositeBuffer.compose(allocator,
allocator.allocate(8).send(),
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
@ -89,18 +90,19 @@ public class BufferCompositionTest extends BufferTestSupport {
public void compositeBufferMustNotBeAllowedToContainThemselves() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer a = allocator.allocate(4);
Buffer buf = Buffer.compose(allocator, a);
CompositeBuffer buf = CompositeBuffer.compose(allocator, a);
try (buf; a) {
a.close();
try {
assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(buf, buf));
assertThrows(IllegalArgumentException.class, () -> buf.extendWith(buf));
assertTrue(buf.isOwned());
try (Buffer composite = Buffer.compose(allocator, buf)) {
try (Buffer composite = CompositeBuffer.compose(allocator, buf)) {
// the composing increments the reference count of constituent buffers...
// counter-act this so it can be extended:
a.close(); // buf is now owned so it can be extended.
// counter-act this, so it can be extended:
a.close(); // buf is now owned, so it can be extended.
try {
assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(buf, composite));
assertThrows(IllegalArgumentException.class,
() -> buf.extendWith(composite));
} finally {
a.acquire(); // restore the reference count to align with our try-with-resources structure.
}
@ -119,7 +121,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer composite;
try (Buffer a = allocator.allocate(4, BIG_ENDIAN)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeInt(0x01020304);
@ -136,7 +138,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer composite;
try (Buffer a = allocator.allocate(4, LITTLE_ENDIAN)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeInt(0x05060708);
@ -150,7 +152,7 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void emptyCompositeBufferMustUseNativeByteOrder() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator)) {
Buffer composite = CompositeBuffer.compose(allocator)) {
assertThat(composite.order()).isEqualTo(ByteOrder.nativeOrder());
}
}
@ -160,8 +162,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(a, b));
assertThat(exc).hasMessageContaining("Expected").hasMessageContaining("composite");
assertThrows(ClassCastException.class, () -> ((CompositeBuffer) a).extendWith(b));
}
}
@ -170,9 +171,9 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
Buffer composed = Buffer.compose(allocator, a)) {
CompositeBuffer composed = CompositeBuffer.compose(allocator, a)) {
try (Buffer ignore = composed.acquire()) {
var exc = assertThrows(IllegalStateException.class, () -> Buffer.extendComposite(composed, b));
var exc = assertThrows(IllegalStateException.class, () -> composed.extendWith(b));
assertThat(exc).hasMessageContaining("owned");
}
}
@ -181,13 +182,13 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void extendingCompositeBufferWithItselfMustThrow() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
var exc = assertThrows(IllegalArgumentException.class,
() -> Buffer.extendComposite(composite, composite));
() -> composite.extendWith(composite));
assertThat(exc).hasMessageContaining("cannot be extended");
}
}
@ -196,20 +197,20 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void extendingWithZeroCapacityBufferHasNoEffect() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator)) {
Buffer.extendComposite(composite, composite);
CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
composite.extendWith(composite);
assertThat(composite.capacity()).isZero();
assertThat(composite.countComponents()).isZero();
}
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer a = allocator.allocate(1);
Buffer composite = Buffer.compose(allocator, a);
CompositeBuffer composite = CompositeBuffer.compose(allocator, a);
a.close();
assertTrue(composite.isOwned());
assertThat(composite.capacity()).isOne();
assertThat(composite.countComponents()).isOne();
try (Buffer b = Buffer.compose(allocator)) {
Buffer.extendComposite(composite, b);
try (Buffer b = CompositeBuffer.compose(allocator)) {
composite.extendWith(b);
}
assertTrue(composite.isOwned());
assertThat(composite.capacity()).isOne();
@ -220,18 +221,18 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void extendingCompositeBufferWithNullMustThrow() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator)) {
assertThrows(NullPointerException.class, () -> Buffer.extendComposite(composite, null));
CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
assertThrows(NullPointerException.class, () -> composite.extendWith(null));
}
}
@Test
public void extendingCompositeBufferMustIncreaseCapacityByGivenBigEndianBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator)) {
CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
assertThat(composite.capacity()).isZero();
try (Buffer buf = allocator.allocate(8, BIG_ENDIAN)) {
Buffer.extendComposite(composite, buf);
composite.extendWith(buf);
}
assertThat(composite.capacity()).isEqualTo(8);
composite.writeLong(0x0102030405060708L);
@ -242,10 +243,10 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void extendingCompositeBufferMustIncreaseCapacityByGivenLittleEndianBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer composite = Buffer.compose(allocator)) {
CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
assertThat(composite.capacity()).isZero();
try (Buffer buf = allocator.allocate(8, LITTLE_ENDIAN)) {
Buffer.extendComposite(composite, buf);
composite.extendWith(buf);
}
assertThat(composite.capacity()).isEqualTo(8);
composite.writeLong(0x0102030405060708L);
@ -256,13 +257,14 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void extendingBigEndianCompositeBufferMustThrowIfExtensionIsLittleEndian() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8, BIG_ENDIAN)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
try (Buffer b = allocator.allocate(8, LITTLE_ENDIAN)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
var exc = assertThrows(IllegalArgumentException.class,
() -> composite.extendWith(b));
assertThat(exc).hasMessageContaining("byte order");
}
}
@ -272,13 +274,14 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void extendingLittleEndianCompositeBufferMustThrowIfExtensionIsBigEndian() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8, LITTLE_ENDIAN)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
try (Buffer b = allocator.allocate(8, BIG_ENDIAN)) {
var exc = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
var exc = assertThrows(IllegalArgumentException.class,
() -> composite.extendWith(b));
assertThat(exc).hasMessageContaining("byte order");
}
}
@ -288,9 +291,9 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void emptyCompositeBufferMustAllowExtendingWithBufferWithBigEndianByteOrder() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
try (Buffer composite = Buffer.compose(allocator)) {
try (CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
try (Buffer b = allocator.allocate(8, BIG_ENDIAN)) {
Buffer.extendComposite(composite, b);
composite.extendWith(b);
assertThat(composite.order()).isEqualTo(BIG_ENDIAN);
}
}
@ -300,9 +303,9 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void emptyCompositeBufferMustAllowExtendingWithBufferWithLittleEndianByteOrder() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
try (Buffer composite = Buffer.compose(allocator)) {
try (CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
try (Buffer b = allocator.allocate(8, LITTLE_ENDIAN)) {
Buffer.extendComposite(composite, b);
composite.extendWith(b);
assertThat(composite.order()).isEqualTo(LITTLE_ENDIAN);
}
}
@ -312,9 +315,9 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void emptyCompositeBufferMustAllowExtendingWithReadOnlyBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
try (Buffer composite = Buffer.compose(allocator)) {
try (CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
try (Buffer b = allocator.allocate(8).readOnly(true)) {
Buffer.extendComposite(composite, b);
composite.extendWith(b);
assertTrue(composite.readOnly());
}
}
@ -324,15 +327,15 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void whenExtendingCompositeBufferWithWriteOffsetAtCapacityExtensionWriteOffsetCanBeNonZero() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeLong(0);
try (Buffer b = allocator.allocate(8)) {
b.writeInt(1);
Buffer.extendComposite(composite, b);
composite.extendWith(b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
}
@ -343,18 +346,19 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void whenExtendingCompositeBufferWithWriteOffsetLessThanCapacityExtensionWriteOffsetMustZero() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeInt(0);
try (Buffer b = allocator.allocate(8)) {
b.writeInt(1);
var exc = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
var exc = assertThrows(IllegalArgumentException.class,
() -> composite.extendWith(b));
assertThat(exc).hasMessageContaining("unwritten gap");
b.writerOffset(0);
Buffer.extendComposite(composite, b);
composite.extendWith(b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(4);
}
@ -365,9 +369,9 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void whenExtendingCompositeBufferWithReadOffsetAtCapacityExtensionReadOffsetCanBeNonZero() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeLong(0);
@ -375,7 +379,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (Buffer b = allocator.allocate(8)) {
b.writeInt(1);
b.readInt();
Buffer.extendComposite(composite, b);
composite.extendWith(b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
}
@ -386,9 +390,9 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void whenExtendingCompositeBufferWithReadOffsetLessThanCapacityExtensionReadOffsetMustZero() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
composite.writeLong(0);
@ -396,10 +400,11 @@ public class BufferCompositionTest extends BufferTestSupport {
try (Buffer b = allocator.allocate(8)) {
b.writeInt(1);
b.readInt();
var exc = assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
var exc = assertThrows(IllegalArgumentException.class,
() -> composite.extendWith(b));
assertThat(exc).hasMessageContaining("unread gap");
b.readerOffset(0);
Buffer.extendComposite(composite, b);
composite.extendWith(b);
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
assertThat(composite.readerOffset()).isEqualTo(4);
@ -413,7 +418,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(4, BIG_ENDIAN);
Buffer b = allocator.allocate(4, LITTLE_ENDIAN)) {
assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, a, b));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, b));
}
}
@ -422,7 +427,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(4).readOnly(true);
Buffer b = allocator.allocate(4).readOnly(true);
Buffer composite = Buffer.compose(allocator, a, b)) {
Buffer composite = CompositeBuffer.compose(allocator, a, b)) {
assertTrue(composite.readOnly());
verifyWriteInaccessible(composite);
}
@ -433,22 +438,22 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8).readOnly(true);
Buffer b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, a, b));
assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, b, a));
assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, a, b, a));
assertThrows(IllegalArgumentException.class, () -> Buffer.compose(allocator, b, a, b));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, b));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, b, a));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, b, a));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, b, a, b));
}
}
@Test
public void compositeWritableBufferCannotBeExtendedWithReadOnlyBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite; Buffer b = allocator.allocate(8).readOnly(true)) {
assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
assertThrows(IllegalArgumentException.class, () -> composite.extendWith(b));
}
}
}
@ -456,12 +461,12 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void compositeReadOnlyBufferCannotBeExtendedWithWritableBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8).readOnly(true)) {
composite = Buffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a);
}
try (composite; Buffer b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> Buffer.extendComposite(composite, b));
assertThrows(IllegalArgumentException.class, () -> composite.extendWith(b));
}
}
}

View File

@ -33,7 +33,7 @@ public class BufferEnsureWritableTest extends BufferTestSupport {
assertThrows(IllegalStateException.class, () -> slice.ensureWritable(1));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
}
try (Buffer compose = Buffer.compose(allocator, buf)) {
try (Buffer compose = CompositeBuffer.compose(allocator, buf)) {
assertThrows(IllegalStateException.class, () -> compose.ensureWritable(1));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
}
@ -92,7 +92,7 @@ public class BufferEnsureWritableTest extends BufferTestSupport {
@Test
public void ensureWritableMustExpandCapacityOfEmptyCompositeBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer buf = Buffer.compose(allocator)) {
Buffer buf = CompositeBuffer.compose(allocator)) {
assertThat(buf.writableBytes()).isEqualTo(0);
buf.ensureWritable(8);
assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8);

View File

@ -82,7 +82,7 @@ public class BufferReadOnlyTest extends BufferTestSupport {
@Test
public void readOnlyBufferMustRemainReadOnlyAfterSendForEmptyCompositeBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer buf = Buffer.compose(allocator)) {
Buffer buf = CompositeBuffer.compose(allocator)) {
buf.readOnly(true);
var send = buf.send();
try (Buffer receive = send.receive()) {

View File

@ -310,7 +310,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
assertEquals(0, slice.capacity()); // We haven't written anything, so the slice is empty.
int sliceBorrows = slice.countBorrows();
assertEquals(borrows + 2, buf.countBorrows());
try (Buffer ignored1 = Buffer.compose(allocator, buf, slice)) {
try (Buffer ignored1 = CompositeBuffer.compose(allocator, buf, slice)) {
assertEquals(borrows + 3, buf.countBorrows());
// Note: Slice is empty; not acquired by the composite buffer.
assertEquals(sliceBorrows, slice.countBorrows());
@ -337,7 +337,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
assertEquals(1, slice.capacity());
int sliceBorrows = slice.countBorrows();
assertEquals(borrows + 2, buf.countBorrows());
try (Buffer ignored1 = Buffer.compose(allocator, buf, slice)) {
try (Buffer ignored1 = CompositeBuffer.compose(allocator, buf, slice)) {
assertEquals(borrows + 3, buf.countBorrows());
assertEquals(sliceBorrows + 1, slice.countBorrows());
}
@ -656,7 +656,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@Test
public void bifurcateOnEmptyBigEndianCompositeBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer buf = Buffer.compose(allocator).order(BIG_ENDIAN)) {
Buffer buf = CompositeBuffer.compose(allocator).order(BIG_ENDIAN)) {
verifyBifurcateEmptyCompositeBuffer(buf);
}
}
@ -664,7 +664,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
@Test
public void bifurcateOnEmptyLittleEndianCompositeBuffer() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer buf = Buffer.compose(allocator).order(LITTLE_ENDIAN)) {
Buffer buf = CompositeBuffer.compose(allocator).order(LITTLE_ENDIAN)) {
verifyBifurcateEmptyCompositeBuffer(buf);
}
}

View File

@ -166,7 +166,7 @@ public abstract class BufferTestSupport {
int half = size / 2;
try (Buffer firstHalf = a.allocate(half);
Buffer secondHalf = b.allocate(size - half)) {
return Buffer.compose(a, firstHalf, secondHalf);
return CompositeBuffer.compose(a, firstHalf, secondHalf);
}
}
@ -190,7 +190,7 @@ public abstract class BufferTestSupport {
try (Buffer a = alloc.allocate(part);
Buffer b = alloc.allocate(part);
Buffer c = alloc.allocate(size - part * 2)) {
return Buffer.compose(alloc, a, b, c);
return CompositeBuffer.compose(alloc, a, b, c);
}
}
@ -229,7 +229,7 @@ public abstract class BufferTestSupport {
if (size < 2) {
return allocator.allocate(size);
}
var buf = Buffer.compose(allocator);
var buf = CompositeBuffer.compose(allocator);
buf.ensureWritable(size);
return buf;
}
@ -364,8 +364,8 @@ public abstract class BufferTestSupport {
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, target, 0, 1));
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, new byte[1], 0, 1));
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, ByteBuffer.allocate(1), 0, 1));
if (Buffer.isComposite(buf)) {
assertThrows(IllegalStateException.class, () -> Buffer.extendComposite(buf, target));
if (CompositeBuffer.isComposite(buf)) {
assertThrows(IllegalStateException.class, () -> ((CompositeBuffer) buf).extendWith(target));
}
}

View File

@ -17,6 +17,7 @@ package io.netty.buffer.api.benchmarks;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.CompositeBuffer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -62,14 +63,14 @@ public class ByteIterationBenchmark {
allocator = BufferAllocator.heap();
try (var a = allocator.allocate(SIZE / 2);
var b = allocator.allocate(SIZE / 2)) {
buf = Buffer.compose(allocator, a, b);
buf = CompositeBuffer.compose(allocator, a, b);
}
break;
case "composite-direct":
allocator = BufferAllocator.direct();
try (var a = allocator.allocate(SIZE / 2);
var b = allocator.allocate(SIZE / 2)) {
buf = Buffer.compose(allocator, a, b);
buf = CompositeBuffer.compose(allocator, a, b);
}
break;
default:

View File

@ -17,6 +17,7 @@ package io.netty.buffer.api.examples;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.Scope;
import java.util.concurrent.ThreadLocalRandom;
@ -44,7 +45,7 @@ public final class ComposingAndSlicingExample {
private static Buffer createBigBuffer(BufferAllocator allocator) {
try (Scope scope = new Scope()) {
return Buffer.compose(allocator,
return CompositeBuffer.compose(allocator,
scope.add(allocator.allocate(64)),
scope.add(allocator.allocate(64)),
scope.add(allocator.allocate(64)),

View File

@ -17,6 +17,7 @@ package io.netty.buffer.api.examples;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.Send;
import java.nio.channels.FileChannel;
@ -37,7 +38,7 @@ public final class FileCopyExample {
try (BufferAllocator allocator = BufferAllocator.pooledDirect();
var input = FileChannel.open(Path.of("/dev/urandom"), READ);
var output = FileChannel.open(Path.of("random.bin"), CREATE, TRUNCATE_EXISTING, WRITE)) {
Send<Buffer> done = Buffer.compose(allocator).send();
Send<Buffer> done = CompositeBuffer.compose(allocator).send();
var reader = executor.submit(() -> {
for (int i = 0; i < 1024; i++) {

View File

@ -17,6 +17,7 @@ package io.netty.buffer.api.examples.bytetomessagedecoder;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.Send;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@ -82,11 +83,11 @@ public abstract class AlternativeMessageDecoder extends ChannelHandlerAdapter {
}
private void processRead(ChannelHandlerContext ctx, Buffer input) {
if (collector.isOwned() && Buffer.isComposite(collector) && input.isOwned()
if (collector.isOwned() && CompositeBuffer.isComposite(collector) && input.isOwned()
&& (collector.writableBytes() == 0 || input.writerOffset() == 0)
&& (collector.readableBytes() == 0 || input.readerOffset() == 0)
&& collector.order() == input.order()) {
Buffer.extendComposite(collector, input);
((CompositeBuffer) collector).extendWith(input);
drainCollector(ctx);
return;
}

View File

@ -18,6 +18,7 @@ package io.netty.buffer.api.examples.bytetomessagedecoder;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
@ -94,7 +95,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
* Cumulate {@link Buffer}s by merge them into one {@link Buffer}'s, using memory copies.
*/
public static final Cumulator MERGE_CUMULATOR = (alloc, cumulation, in) -> {
if (cumulation.readableBytes() == 0 && !Buffer.isComposite(cumulation)) {
if (cumulation.readableBytes() == 0 && !CompositeBuffer.isComposite(cumulation)) {
// If cumulation is empty and input buffer is contiguous, use it directly
cumulation.close();
return in;
@ -128,7 +129,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
}
Buffer composite;
try (in) {
if (Buffer.isComposite(cumulation) && cumulation.isOwned()) {
if (CompositeBuffer.isComposite(cumulation) && cumulation.isOwned()) {
composite = cumulation;
if (composite.writerOffset() != composite.capacity()) {
// Writer index must equal capacity if we are going to "write"
@ -137,9 +138,9 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
cumulation.close();
}
} else {
composite = Buffer.compose(alloc, cumulation);
composite = CompositeBuffer.compose(alloc, cumulation);
}
Buffer.extendComposite(composite, in);
((CompositeBuffer) composite).extendWith(in);
return composite;
}
};
@ -224,7 +225,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
}
private static Buffer newEmptyBuffer() {
return Buffer.compose(BufferAllocator.heap());
return CompositeBuffer.compose(BufferAllocator.heap());
}
@Override

View File

@ -17,6 +17,7 @@ package io.netty.buffer.api.examples.bytetomessagedecoder;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.buffer.api.BufferAllocator.heap;
import static io.netty.buffer.api.BufferTestSupport.assertEquals;
import static io.netty.buffer.api.CompositeBuffer.compose;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static org.assertj.core.api.Assertions.assertThat;
@ -421,7 +423,7 @@ public class ByteToMessageDecoderTest {
@Test
public void releaseWhenCompositeCumulateThrows() {
Buffer in = heap().allocate(12, LITTLE_ENDIAN).writerOffset(12);
try (Buffer cumulation = Buffer.compose(heap(), heap().allocate(1, BIG_ENDIAN).writeByte((byte) 0).send())) {
try (Buffer cumulation = compose(heap(), heap().allocate(1, BIG_ENDIAN).writeByte((byte) 0).send())) {
ByteToMessageDecoder.COMPOSITE_CUMULATOR.cumulate(heap(), cumulation, in);
fail();
} catch (IllegalArgumentException expected) {