Remove Deref

This abstraction was only used to allow composing over both buffers and sends of buffers, but we can also do that with method overloads.

The Deref had weird semantics and consequences that didn't make much sense.
In other words, it did not pay a return on its complexity cost.
This commit is contained in:
Chris Vest 2021-04-27 11:04:06 +02:00
parent ec0dbb6b5e
commit 86c929dd5a
6 changed files with 98 additions and 88 deletions

View File

@ -64,7 +64,7 @@ import java.nio.ByteOrder;
* To send a buffer to another thread, the buffer must not have any outstanding borrows.
* That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()};
* all {@linkplain #slice() slices} must have been closed.
* And if this buffer is a constituent of a {@linkplain Buffer#compose(BufferAllocator, Deref...) composite buffer},
* And if this buffer is a constituent of a {@linkplain Buffer#compose(BufferAllocator, Buffer...) composite buffer},
* then that composite buffer must be closed.
* And if this buffer is itself a composite buffer, then it must own all of its constituent buffers.
* The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not.
@ -180,19 +180,87 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* @throws IllegalArgumentException if the given buffers have an inconsistent
* {@linkplain Buffer#order() byte order}.
*/
@SafeVarargs
static Buffer compose(BufferAllocator allocator, Deref<Buffer>... bufs) {
static Buffer compose(BufferAllocator allocator, Buffer... bufs) {
return new CompositeBuffer(allocator, bufs);
}
/**
* Compose the given sequence of sends of buffers and present them as a single buffer.
* <p>
* <strong>Note:</strong> The composite buffer holds a reference to all the constituent buffers,
* until the composite buffer is deallocated.
* This means the constituent buffers must still have their outside-reference count decremented as normal.
* If the buffers are allocated for the purpose of participating in the composite buffer,
* then they should be closed as soon as the composite buffer has been created, like in this example:
* <pre>{@code
* try (Buffer a = allocator.allocate(size);
* Buffer b = allocator.allocate(size)) {
* return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>
* {@linkplain Buffer#send() Sending} a composite buffer implies sending all of its constituent buffers.
* For sending to be possible, both the composite buffer itself, and all of its constituent buffers, must be in an
* {@linkplain Rc#isOwned() owned state}.
* This means that the composite buffer must be the only reference to the constituent buffers.
* <p>
* All of the constituent buffers must have the same {@linkplain Buffer#order() byte order}.
* An exception will be thrown if you attempt to compose buffers that have different byte orders,
* and changing the byte order of the constituent buffers so they become inconsistent after construction,
* will result in unspecified behaviour.
* <p>
* The read and write offsets of the constituent buffers must be arranged such that there are no "gaps" when viewed
* as a single connected chunk of memory.
* Specifically, there can be at most one buffer whose write offset is neither zero nor at capacity,
* and all buffers prior to it must have their write offsets at capacity, and all buffers after it must have a write
* offset of zero.
* Likewise, there can be at most one buffer whose read offset is neither zero nor at capacity,
* and all buffers prior to it must have their read offsets at capacity, and all buffers after it must have a read
* offset of zero.
* Furthermore, the sum of the read offsets must be less than or equal to the sum of the write offsets.
* <p>
* Reads and writes to the composite buffer that modifies the read or write offsets, will also modify the relevant
* offsets in the constituent buffers.
* <p>
* It is not a requirement that the buffers have the same size.
* <p>
* It is not a requirement that the buffers are allocated by this allocator, but if
* {@link Buffer#ensureWritable(int)} is called on the composed buffer, and the composed buffer needs to be
* expanded, then this allocator instance will be used for allocation the extra memory.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @param sends The sent buffers to compose into a single buffer view.
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent
* {@linkplain Buffer#order() byte order}.
*/
@SafeVarargs
static Buffer compose(BufferAllocator allocator, Send<Buffer>... sends) {
return new CompositeBuffer(allocator, sends);
}
/**
* Create an empty composite buffer, that has no components. The buffer can be extended with components using either
* {@link #ensureWritable(int)} or {@link #extendComposite(Buffer, Buffer)}.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @return A composite buffer that has no components, and has a capacity of zero.
*/
static Buffer compose(BufferAllocator allocator) {
return new CompositeBuffer(allocator, new Buffer[0]);
}
/**
* Extend the given composite buffer with the given extension buffer.
* This works as if the extension had originally been included at the end of the list of constituent buffers when
* the composite buffer was created.
* The composite buffer is modified in-place.
*
* @see #compose(BufferAllocator, Deref...)
* @param composite The composite buffer (from a prior {@link #compose(BufferAllocator, Deref...)} call) to extend
* @see #compose(BufferAllocator, Buffer...)
* @see #compose(BufferAllocator, Send...)
* @param composite The composite buffer (from a prior {@link #compose(BufferAllocator, Buffer...)} call) to extend
* with the given extension buffer.
* @param extension The buffer to extend the composite buffer with.
*/
@ -207,9 +275,9 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
}
/**
* Check if the given buffer is a {@linkplain #compose(BufferAllocator, Deref...) composite} buffer or not.
* Check if the given buffer is a {@linkplain #compose(BufferAllocator, Buffer...) composite} buffer or not.
* @param composite The buffer to check.
* @return {@code true} if the given buffer was created with {@link #compose(BufferAllocator, Deref...)},
* @return {@code true} if the given buffer was created with {@link #compose(BufferAllocator, Buffer...)},
* {@code false} otherwise.
*/
static boolean isComposite(Buffer composite) {

View File

@ -58,11 +58,17 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
private boolean closed;
private boolean readOnly;
CompositeBuffer(BufferAllocator allocator, Deref<Buffer>[] refs) {
this(allocator, filterExternalBufs(refs), COMPOSITE_DROP, false);
CompositeBuffer(BufferAllocator allocator, Buffer[] refs) {
this(allocator, filterExternalBufs(Arrays.stream(refs)
.map(buf -> buf.acquire() /* Increments reference counts. */)), COMPOSITE_DROP, false);
}
private static Buffer[] filterExternalBufs(Deref<Buffer>[] refs) {
CompositeBuffer(BufferAllocator allocator, Send<Buffer>[] refs) {
this(allocator, filterExternalBufs(Arrays.stream(refs)
.map(buf -> buf.receive())), COMPOSITE_DROP, false);
}
private static Buffer[] filterExternalBufs(Stream<Buffer> refs) {
// We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway,
// and also, by ensuring that all constituent buffers contribute to the size of the composite buffer,
// we make sure that the number of composite buffers will never become greater than the number of bytes in
@ -70,11 +76,10 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
// will never overflow their component counts.
// Allocating a new array unconditionally also prevents external modification of the array.
Buffer[] bufs = Arrays.stream(refs)
.map(r -> r.get()) // Increments reference counts.
.filter(CompositeBuffer::discardEmpty)
.flatMap(CompositeBuffer::flattenBuffer)
.toArray(Buffer[]::new);
Buffer[] bufs = refs
.filter(CompositeBuffer::discardEmpty)
.flatMap(CompositeBuffer::flattenBuffer)
.toArray(Buffer[]::new);
// Make sure there are no duplicates among the buffers.
Set<Buffer> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));

View File

@ -1,52 +0,0 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import java.util.function.Supplier;
/**
* A Deref provides the capability to acquire a reference to a {@linkplain Rc reference counted} object.
* <p>
* <strong>Note:</strong> Callers must ensure that they close any references they obtain.
* <p>
* Deref itself does not specify if a reference can be obtained more than once.
* For instance, any {@link Send} object is also a {@code Deref}, but the reference can only be acquired once.
* Meanwhile, {@link Rc} objects are themselves their own {@code Derefs}, and permit references to be acquired multiple
* times.
*
* @param <T> The concrete type of reference counted object that can be obtained.
*/
public interface Deref<T extends Rc<T>> extends Supplier<T> {
/**
* Acquire a reference to the reference counted object.
* <p>
* <strong>Note:</strong> This call increments the reference count of the acquired object, and must be paired with
* a {@link Rc#close()} call.
* Using a try-with-resources clause is the easiest way to ensure this.
*
* @return A reference to the reference counted object.
*/
@Override
T get();
/**
* Determine if the object in this {@code Deref} is an instance of the given class.
*
* @param cls The type to check.
* @return {@code true} if the object in this {@code Deref} can be assigned fields or variables of the given type.
*/
boolean referentIsInstanceOf(Class<?> cls);
}

View File

@ -26,7 +26,7 @@ package io.netty.buffer.api;
*
* @param <I> The concrete subtype.
*/
public interface Rc<I extends Rc<I>> extends AutoCloseable, Deref<I> {
public interface Rc<I extends Rc<I>> extends AutoCloseable {
/**
* Increment the reference count.
* <p>
@ -36,16 +36,6 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable, Deref<I> {
*/
I acquire();
@Override
default I get() {
return acquire();
}
@Override
default boolean referentIsInstanceOf(Class<?> cls) {
return cls.isInstance(this);
}
/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
* <p>

View File

@ -32,7 +32,7 @@ import java.util.function.Supplier;
*
* @param <T>
*/
public interface Send<T extends Rc<T>> extends Deref<T> {
public interface Send<T extends Rc<T>> {
/**
* Construct a {@link Send} based on the given {@link Supplier}.
* The supplier will be called only once, in the receiving thread.
@ -117,8 +117,12 @@ public interface Send<T extends Rc<T>> extends Deref<T> {
}
}
@Override
default T get() {
return receive();
}
/**
* Determine if the object received from this {@code Send} is an instance of the given class.
*
* @param cls The type to check.
* @return {@code true} if the object received from this {@code Send} can be assigned fields or variables of the
* given type, otherwise false.
*/
boolean referentIsInstanceOf(Class<?> cls);
}

View File

@ -43,11 +43,6 @@ class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
return (I) copy;
}
Owned<T> unsafeUnwrapOwned() {
gateReception();
return outgoing;
}
private void gateReception() {
if ((boolean) RECEIVED.getAndSet(this, true)) {
throw new IllegalStateException("This object has already been received.");