Introduce Deref abstraction

Motivation:
Sometimes, we wish to operate on both buffers and anything that can produce a buffer.
For instance, when making a composite buffer, we could compose either buffers or sends.

Modification:
Introduce a Deref interface, which is extended by both Rc and Send.
A Deref can be used to acquire an Rc instance, and in doing so will also acquire a reference to the Rc.
That is, dereferencing increases the reference count.
For Rc itself, this just delegates to Rc.acquire, while for Send it delegates to Send.receive, and can only be called once.
The Allocator.compose method has been changed to take Derefs.
This allows us to compose either Bufs or Sends of bufs.
Or a mix.
Extra care and caution has been added to the code, to make sure the reference counts are managed correctly when composing buffers, now that it's a more complicated operation.
A handful of convenience methods for working with Sends have also been added to the Send interface.

Result:
We can now build a composite buffer out of sends of buffers.
This commit is contained in:
Chris Vest 2021-02-11 14:08:22 +01:00
parent 1a741baaf0
commit 492977d9be
13 changed files with 250 additions and 48 deletions

View File

@ -107,7 +107,7 @@ public interface Allocator extends AutoCloseable {
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent {@linkplain Buf#order() byte order}.
*/
default Buf compose(Buf... bufs) {
default Buf compose(Deref<Buf>... bufs) {
return new CompositeBuf(this, bufs);
}
@ -117,8 +117,8 @@ public interface Allocator extends AutoCloseable {
* the composite buffer was created.
* The composite buffer is modified in-place.
*
* @see #compose(Buf...)
* @param composite The composite buffer (from a prior {@link #compose(Buf...)} call) to extend with the given
* @see #compose(Deref...)
* @param composite The composite buffer (from a prior {@link #compose(Deref...)} call) to extend with the given
* extension buffer.
* @param extension The buffer to extend the composite buffer with.
*/
@ -133,9 +133,9 @@ public interface Allocator extends AutoCloseable {
}
/**
* Check if the given buffer is a {@linkplain #compose(Buf...) composite} buffer or not.
* Check if the given buffer is a {@linkplain #compose(Deref...) composite} buffer or not.
* @param composite The buffer to check.
* @return {@code true} if the given buffer was created with {@link #compose(Buf...)}, {@code false} otherwise.
* @return {@code true} if the given buffer was created with {@link #compose(Deref...)}, {@code false} otherwise.
*/
static boolean isComposite(Buf composite) {
return composite.getClass() == CompositeBuf.class;

View File

@ -70,7 +70,7 @@ import java.nio.ByteOrder;
* To send a buffer to another thread, the buffer must not have any outstanding borrows.
* That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()};
* all {@linkplain #slice() slices} must have been closed.
* And if this buffer is a constituent of a {@linkplain Allocator#compose(Buf...) composite buffer},
* And if this buffer is a constituent of a {@linkplain Allocator#compose(Deref...) composite buffer},
* then that composite buffer must be closed.
* And if this buffer is itself a composite buffer, then it must own all of its constituent buffers.
* The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not.
@ -439,6 +439,8 @@ public interface Buf extends Rc<Buf>, BufAccessors {
/**
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
* <p>
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* <p>
* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently
* {@linkplain #send() sent} to other threads.

View File

@ -83,10 +83,10 @@ public abstract class BufHolder<T extends BufHolder<T>> implements Rc<T> {
return buf.countBorrows();
}
@SuppressWarnings("unchecked")
@Override
public Send<T> send() {
var send = buf.send();
return () -> receive(send.receive());
return buf.send().map((Class<T>) getClass(), this::receive);
}
/**

View File

@ -121,6 +121,7 @@ public interface ComponentProcessor {
* @return A new {@link ByteBuffer}, with its own position and limit, for this memory component.
*/
ByteBuffer readableBuffer();
// todo for Unsafe-based impl, DBB.attachment needs to keep underlying memory alive
}
/**

View File

@ -54,11 +54,11 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private boolean closed;
private boolean readOnly;
CompositeBuf(Allocator allocator, Buf[] bufs) {
this(allocator, true, filterExternalBufs(bufs), COMPOSITE_DROP);
CompositeBuf(Allocator allocator, Deref<Buf>[] refs) {
this(allocator, true, filterExternalBufs(refs), COMPOSITE_DROP, false);
}
private static Buf[] filterExternalBufs(Buf[] bufs) {
private static Buf[] filterExternalBufs(Deref<Buf>[] refs) {
// We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway,
// and also, by ensuring that all constituent buffers contribute to the size of the composite buffer,
// we make sure that the number of composite buffers will never become greater than the number of bytes in
@ -66,34 +66,60 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
// will never overflow their component counts.
// Allocating a new array unconditionally also prevents external modification of the array.
bufs = Arrays.stream(bufs)
.filter(b -> b.capacity() > 0)
Buf[] bufs = Arrays.stream(refs)
.map(r -> r.get()) // Increments reference counts.
.filter(CompositeBuf::discardEmpty)
.flatMap(CompositeBuf::flattenBuffer)
.toArray(Buf[]::new);
// Make sure there are no duplicates among the buffers.
Set<Buf> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));
if (duplicatesCheck.size() < bufs.length) {
for (Buf buf : bufs) {
buf.close(); // Undo the increment we did with Deref.get().
}
throw new IllegalArgumentException(
"Cannot create composite buffer with duplicate constituent buffer components.");
}
return bufs;
}
private static boolean discardEmpty(Buf buf) {
if (buf.capacity() > 0) {
return true;
} else {
// If we filter a buffer out, then we must make sure to close it since we incremented the reference count
// with Deref.get() earlier.
buf.close();
return false;
}
}
private static Stream<Buf> flattenBuffer(Buf buf) {
if (buf instanceof CompositeBuf) {
return Stream.of(((CompositeBuf) buf).bufs);
// Extract components and move our reference count from the composite onto the components.
var composite = (CompositeBuf) buf;
var bufs = composite.bufs;
for (Buf b : bufs) {
b.acquire();
}
buf.close(); // Important: acquire on components *before* closing composite.
return Stream.of(bufs);
}
return Stream.of(buf);
}
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop,
boolean acquireBufs) {
super(drop);
this.allocator = allocator;
this.isSendable = isSendable;
if (acquireBufs) {
for (Buf buf : bufs) {
buf.acquire();
}
}
try {
if (bufs.length > 0) {
ByteOrder targetOrder = bufs[0].order();
for (Buf buf : bufs) {
@ -116,6 +142,14 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
this.bufs = bufs;
computeBufferOffsets();
tornBufAccessors = new TornBufAccessors(this);
} catch (Exception e) {
// Always close bufs on exception, regardless of acquireBufs value.
// If acquireBufs is false, it just means the ref count increments happened prior to this constructor call.
for (Buf buf : bufs) {
buf.close();
}
throw e;
}
}
private void computeBufferOffsets() {
@ -292,7 +326,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
slices = new Buf[] { choice.slice(subOffset, 0) };
}
return new CompositeBuf(allocator, false, slices, drop);
return new CompositeBuf(allocator, false, slices, drop, true);
} catch (Throwable throwable) {
// We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer:
close();
@ -718,7 +752,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
if (bufs.length == 0) {
// Bifurcating a zero-length buffer is trivial.
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop()).order(order);
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop(), true).order(order);
}
int i = searchOffsets(woff);
@ -730,7 +764,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
computeBufferOffsets();
try {
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop());
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop(), true);
compositeBuf.order = order; // Preserve byte order even if bifs array is empty.
return compositeBuf;
} finally {
@ -1133,7 +1167,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
for (int i = 0; i < sends.length; i++) {
received[i] = sends[i].receive();
}
var composite = new CompositeBuf(allocator, true, received, drop);
var composite = new CompositeBuf(allocator, true, received, drop, true);
composite.readOnly = readOnly;
drop.attach(composite);
return composite;

View File

@ -0,0 +1,52 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import java.util.function.Supplier;
/**
* A Deref provides the capability to acquire a reference to a {@linkplain Rc reference counted} object.
* <p>
* <strong>Note:</strong> Callers must ensure that they close any references they obtain.
* <p>
* Deref itself does not specify if a reference can be obtained more than once.
* For instance, any {@link Send} object is also a {@code Deref}, but the reference can only be acquired once.
* Meanwhile, {@link Rc} objects are themselves their own {@code Derefs}, and permit references to be acquired multiple
* times.
*
* @param <T> The concrete type of reference counted object that can be obtained.
*/
public interface Deref<T extends Rc<T>> extends Supplier<T> {
/**
* Acquire a reference to the reference counted object.
* <p>
* <strong>Note:</strong> This call increments the reference count of the acquired object, and must be paired with
* a {@link Rc#close()} call.
* Using a try-with-resources clause is the easiest way to ensure this.
*
* @return A reference to the reference counted object.
*/
@Override
T get();
/**
* Determine if the object in this {@code Deref} is an instance of the given class.
*
* @param cls The type to check.
* @return {@code true} if the object in this {@code Deref} can be assigned fields or variables of the given type.
*/
boolean isInstanceOf(Class<?> cls);
}

View File

@ -26,7 +26,7 @@ package io.netty.buffer.api;
*
* @param <I> The concrete subtype.
*/
public interface Rc<I extends Rc<I>> extends AutoCloseable {
public interface Rc<I extends Rc<I>> extends AutoCloseable, Deref<I> {
/**
* Increment the reference count.
* <p>
@ -36,6 +36,16 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable {
*/
I acquire();
@Override
default I get() {
return acquire();
}
@Override
default boolean isInstanceOf(Class<?> cls) {
return cls.isInstance(this);
}
/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
* <p>

View File

@ -79,7 +79,7 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
}
var owned = prepareSend();
acquires = -2; // Close without dropping. This also ignore future double-free attempts.
return new TransferSend<I, T>(owned, drop);
return new TransferSend<I, T>(owned, drop, getClass());
}
/**

View File

@ -15,6 +15,10 @@
*/
package io.netty.buffer.api;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A Send object is a temporary holder of an {@link Rc}, used for transferring the ownership of the Rc from one thread
* to another.
@ -28,8 +32,43 @@ package io.netty.buffer.api;
*
* @param <T>
*/
@FunctionalInterface
public interface Send<T extends Rc<T>> {
public interface Send<T extends Rc<T>> extends Deref<T> {
/**
* Construct a {@link Send} based on the given {@link Supplier}.
* The supplier will be called only once, in the receiving thread.
*
* @param concreteObjectType The concrete type of the object being sent. Specifically, the object returned from the
* {@link Supplier#get()} method must be an instance of this class.
* @param supplier The supplier of the object being sent, which will be called when the object is ready to be
* received.
* @param <T> The type of object being sent.
* @return A {@link Send} which will deliver an object of the given type, from the supplier.
*/
static <T extends Rc<T>> Send<T> sending(Class<T> concreteObjectType, Supplier<? extends T> supplier) {
return new Send<T>() {
private final AtomicBoolean gate = new AtomicBoolean();
@Override
public T receive() {
if (gate.getAndSet(true)) {
throw new IllegalStateException("This object has already been received.");
}
return supplier.get();
}
@Override
public boolean isInstanceOf(Class<?> cls) {
return cls.isAssignableFrom(concreteObjectType);
}
@Override
public void discard() {
if (!gate.getAndSet(true)) {
supplier.get().close();
}
}
};
}
/**
* Receive the {@link Rc} instance being sent, and bind its ownership to the calling thread. The invalidation of the
* sent Rc in the sending thread happens-before the return of this method.
@ -40,4 +79,33 @@ public interface Send<T extends Rc<T>> {
* @throws IllegalStateException If this method is called more than once.
*/
T receive();
/**
* Apply a mapping function to the object being sent. The mapping will occur when the object is received.
*
* @param type The result type of the mapping function.
* @param mapper The mapping function to apply to the object being sent.
* @param <R> The result type of the mapping function.
* @return A new {@link Send} instance that will deliver an object that is the result of the mapping.
*/
default <R extends Rc<R>> Send<R> map(Class<R> type, Function<T, ? extends R> mapper) {
return sending(type, () -> mapper.apply(receive()));
}
/**
* Discard this {@link Send} and the object it contains.
* This has no effect if the send has already been received.
*/
default void discard() {
try {
receive().close();
} catch (IllegalStateException ignore) {
// Don't do anything if the send has already been consumed.
}
}
@Override
default T get() {
return receive();
}
}

View File

@ -24,12 +24,14 @@ class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
private static final VarHandle RECEIVED = findVarHandle(lookup(), TransferSend.class, "received", boolean.class);
private final Owned<T> outgoing;
private final Drop<T> drop;
private final Class<?> concreteType;
@SuppressWarnings("unused")
private volatile boolean received; // Accessed via VarHandle
TransferSend(Owned<T> outgoing, Drop<T> drop) {
TransferSend(Owned<T> outgoing, Drop<T> drop, Class<?> concreteType) {
this.outgoing = outgoing;
this.drop = drop;
this.concreteType = concreteType;
}
@SuppressWarnings("unchecked")
@ -47,8 +49,22 @@ class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
}
private void gateReception() {
if (!RECEIVED.compareAndSet(this, false, true)) {
if ((boolean) RECEIVED.getAndSet(this, true)) {
throw new IllegalStateException("This object has already been received.");
}
}
@Override
public boolean isInstanceOf(Class<?> cls) {
return cls.isAssignableFrom(concreteType);
}
@Override
public void discard() {
if (!(boolean) RECEIVED.getAndSet(this, true)) {
var copy = outgoing.transferOwnership(drop);
drop.attach(copy);
copy.close();
}
}
}

View File

@ -100,7 +100,7 @@ class BufRefTest {
try (Allocator allocator = Allocator.heap();
BufRef refA = new BufRef(allocator.allocate(8).send())) {
refA.contents().writeInt(42);
var send = refA.send();
Send<BufRef> send = refA.send();
assertThrows(IllegalStateException.class, () -> refA.contents().readInt());
try (BufRef refB = send.receive()) {
assertThat(refB.contents().readInt()).isEqualTo(42);

View File

@ -887,10 +887,15 @@ public class BufTest {
try (Buf ignored = buf.acquire()) {
assertEquals(borrows + 1, buf.countBorrows());
try (Buf slice = buf.slice()) {
assertEquals(0, slice.capacity()); // We haven't written anything, so the slice is empty.
int sliceBorrows = slice.countBorrows();
assertEquals(borrows + 2, buf.countBorrows());
try (Buf ignored1 = allocator.compose(buf, slice)) {
assertEquals(borrows + 3, buf.countBorrows());
// Note: Slice is empty; not acquired by the composite buffer.
assertEquals(sliceBorrows, slice.countBorrows());
}
assertEquals(sliceBorrows, slice.countBorrows());
assertEquals(borrows + 2, buf.countBorrows());
}
assertEquals(borrows + 1, buf.countBorrows());
@ -1647,6 +1652,18 @@ public class BufTest {
}
}
@Test
public void compositeBufferFromSends() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose(
allocator.allocate(8).send(),
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
assertEquals(24, composite.capacity());
assertTrue(composite.isOwned());
}
}
@Test
public void compositeBufferMustNotBeAllowedToContainThemselves() {
try (Allocator allocator = Allocator.heap()) {
@ -2649,6 +2666,7 @@ public class BufTest {
}
}
}
// todo read only buffer must have zero writable bytes
@ParameterizedTest
@MethodSource("nonCompositeAllocators")

View File

@ -18,6 +18,7 @@ package io.netty.buffer.api.examples;
import io.netty.buffer.api.Allocator;
import io.netty.buffer.api.Buf;
import static java.lang.System.out;
import static java.util.concurrent.CompletableFuture.completedFuture;
public final class AsyncExample {
@ -33,9 +34,9 @@ public final class AsyncExample {
}
}).thenAcceptAsync(send -> {
try (Buf buf = send.receive()) {
System.out.println("First thread id was " + buf.readLong());
System.out.println("Then sent to " + buf.readLong());
System.out.println("And now in thread " + threadId());
out.println("First thread id was " + buf.readLong());
out.println("Then sent to " + buf.readLong());
out.println("And now in thread " + threadId());
}
}).get();
}