From 492977d9be0886e10c295c9a49b425ef705879b4 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 11 Feb 2021 14:08:22 +0100 Subject: [PATCH] Introduce Deref abstraction Motivation: Sometimes, we wish to operate on both buffers and anything that can produce a buffer. For instance, when making a composite buffer, we could compose either buffers or sends. Modification: Introduce a Deref interface, which is extended by both Rc and Send. A Deref can be used to acquire an Rc instance, and in doing so will also acquire a reference to the Rc. That is, dereferencing increases the reference count. For Rc itself, this just delegates to Rc.acquire, while for Send it delegates to Send.receive, and can only be called once. The Allocator.compose method has been changed to take Derefs. This allows us to compose either Bufs or Sends of bufs. Or a mix. Extra care and caution has been added to the code, to make sure the reference counts are managed correctly when composing buffers, now that it's a more complicated operation. A handful of convenience methods for working with Sends have also been added to the Send interface. Result: We can now build a composite buffer out of sends of buffers. --- .../java/io/netty/buffer/api/Allocator.java | 10 +- src/main/java/io/netty/buffer/api/Buf.java | 4 +- .../java/io/netty/buffer/api/BufHolder.java | 4 +- .../netty/buffer/api/ComponentProcessor.java | 1 + .../io/netty/buffer/api/CompositeBuf.java | 94 +++++++++++++------ src/main/java/io/netty/buffer/api/Deref.java | 52 ++++++++++ src/main/java/io/netty/buffer/api/Rc.java | 12 ++- .../java/io/netty/buffer/api/RcSupport.java | 2 +- src/main/java/io/netty/buffer/api/Send.java | 72 +++++++++++++- .../io/netty/buffer/api/TransferSend.java | 20 +++- .../java/io/netty/buffer/api/BufRefTest.java | 2 +- .../java/io/netty/buffer/api/BufTest.java | 18 ++++ .../buffer/api/examples/AsyncExample.java | 7 +- 13 files changed, 250 insertions(+), 48 deletions(-) create mode 100644 src/main/java/io/netty/buffer/api/Deref.java diff --git a/src/main/java/io/netty/buffer/api/Allocator.java b/src/main/java/io/netty/buffer/api/Allocator.java index c24e096..726027e 100644 --- a/src/main/java/io/netty/buffer/api/Allocator.java +++ b/src/main/java/io/netty/buffer/api/Allocator.java @@ -107,7 +107,7 @@ public interface Allocator extends AutoCloseable { * @return A buffer composed of, and backed by, the given buffers. * @throws IllegalArgumentException if the given buffers have an inconsistent {@linkplain Buf#order() byte order}. */ - default Buf compose(Buf... bufs) { + default Buf compose(Deref... bufs) { return new CompositeBuf(this, bufs); } @@ -117,8 +117,8 @@ public interface Allocator extends AutoCloseable { * the composite buffer was created. * The composite buffer is modified in-place. * - * @see #compose(Buf...) - * @param composite The composite buffer (from a prior {@link #compose(Buf...)} call) to extend with the given + * @see #compose(Deref...) + * @param composite The composite buffer (from a prior {@link #compose(Deref...)} call) to extend with the given * extension buffer. * @param extension The buffer to extend the composite buffer with. */ @@ -133,9 +133,9 @@ public interface Allocator extends AutoCloseable { } /** - * Check if the given buffer is a {@linkplain #compose(Buf...) composite} buffer or not. + * Check if the given buffer is a {@linkplain #compose(Deref...) composite} buffer or not. * @param composite The buffer to check. - * @return {@code true} if the given buffer was created with {@link #compose(Buf...)}, {@code false} otherwise. + * @return {@code true} if the given buffer was created with {@link #compose(Deref...)}, {@code false} otherwise. */ static boolean isComposite(Buf composite) { return composite.getClass() == CompositeBuf.class; diff --git a/src/main/java/io/netty/buffer/api/Buf.java b/src/main/java/io/netty/buffer/api/Buf.java index e775785..27524d8 100644 --- a/src/main/java/io/netty/buffer/api/Buf.java +++ b/src/main/java/io/netty/buffer/api/Buf.java @@ -70,7 +70,7 @@ import java.nio.ByteOrder; * To send a buffer to another thread, the buffer must not have any outstanding borrows. * That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()}; * all {@linkplain #slice() slices} must have been closed. - * And if this buffer is a constituent of a {@linkplain Allocator#compose(Buf...) composite buffer}, + * And if this buffer is a constituent of a {@linkplain Allocator#compose(Deref...) composite buffer}, * then that composite buffer must be closed. * And if this buffer is itself a composite buffer, then it must own all of its constituent buffers. * The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not. @@ -439,6 +439,8 @@ public interface Buf extends Rc, BufAccessors { /** * Split the buffer into two, at the {@linkplain #writerOffset() write offset} position. *

