diff --git a/src/main/java/io/netty/buffer/api/Buffer.java b/src/main/java/io/netty/buffer/api/Buffer.java index 4a67118..bdf8346 100644 --- a/src/main/java/io/netty/buffer/api/Buffer.java +++ b/src/main/java/io/netty/buffer/api/Buffer.java @@ -64,7 +64,7 @@ import java.nio.ByteOrder; * To send a buffer to another thread, the buffer must not have any outstanding borrows. * That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()}; * all {@linkplain #slice() slices} must have been closed. - * And if this buffer is a constituent of a {@linkplain Buffer#compose(BufferAllocator, Deref...) composite buffer}, + * And if this buffer is a constituent of a {@linkplain Buffer#compose(BufferAllocator, Buffer...) composite buffer}, * then that composite buffer must be closed. * And if this buffer is itself a composite buffer, then it must own all of its constituent buffers. * The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not. @@ -180,19 +180,87 @@ public interface Buffer extends Rc, BufferAccessors { * @throws IllegalArgumentException if the given buffers have an inconsistent * {@linkplain Buffer#order() byte order}. */ - @SafeVarargs - static Buffer compose(BufferAllocator allocator, Deref... bufs) { + static Buffer compose(BufferAllocator allocator, Buffer... bufs) { return new CompositeBuffer(allocator, bufs); } + /** + * Compose the given sequence of sends of buffers and present them as a single buffer. + *

+ * Note: The composite buffer holds a reference to all the constituent buffers, + * until the composite buffer is deallocated. + * This means the constituent buffers must still have their outside-reference count decremented as normal. + * If the buffers are allocated for the purpose of participating in the composite buffer, + * then they should be closed as soon as the composite buffer has been created, like in this example: + *

{@code
+     *     try (Buffer a = allocator.allocate(size);
+     *          Buffer b = allocator.allocate(size)) {
+     *         return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
+     *     } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
+     * }
+ *

+ * {@linkplain Buffer#send() Sending} a composite buffer implies sending all of its constituent buffers. + * For sending to be possible, both the composite buffer itself, and all of its constituent buffers, must be in an + * {@linkplain Rc#isOwned() owned state}. + * This means that the composite buffer must be the only reference to the constituent buffers. + *

+ * All of the constituent buffers must have the same {@linkplain Buffer#order() byte order}. + * An exception will be thrown if you attempt to compose buffers that have different byte orders, + * and changing the byte order of the constituent buffers so they become inconsistent after construction, + * will result in unspecified behaviour. + *

+ * The read and write offsets of the constituent buffers must be arranged such that there are no "gaps" when viewed + * as a single connected chunk of memory. + * Specifically, there can be at most one buffer whose write offset is neither zero nor at capacity, + * and all buffers prior to it must have their write offsets at capacity, and all buffers after it must have a write + * offset of zero. + * Likewise, there can be at most one buffer whose read offset is neither zero nor at capacity, + * and all buffers prior to it must have their read offsets at capacity, and all buffers after it must have a read + * offset of zero. + * Furthermore, the sum of the read offsets must be less than or equal to the sum of the write offsets. + *

+ * Reads and writes to the composite buffer that modifies the read or write offsets, will also modify the relevant + * offsets in the constituent buffers. + *

+ * It is not a requirement that the buffers have the same size. + *

+ * It is not a requirement that the buffers are allocated by this allocator, but if + * {@link Buffer#ensureWritable(int)} is called on the composed buffer, and the composed buffer needs to be + * expanded, then this allocator instance will be used for allocation the extra memory. + * + * @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service + * {@link #ensureWritable(int)} calls. + * @param sends The sent buffers to compose into a single buffer view. + * @return A buffer composed of, and backed by, the given buffers. + * @throws IllegalArgumentException if the given buffers have an inconsistent + * {@linkplain Buffer#order() byte order}. + */ + @SafeVarargs + static Buffer compose(BufferAllocator allocator, Send... sends) { + return new CompositeBuffer(allocator, sends); + } + + /** + * Create an empty composite buffer, that has no components. The buffer can be extended with components using either + * {@link #ensureWritable(int)} or {@link #extendComposite(Buffer, Buffer)}. + * + * @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service + * {@link #ensureWritable(int)} calls. + * @return A composite buffer that has no components, and has a capacity of zero. + */ + static Buffer compose(BufferAllocator allocator) { + return new CompositeBuffer(allocator, new Buffer[0]); + } + /** * Extend the given composite buffer with the given extension buffer. * This works as if the extension had originally been included at the end of the list of constituent buffers when * the composite buffer was created. * The composite buffer is modified in-place. * - * @see #compose(BufferAllocator, Deref...) - * @param composite The composite buffer (from a prior {@link #compose(BufferAllocator, Deref...)} call) to extend + * @see #compose(BufferAllocator, Buffer...) + * @see #compose(BufferAllocator, Send...) + * @param composite The composite buffer (from a prior {@link #compose(BufferAllocator, Buffer...)} call) to extend * with the given extension buffer. * @param extension The buffer to extend the composite buffer with. */ @@ -207,9 +275,9 @@ public interface Buffer extends Rc, BufferAccessors { } /** - * Check if the given buffer is a {@linkplain #compose(BufferAllocator, Deref...) composite} buffer or not. + * Check if the given buffer is a {@linkplain #compose(BufferAllocator, Buffer...) composite} buffer or not. * @param composite The buffer to check. - * @return {@code true} if the given buffer was created with {@link #compose(BufferAllocator, Deref...)}, + * @return {@code true} if the given buffer was created with {@link #compose(BufferAllocator, Buffer...)}, * {@code false} otherwise. */ static boolean isComposite(Buffer composite) { diff --git a/src/main/java/io/netty/buffer/api/CompositeBuffer.java b/src/main/java/io/netty/buffer/api/CompositeBuffer.java index 7c72a1c..f2e9164 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuffer.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuffer.java @@ -58,11 +58,17 @@ final class CompositeBuffer extends RcSupport implement private boolean closed; private boolean readOnly; - CompositeBuffer(BufferAllocator allocator, Deref[] refs) { - this(allocator, filterExternalBufs(refs), COMPOSITE_DROP, false); + CompositeBuffer(BufferAllocator allocator, Buffer[] refs) { + this(allocator, filterExternalBufs(Arrays.stream(refs) + .map(buf -> buf.acquire() /* Increments reference counts. */)), COMPOSITE_DROP, false); } - private static Buffer[] filterExternalBufs(Deref[] refs) { + CompositeBuffer(BufferAllocator allocator, Send[] refs) { + this(allocator, filterExternalBufs(Arrays.stream(refs) + .map(buf -> buf.receive())), COMPOSITE_DROP, false); + } + + private static Buffer[] filterExternalBufs(Stream refs) { // We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway, // and also, by ensuring that all constituent buffers contribute to the size of the composite buffer, // we make sure that the number of composite buffers will never become greater than the number of bytes in @@ -70,11 +76,10 @@ final class CompositeBuffer extends RcSupport implement // This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable, // will never overflow their component counts. // Allocating a new array unconditionally also prevents external modification of the array. - Buffer[] bufs = Arrays.stream(refs) - .map(r -> r.get()) // Increments reference counts. - .filter(CompositeBuffer::discardEmpty) - .flatMap(CompositeBuffer::flattenBuffer) - .toArray(Buffer[]::new); + Buffer[] bufs = refs + .filter(CompositeBuffer::discardEmpty) + .flatMap(CompositeBuffer::flattenBuffer) + .toArray(Buffer[]::new); // Make sure there are no duplicates among the buffers. Set duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>()); duplicatesCheck.addAll(Arrays.asList(bufs)); diff --git a/src/main/java/io/netty/buffer/api/Deref.java b/src/main/java/io/netty/buffer/api/Deref.java deleted file mode 100644 index 4a1e7b7..0000000 --- a/src/main/java/io/netty/buffer/api/Deref.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright 2020 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.buffer.api; - -import java.util.function.Supplier; - -/** - * A Deref provides the capability to acquire a reference to a {@linkplain Rc reference counted} object. - *

- * Note: Callers must ensure that they close any references they obtain. - *

- * Deref itself does not specify if a reference can be obtained more than once. - * For instance, any {@link Send} object is also a {@code Deref}, but the reference can only be acquired once. - * Meanwhile, {@link Rc} objects are themselves their own {@code Derefs}, and permit references to be acquired multiple - * times. - * - * @param The concrete type of reference counted object that can be obtained. - */ -public interface Deref> extends Supplier { - /** - * Acquire a reference to the reference counted object. - *

- * Note: This call increments the reference count of the acquired object, and must be paired with - * a {@link Rc#close()} call. - * Using a try-with-resources clause is the easiest way to ensure this. - * - * @return A reference to the reference counted object. - */ - @Override - T get(); - - /** - * Determine if the object in this {@code Deref} is an instance of the given class. - * - * @param cls The type to check. - * @return {@code true} if the object in this {@code Deref} can be assigned fields or variables of the given type. - */ - boolean referentIsInstanceOf(Class cls); -} diff --git a/src/main/java/io/netty/buffer/api/Rc.java b/src/main/java/io/netty/buffer/api/Rc.java index c445cf9..6ec164e 100644 --- a/src/main/java/io/netty/buffer/api/Rc.java +++ b/src/main/java/io/netty/buffer/api/Rc.java @@ -26,7 +26,7 @@ package io.netty.buffer.api; * * @param The concrete subtype. */ -public interface Rc> extends AutoCloseable, Deref { +public interface Rc> extends AutoCloseable { /** * Increment the reference count. *

@@ -36,16 +36,6 @@ public interface Rc> extends AutoCloseable, Deref { */ I acquire(); - @Override - default I get() { - return acquire(); - } - - @Override - default boolean referentIsInstanceOf(Class cls) { - return cls.isInstance(this); - } - /** * Decrement the reference count, and despose of the resource if the last reference is closed. *

diff --git a/src/main/java/io/netty/buffer/api/Send.java b/src/main/java/io/netty/buffer/api/Send.java index 4e4d62d..a269d22 100644 --- a/src/main/java/io/netty/buffer/api/Send.java +++ b/src/main/java/io/netty/buffer/api/Send.java @@ -32,7 +32,7 @@ import java.util.function.Supplier; * * @param */ -public interface Send> extends Deref { +public interface Send> { /** * Construct a {@link Send} based on the given {@link Supplier}. * The supplier will be called only once, in the receiving thread. @@ -117,8 +117,12 @@ public interface Send> extends Deref { } } - @Override - default T get() { - return receive(); - } + /** + * Determine if the object received from this {@code Send} is an instance of the given class. + * + * @param cls The type to check. + * @return {@code true} if the object received from this {@code Send} can be assigned fields or variables of the + * given type, otherwise false. + */ + boolean referentIsInstanceOf(Class cls); } diff --git a/src/main/java/io/netty/buffer/api/TransferSend.java b/src/main/java/io/netty/buffer/api/TransferSend.java index a7cc7e7..d73e104 100644 --- a/src/main/java/io/netty/buffer/api/TransferSend.java +++ b/src/main/java/io/netty/buffer/api/TransferSend.java @@ -43,11 +43,6 @@ class TransferSend, T extends Rc> implements Send { return (I) copy; } - Owned unsafeUnwrapOwned() { - gateReception(); - return outgoing; - } - private void gateReception() { if ((boolean) RECEIVED.getAndSet(this, true)) { throw new IllegalStateException("This object has already been received.");