Merge pull request #27 from netty/send-deref

Introduce Deref abstraction
This commit is contained in:
Chris Vest 2021-02-12 17:22:34 +01:00 committed by GitHub
commit 5f1f0bae38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 250 additions and 48 deletions

View File

@ -107,7 +107,7 @@ public interface Allocator extends AutoCloseable {
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent {@linkplain Buf#order() byte order}.
*/
default Buf compose(Buf... bufs) {
default Buf compose(Deref<Buf>... bufs) {
return new CompositeBuf(this, bufs);
}
@ -117,8 +117,8 @@ public interface Allocator extends AutoCloseable {
* the composite buffer was created.
* The composite buffer is modified in-place.
*
* @see #compose(Buf...)
* @param composite The composite buffer (from a prior {@link #compose(Buf...)} call) to extend with the given
* @see #compose(Deref...)
* @param composite The composite buffer (from a prior {@link #compose(Deref...)} call) to extend with the given
* extension buffer.
* @param extension The buffer to extend the composite buffer with.
*/
@ -133,9 +133,9 @@ public interface Allocator extends AutoCloseable {
}
/**
* Check if the given buffer is a {@linkplain #compose(Buf...) composite} buffer or not.
* Check if the given buffer is a {@linkplain #compose(Deref...) composite} buffer or not.
* @param composite The buffer to check.
* @return {@code true} if the given buffer was created with {@link #compose(Buf...)}, {@code false} otherwise.
* @return {@code true} if the given buffer was created with {@link #compose(Deref...)}, {@code false} otherwise.
*/
static boolean isComposite(Buf composite) {
return composite.getClass() == CompositeBuf.class;

View File

@ -70,7 +70,7 @@ import java.nio.ByteOrder;
* To send a buffer to another thread, the buffer must not have any outstanding borrows.
* That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()};
* all {@linkplain #slice() slices} must have been closed.
* And if this buffer is a constituent of a {@linkplain Allocator#compose(Buf...) composite buffer},
* And if this buffer is a constituent of a {@linkplain Allocator#compose(Deref...) composite buffer},
* then that composite buffer must be closed.
* And if this buffer is itself a composite buffer, then it must own all of its constituent buffers.
* The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not.
@ -439,6 +439,8 @@ public interface Buf extends Rc<Buf>, BufAccessors {
/**
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
* <p>
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* <p>
* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently
* {@linkplain #send() sent} to other threads.

View File

@ -83,10 +83,10 @@ public abstract class BufHolder<T extends BufHolder<T>> implements Rc<T> {
return buf.countBorrows();
}
@SuppressWarnings("unchecked")
@Override
public Send<T> send() {
var send = buf.send();
return () -> receive(send.receive());
return buf.send().map((Class<T>) getClass(), this::receive);
}
/**

View File

@ -121,6 +121,7 @@ public interface ComponentProcessor {
* @return A new {@link ByteBuffer}, with its own position and limit, for this memory component.
*/
ByteBuffer readableBuffer();
// todo for Unsafe-based impl, DBB.attachment needs to keep underlying memory alive
}
/**

View File

@ -54,11 +54,11 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private boolean closed;
private boolean readOnly;
CompositeBuf(Allocator allocator, Buf[] bufs) {
this(allocator, true, filterExternalBufs(bufs), COMPOSITE_DROP);
CompositeBuf(Allocator allocator, Deref<Buf>[] refs) {
this(allocator, true, filterExternalBufs(refs), COMPOSITE_DROP, false);
}
private static Buf[] filterExternalBufs(Buf[] bufs) {
private static Buf[] filterExternalBufs(Deref<Buf>[] refs) {
// We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway,
// and also, by ensuring that all constituent buffers contribute to the size of the composite buffer,
// we make sure that the number of composite buffers will never become greater than the number of bytes in
@ -66,56 +66,90 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
// will never overflow their component counts.
// Allocating a new array unconditionally also prevents external modification of the array.
bufs = Arrays.stream(bufs)
.filter(b -> b.capacity() > 0)
Buf[] bufs = Arrays.stream(refs)
.map(r -> r.get()) // Increments reference counts.
.filter(CompositeBuf::discardEmpty)
.flatMap(CompositeBuf::flattenBuffer)
.toArray(Buf[]::new);
// Make sure there are no duplicates among the buffers.
Set<Buf> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));
if (duplicatesCheck.size() < bufs.length) {
for (Buf buf : bufs) {
buf.close(); // Undo the increment we did with Deref.get().
}
throw new IllegalArgumentException(
"Cannot create composite buffer with duplicate constituent buffer components.");
}
return bufs;
}
private static boolean discardEmpty(Buf buf) {
if (buf.capacity() > 0) {
return true;
} else {
// If we filter a buffer out, then we must make sure to close it since we incremented the reference count
// with Deref.get() earlier.
buf.close();
return false;
}
}
private static Stream<Buf> flattenBuffer(Buf buf) {
if (buf instanceof CompositeBuf) {
return Stream.of(((CompositeBuf) buf).bufs);
// Extract components and move our reference count from the composite onto the components.
var composite = (CompositeBuf) buf;
var bufs = composite.bufs;
for (Buf b : bufs) {
b.acquire();
}
buf.close(); // Important: acquire on components *before* closing composite.
return Stream.of(bufs);
}
return Stream.of(buf);
}
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop,
boolean acquireBufs) {
super(drop);
this.allocator = allocator;
this.isSendable = isSendable;
for (Buf buf : bufs) {
buf.acquire();
}
if (bufs.length > 0) {
ByteOrder targetOrder = bufs[0].order();
if (acquireBufs) {
for (Buf buf : bufs) {
if (buf.order() != targetOrder) {
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
}
buf.acquire();
}
order = bufs[0].order();
}
try {
if (bufs.length > 0) {
ByteOrder targetOrder = bufs[0].order();
for (Buf buf : bufs) {
if (buf.order() != targetOrder) {
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
}
}
order = bufs[0].order();
boolean targetReadOnly = bufs[0].readOnly();
for (Buf buf : bufs) {
if (buf.readOnly() != targetReadOnly) {
throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state.");
boolean targetReadOnly = bufs[0].readOnly();
for (Buf buf : bufs) {
if (buf.readOnly() != targetReadOnly) {
throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state.");
}
}
readOnly = targetReadOnly;
} else {
order = ByteOrder.nativeOrder();
}
readOnly = targetReadOnly;
} else {
order = ByteOrder.nativeOrder();
this.bufs = bufs;
computeBufferOffsets();
tornBufAccessors = new TornBufAccessors(this);
} catch (Exception e) {
// Always close bufs on exception, regardless of acquireBufs value.
// If acquireBufs is false, it just means the ref count increments happened prior to this constructor call.
for (Buf buf : bufs) {
buf.close();
}
throw e;
}
this.bufs = bufs;
computeBufferOffsets();
tornBufAccessors = new TornBufAccessors(this);
}
private void computeBufferOffsets() {
@ -292,7 +326,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
slices = new Buf[] { choice.slice(subOffset, 0) };
}
return new CompositeBuf(allocator, false, slices, drop);
return new CompositeBuf(allocator, false, slices, drop, true);
} catch (Throwable throwable) {
// We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer:
close();
@ -718,7 +752,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
if (bufs.length == 0) {
// Bifurcating a zero-length buffer is trivial.
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop()).order(order);
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop(), true).order(order);
}
int i = searchOffsets(woff);
@ -730,7 +764,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
computeBufferOffsets();
try {
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop());
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop(), true);
compositeBuf.order = order; // Preserve byte order even if bifs array is empty.
return compositeBuf;
} finally {
@ -1133,7 +1167,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
for (int i = 0; i < sends.length; i++) {
received[i] = sends[i].receive();
}
var composite = new CompositeBuf(allocator, true, received, drop);
var composite = new CompositeBuf(allocator, true, received, drop, true);
composite.readOnly = readOnly;
drop.attach(composite);
return composite;

View File

@ -0,0 +1,52 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import java.util.function.Supplier;
/**
* A Deref provides the capability to acquire a reference to a {@linkplain Rc reference counted} object.
* <p>
* <strong>Note:</strong> Callers must ensure that they close any references they obtain.
* <p>
* Deref itself does not specify if a reference can be obtained more than once.
* For instance, any {@link Send} object is also a {@code Deref}, but the reference can only be acquired once.
* Meanwhile, {@link Rc} objects are themselves their own {@code Derefs}, and permit references to be acquired multiple
* times.
*
* @param <T> The concrete type of reference counted object that can be obtained.
*/
public interface Deref<T extends Rc<T>> extends Supplier<T> {
/**
* Acquire a reference to the reference counted object.
* <p>
* <strong>Note:</strong> This call increments the reference count of the acquired object, and must be paired with
* a {@link Rc#close()} call.
* Using a try-with-resources clause is the easiest way to ensure this.
*
* @return A reference to the reference counted object.
*/
@Override
T get();
/**
* Determine if the object in this {@code Deref} is an instance of the given class.
*
* @param cls The type to check.
* @return {@code true} if the object in this {@code Deref} can be assigned fields or variables of the given type.
*/
boolean isInstanceOf(Class<?> cls);
}

View File

@ -26,7 +26,7 @@ package io.netty.buffer.api;
*
* @param <I> The concrete subtype.
*/
public interface Rc<I extends Rc<I>> extends AutoCloseable {
public interface Rc<I extends Rc<I>> extends AutoCloseable, Deref<I> {
/**
* Increment the reference count.
* <p>
@ -36,6 +36,16 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable {
*/
I acquire();
@Override
default I get() {
return acquire();
}
@Override
default boolean isInstanceOf(Class<?> cls) {
return cls.isInstance(this);
}
/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
* <p>

View File

@ -79,7 +79,7 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
}
var owned = prepareSend();
acquires = -2; // Close without dropping. This also ignore future double-free attempts.
return new TransferSend<I, T>(owned, drop);
return new TransferSend<I, T>(owned, drop, getClass());
}
/**

View File

@ -15,6 +15,10 @@
*/
package io.netty.buffer.api;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A Send object is a temporary holder of an {@link Rc}, used for transferring the ownership of the Rc from one thread
* to another.
@ -28,8 +32,43 @@ package io.netty.buffer.api;
*
* @param <T>
*/
@FunctionalInterface
public interface Send<T extends Rc<T>> {
public interface Send<T extends Rc<T>> extends Deref<T> {
/**
* Construct a {@link Send} based on the given {@link Supplier}.
* The supplier will be called only once, in the receiving thread.
*
* @param concreteObjectType The concrete type of the object being sent. Specifically, the object returned from the
* {@link Supplier#get()} method must be an instance of this class.
* @param supplier The supplier of the object being sent, which will be called when the object is ready to be
* received.
* @param <T> The type of object being sent.
* @return A {@link Send} which will deliver an object of the given type, from the supplier.
*/
static <T extends Rc<T>> Send<T> sending(Class<T> concreteObjectType, Supplier<? extends T> supplier) {
return new Send<T>() {
private final AtomicBoolean gate = new AtomicBoolean();
@Override
public T receive() {
if (gate.getAndSet(true)) {
throw new IllegalStateException("This object has already been received.");
}
return supplier.get();
}
@Override
public boolean isInstanceOf(Class<?> cls) {
return cls.isAssignableFrom(concreteObjectType);
}
@Override
public void discard() {
if (!gate.getAndSet(true)) {
supplier.get().close();
}
}
};
}
/**
* Receive the {@link Rc} instance being sent, and bind its ownership to the calling thread. The invalidation of the
* sent Rc in the sending thread happens-before the return of this method.
@ -40,4 +79,33 @@ public interface Send<T extends Rc<T>> {
* @throws IllegalStateException If this method is called more than once.
*/
T receive();
/**
* Apply a mapping function to the object being sent. The mapping will occur when the object is received.
*
* @param type The result type of the mapping function.
* @param mapper The mapping function to apply to the object being sent.
* @param <R> The result type of the mapping function.
* @return A new {@link Send} instance that will deliver an object that is the result of the mapping.
*/
default <R extends Rc<R>> Send<R> map(Class<R> type, Function<T, ? extends R> mapper) {
return sending(type, () -> mapper.apply(receive()));
}
/**
* Discard this {@link Send} and the object it contains.
* This has no effect if the send has already been received.
*/
default void discard() {
try {
receive().close();
} catch (IllegalStateException ignore) {
// Don't do anything if the send has already been consumed.
}
}
@Override
default T get() {
return receive();
}
}

View File

@ -24,12 +24,14 @@ class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
private static final VarHandle RECEIVED = findVarHandle(lookup(), TransferSend.class, "received", boolean.class);
private final Owned<T> outgoing;
private final Drop<T> drop;
private final Class<?> concreteType;
@SuppressWarnings("unused")
private volatile boolean received; // Accessed via VarHandle
TransferSend(Owned<T> outgoing, Drop<T> drop) {
TransferSend(Owned<T> outgoing, Drop<T> drop, Class<?> concreteType) {
this.outgoing = outgoing;
this.drop = drop;
this.concreteType = concreteType;
}
@SuppressWarnings("unchecked")
@ -47,8 +49,22 @@ class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
}
private void gateReception() {
if (!RECEIVED.compareAndSet(this, false, true)) {
if ((boolean) RECEIVED.getAndSet(this, true)) {
throw new IllegalStateException("This object has already been received.");
}
}
@Override
public boolean isInstanceOf(Class<?> cls) {
return cls.isAssignableFrom(concreteType);
}
@Override
public void discard() {
if (!(boolean) RECEIVED.getAndSet(this, true)) {
var copy = outgoing.transferOwnership(drop);
drop.attach(copy);
copy.close();
}
}
}

View File

@ -100,7 +100,7 @@ class BufRefTest {
try (Allocator allocator = Allocator.heap();
BufRef refA = new BufRef(allocator.allocate(8).send())) {
refA.contents().writeInt(42);
var send = refA.send();
Send<BufRef> send = refA.send();
assertThrows(IllegalStateException.class, () -> refA.contents().readInt());
try (BufRef refB = send.receive()) {
assertThat(refB.contents().readInt()).isEqualTo(42);

View File

@ -887,10 +887,15 @@ public class BufTest {
try (Buf ignored = buf.acquire()) {
assertEquals(borrows + 1, buf.countBorrows());
try (Buf slice = buf.slice()) {
assertEquals(0, slice.capacity()); // We haven't written anything, so the slice is empty.
int sliceBorrows = slice.countBorrows();
assertEquals(borrows + 2, buf.countBorrows());
try (Buf ignored1 = allocator.compose(buf, slice)) {
assertEquals(borrows + 3, buf.countBorrows());
// Note: Slice is empty; not acquired by the composite buffer.
assertEquals(sliceBorrows, slice.countBorrows());
}
assertEquals(sliceBorrows, slice.countBorrows());
assertEquals(borrows + 2, buf.countBorrows());
}
assertEquals(borrows + 1, buf.countBorrows());
@ -1647,6 +1652,18 @@ public class BufTest {
}
}
@Test
public void compositeBufferFromSends() {
try (Allocator allocator = Allocator.heap();
Buf composite = allocator.compose(
allocator.allocate(8).send(),
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
assertEquals(24, composite.capacity());
assertTrue(composite.isOwned());
}
}
@Test
public void compositeBufferMustNotBeAllowedToContainThemselves() {
try (Allocator allocator = Allocator.heap()) {
@ -2649,6 +2666,7 @@ public class BufTest {
}
}
}
// todo read only buffer must have zero writable bytes
@ParameterizedTest
@MethodSource("nonCompositeAllocators")

View File

@ -18,6 +18,7 @@ package io.netty.buffer.api.examples;
import io.netty.buffer.api.Allocator;
import io.netty.buffer.api.Buf;
import static java.lang.System.out;
import static java.util.concurrent.CompletableFuture.completedFuture;
public final class AsyncExample {
@ -33,9 +34,9 @@ public final class AsyncExample {
}
}).thenAcceptAsync(send -> {
try (Buf buf = send.receive()) {
System.out.println("First thread id was " + buf.readLong());
System.out.println("Then sent to " + buf.readLong());
System.out.println("And now in thread " + threadId());
out.println("First thread id was " + buf.readLong());
out.println("Then sent to " + buf.readLong());
out.println("And now in thread " + threadId());
}
}).get();
}