netty-incubator-buffer-api/src/main/java/io/netty/buffer/api/Send.java
Chris Vest 492977d9be Introduce Deref abstraction
Motivation:
Sometimes, we wish to operate on both buffers and anything that can produce a buffer.
For instance, when making a composite buffer, we could compose either buffers or sends.

Modification:
Introduce a Deref interface, which is extended by both Rc and Send.
A Deref can be used to acquire an Rc instance, and in doing so will also acquire a reference to the Rc.
That is, dereferencing increases the reference count.
For Rc itself, this just delegates to Rc.acquire, while for Send it delegates to Send.receive, and can only be called once.
The Allocator.compose method has been changed to take Derefs.
This allows us to compose either Bufs or Sends of bufs.
Or a mix.
Extra care and caution has been added to the code, to make sure the reference counts are managed correctly when composing buffers, now that it's a more complicated operation.
A handful of convenience methods for working with Sends have also been added to the Send interface.

Result:
We can now build a composite buffer out of sends of buffers.
2021-02-11 14:26:57 +01:00

112 lines
4.3 KiB
Java

/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* A Send object is a temporary holder of an {@link Rc}, used for transferring the ownership of the Rc from one thread
* to another.
* <p>
* Prior to the Send being created, the originating Rc is invalidated, to prevent access while it is being sent. This
* means it cannot be accessed, closed, or disposed of, while it is in-flight. Once the Rc is {@linkplain #receive()
* received}, the new ownership is established.
* <p>
* Care must be taken to ensure that the Rc is always received by some thread. Failure to do so can result in a resource
* leak.
*
* @param <T>
*/
public interface Send<T extends Rc<T>> extends Deref<T> {
/**
* Construct a {@link Send} based on the given {@link Supplier}.
* The supplier will be called only once, in the receiving thread.
*
* @param concreteObjectType The concrete type of the object being sent. Specifically, the object returned from the
* {@link Supplier#get()} method must be an instance of this class.
* @param supplier The supplier of the object being sent, which will be called when the object is ready to be
* received.
* @param <T> The type of object being sent.
* @return A {@link Send} which will deliver an object of the given type, from the supplier.
*/
static <T extends Rc<T>> Send<T> sending(Class<T> concreteObjectType, Supplier<? extends T> supplier) {
return new Send<T>() {
private final AtomicBoolean gate = new AtomicBoolean();
@Override
public T receive() {
if (gate.getAndSet(true)) {
throw new IllegalStateException("This object has already been received.");
}
return supplier.get();
}
@Override
public boolean isInstanceOf(Class<?> cls) {
return cls.isAssignableFrom(concreteObjectType);
}
@Override
public void discard() {
if (!gate.getAndSet(true)) {
supplier.get().close();
}
}
};
}
/**
* Receive the {@link Rc} instance being sent, and bind its ownership to the calling thread. The invalidation of the
* sent Rc in the sending thread happens-before the return of this method.
* <p>
* This method can only be called once, and will throw otherwise.
*
* @return The sent Rc instance.
* @throws IllegalStateException If this method is called more than once.
*/
T receive();
/**
* Apply a mapping function to the object being sent. The mapping will occur when the object is received.
*
* @param type The result type of the mapping function.
* @param mapper The mapping function to apply to the object being sent.
* @param <R> The result type of the mapping function.
* @return A new {@link Send} instance that will deliver an object that is the result of the mapping.
*/
default <R extends Rc<R>> Send<R> map(Class<R> type, Function<T, ? extends R> mapper) {
return sending(type, () -> mapper.apply(receive()));
}
/**
* Discard this {@link Send} and the object it contains.
* This has no effect if the send has already been received.
*/
default void discard() {
try {
receive().close();
} catch (IllegalStateException ignore) {
// Don't do anything if the send has already been consumed.
}
}
@Override
default T get() {
return receive();
}
}