+ * The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown. + *

* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new * buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently * {@linkplain #send() sent} to other threads. diff --git a/src/main/java/io/netty/buffer/api/BufHolder.java b/src/main/java/io/netty/buffer/api/BufHolder.java index aa88892..cd039a9 100644 --- a/src/main/java/io/netty/buffer/api/BufHolder.java +++ b/src/main/java/io/netty/buffer/api/BufHolder.java @@ -83,10 +83,10 @@ public abstract class BufHolder> implements Rc { return buf.countBorrows(); } + @SuppressWarnings("unchecked") @Override public Send send() { - var send = buf.send(); - return () -> receive(send.receive()); + return buf.send().map((Class) getClass(), this::receive); } /** diff --git a/src/main/java/io/netty/buffer/api/ComponentProcessor.java b/src/main/java/io/netty/buffer/api/ComponentProcessor.java index f8208e7..53dd1cf 100644 --- a/src/main/java/io/netty/buffer/api/ComponentProcessor.java +++ b/src/main/java/io/netty/buffer/api/ComponentProcessor.java @@ -121,6 +121,7 @@ public interface ComponentProcessor { * @return A new {@link ByteBuffer}, with its own position and limit, for this memory component. */ ByteBuffer readableBuffer(); + // todo for Unsafe-based impl, DBB.attachment needs to keep underlying memory alive } /** diff --git a/src/main/java/io/netty/buffer/api/CompositeBuf.java b/src/main/java/io/netty/buffer/api/CompositeBuf.java index cff03fc..0188d5f 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuf.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuf.java @@ -54,11 +54,11 @@ final class CompositeBuf extends RcSupport implements Buf { private boolean closed; private boolean readOnly; - CompositeBuf(Allocator allocator, Buf[] bufs) { - this(allocator, true, filterExternalBufs(bufs), COMPOSITE_DROP); + CompositeBuf(Allocator allocator, Deref[] refs) { + this(allocator, true, filterExternalBufs(refs), COMPOSITE_DROP, false); } - private static Buf[] filterExternalBufs(Buf[] bufs) { + private static Buf[] filterExternalBufs(Deref[] refs) { // We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway, // and also, by ensuring that all constituent buffers contribute to the size of the composite buffer, // we make sure that the number of composite buffers will never become greater than the number of bytes in @@ -66,56 +66,90 @@ final class CompositeBuf extends RcSupport implements Buf { // This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable, // will never overflow their component counts. // Allocating a new array unconditionally also prevents external modification of the array. - bufs = Arrays.stream(bufs) - .filter(b -> b.capacity() > 0) + Buf[] bufs = Arrays.stream(refs) + .map(r -> r.get()) // Increments reference counts. + .filter(CompositeBuf::discardEmpty) .flatMap(CompositeBuf::flattenBuffer) .toArray(Buf[]::new); // Make sure there are no duplicates among the buffers. Set duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>()); duplicatesCheck.addAll(Arrays.asList(bufs)); if (duplicatesCheck.size() < bufs.length) { + for (Buf buf : bufs) { + buf.close(); // Undo the increment we did with Deref.get(). + } throw new IllegalArgumentException( "Cannot create composite buffer with duplicate constituent buffer components."); } return bufs; } + private static boolean discardEmpty(Buf buf) { + if (buf.capacity() > 0) { + return true; + } else { + // If we filter a buffer out, then we must make sure to close it since we incremented the reference count + // with Deref.get() earlier. + buf.close(); + return false; + } + } + private static Stream flattenBuffer(Buf buf) { if (buf instanceof CompositeBuf) { - return Stream.of(((CompositeBuf) buf).bufs); + // Extract components and move our reference count from the composite onto the components. + var composite = (CompositeBuf) buf; + var bufs = composite.bufs; + for (Buf b : bufs) { + b.acquire(); + } + buf.close(); // Important: acquire on components *before* closing composite. + return Stream.of(bufs); } return Stream.of(buf); } - private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop drop) { + private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop drop, + boolean acquireBufs) { super(drop); this.allocator = allocator; this.isSendable = isSendable; - for (Buf buf : bufs) { - buf.acquire(); - } - if (bufs.length > 0) { - ByteOrder targetOrder = bufs[0].order(); + if (acquireBufs) { for (Buf buf : bufs) { - if (buf.order() != targetOrder) { - throw new IllegalArgumentException("Constituent buffers have inconsistent byte order."); - } + buf.acquire(); } - order = bufs[0].order(); + } + try { + if (bufs.length > 0) { + ByteOrder targetOrder = bufs[0].order(); + for (Buf buf : bufs) { + if (buf.order() != targetOrder) { + throw new IllegalArgumentException("Constituent buffers have inconsistent byte order."); + } + } + order = bufs[0].order(); - boolean targetReadOnly = bufs[0].readOnly(); - for (Buf buf : bufs) { - if (buf.readOnly() != targetReadOnly) { - throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state."); + boolean targetReadOnly = bufs[0].readOnly(); + for (Buf buf : bufs) { + if (buf.readOnly() != targetReadOnly) { + throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state."); + } } + readOnly = targetReadOnly; + } else { + order = ByteOrder.nativeOrder(); } - readOnly = targetReadOnly; - } else { - order = ByteOrder.nativeOrder(); + this.bufs = bufs; + computeBufferOffsets(); + tornBufAccessors = new TornBufAccessors(this); + } catch (Exception e) { + // Always close bufs on exception, regardless of acquireBufs value. + // If acquireBufs is false, it just means the ref count increments happened prior to this constructor call. + for (Buf buf : bufs) { + buf.close(); + } + throw e; } - this.bufs = bufs; - computeBufferOffsets(); - tornBufAccessors = new TornBufAccessors(this); } private void computeBufferOffsets() { @@ -292,7 +326,7 @@ final class CompositeBuf extends RcSupport implements Buf { slices = new Buf[] { choice.slice(subOffset, 0) }; } - return new CompositeBuf(allocator, false, slices, drop); + return new CompositeBuf(allocator, false, slices, drop, true); } catch (Throwable throwable) { // We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer: close(); @@ -718,7 +752,7 @@ final class CompositeBuf extends RcSupport implements Buf { } if (bufs.length == 0) { // Bifurcating a zero-length buffer is trivial. - return new CompositeBuf(allocator, true, bufs, unsafeGetDrop()).order(order); + return new CompositeBuf(allocator, true, bufs, unsafeGetDrop(), true).order(order); } int i = searchOffsets(woff); @@ -730,7 +764,7 @@ final class CompositeBuf extends RcSupport implements Buf { } computeBufferOffsets(); try { - var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop()); + var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop(), true); compositeBuf.order = order; // Preserve byte order even if bifs array is empty. return compositeBuf; } finally { @@ -1133,7 +1167,7 @@ final class CompositeBuf extends RcSupport implements Buf { for (int i = 0; i < sends.length; i++) { received[i] = sends[i].receive(); } - var composite = new CompositeBuf(allocator, true, received, drop); + var composite = new CompositeBuf(allocator, true, received, drop, true); composite.readOnly = readOnly; drop.attach(composite); return composite; diff --git a/src/main/java/io/netty/buffer/api/Deref.java b/src/main/java/io/netty/buffer/api/Deref.java new file mode 100644 index 0000000..7dd206e --- /dev/null +++ b/src/main/java/io/netty/buffer/api/Deref.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api; + +import java.util.function.Supplier; + +/** + * A Deref provides the capability to acquire a reference to a {@linkplain Rc reference counted} object. + *

+ * Note: Callers must ensure that they close any references they obtain. + *

+ * Deref itself does not specify if a reference can be obtained more than once. + * For instance, any {@link Send} object is also a {@code Deref}, but the reference can only be acquired once. + * Meanwhile, {@link Rc} objects are themselves their own {@code Derefs}, and permit references to be acquired multiple + * times. + * + * @param The concrete type of reference counted object that can be obtained. + */ +public interface Deref> extends Supplier { + /** + * Acquire a reference to the reference counted object. + *

+ * Note: This call increments the reference count of the acquired object, and must be paired with + * a {@link Rc#close()} call. + * Using a try-with-resources clause is the easiest way to ensure this. + * + * @return A reference to the reference counted object. + */ + @Override + T get(); + + /** + * Determine if the object in this {@code Deref} is an instance of the given class. + * + * @param cls The type to check. + * @return {@code true} if the object in this {@code Deref} can be assigned fields or variables of the given type. + */ + boolean isInstanceOf(Class cls); +} diff --git a/src/main/java/io/netty/buffer/api/Rc.java b/src/main/java/io/netty/buffer/api/Rc.java index def5f7d..2cc50f7 100644 --- a/src/main/java/io/netty/buffer/api/Rc.java +++ b/src/main/java/io/netty/buffer/api/Rc.java @@ -26,7 +26,7 @@ package io.netty.buffer.api; * * @param The concrete subtype. */ -public interface Rc> extends AutoCloseable { +public interface Rc> extends AutoCloseable, Deref { /** * Increment the reference count. *

@@ -36,6 +36,16 @@ public interface Rc> extends AutoCloseable { */ I acquire(); + @Override + default I get() { + return acquire(); + } + + @Override + default boolean isInstanceOf(Class cls) { + return cls.isInstance(this); + } + /** * Decrement the reference count, and despose of the resource if the last reference is closed. *

diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index eaf8ca5..1541810 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -79,7 +79,7 @@ public abstract class RcSupport, T extends RcSupport> impl } var owned = prepareSend(); acquires = -2; // Close without dropping. This also ignore future double-free attempts. - return new TransferSend(owned, drop); + return new TransferSend(owned, drop, getClass()); } /** diff --git a/src/main/java/io/netty/buffer/api/Send.java b/src/main/java/io/netty/buffer/api/Send.java index 96d4a7e..b272740 100644 --- a/src/main/java/io/netty/buffer/api/Send.java +++ b/src/main/java/io/netty/buffer/api/Send.java @@ -15,6 +15,10 @@ */ package io.netty.buffer.api; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; + /** * A Send object is a temporary holder of an {@link Rc}, used for transferring the ownership of the Rc from one thread * to another. @@ -28,8 +32,43 @@ package io.netty.buffer.api; * * @param */ -@FunctionalInterface -public interface Send> { +public interface Send> extends Deref { + /** + * Construct a {@link Send} based on the given {@link Supplier}. + * The supplier will be called only once, in the receiving thread. + * + * @param concreteObjectType The concrete type of the object being sent. Specifically, the object returned from the + * {@link Supplier#get()} method must be an instance of this class. + * @param supplier The supplier of the object being sent, which will be called when the object is ready to be + * received. + * @param The type of object being sent. + * @return A {@link Send} which will deliver an object of the given type, from the supplier. + */ + static > Send sending(Class concreteObjectType, Supplier supplier) { + return new Send() { + private final AtomicBoolean gate = new AtomicBoolean(); + @Override + public T receive() { + if (gate.getAndSet(true)) { + throw new IllegalStateException("This object has already been received."); + } + return supplier.get(); + } + + @Override + public boolean isInstanceOf(Class cls) { + return cls.isAssignableFrom(concreteObjectType); + } + + @Override + public void discard() { + if (!gate.getAndSet(true)) { + supplier.get().close(); + } + } + }; + } + /** * Receive the {@link Rc} instance being sent, and bind its ownership to the calling thread. The invalidation of the * sent Rc in the sending thread happens-before the return of this method. @@ -40,4 +79,33 @@ public interface Send> { * @throws IllegalStateException If this method is called more than once. */ T receive(); + + /** + * Apply a mapping function to the object being sent. The mapping will occur when the object is received. + * + * @param type The result type of the mapping function. + * @param mapper The mapping function to apply to the object being sent. + * @param The result type of the mapping function. + * @return A new {@link Send} instance that will deliver an object that is the result of the mapping. + */ + default > Send map(Class type, Function mapper) { + return sending(type, () -> mapper.apply(receive())); + } + + /** + * Discard this {@link Send} and the object it contains. + * This has no effect if the send has already been received. + */ + default void discard() { + try { + receive().close(); + } catch (IllegalStateException ignore) { + // Don't do anything if the send has already been consumed. + } + } + + @Override + default T get() { + return receive(); + } } diff --git a/src/main/java/io/netty/buffer/api/TransferSend.java b/src/main/java/io/netty/buffer/api/TransferSend.java index 24098f4..0d89826 100644 --- a/src/main/java/io/netty/buffer/api/TransferSend.java +++ b/src/main/java/io/netty/buffer/api/TransferSend.java @@ -24,12 +24,14 @@ class TransferSend, T extends Rc> implements Send { private static final VarHandle RECEIVED = findVarHandle(lookup(), TransferSend.class, "received", boolean.class); private final Owned outgoing; private final Drop drop; + private final Class concreteType; @SuppressWarnings("unused") private volatile boolean received; // Accessed via VarHandle - TransferSend(Owned outgoing, Drop drop) { + TransferSend(Owned outgoing, Drop drop, Class concreteType) { this.outgoing = outgoing; this.drop = drop; + this.concreteType = concreteType; } @SuppressWarnings("unchecked") @@ -47,8 +49,22 @@ class TransferSend, T extends Rc> implements Send { } private void gateReception() { - if (!RECEIVED.compareAndSet(this, false, true)) { + if ((boolean) RECEIVED.getAndSet(this, true)) { throw new IllegalStateException("This object has already been received."); } } + + @Override + public boolean isInstanceOf(Class cls) { + return cls.isAssignableFrom(concreteType); + } + + @Override + public void discard() { + if (!(boolean) RECEIVED.getAndSet(this, true)) { + var copy = outgoing.transferOwnership(drop); + drop.attach(copy); + copy.close(); + } + } } diff --git a/src/test/java/io/netty/buffer/api/BufRefTest.java b/src/test/java/io/netty/buffer/api/BufRefTest.java index 85e5fd4..0bc8d0a 100644 --- a/src/test/java/io/netty/buffer/api/BufRefTest.java +++ b/src/test/java/io/netty/buffer/api/BufRefTest.java @@ -100,7 +100,7 @@ class BufRefTest { try (Allocator allocator = Allocator.heap(); BufRef refA = new BufRef(allocator.allocate(8).send())) { refA.contents().writeInt(42); - var send = refA.send(); + Send send = refA.send(); assertThrows(IllegalStateException.class, () -> refA.contents().readInt()); try (BufRef refB = send.receive()) { assertThat(refB.contents().readInt()).isEqualTo(42); diff --git a/src/test/java/io/netty/buffer/api/BufTest.java b/src/test/java/io/netty/buffer/api/BufTest.java index db8fd28..a6dd3ea 100644 --- a/src/test/java/io/netty/buffer/api/BufTest.java +++ b/src/test/java/io/netty/buffer/api/BufTest.java @@ -887,10 +887,15 @@ public class BufTest { try (Buf ignored = buf.acquire()) { assertEquals(borrows + 1, buf.countBorrows()); try (Buf slice = buf.slice()) { + assertEquals(0, slice.capacity()); // We haven't written anything, so the slice is empty. + int sliceBorrows = slice.countBorrows(); assertEquals(borrows + 2, buf.countBorrows()); try (Buf ignored1 = allocator.compose(buf, slice)) { assertEquals(borrows + 3, buf.countBorrows()); + // Note: Slice is empty; not acquired by the composite buffer. + assertEquals(sliceBorrows, slice.countBorrows()); } + assertEquals(sliceBorrows, slice.countBorrows()); assertEquals(borrows + 2, buf.countBorrows()); } assertEquals(borrows + 1, buf.countBorrows()); @@ -1647,6 +1652,18 @@ public class BufTest { } } + @Test + public void compositeBufferFromSends() { + try (Allocator allocator = Allocator.heap(); + Buf composite = allocator.compose( + allocator.allocate(8).send(), + allocator.allocate(8).send(), + allocator.allocate(8).send())) { + assertEquals(24, composite.capacity()); + assertTrue(composite.isOwned()); + } + } + @Test public void compositeBufferMustNotBeAllowedToContainThemselves() { try (Allocator allocator = Allocator.heap()) { @@ -2649,6 +2666,7 @@ public class BufTest { } } } + // todo read only buffer must have zero writable bytes @ParameterizedTest @MethodSource("nonCompositeAllocators") diff --git a/src/test/java/io/netty/buffer/api/examples/AsyncExample.java b/src/test/java/io/netty/buffer/api/examples/AsyncExample.java index 16edfd3..9c5211a 100644 --- a/src/test/java/io/netty/buffer/api/examples/AsyncExample.java +++ b/src/test/java/io/netty/buffer/api/examples/AsyncExample.java @@ -18,6 +18,7 @@ package io.netty.buffer.api.examples; import io.netty.buffer.api.Allocator; import io.netty.buffer.api.Buf; +import static java.lang.System.out; import static java.util.concurrent.CompletableFuture.completedFuture; public final class AsyncExample { @@ -33,9 +34,9 @@ public final class AsyncExample { } }).thenAcceptAsync(send -> { try (Buf buf = send.receive()) { - System.out.println("First thread id was " + buf.readLong()); - System.out.println("Then sent to " + buf.readLong()); - System.out.println("And now in thread " + threadId()); + out.println("First thread id was " + buf.readLong()); + out.println("Then sent to " + buf.readLong()); + out.println("And now in thread " + threadId()); } }).get(); }