diff --git a/src/main/java/io/netty/buffer/api/Allocator.java b/src/main/java/io/netty/buffer/api/Allocator.java index c24e096..726027e 100644 --- a/src/main/java/io/netty/buffer/api/Allocator.java +++ b/src/main/java/io/netty/buffer/api/Allocator.java @@ -107,7 +107,7 @@ public interface Allocator extends AutoCloseable { * @return A buffer composed of, and backed by, the given buffers. * @throws IllegalArgumentException if the given buffers have an inconsistent {@linkplain Buf#order() byte order}. */ - default Buf compose(Buf... bufs) { + default Buf compose(Deref... bufs) { return new CompositeBuf(this, bufs); } @@ -117,8 +117,8 @@ public interface Allocator extends AutoCloseable { * the composite buffer was created. * The composite buffer is modified in-place. * - * @see #compose(Buf...) - * @param composite The composite buffer (from a prior {@link #compose(Buf...)} call) to extend with the given + * @see #compose(Deref...) + * @param composite The composite buffer (from a prior {@link #compose(Deref...)} call) to extend with the given * extension buffer. * @param extension The buffer to extend the composite buffer with. */ @@ -133,9 +133,9 @@ public interface Allocator extends AutoCloseable { } /** - * Check if the given buffer is a {@linkplain #compose(Buf...) composite} buffer or not. + * Check if the given buffer is a {@linkplain #compose(Deref...) composite} buffer or not. * @param composite The buffer to check. - * @return {@code true} if the given buffer was created with {@link #compose(Buf...)}, {@code false} otherwise. + * @return {@code true} if the given buffer was created with {@link #compose(Deref...)}, {@code false} otherwise. */ static boolean isComposite(Buf composite) { return composite.getClass() == CompositeBuf.class; diff --git a/src/main/java/io/netty/buffer/api/Buf.java b/src/main/java/io/netty/buffer/api/Buf.java index e775785..27524d8 100644 --- a/src/main/java/io/netty/buffer/api/Buf.java +++ b/src/main/java/io/netty/buffer/api/Buf.java @@ -70,7 +70,7 @@ import java.nio.ByteOrder; * To send a buffer to another thread, the buffer must not have any outstanding borrows. * That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()}; * all {@linkplain #slice() slices} must have been closed. - * And if this buffer is a constituent of a {@linkplain Allocator#compose(Buf...) composite buffer}, + * And if this buffer is a constituent of a {@linkplain Allocator#compose(Deref...) composite buffer}, * then that composite buffer must be closed. * And if this buffer is itself a composite buffer, then it must own all of its constituent buffers. * The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not. @@ -439,6 +439,8 @@ public interface Buf extends Rc, BufAccessors { /** * Split the buffer into two, at the {@linkplain #writerOffset() write offset} position. *

+ * The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown. + *

* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new * buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently * {@linkplain #send() sent} to other threads. diff --git a/src/main/java/io/netty/buffer/api/BufHolder.java b/src/main/java/io/netty/buffer/api/BufHolder.java index aa88892..cd039a9 100644 --- a/src/main/java/io/netty/buffer/api/BufHolder.java +++ b/src/main/java/io/netty/buffer/api/BufHolder.java @@ -83,10 +83,10 @@ public abstract class BufHolder> implements Rc { return buf.countBorrows(); } + @SuppressWarnings("unchecked") @Override public Send send() { - var send = buf.send(); - return () -> receive(send.receive()); + return buf.send().map((Class) getClass(), this::receive); } /** diff --git a/src/main/java/io/netty/buffer/api/ComponentProcessor.java b/src/main/java/io/netty/buffer/api/ComponentProcessor.java index f8208e7..53dd1cf 100644 --- a/src/main/java/io/netty/buffer/api/ComponentProcessor.java +++ b/src/main/java/io/netty/buffer/api/ComponentProcessor.java @@ -121,6 +121,7 @@ public interface ComponentProcessor { * @return A new {@link ByteBuffer}, with its own position and limit, for this memory component. */ ByteBuffer readableBuffer(); + // todo for Unsafe-based impl, DBB.attachment needs to keep underlying memory alive } /** diff --git a/src/main/java/io/netty/buffer/api/CompositeBuf.java b/src/main/java/io/netty/buffer/api/CompositeBuf.java index cff03fc..0188d5f 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuf.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuf.java @@ -54,11 +54,11 @@ final class CompositeBuf extends RcSupport implements Buf { private boolean closed; private boolean readOnly; - CompositeBuf(Allocator allocator, Buf[] bufs) { - this(allocator, true, filterExternalBufs(bufs), COMPOSITE_DROP); + CompositeBuf(Allocator allocator, Deref[] refs) { + this(allocator, true, filterExternalBufs(refs), COMPOSITE_DROP, false); } - private static Buf[] filterExternalBufs(Buf[] bufs) { + private static Buf[] filterExternalBufs(Deref[] refs) { // We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway, // and also, by ensuring that all constituent buffers contribute to the size of the composite buffer, // we make sure that the number of composite buffers will never become greater than the number of bytes in @@ -66,56 +66,90 @@ final class CompositeBuf extends RcSupport implements Buf { // This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable, // will never overflow their component counts. // Allocating a new array unconditionally also prevents external modification of the array. - bufs = Arrays.stream(bufs) - .filter(b -> b.capacity() > 0) + Buf[] bufs = Arrays.stream(refs) + .map(r -> r.get()) // Increments reference counts. + .filter(CompositeBuf::discardEmpty) .flatMap(CompositeBuf::flattenBuffer) .toArray(Buf[]::new); // Make sure there are no duplicates among the buffers. Set duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>()); duplicatesCheck.addAll(Arrays.asList(bufs)); if (duplicatesCheck.size() < bufs.length) { + for (Buf buf : bufs) { + buf.close(); // Undo the increment we did with Deref.get(). + } throw new IllegalArgumentException( "Cannot create composite buffer with duplicate constituent buffer components."); } return bufs; } + private static boolean discardEmpty(Buf buf) { + if (buf.capacity() > 0) { + return true; + } else { + // If we filter a buffer out, then we must make sure to close it since we incremented the reference count + // with Deref.get() earlier. + buf.close(); + return false; + } + } + private static Stream flattenBuffer(Buf buf) { if (buf instanceof CompositeBuf) { - return Stream.of(((CompositeBuf) buf).bufs); + // Extract components and move our reference count from the composite onto the components. + var composite = (CompositeBuf) buf; + var bufs = composite.bufs; + for (Buf b : bufs) { + b.acquire(); + } + buf.close(); // Important: acquire on components *before* closing composite. + return Stream.of(bufs); } return Stream.of(buf); } - private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop drop) { + private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop drop, + boolean acquireBufs) { super(drop); this.allocator = allocator; this.isSendable = isSendable; - for (Buf buf : bufs) { - buf.acquire(); - } - if (bufs.length > 0) { - ByteOrder targetOrder = bufs[0].order(); + if (acquireBufs) { for (Buf buf : bufs) { - if (buf.order() != targetOrder) { - throw new IllegalArgumentException("Constituent buffers have inconsistent byte order."); - } + buf.acquire(); } - order = bufs[0].order(); + } + try { + if (bufs.length > 0) { + ByteOrder targetOrder = bufs[0].order(); + for (Buf buf : bufs) { + if (buf.order() != targetOrder) { + throw new IllegalArgumentException("Constituent buffers have inconsistent byte order."); + } + } + order = bufs[0].order(); - boolean targetReadOnly = bufs[0].readOnly(); - for (Buf buf : bufs) { - if (buf.readOnly() != targetReadOnly) { - throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state."); + boolean targetReadOnly = bufs[0].readOnly(); + for (Buf buf : bufs) { + if (buf.readOnly() != targetReadOnly) { + throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state."); + } } + readOnly = targetReadOnly; + } else { + order = ByteOrder.nativeOrder(); } - readOnly = targetReadOnly; - } else { - order = ByteOrder.nativeOrder(); + this.bufs = bufs; + computeBufferOffsets(); + tornBufAccessors = new TornBufAccessors(this); + } catch (Exception e) { + // Always close bufs on exception, regardless of acquireBufs value. + // If acquireBufs is false, it just means the ref count increments happened prior to this constructor call. + for (Buf buf : bufs) { + buf.close(); + } + throw e; } - this.bufs = bufs; - computeBufferOffsets(); - tornBufAccessors = new TornBufAccessors(this); } private void computeBufferOffsets() { @@ -292,7 +326,7 @@ final class CompositeBuf extends RcSupport implements Buf { slices = new Buf[] { choice.slice(subOffset, 0) }; } - return new CompositeBuf(allocator, false, slices, drop); + return new CompositeBuf(allocator, false, slices, drop, true); } catch (Throwable throwable) { // We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer: close(); @@ -718,7 +752,7 @@ final class CompositeBuf extends RcSupport implements Buf { } if (bufs.length == 0) { // Bifurcating a zero-length buffer is trivial. - return new CompositeBuf(allocator, true, bufs, unsafeGetDrop()).order(order); + return new CompositeBuf(allocator, true, bufs, unsafeGetDrop(), true).order(order); } int i = searchOffsets(woff); @@ -730,7 +764,7 @@ final class CompositeBuf extends RcSupport implements Buf { } computeBufferOffsets(); try { - var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop()); + var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop(), true); compositeBuf.order = order; // Preserve byte order even if bifs array is empty. return compositeBuf; } finally { @@ -1133,7 +1167,7 @@ final class CompositeBuf extends RcSupport implements Buf { for (int i = 0; i < sends.length; i++) { received[i] = sends[i].receive(); } - var composite = new CompositeBuf(allocator, true, received, drop); + var composite = new CompositeBuf(allocator, true, received, drop, true); composite.readOnly = readOnly; drop.attach(composite); return composite; diff --git a/src/main/java/io/netty/buffer/api/Deref.java b/src/main/java/io/netty/buffer/api/Deref.java new file mode 100644 index 0000000..7dd206e --- /dev/null +++ b/src/main/java/io/netty/buffer/api/Deref.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api; + +import java.util.function.Supplier; + +/** + * A Deref provides the capability to acquire a reference to a {@linkplain Rc reference counted} object. + *

+ * Note: Callers must ensure that they close any references they obtain. + *

+ * Deref itself does not specify if a reference can be obtained more than once. + * For instance, any {@link Send} object is also a {@code Deref}, but the reference can only be acquired once. + * Meanwhile, {@link Rc} objects are themselves their own {@code Derefs}, and permit references to be acquired multiple + * times. + * + * @param The concrete type of reference counted object that can be obtained. + */ +public interface Deref> extends Supplier { + /** + * Acquire a reference to the reference counted object. + *

+ * Note: This call increments the reference count of the acquired object, and must be paired with + * a {@link Rc#close()} call. + * Using a try-with-resources clause is the easiest way to ensure this. + * + * @return A reference to the reference counted object. + */ + @Override + T get(); + + /** + * Determine if the object in this {@code Deref} is an instance of the given class. + * + * @param cls The type to check. + * @return {@code true} if the object in this {@code Deref} can be assigned fields or variables of the given type. + */ + boolean isInstanceOf(Class cls); +} diff --git a/src/main/java/io/netty/buffer/api/Rc.java b/src/main/java/io/netty/buffer/api/Rc.java index def5f7d..2cc50f7 100644 --- a/src/main/java/io/netty/buffer/api/Rc.java +++ b/src/main/java/io/netty/buffer/api/Rc.java @@ -26,7 +26,7 @@ package io.netty.buffer.api; * * @param The concrete subtype. */ -public interface Rc> extends AutoCloseable { +public interface Rc> extends AutoCloseable, Deref { /** * Increment the reference count. *

@@ -36,6 +36,16 @@ public interface Rc> extends AutoCloseable { */ I acquire(); + @Override + default I get() { + return acquire(); + } + + @Override + default boolean isInstanceOf(Class cls) { + return cls.isInstance(this); + } + /** * Decrement the reference count, and despose of the resource if the last reference is closed. *

diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index eaf8ca5..1541810 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -79,7 +79,7 @@ public abstract class RcSupport, T extends RcSupport> impl } var owned = prepareSend(); acquires = -2; // Close without dropping. This also ignore future double-free attempts. - return new TransferSend(owned, drop); + return new TransferSend(owned, drop, getClass()); } /** diff --git a/src/main/java/io/netty/buffer/api/Send.java b/src/main/java/io/netty/buffer/api/Send.java index 96d4a7e..b272740 100644 --- a/src/main/java/io/netty/buffer/api/Send.java +++ b/src/main/java/io/netty/buffer/api/Send.java @@ -15,6 +15,10 @@ */ package io.netty.buffer.api; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; + /** * A Send object is a temporary holder of an {@link Rc}, used for transferring the ownership of the Rc from one thread * to another. @@ -28,8 +32,43 @@ package io.netty.buffer.api; * * @param */ -@FunctionalInterface -public interface Send> { +public interface Send> extends Deref { + /** + * Construct a {@link Send} based on the given {@link Supplier}. + * The supplier will be called only once, in the receiving thread. + * + * @param concreteObjectType The concrete type of the object being sent. Specifically, the object returned from the + * {@link Supplier#get()} method must be an instance of this class. + * @param supplier The supplier of the object being sent, which will be called when the object is ready to be + * received. + * @param The type of object being sent. + * @return A {@link Send} which will deliver an object of the given type, from the supplier. + */ + static > Send sending(Class concreteObjectType, Supplier supplier) { + return new Send() { + private final AtomicBoolean gate = new AtomicBoolean(); + @Override + public T receive() { + if (gate.getAndSet(true)) { + throw new IllegalStateException("This object has already been received."); + } + return supplier.get(); + } + + @Override + public boolean isInstanceOf(Class cls) { + return cls.isAssignableFrom(concreteObjectType); + } + + @Override + public void discard() { + if (!gate.getAndSet(true)) { + supplier.get().close(); + } + } + }; + } + /** * Receive the {@link Rc} instance being sent, and bind its ownership to the calling thread. The invalidation of the * sent Rc in the sending thread happens-before the return of this method. @@ -40,4 +79,33 @@ public interface Send> { * @throws IllegalStateException If this method is called more than once. */ T receive(); + + /** + * Apply a mapping function to the object being sent. The mapping will occur when the object is received. + * + * @param type The result type of the mapping function. + * @param mapper The mapping function to apply to the object being sent. + * @param The result type of the mapping function. + * @return A new {@link Send} instance that will deliver an object that is the result of the mapping. + */ + default > Send map(Class type, Function mapper) { + return sending(type, () -> mapper.apply(receive())); + } + + /** + * Discard this {@link Send} and the object it contains. + * This has no effect if the send has already been received. + */ + default void discard() { + try { + receive().close(); + } catch (IllegalStateException ignore) { + // Don't do anything if the send has already been consumed. + } + } + + @Override + default T get() { + return receive(); + } } diff --git a/src/main/java/io/netty/buffer/api/TransferSend.java b/src/main/java/io/netty/buffer/api/TransferSend.java index 24098f4..0d89826 100644 --- a/src/main/java/io/netty/buffer/api/TransferSend.java +++ b/src/main/java/io/netty/buffer/api/TransferSend.java @@ -24,12 +24,14 @@ class TransferSend, T extends Rc> implements Send { private static final VarHandle RECEIVED = findVarHandle(lookup(), TransferSend.class, "received", boolean.class); private final Owned outgoing; private final Drop drop; + private final Class concreteType; @SuppressWarnings("unused") private volatile boolean received; // Accessed via VarHandle - TransferSend(Owned outgoing, Drop drop) { + TransferSend(Owned outgoing, Drop drop, Class concreteType) { this.outgoing = outgoing; this.drop = drop; + this.concreteType = concreteType; } @SuppressWarnings("unchecked") @@ -47,8 +49,22 @@ class TransferSend, T extends Rc> implements Send { } private void gateReception() { - if (!RECEIVED.compareAndSet(this, false, true)) { + if ((boolean) RECEIVED.getAndSet(this, true)) { throw new IllegalStateException("This object has already been received."); } } + + @Override + public boolean isInstanceOf(Class cls) { + return cls.isAssignableFrom(concreteType); + } + + @Override + public void discard() { + if (!(boolean) RECEIVED.getAndSet(this, true)) { + var copy = outgoing.transferOwnership(drop); + drop.attach(copy); + copy.close(); + } + } } diff --git a/src/test/java/io/netty/buffer/api/BufRefTest.java b/src/test/java/io/netty/buffer/api/BufRefTest.java index 85e5fd4..0bc8d0a 100644 --- a/src/test/java/io/netty/buffer/api/BufRefTest.java +++ b/src/test/java/io/netty/buffer/api/BufRefTest.java @@ -100,7 +100,7 @@ class BufRefTest { try (Allocator allocator = Allocator.heap(); BufRef refA = new BufRef(allocator.allocate(8).send())) { refA.contents().writeInt(42); - var send = refA.send(); + Send send = refA.send(); assertThrows(IllegalStateException.class, () -> refA.contents().readInt()); try (BufRef refB = send.receive()) { assertThat(refB.contents().readInt()).isEqualTo(42); diff --git a/src/test/java/io/netty/buffer/api/BufTest.java b/src/test/java/io/netty/buffer/api/BufTest.java index db8fd28..a6dd3ea 100644 --- a/src/test/java/io/netty/buffer/api/BufTest.java +++ b/src/test/java/io/netty/buffer/api/BufTest.java @@ -887,10 +887,15 @@ public class BufTest { try (Buf ignored = buf.acquire()) { assertEquals(borrows + 1, buf.countBorrows()); try (Buf slice = buf.slice()) { + assertEquals(0, slice.capacity()); // We haven't written anything, so the slice is empty. + int sliceBorrows = slice.countBorrows(); assertEquals(borrows + 2, buf.countBorrows()); try (Buf ignored1 = allocator.compose(buf, slice)) { assertEquals(borrows + 3, buf.countBorrows()); + // Note: Slice is empty; not acquired by the composite buffer. + assertEquals(sliceBorrows, slice.countBorrows()); } + assertEquals(sliceBorrows, slice.countBorrows()); assertEquals(borrows + 2, buf.countBorrows()); } assertEquals(borrows + 1, buf.countBorrows()); @@ -1647,6 +1652,18 @@ public class BufTest { } } + @Test + public void compositeBufferFromSends() { + try (Allocator allocator = Allocator.heap(); + Buf composite = allocator.compose( + allocator.allocate(8).send(), + allocator.allocate(8).send(), + allocator.allocate(8).send())) { + assertEquals(24, composite.capacity()); + assertTrue(composite.isOwned()); + } + } + @Test public void compositeBufferMustNotBeAllowedToContainThemselves() { try (Allocator allocator = Allocator.heap()) { @@ -2649,6 +2666,7 @@ public class BufTest { } } } + // todo read only buffer must have zero writable bytes @ParameterizedTest @MethodSource("nonCompositeAllocators") diff --git a/src/test/java/io/netty/buffer/api/examples/AsyncExample.java b/src/test/java/io/netty/buffer/api/examples/AsyncExample.java index 16edfd3..9c5211a 100644 --- a/src/test/java/io/netty/buffer/api/examples/AsyncExample.java +++ b/src/test/java/io/netty/buffer/api/examples/AsyncExample.java @@ -18,6 +18,7 @@ package io.netty.buffer.api.examples; import io.netty.buffer.api.Allocator; import io.netty.buffer.api.Buf; +import static java.lang.System.out; import static java.util.concurrent.CompletableFuture.completedFuture; public final class AsyncExample { @@ -33,9 +34,9 @@ public final class AsyncExample { } }).thenAcceptAsync(send -> { try (Buf buf = send.receive()) { - System.out.println("First thread id was " + buf.readLong()); - System.out.println("Then sent to " + buf.readLong()); - System.out.println("And now in thread " + threadId()); + out.println("First thread id was " + buf.readLong()); + out.println("Then sent to " + buf.readLong()); + out.println("And now in thread " + threadId()); } }).get(); }