Remove acquire from the public API

This is a step toward effectively eliminating reference counting.
Reference counting is only needed when the memory in buffers can be shared.
If we remove all forms of sharing, then the buffers would be in an owned state at all times.
Then we would no longer need to worry about the state of the buffers before calling, e.g. `ensureWritable` and methods like that.

Just removing `acquire` is not enough; we also need to remove the `slice` method.
In this commit we are, however, starting with `acquire` because doing so requires rearranging the type hierarchy and the generics we have in play.
This was not an easy exercise, but for the purpose of record keeping, it's useful to have that work separate from the work of removing `slice`.
This commit is contained in:
Chris Vest 2021-05-26 17:13:29 +02:00
parent aaf8e294cc
commit f0ee2e1467
39 changed files with 467 additions and 692 deletions

View File

@ -34,7 +34,7 @@ We have been collaborating with the panama-foreign project, providing feedback t
Our new buffer API is being designed with a future in mind, where access to Unsafe and JNI, is no longer possible.
This is, however, not the implementation we are going to provide at first.
The APIs from panama-foreign are still not finished, and likely wont be in time for the release of JDK 17.
With this in mind, Netty 5 will likely baseline on Java 11 anyway.
With this in mind, Netty 5 will baseline on Java 11.
== Where we are going
@ -71,32 +71,15 @@ Hopefully youll be able to see these principles reflected in the new API.
In this section well outline the major changes, and most prominent points of interest in the new API.
=== An explicit concept of ownership
In the new API we make it explicit that a buffer can have an “owner”.
This is the case when the buffer isnt borrowed out anywhere else; the reference count is one.
In this state the buffer is said to be “owned”.
This is important because certain operations can only be called when the buffer is in an owned state, as they could otherwise violate our safety requirements.
All of these operations will call this out in their javadocs.
These operations either change the ownership of the buffer, or they change the internal memory allocation of the buffer, in some way.
For instance, `ensureWritable()` requires ownership because it may replace the internal memory allocation with a new larger one.
The `split()` method requires ownership because it changes how the buffer ownership itself is set up.
The `send()` method is used for transferring ownership between threads, which obviously requires ownership to begin with.
[source,java]
----
if (buf.isOwned()) {
buf.compact(); // compact() requires ownership because it moves data.
}
----
=== Reference counting
Buffers are now `AutoCloseable` and can be used in try-with-resources clauses.
Every allocation and acquire call on a buffer (any `Rc` object, really) must be paired with a `close()`, and every `receive()` call on a `Send` must also be paired with a `close()`.
Every method that increments the reference count will document this fact.
When the reference count is not incremented, then there is obviously no need to decrement it either.
Buffers can thus be passed through a pipeline without having to do much reference counting work.
Every allocation and acquire call on a buffer (any `Resource` object, really) must be paired with a `close()`, and every `receive()` call on a `Send` must also be paired with a `close()`.
While referene counting is a useful thing for tracking resource life-cycles internally, it is not itself exposed in the public API.
Instead, the public API effectively has a boolean open/closed state.
This simplifies the API a great deal; buffers are created, and in the end they are closed.
The code in between needs to be arranged such that it just avoids ever holding on to buffers that might be closed.
[source,java]
----
@ -105,15 +88,13 @@ try (Buffer buf = allocator.allocate(8)) {
} // buf is deallocated here.
----
The updates to the reference counts are not thread-safe, because the buffers themselves - their contents and their offsets - are not thread-safe.
The change of the open/closed state is not thread-safe, because the buffers themselves - their contents and their offsets - are not thread-safe.
This is a deviation from how ByteBuf works, where the updates are atomic, and the reference count checks on memory accesses are “optimistic” in that they permit data races to occur.
This codifies that buffers cannot be modified by more than one thread at a time, and that buffers should be shared via safe publication.
Using the `send()` mechanism helps with the thread-safe transfer of buffer ownership.
A buffers contents can still be access from multiple threads via the `get*` methods.
However, the buffer should be effectively read-only while it is exposed like that, as accesses would otherwise be racy.
See https://github.com/netty/netty-incubator-buffer-api/blob/main/src/main/java/io/netty/buffer/api/Rc.java
If these simple rules and patterns are followed strictly, then memory leaks should not occur.
=== Cleaner attached by default
@ -126,20 +107,13 @@ Note, however, that the buffers are still reference counted, because this has mo
Off-heap (or direct) buffers can give the GC an inaccurate picture of memory usage, which in turn can lead to abrupt bouts of poor performance when the system is under load.
The cleaner is a fall back that will likely also be used as part of leak detection.
=== Slices are always retained
=== Slices are gone
The existing ByteBuf API has both slice() and retainedSlice() methods, where the latter increments the reference count of the parent buffer, and the former does not.
In the new API the slice() method always increments the reference count of the buffer being sliced.
The slice itself has its own independent positions, and its own reference count.
[source,java]
----
try (Buffer slice = buf.slice()) {
// process slice of readable data.
}
----
There is currently no duplicate() methods on the API, because it is not clear if they are really needed, but if we were to add them, they would work in the same way as slice() does.
The existing ByteBuf API has a number of methods that allow multiple buffers to share access to the same memory.
It turns out that this capability is at the heart of why reference counting is a necessary part of the ByteBuf API.
By removing the various `slice()` and `duplicate()` methods, along with the `retain()`/`release()` family of methods, we also remove the ability for buffers to share memory.
This allows us to simplify the reference counting concept to a simple boolean open/closed state.
Buffers are created, and at the end of their life, they are closed, which releases their memory back to where it came from.
=== Buffer interface
@ -229,24 +203,14 @@ Thats why the method to compose buffers takes a `BufferAllocator` as a first
----
try (Buffer x = allocator.allocate(128);
Buffer y = allocator.allocate(128)) {
return CompositeBuffer.compose(allocator, x, y);
return CompositeBuffer.compose(allocator, x.send(), y.send());
}
----
The static `compose()` method will create a composite buffer, even when only given a single buffer, or no buffers.
The composite buffer acquires a reference on each of its constituent component buffers.
This means that, for instance, newly allocated buffers will not be owned by the composite buffer unless the reference outside of the composite buffer is closed.
In the above example, the reference counts for the buffers x and y are initially 1, then gets incremented to 2 by creating the composite buffer, and it drops back down to 1 at the end of the try-with-resources clause.
When the method returns, the composite buffer will be the only thing holding on to the two buffers, and it will thus have ownership over them.
A composite buffer can only be owned if all of its constituent buffers are owned.
Conversely, by acquiring a reference to each constituent component buffer, the composite buffer prevents them from being owned elsewhere.
This is important because buffers cannot change their size, or transfer their ownership elsewhere, unless they are already owned.
If the constituent component buffers of a composite buffer could change their size, they would be able to break the offset computations inside of the composite buffer, and break the illusion that the composite buffer is just like one large buffer.
A composite buffer can also be composed out of `Send<Buffer>` instances.
This ensures the composite buffer gets an exclusive reference to the sent components.
The composite buffer takes ownership of each of its constituent component buffers, via the `Send<Buffer>` arguments.
This guarantees that the composite cannot be brought into a state that is invalid, through direct manipulation of its components.
Although there is in principle is no need for integrating code to know whether a buffer is composite, it is still possible to query, in case it is helpful for some optimisations.
This is done with the `countComponents()`, `countReadableComponents()`, and `countWritableComponents()` family of methods.
@ -260,10 +224,6 @@ That is, you can pass composite buffers to the `CompositeBuffer.compose()` metho
However, the new composite buffer will end up with the flattened concatenation of all constituent components.
This means the number of indirections will not increase in the new buffer.
Because the new composite buffer increases the reference counts on all of its components, and because a composite buffer can only be owned when all of its constituent components are owned, the ownership model ends up working just fine with this flattening.
This also means that a composite buffer that is composed of other composite buffers, do not increase the reference counts of those other composite buffers only their components have their reference counts increased.
This wont make any difference in how the buffers behave, but it may cause some surprises to the few who are inspecting the `countBorrows()`.
=== Iterating components
The `forEachReadable()` and `forEachWritable()` methods iterate a buffers readable and writable areas, respectively.
@ -354,15 +314,15 @@ The MemorySegment APIs that are being developed in the OpenJDK project will use
The `get`/`set`/`read`/`writeBoolean` accessor methods are being removed with no replacement planned.
They have ambiguous meaning when working with buffers that are fundamentally byte-granular.
=== Splitting buffer ownership with split()
=== Splitting buffers with split()
The more explicit concept of ownership, and how ownership is now a requirement for calling some Buffer methods, may get in the way in some cases.
For instance, in Netty, the `ByteToMessageDecoder` collects data into a collecting buffer, from which data frames are sliced off and then sent off to be processed in parallel in other threads.
With the removal of the `slice()` family of methods, we are in need of an alternative way to process a buffer in parts.
For instance, in Netty, the `ByteToMessageDecoder` collects data into a collecting buffer, from which data frames are produced and then sent off to be processed further down a pipeline, potentially in parallel in other threads.
Since slices are now always retaining, they would effectively lock out all methods that require ownership.
Since slices would cause memory to be shared, they would effectively lock out all methods that require ownership.
This would be a problem for such a collecting buffer, since it needs to grow dynamically to accommodate the largest message or frame size.
To address this, the new API introduces a `Buffer.split()` (https://github.com/netty/netty-incubator-buffer-api/blob/main/src/main/java/io/netty/buffer/api/Buffer.java#L481) method.
To address this, the new API introduces a `Buffer.split()` (https://github.com/netty/netty-incubator-buffer-api/blob/main/src/main/java/io/netty/buffer/api/Buffer.java#L529) method.
This method splits the ownership of a buffer in two.
All the read and readable bytes are returned in a new, independent buffer, and the existing buffer gets truncated at the head by a corresponding amount.
The capacities and offsets of both buffers are adjusted such that they cannot access each others memory.

View File

@ -32,43 +32,17 @@ import java.nio.ByteOrder;
* <h3>Life cycle and reference counting</h3>
*
* The buffer has a life cycle, where it is allocated, used, and deallocated.
* The reference count controls this life cycle.
* <p>
* When the buffer is initially allocated, a pairing {@link #close()} call will deallocate it.
* In this state, the buffer {@linkplain #isOwned() is "owned"}.
* <p>
* The buffer can also be {@linkplain #acquire() acquired} when it's about to be involved in a complicated lifetime.
* The {@link #acquire()} call increments the reference count of the buffer,
* and a pairing {@link #close()} call will decrement the reference count.
* Each acquire lends out the buffer, and the buffer is said to be in a "borrowed" state.
* <p>
* Certain operations, such as {@link #send()}, are only available on owned buffers.
* If a buffer is {@linkplain #send() sent} elsewhere, the {@linkplain #close() close} method on the given instance
* will become a no-op.
* The buffer can be thought of as a view onto memory, and calling {@link #send()} on the buffer will effectively close
* that view, and recreate it upon reception at its destination.
*
* <h3>Thread-safety</h3>
*
* Buffers are not thread-safe.
* The reference counting implied by the {@link Rc} interface is itself not thread-safe,
* and buffers additionally contain other mutable data that is not thread-safe.
* Depending on the buffer implementation, the buffer may impose confinement restrictions as well,
* so that the buffer cannot even be read using absolute offsets,
* such as with the {@link #getByte(int)} method,
* from multiple threads.
* <p>
* If a buffer needs to be accessed by a different thread,
* then the ownership of that buffer must be sent to that thread.
* This can be done with the {@link #send()} method.
* The send method consumes the buffer, if it is in an owned state, and produces a {@link Send} object.
* The {@link Send} object can then be shared in a thread-safe way (so-called "safe publication"),
* with the intended recipient thread.
* <p>
* To send a buffer to another thread, the buffer must not have any outstanding borrows.
* That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()};
* all {@linkplain #slice() slices} must have been closed.
* And if this buffer is a constituent of a
* {@linkplain CompositeBuffer#compose(BufferAllocator, Buffer...) composite buffer}, then that composite buffer must
* be closed.
* And if this buffer is itself a composite buffer, then it must own all of its constituent buffers.
* The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not.
* The {@linkplain #isAccessible() accessibility state} implied by the {@link Resource} interface is itself not
* thread-safe, and buffers additionally contain other mutable data that is not thread-safe.
*
* <h3>Accessing data</h3>
*
@ -137,7 +111,7 @@ import java.nio.ByteOrder;
* The {@link BufferAllocator} has a {@link BufferAllocator#constBufferSupplier(byte[])} method that solves this, and
* prevents these bugs from occurring.
*/
public interface Buffer extends Rc<Buffer>, BufferAccessors {
public interface Buffer extends Resource<Buffer>, BufferAccessors {
/**
* Change the default byte order of this buffer, and return this buffer.
*
@ -412,7 +386,6 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
/**
* Ensure that this buffer has {@linkplain #writableBytes() available space for writing} the given number of
* bytes.
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* If this buffer already has the necessary space, then this method returns immediately.
* If this buffer does not already have the necessary space, then it will be expanded using the
* {@link BufferAllocator} the buffer was created with.
@ -420,8 +393,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* {@code false}.
*
* @param size The requested number of bytes of space that should be available for writing.
* @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state,
* or is {@linkplain #readOnly() read-only}.
* @throws IllegalStateException if this buffer is not in a bad state, or is {@linkplain #readOnly() read-only}.
*/
default void ensureWritable(int size) {
ensureWritable(size, 1, true);
@ -430,7 +402,6 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
/**
* Ensure that this buffer has {@linkplain #writableBytes() available space for writing} the given number of
* bytes.
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* If this buffer already has the necessary space, then this method returns immediately.
* If this buffer does not already have the necessary space, then space will be made available in one or all of
* the following available ways:
@ -463,8 +434,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* @param allowCompaction {@code true} if the method is allowed to modify the
* {@linkplain #readerOffset() reader offset} and
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
* @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state,
* or is {@linkplain #readOnly() read-only}.
* @throws IllegalStateException if this buffer is not in a bad state, or is {@linkplain #readOnly() read-only}.
*/
void ensureWritable(int size, int minimumGrowth, boolean allowCompaction);
@ -514,8 +484,6 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
/**
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
* <p>
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* <p>
* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be independently
* {@linkplain #send() sent} to other threads.
@ -565,8 +533,6 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
/**
* Split the buffer into two, at the given {@code splitOffset}.
* <p>
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* <p>
* The region of this buffer that precede the {@code splitOffset}, will be captured and returned in a new
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be independently
* {@linkplain #send() sent} to other threads.
@ -615,8 +581,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
/**
* Discards the read bytes, and moves the buffer contents to the beginning of the buffer.
*
* @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state,
* or is {@linkplain #readOnly() read-only}.
* @throws IllegalStateException if this buffer is not in a bad state, or is {@linkplain #readOnly() read-only}.
*/
void compact();
@ -675,7 +640,8 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* <p>
* The {@link ByteBuffer} instances obtained from the component, share lifetime with that internal component.
* This means they can be accessed as long as the internal memory store remain unchanged. Methods that may cause
* such changes, are any method that requires the buffer to be {@linkplain #isOwned() owned}.
* such changes are {@link #split(int)}, {@link #split()}, {@link #compact()}, {@link #ensureWritable(int)},
* {@link #ensureWritable(int, int, boolean)}, and {@link #send()}.
* <p>
* The best way to ensure this doesn't cause any trouble, is to use the buffers directly as part of the iteration,
* or immediately after the iteration while we are still in the scope of the method that triggered the iteration.
@ -717,7 +683,8 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
* <p>
* The {@link ByteBuffer} instances obtained from the component, share lifetime with that internal component.
* This means they can be accessed as long as the internal memory store remain unchanged. Methods that may cause
* such changes, are any method that requires the buffer to be {@linkplain #isOwned() owned}.
* such changes are {@link #split(int)}, {@link #split()}, {@link #compact()}, {@link #ensureWritable(int)},
* {@link #ensureWritable(int, int, boolean)}, and {@link #send()}.
* <p>
* The best way to ensure this doesn't cause any trouble, is to use the buffers directly as part of the iteration,
* or immediately after the iteration while we are still in the scope of the method that triggered the iteration.

View File

@ -74,8 +74,8 @@ public interface BufferAllocator extends AutoCloseable {
* byte contents. The buffer has the same capacity as the byte array length, and its write offset is placed at the
* end, and its read offset is at the beginning, such that the entire buffer contents are readable.
* <p>
* The buffers produced by the supplier will have {@linkplain Buffer#isOwned() ownership}, and closing them will
* make them {@linkplain Buffer#isAccessible() inaccessible}, just like a normally allocated buffer.
* The buffers produced by the supplier will each have their own independent life-cycle, and closing them will
* make them {@linkplain Buffer#isAccessible() inaccessible}, just like normally allocated buffers.
* <p>
* The buffers produced are "constants", in the sense that they are {@linkplain Buffer#readOnly() read-only}.
* <p>

View File

@ -15,6 +15,7 @@
*/
package io.netty.buffer.api;
import io.netty.buffer.api.internal.ResourceSupport;
import io.netty.buffer.api.internal.Statics;
import java.lang.invoke.VarHandle;
@ -31,12 +32,12 @@ import static java.lang.invoke.MethodHandles.lookup;
* as inspiration.
* <p>
* If you just want an object that is a reference to a buffer, then the {@link BufferRef} can be used for that purpose.
* If you have an advanced use case where you wish to implement {@link Rc}, and tightly control lifetimes, then
* {@link RcSupport} can be of help.
* If you have an advanced use case where you wish to implement {@link Resource}, and tightly control lifetimes, then
* {@link ResourceSupport} can be of help.
*
* @param <T> The concrete {@link BufferHolder} type.
*/
public abstract class BufferHolder<T extends BufferHolder<T>> implements Rc<T> {
public abstract class BufferHolder<T extends BufferHolder<T>> implements Resource<T> {
private static final VarHandle BUF = Statics.findVarHandle(lookup(), BufferHolder.class, "buf", Buffer.class);
private Buffer buf;
@ -48,7 +49,7 @@ public abstract class BufferHolder<T extends BufferHolder<T>> implements Rc<T> {
* @param buf The {@linkplain Buffer buffer} to be held by this holder.
*/
protected BufferHolder(Buffer buf) {
this.buf = Objects.requireNonNull(buf, "The buffer cannot be null.").acquire();
this.buf = Objects.requireNonNull(buf, "The buffer cannot be null.");
}
/**
@ -62,23 +63,11 @@ public abstract class BufferHolder<T extends BufferHolder<T>> implements Rc<T> {
buf = Objects.requireNonNull(send, "The send cannot be null.").receive();
}
@SuppressWarnings("unchecked")
@Override
public T acquire() {
buf.acquire();
return (T) this;
}
@Override
public void close() {
buf.close();
}
@Override
public boolean isOwned() {
return buf.isOwned();
}
@SuppressWarnings("unchecked")
@Override
public Send<T> send() {
@ -95,24 +84,6 @@ public abstract class BufferHolder<T extends BufferHolder<T>> implements Rc<T> {
*/
protected abstract T receive(Buffer buf);
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* This method is protected to permit advanced use cases of {@link BufferHolder} sub-class implementations.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and increases the reference count of the new buffer.
* <p>
* The buffer assignment is performed using a plain store.
*
* @param newBuf The new {@link Buffer} instance that is replacing the currently held buffer.
*/
protected final void replaceBuf(Buffer newBuf) {
try (var ignore = buf) {
buf = newBuf.acquire();
}
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
@ -125,28 +96,10 @@ public abstract class BufferHolder<T extends BufferHolder<T>> implements Rc<T> {
*
* @param send The new {@link Buffer} instance that is replacing the currently held buffer.
*/
protected final void replaceBuf(Send<Buffer> send) {
try (var ignore = buf) {
protected final void replaceBuffer(Send<Buffer> send) {
buf.close();
buf = send.receive();
}
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* This method is protected to permit advanced use cases of {@link BufferHolder} sub-class implementations.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and increases the reference count of the new buffer.
* <p>
* The buffer assignment is performed using a volatile store.
*
* @param newBuf The new {@link Buffer} instance that is replacing the currently held buffer.
*/
protected final void replaceBufVolatile(Buffer newBuf) {
var prev = (Buffer) BUF.getAndSet(this, newBuf.acquire());
prev.close();
}
/**
* Replace the underlying referenced buffer with the given buffer.
@ -160,7 +113,7 @@ public abstract class BufferHolder<T extends BufferHolder<T>> implements Rc<T> {
*
* @param send The {@link Send} with the new {@link Buffer} instance that is replacing the currently held buffer.
*/
protected final void replaceBufVolatile(Send<Buffer> send) {
protected final void replaceBufferVolatile(Send<Buffer> send) {
var prev = (Buffer) BUF.getAndSet(this, send.receive());
prev.close();
}
@ -172,7 +125,7 @@ public abstract class BufferHolder<T extends BufferHolder<T>> implements Rc<T> {
*
* @return The {@link Buffer} instance being held by this {@linkplain T buffer holder}.
*/
protected final Buffer getBuf() {
protected final Buffer getBuffer() {
return buf;
}
@ -183,7 +136,7 @@ public abstract class BufferHolder<T extends BufferHolder<T>> implements Rc<T> {
*
* @return The {@link Buffer} instance being held by this {@linkplain T buffer holder}.
*/
protected final Buffer getBufVolatile() {
protected final Buffer getBufferVolatile() {
return (Buffer) BUF.getVolatile(this);
}

View File

@ -27,7 +27,7 @@ public final class BufferRef extends BufferHolder<BufferRef> {
*
* @param buf The buffer to reference.
*/
public BufferRef(Buffer buf) {
private BufferRef(Buffer buf) {
super(buf);
// BufferRef is meant to be atomic, so we need to add a fence to get the semantics of a volatile store.
VarHandle.fullFence();
@ -49,20 +49,6 @@ public final class BufferRef extends BufferHolder<BufferRef> {
return new BufferRef(buf);
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and increases the reference count of the new buffer.
* <p>
* The buffer assignment is performed using a volatile store.
*
* @param newBuf The new {@link Buffer} instance that is replacing the currently held buffer.
*/
public void replace(Buffer newBuf) {
replaceBufVolatile(newBuf);
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
@ -74,7 +60,7 @@ public final class BufferRef extends BufferHolder<BufferRef> {
* @param send The {@link Send} with the new {@link Buffer} instance that is replacing the currently held buffer.
*/
public void replace(Send<Buffer> send) {
replaceBufVolatile(send);
replaceBufferVolatile(send);
}
/**
@ -83,6 +69,6 @@ public final class BufferRef extends BufferHolder<BufferRef> {
* @return The buffer held by the reference.
*/
public Buffer contents() {
return getBufVolatile();
return getBufferVolatile();
}
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.buffer.api;
import io.netty.buffer.api.internal.ResourceSupport;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
@ -32,11 +34,6 @@ import java.util.stream.Stream;
* A composite buffer is constructed using one of the {@code compose} methods:
* <ul>
* <li>
* {@link #compose(BufferAllocator, Buffer...)} creates a composite buffer from the given set of buffers.
* The reference count to the passed buffers will be increased, and the resulting composite buffer may or may
* not have ownership.
* </li>
* <li>
* {@link #compose(BufferAllocator, Send[])} creates a composite buffer from the buffers that are sent to it via
* the passed in send objects. Since {@link Send#receive()} transfers ownership, the resulting composite buffer
* will have ownership, because it is guaranteed that there are no other references to its constituent buffers.
@ -48,7 +45,7 @@ import java.util.stream.Stream;
* </li>
* </ul>
* Composite buffers can later be extended with internally allocated components, with {@link #ensureWritable(int)},
* or with externally allocated buffers, using {@link #extendWith(Buffer)}.
* or with externally allocated buffers, using {@link #extendWith(Send)}.
*
* <h3>Constituent buffer requirements</h3>
*
@ -82,12 +79,12 @@ import java.util.stream.Stream;
*
* <h3>Ownership and Send</h3>
*
* {@linkplain Buffer#send() Sending} a composite buffer implies sending all of its constituent buffers.
* For sending to be possible, both the composite buffer itself, and all of its constituent buffers, must be in an
* {@linkplain Rc#isOwned() owned state}.
* This means that the composite buffer must be the only reference to the constituent buffers.
* {@linkplain Resource#send() Sending} a composite buffer implies sending all of its constituent buffers.
* For sending to be possible, both the composite buffer itself, and all of its constituent buffers, must be in a
* state that permits them being sent. This should be the case by default, as it shouldn't be possible to create
* composite buffers that can't be sent.
*/
public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implements Buffer {
public final class CompositeBuffer extends ResourceSupport<Buffer, CompositeBuffer> implements Buffer {
/**
* The max array size is JVM implementation dependant, but most seem to settle on {@code Integer.MAX_VALUE - 8}.
* We set the max composite buffer capacity to the same, since it would otherwise be impossible to create a
@ -122,37 +119,6 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
private boolean closed;
private boolean readOnly;
/**
* Compose the given sequence of buffers and present them as a single buffer.
* <p>
* <strong>Note:</strong> The composite buffer increments the reference count on all the constituent buffers,
* and holds a reference to them until the composite buffer is deallocated.
* This means the constituent buffers must still have their outside-reference count decremented as normal.
* If the buffers are allocated for the purpose of participating in the composite buffer,
* then they should be closed as soon as the composite buffer has been created, like in this example:
* <pre>{@code
* try (Buffer a = allocator.allocate(size);
* Buffer b = allocator.allocate(size)) {
* return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>
* See the class documentation for more information on what is required of the given buffers for composition to be
* allowed.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @param bufs The buffers to compose into a single buffer view.
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent
* {@linkplain Buffer#order() byte order}.
*/
public static CompositeBuffer compose(BufferAllocator allocator, Buffer... bufs) {
Stream<Buffer> bufferStream = Arrays.stream(bufs)
.map(buf -> buf.acquire()); // Increments reference counts.
return new CompositeBuffer(allocator, filterExternalBufs(bufferStream), COMPOSITE_DROP, false);
}
/**
* Compose the given sequence of sends of buffers and present them as a single buffer.
* <p>
@ -201,25 +167,25 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
if (ise != null) {
throw ise;
}
return new CompositeBuffer(allocator, filterExternalBufs(Arrays.stream(bufs)), COMPOSITE_DROP, false);
return new CompositeBuffer(allocator, filterExternalBufs(Arrays.stream(bufs)), COMPOSITE_DROP);
}
/**
* Create an empty composite buffer, that has no components. The buffer can be extended with components using either
* {@link #ensureWritable(int)} or {@link #extendWith(Buffer)}.
* {@link #ensureWritable(int)} or {@link #extendWith(Send)}.
*
* @param allocator The allocator for the composite buffer. This allocator will be used e.g. to service
* {@link #ensureWritable(int)} calls.
* @return A composite buffer that has no components, and has a capacity of zero.
*/
public static CompositeBuffer compose(BufferAllocator allocator) {
return new CompositeBuffer(allocator, EMPTY_BUFFER_ARRAY, COMPOSITE_DROP, false);
return new CompositeBuffer(allocator, EMPTY_BUFFER_ARRAY, COMPOSITE_DROP);
}
/**
* Check if the given buffer is a {@linkplain #compose(BufferAllocator, Buffer...) composite} buffer or not.
* Check if the given buffer is a {@linkplain #compose(BufferAllocator, Send...) composite} buffer or not.
* @param composite The buffer to check.
* @return {@code true} if the given buffer was created with {@link #compose(BufferAllocator, Buffer...)},
* @return {@code true} if the given buffer was created with {@link #compose(BufferAllocator, Send...)},
* {@code false} otherwise.
*/
public static boolean isComposite(Buffer composite) {
@ -267,24 +233,14 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
// Extract components and move our reference count from the composite onto the components.
var composite = (CompositeBuffer) buf;
var bufs = composite.bufs;
for (Buffer b : bufs) {
b.acquire();
}
buf.close(); // Important: acquire on components *before* closing composite.
return Stream.of(bufs);
}
return Stream.of(buf);
}
private CompositeBuffer(BufferAllocator allocator, Buffer[] bufs, Drop<CompositeBuffer> drop,
boolean acquireBufs) {
private CompositeBuffer(BufferAllocator allocator, Buffer[] bufs, Drop<CompositeBuffer> drop) {
super(drop);
this.allocator = allocator;
if (acquireBufs) {
for (Buffer buf : bufs) {
buf.acquire();
}
}
try {
if (bufs.length > 0) {
ByteOrder targetOrder = bufs[0].order();
@ -490,7 +446,7 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
// This is important because 1) slice() already acquired the buffers, and 2) if this slice is empty
// then we need to keep holding on to it to prevent this originating composite buffer from getting
// ownership. If it did, its behaviour would be inconsistent with that of a non-composite buffer.
return new CompositeBuffer(allocator, slices, COMPOSITE_DROP, false);
return new CompositeBuffer(allocator, slices, COMPOSITE_DROP);
}
@Override
@ -838,33 +794,37 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
* the composite buffer was created.
* The extension buffer is added to the end of this composite buffer, which is modified in-place.
*
* @see #compose(BufferAllocator, Buffer...)
* @see #compose(BufferAllocator, Send...)
* @param extension The buffer to extend the composite buffer with.
*/
public void extendWith(Buffer extension) {
public void extendWith(Send<Buffer> extension) {
Objects.requireNonNull(extension, "Extension buffer cannot be null.");
Buffer buffer = extension.receive();
if (!isOwned()) {
buffer.close();
throw new IllegalStateException("This buffer cannot be extended because it is not in an owned state.");
}
if (bufs.length > 0 && extension.order() != order()) {
if (bufs.length > 0 && buffer.order() != order()) {
buffer.close();
throw new IllegalArgumentException(
"This buffer uses " + order() + " byte order, and cannot be extended with " +
"a buffer that uses " + extension.order() + " byte order.");
"a buffer that uses " + buffer.order() + " byte order.");
}
if (bufs.length > 0 && extension.readOnly() != readOnly()) {
if (bufs.length > 0 && buffer.readOnly() != readOnly()) {
buffer.close();
throw new IllegalArgumentException(
"This buffer is " + (readOnly? "read-only" : "writable") + ", " +
"and cannot be extended with a buffer that is " +
(extension.readOnly()? "read-only." : "writable."));
(buffer.readOnly()? "read-only." : "writable."));
}
long extensionCapacity = extension.capacity();
long extensionCapacity = buffer.capacity();
if (extensionCapacity == 0) {
// Extending by a zero-sized buffer makes no difference. Especially since it's not allowed to change the
// capacity of buffers that are constiuents of composite buffers.
// This also ensures that methods like countComponents, and forEachReadable, do not have to worry about
// overflow in their component counters.
buffer.close();
return;
}
@ -873,10 +833,10 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
Buffer[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
try {
if (extension instanceof CompositeBuffer) {
if (buffer instanceof CompositeBuffer) {
// If the extension is itself a composite buffer, then extend this one by all the constituent
// component buffers.
CompositeBuffer compositeExtension = (CompositeBuffer) extension;
CompositeBuffer compositeExtension = (CompositeBuffer) buffer;
Buffer[] addedBuffers = compositeExtension.bufs;
Set<Buffer> duplicatesCheck = Collections.newSetFromMap(new IdentityHashMap<>());
duplicatesCheck.addAll(Arrays.asList(bufs));
@ -884,24 +844,21 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
if (duplicatesCheck.size() < bufs.length + addedBuffers.length) {
throw extensionDuplicatesException();
}
for (Buffer addedBuffer : addedBuffers) {
addedBuffer.acquire();
}
int extendAtIndex = bufs.length;
bufs = Arrays.copyOf(bufs, extendAtIndex + addedBuffers.length);
System.arraycopy(addedBuffers, 0, bufs, extendAtIndex, addedBuffers.length);
computeBufferOffsets();
} else {
for (Buffer buf : restoreTemp) {
if (buf == extension) {
if (buf == buffer) {
throw extensionDuplicatesException();
}
}
unsafeExtendWith(extension.acquire());
unsafeExtendWith(buffer);
}
if (restoreTemp.length == 0) {
order = extension.order();
readOnly = extension.readOnly();
order = buffer.order();
readOnly = buffer.readOnly();
}
} catch (Exception e) {
bufs = restoreTemp;
@ -939,7 +896,7 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
checkSplit(splitOffset);
if (bufs.length == 0) {
// Splitting a zero-length buffer is trivial.
return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order);
return new CompositeBuffer(allocator, bufs, unsafeGetDrop()).order(order);
}
int i = searchOffsets(splitOffset);
@ -954,16 +911,9 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
}
private CompositeBuffer buildSplitBuffer(Buffer[] splits) {
try {
var compositeBuf = new CompositeBuffer(allocator, splits, unsafeGetDrop(), true);
var compositeBuf = new CompositeBuffer(allocator, splits, unsafeGetDrop());
compositeBuf.order = order; // Preserve byte order even if splits array is empty.
return compositeBuf;
} finally {
// Drop our references to the buffers in the splits array. They belong to the new composite buffer now.
for (Buffer split : splits) {
split.close();
}
}
}
/**
@ -980,7 +930,7 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
checkSplit(splitOffset);
if (bufs.length == 0) {
// Splitting a zero-length buffer is trivial.
return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order);
return new CompositeBuffer(allocator, bufs, unsafeGetDrop()).order(order);
}
int i = searchOffsets(splitOffset);
@ -1008,7 +958,7 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
checkSplit(splitOffset);
if (bufs.length == 0) {
// Splitting a zero-length buffer is trivial.
return new CompositeBuffer(allocator, bufs, unsafeGetDrop(), true).order(order);
return new CompositeBuffer(allocator, bufs, unsafeGetDrop()).order(order);
}
int i = searchOffsets(splitOffset);
@ -1415,7 +1365,7 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
for (int i = 0; i < sends.length; i++) {
received[i] = sends[i].receive();
}
var composite = new CompositeBuffer(allocator, received, drop, false);
var composite = new CompositeBuffer(allocator, received, drop);
composite.readOnly = readOnly;
drop.attach(composite);
return composite;
@ -1439,7 +1389,8 @@ public final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> im
private boolean allConstituentsAreOwned() {
boolean result = true;
for (Buffer buf : bufs) {
result &= buf.isOwned();
//noinspection unchecked
result &= ((ResourceSupport<Buffer, ?>) buf).isOwned();
}
return result;
}

View File

@ -16,24 +16,24 @@
package io.netty.buffer.api;
/**
* The Drop interface is used by {@link Rc} instances to implement their resource disposal mechanics. The {@link
* #drop(Object)} method will be called by the Rc when their last reference is closed.
* The Drop interface is used by {@link Resource} instances to implement their resource disposal mechanics.
* The {@link #drop(Object)} method will be called by the resource when they are closed.
*
* @param <T>
*/
@FunctionalInterface
public interface Drop<T> {
/**
* Dispose of the resources in the given Rc.
* Dispose of the resources in the given {@link Resource} instance.
*
* @param obj The Rc instance being dropped.
* @param obj The {@link Resource} instance being dropped.
*/
void drop(T obj);
/**
* Called when the resource changes owner.
*
* @param obj The new Rc instance with the new owner.
* @param obj The new {@link Resource} instance with the new owner.
*/
default void attach(T obj) {
}

View File

@ -16,23 +16,23 @@
package io.netty.buffer.api;
/**
* This interface encapsulates the ownership of an {@link Rc}, and exposes a method that may be used to transfer this
* ownership to the specified recipient thread.
* This interface encapsulates the ownership of a {@link Resource}, and exposes a method that may be used to transfer
* this ownership to the specified recipient thread.
*
* @param <T> The concrete type of {@link Rc} that is owned.
* @param <T> The concrete type of {@link Resource} that is owned.
*/
@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
public interface Owned<T> {
/**
* Transfer the ownership of the owned Rc, to the calling thread. The owned Rc is invalidated but without
* disposing of its internal state. Then a new Rc with the given owner is produced in its stead.
* Transfer the ownership of the resource, to the calling thread. The resource instance is invalidated but without
* disposing of its internal state. Then a new resource instance with the given owner is produced in its stead.
* <p>
* This method is called by {@link Send} implementations. These implementations will ensure that the transfer of
* ownership (the calling of this method) happens-before the new owner begins accessing the new object. This ensures
* that the new Rc is safely published to the new owners.
* that the new resource instanec is safely published to the new owners.
*
* @param drop The drop object that knows how to dispose of the state represented by this Rc.
* @return A new Rc instance that is exactly the same as this Rc, except it has the new owner.
* @param drop The drop object that knows how to dispose of the state represented by this {@link Resource}.
* @return A new resource instance that is exactly the same as this resource.
*/
T transferOwnership(Drop<T> drop);
}

View File

@ -1,80 +0,0 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
/**
* An Rc is a reference counted, thread-confined, resource of sorts. Because these resources are thread-confined, the
* reference counting is NOT atomic. An Rc can only be accessed by one thread at a time - the owner thread that the
* resource is confined to.
* <p>
* When the last reference is closed (accounted for using {@link AutoCloseable} and try-with-resources statements,
* ideally), then the resource is desposed of, or released, or returned to the pool it came from. The precise action is
* implemented by the {@link Drop} instance given as an argument to the Rc constructor.
*
* @param <I> The concrete subtype.
*/
public interface Rc<I extends Rc<I>> extends AutoCloseable {
/**
* Increment the reference count.
* <p>
* Note, this method is not thread-safe because reference counted objects are meant to thread-confined.
*
* @return This Rc instance.
*/
I acquire();
/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
* <p>
* Note, this method is not thread-safe because reference counted objects are meant to be thread-confined.
*
* @throws IllegalStateException If this Rc has already been closed.
*/
@Override
void close();
/**
* Send this reference counted object instance to another Thread, transferring the ownership to the recipient.
* <p>
* Note that the object must be {@linkplain #isOwned() owned}, and cannot have any outstanding borrows,
* when it's being sent.
* That is, all previous acquires must have been closed, and {@link #isOwned()} must return {@code true}.
* <p>
* This instance immediately becomes inaccessible, and all attempts at accessing this reference counted object
* will throw. Calling {@link #close()} will have no effect, so this method is safe to call within a
* try-with-resources statement.
*/
Send<I> send();
/**
* Check that this reference counted object is owned.
* <p>
* To be owned, the object must have no outstanding acquires, and no other implementation defined restrictions.
*
* @return {@code true} if this object can be {@linkplain #send() sent},
* or {@code false} if calling {@link #send()} would throw an exception.
*/
boolean isOwned();
/**
* Check if this object is accessible.
*
* @return {@code true} if this object is still valid and can be accessed,
* otherwise {@code false} if, for instance, this object has been dropped/deallocated,
* or been {@linkplain #send() sent} elsewhere.
*/
boolean isAccessible();
}

View File

@ -27,8 +27,8 @@ public interface ReadableComponentProcessor<E extends Exception> {
* {@link Buffer#forEachReadable(int, ReadableComponentProcessor) iteration}.
* <p>
* The component object itself is only valid during this call, but the {@link ByteBuffer byte buffers}, arrays, and
* native address pointers obtained from it, will be valid until any {@link Buffer#isOwned() ownership} requiring
* operation is performed on the buffer.
* native address pointers obtained from it, will be valid until any operation is performed on the buffer, which
* changes the internal memory.
*
* @param index The current index of the given buffer component, based on the initial index passed to the
* {@link Buffer#forEachReadable(int, ReadableComponentProcessor)} method.

View File

@ -0,0 +1,41 @@
package io.netty.buffer.api;
/**
* A resource that has a life-time, and can be {@linkplain #close() closed}.
* Resources are initially {@linkplain #isAccessible() accessible}, but closing them makes them inaccessible.
*/
public interface Resource<T extends Resource<T>> extends AutoCloseable {
/**
* Send this object instance to another Thread, transferring the ownership to the recipient.
* <p>
* The object must be in a state where it can be sent, which includes at least being
* {@linkplain #isAccessible() accessible}.
* <p>
* When sent, this instance will immediately become inaccessible, as if by {@linkplain #close() closing} it.
* All attempts at accessing an object that has been sent, even if that object has not yet been received, should
* cause an exception to be thrown.
* <p>
* Calling {@link #close()} on an object that has been sent will have no effect, so this method is safe to call
* within a try-with-resources statement.
*/
Send<T> send();
/**
* Close the resource, making it inaccessible.
* <p>
* Note, this method is not thread-safe unless otherwise specific.
*
* @throws IllegalStateException If this {@code Resource} has already been closed.
*/
@Override
void close();
/**
* Check if this object is accessible.
*
* @return {@code true} if this object is still valid and can be accessed,
* otherwise {@code false} if, for instance, this object has been dropped/deallocated,
* or been {@linkplain #send() sent} elsewhere.
*/
boolean isAccessible();
}

View File

@ -19,7 +19,7 @@ import java.util.ArrayDeque;
/**
* A scope is a convenient mechanism for capturing the life cycles of multiple reference counted objects. Once the scope
* is closed, all of the added objects will also be closed in reverse insert order. That is, the most recently added
* is closed, all the added objects will also be closed in reverse insert order. That is, the most recently added
* object will be closed first.
* <p>
* Scopes can be reused. After a scope has been closed, new objects can be added to it, and they will be closed when the
@ -31,18 +31,18 @@ import java.util.ArrayDeque;
* Note that scopes are not thread-safe. They are intended to be used from a single thread.
*/
public final class Scope implements AutoCloseable {
private final ArrayDeque<Rc<?>> deque = new ArrayDeque<>();
private final ArrayDeque<Resource<?>> deque = new ArrayDeque<>();
/**
* Add the given reference counted object to this scope, so that it will be {@linkplain Rc#close() closed} when this
* scope is {@linkplain #close() closed}.
* Add the given reference counted object to this scope, so that it will be {@linkplain Resource#close() closed}
* when this scope is {@linkplain #close() closed}.
*
* @param obj The reference counted object to add to this scope.
* @param <T> The type of the reference counted object.
* @return The same exact object that was added; further operations can be chained on the object after this method
* call.
*/
public <T extends Rc<T>> T add(T obj) {
public <T extends Resource<T>> T add(T obj) {
deque.addLast(obj);
return obj;
}
@ -52,7 +52,7 @@ public final class Scope implements AutoCloseable {
*/
@Override
public void close() {
Rc<?> obj;
Resource<?> obj;
while ((obj = deque.pollLast()) != null) {
obj.close();
}

View File

@ -20,19 +20,19 @@ import java.util.function.Function;
import java.util.function.Supplier;
/**
* A Send object is a temporary holder of an {@link Rc}, used for transferring the ownership of the Rc from one thread
* to another.
* A {@code Send} object is a temporary holder of a {@link Resource}, used for transferring the ownership of the
* resource from one thread to another.
* <p>
* Prior to the Send being created, the originating Rc is invalidated, to prevent access while it is being sent. This
* means it cannot be accessed, closed, or disposed of, while it is in-flight. Once the Rc is {@linkplain #receive()
* received}, the new ownership is established.
* Prior to the {@code Send} being created, the originating resource is invalidated, to prevent access while it is being
* sent. This means it cannot be accessed, closed, or disposed of, while it is in-flight. Once the resource is
* {@linkplain #receive() received}, the new ownership is established.
* <p>
* Care must be taken to ensure that the Rc is always received by some thread. Failure to do so can result in a resource
* leak.
* Care must be taken to ensure that the resource is always received by some thread.
* Failure to do so can result in a resource leak.
*
* @param <T>
*/
public interface Send<T extends Rc<T>> {
public interface Send<T extends Resource<T>> {
/**
* Construct a {@link Send} based on the given {@link Supplier}.
* The supplier will be called only once, in the receiving thread.
@ -44,9 +44,10 @@ public interface Send<T extends Rc<T>> {
* @param <T> The type of object being sent.
* @return A {@link Send} which will deliver an object of the given type, from the supplier.
*/
static <T extends Rc<T>> Send<T> sending(Class<T> concreteObjectType, Supplier<? extends T> supplier) {
static <T extends Resource<T>> Send<T> sending(Class<T> concreteObjectType, Supplier<? extends T> supplier) {
return new Send<T>() {
private final AtomicBoolean gate = new AtomicBoolean();
@Override
public T receive() {
if (gate.getAndSet(true)) {
@ -83,12 +84,12 @@ public interface Send<T extends Rc<T>> {
}
/**
* Receive the {@link Rc} instance being sent, and bind its ownership to the calling thread. The invalidation of the
* sent Rc in the sending thread happens-before the return of this method.
* Receive the {@link Resource} instance being sent, and bind its ownership to the calling thread.
* The invalidation of the sent resource in the sending thread happens-before the return of this method.
* <p>
* This method can only be called once, and will throw otherwise.
*
* @return The sent Rc instance.
* @return The sent resource instance.
* @throws IllegalStateException If this method is called more than once.
*/
T receive();
@ -101,19 +102,19 @@ public interface Send<T extends Rc<T>> {
* @param <R> The result type of the mapping function.
* @return A new {@link Send} instance that will deliver an object that is the result of the mapping.
*/
default <R extends Rc<R>> Send<R> map(Class<R> type, Function<T, R> mapper) {
default <R extends Resource<R>> Send<R> map(Class<R> type, Function<T, R> mapper) {
return sending(type, () -> mapper.apply(receive()));
}
/**
* Discard this {@link Send} and the object it contains.
* This has no effect if the send has already been received.
* This has no effect if the send object has already been received.
*/
default void discard() {
try {
receive().close();
} catch (IllegalStateException ignore) {
// Don't do anything if the send has already been consumed.
// Don't do anything if the "Send" has already been consumed.
}
}

View File

@ -25,7 +25,7 @@ import io.netty.buffer.Unpooled;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.internal.Statics;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.RcSupport;
import io.netty.buffer.api.internal.ResourceSupport;
import io.netty.util.ByteProcessor;
import io.netty.util.IllegalReferenceCountException;
@ -40,6 +40,8 @@ import java.nio.channels.ScatteringByteChannel;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.buffer.api.internal.Statics.asRS;
public final class ByteBufAdaptor extends ByteBuf {
private final ByteBufAllocatorAdaptor alloc;
private final Buffer buffer;
@ -80,7 +82,7 @@ public final class ByteBufAdaptor extends ByteBuf {
try {
buffer.ensureWritable(diff);
} catch (IllegalStateException e) {
if (!buffer.isOwned()) {
if (!asRS(buffer).isOwned()) {
throw new UnsupportedOperationException(e);
}
throw e;
@ -215,7 +217,7 @@ public final class ByteBufAdaptor extends ByteBuf {
checkAccess();
if (writableBytes() < minWritableBytes) {
try {
if (buffer.isOwned()) {
if (asRS(buffer).isOwned()) {
// Good place.
buffer.ensureWritable(minWritableBytes);
} else {
@ -1629,7 +1631,7 @@ public final class ByteBufAdaptor extends ByteBuf {
@Override
public ByteBuf retain(int increment) {
for (int i = 0; i < increment; i++) {
buffer.acquire();
asRS(buffer).acquire();
}
return this;
}
@ -1643,11 +1645,11 @@ public final class ByteBufAdaptor extends ByteBuf {
if (!buffer.isAccessible()) {
return -1;
}
if (buffer instanceof RcSupport) {
var rc = (RcSupport<?, ?>) buffer;
if (buffer instanceof ResourceSupport) {
var rc = (ResourceSupport<?, ?>) buffer;
return rc.countBorrows();
}
return buffer.isOwned()? 0 : 1;
return asRS(buffer).isOwned()? 0 : 1;
}
@Override

View File

@ -21,7 +21,7 @@ import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.ByteCursor;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned;
import io.netty.buffer.api.RcSupport;
import io.netty.buffer.api.internal.ResourceSupport;
import io.netty.buffer.api.ReadableComponent;
import io.netty.buffer.api.ReadableComponentProcessor;
import io.netty.buffer.api.WritableComponent;
@ -39,7 +39,7 @@ import static io.netty.buffer.api.internal.Statics.bbslice;
import static io.netty.buffer.api.internal.Statics.bufferIsClosed;
import static io.netty.buffer.api.internal.Statics.bufferIsReadOnly;
class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, ReadableComponent, WritableComponent {
class NioBuffer extends ResourceSupport<Buffer, NioBuffer> implements Buffer, ReadableComponent, WritableComponent {
private static final ByteBuffer CLOSED_BUFFER = ByteBuffer.allocate(0);
private final AllocatorControl control;

View File

@ -13,14 +13,18 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
package io.netty.buffer.api.internal;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned;
import io.netty.buffer.api.Resource;
import java.util.ArrayDeque;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;
abstract class LifecycleTracer {
public abstract class LifecycleTracer {
public static LifecycleTracer get() {
if (Trace.TRACE_LIFECYCLE_DEPTH == 0) {
return new NoOpTracer();
@ -30,36 +34,36 @@ abstract class LifecycleTracer {
return stackTracer;
}
abstract void acquire(int acquires);
public abstract void acquire(int acquires);
abstract void drop(int acquires);
public abstract void drop(int acquires);
abstract void close(int acquires);
public abstract void close(int acquires);
abstract <I extends Rc<I>, T extends RcSupport<I, T>> Owned<T> send(Owned<T> send, int acquires);
public abstract <I extends Resource<I>, T extends ResourceSupport<I, T>> Owned<T> send(Owned<T> send, int acquires);
abstract <E extends Throwable> E attachTrace(E throwable);
public abstract <E extends Throwable> E attachTrace(E throwable);
private static final class NoOpTracer extends LifecycleTracer {
@Override
void acquire(int acquires) {
public void acquire(int acquires) {
}
@Override
void drop(int acquires) {
public void drop(int acquires) {
}
@Override
void close(int acquires) {
public void close(int acquires) {
}
@Override
<I extends Rc<I>, T extends RcSupport<I, T>> Owned<T> send(Owned<T> send, int acquires) {
public <I extends Resource<I>, T extends ResourceSupport<I, T>> Owned<T> send(Owned<T> send, int acquires) {
return send;
}
@Override
<E extends Throwable> E attachTrace(E throwable) {
public <E extends Throwable> E attachTrace(E throwable) {
return throwable;
}
}
@ -76,7 +80,7 @@ abstract class LifecycleTracer {
private boolean dropped;
@Override
void acquire(int acquires) {
public void acquire(int acquires) {
Trace trace = WALKER.walk(new Trace("acquire", acquires));
addTrace(trace);
}
@ -91,20 +95,20 @@ abstract class LifecycleTracer {
}
@Override
void drop(int acquires) {
public void drop(int acquires) {
dropped = true;
addTrace(WALKER.walk(new Trace("drop", acquires)));
}
@Override
void close(int acquires) {
public void close(int acquires) {
if (!dropped) {
addTrace(WALKER.walk(new Trace("close", acquires)));
}
}
@Override
<I extends Rc<I>, T extends RcSupport<I, T>> Owned<T> send(Owned<T> send, int acquires) {
public <I extends Resource<I>, T extends ResourceSupport<I, T>> Owned<T> send(Owned<T> send, int acquires) {
Trace sendTrace = new Trace("send", acquires);
sendTrace.sent = true;
addTrace(WALKER.walk(sendTrace));
@ -118,7 +122,7 @@ abstract class LifecycleTracer {
}
@Override
<E extends Throwable> E attachTrace(E throwable) {
public <E extends Throwable> E attachTrace(E throwable) {
synchronized (traces) {
long timestamp = System.nanoTime();
for (Trace trace : traces) {

View File

@ -13,16 +13,27 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
package io.netty.buffer.api.internal;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned;
import io.netty.buffer.api.Resource;
import io.netty.buffer.api.Send;
import java.util.Objects;
public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> implements Rc<I> {
/**
* Internal support class for resources.
*
* @param <I> The public interface for the resource.
* @param <T> The concrete implementation of the resource.
*/
public abstract class ResourceSupport<I extends Resource<I>, T extends ResourceSupport<I, T>> implements Resource<I> {
private int acquires; // Closed if negative.
private Drop<T> drop;
private final LifecycleTracer tracer;
protected RcSupport(Drop<T> drop) {
protected ResourceSupport(Drop<T> drop) {
this.drop = drop;
tracer = LifecycleTracer.get();
}
@ -30,11 +41,10 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
/**
* Increment the reference count.
* <p>
* Note, this method is not thread-safe because Rc's are meant to thread-confined.
* Note, this method is not thread-safe because Resources are meant to thread-confined.
*
* @return This Rc instance.
* @return This {@link Resource} instance.
*/
@Override
public final I acquire() {
if (acquires < 0) {
throw attachTrace(new IllegalStateException("This resource is closed: " + this + '.'));
@ -50,9 +60,9 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
* <p>
* Note, this method is not thread-safe because Rc's are meant to be thread-confined.
* Note, this method is not thread-safe because Resources are meant to be thread-confined.
*
* @throws IllegalStateException If this Rc has already been closed.
* @throws IllegalStateException If this Resource has already been closed.
*/
@Override
public final void close() {
@ -68,11 +78,12 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
}
/**
* Send this Rc instance to another Thread, transferring the ownership to the recipient. This method can be used
* when the receiving thread is not known up front.
* Send this Resource instance to another Thread, transferring the ownership to the recipient.
* This method can be used when the receiving thread is not known up front.
* <p>
* This instance immediately becomes inaccessible, and all attempts at accessing this Rc will throw. Calling {@link
* #close()} will have no effect, so this method is safe to call within a try-with-resources statement.
* This instance immediately becomes inaccessible, and all attempts at accessing this resource will throw.
* Calling {@link #close()} will have no effect, so this method is safe to call within a try-with-resources
* statement.
*
* @throws IllegalStateException if this object has any outstanding acquires; that is, if this object has been
* {@link #acquire() acquired} more times than it has been {@link #close() closed}.
@ -92,8 +103,8 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
}
/**
* Create an {@link IllegalStateException} with a custom message, tailored to this particular {@link Rc} instance,
* for when the object cannot be sent for some reason.
* Create an {@link IllegalStateException} with a custom message, tailored to this particular
* {@link Resource} instance, for when the object cannot be sent for some reason.
* @return An {@link IllegalStateException} to be thrown when this object cannot be sent.
*/
protected IllegalStateException notSendableException() {
@ -101,7 +112,6 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
"Cannot send() a reference counted object with " + countBorrows() + " borrows: " + this + '.');
}
@Override
public boolean isOwned() {
return acquires == 0;
}
@ -124,11 +134,12 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
/**
* Prepare this instance for ownsership transfer. This method is called from {@link #send()} in the sending thread.
* This method should put this Rc in a deactivated state where it is no longer accessible from the currently owning
* thread. In this state, the Rc instance should only allow a call to {@link Owned#transferOwnership(Drop)} in
* the recipient thread.
* This method should put this resource in a deactivated state where it is no longer accessible from the currently
* owning thread.
* In this state, the resource instance should only allow a call to {@link Owned#transferOwnership(Drop)} in the
* recipient thread.
*
* @return This Rc instance in a deactivated state.
* @return This resource instance in a deactivated state.
*/
protected abstract Owned<T> prepareSend();

View File

@ -160,6 +160,14 @@ public interface Statics {
dest.position(destPos).put(bbslice(src, srcPos, length));
}
static <T extends ResourceSupport<Buffer, ?> & Buffer> T asRS(Buffer buf) {
if (!(buf instanceof ResourceSupport)) {
throw new IllegalArgumentException("Buffer instance is not an instance of ResourceSupport.");
}
//noinspection unchecked
return (T) buf;
}
static IllegalStateException bufferIsClosed() {
return new IllegalStateException("This buffer is closed.");
}

View File

@ -13,15 +13,18 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
package io.netty.buffer.api.internal;
import io.netty.buffer.api.internal.Statics;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned;
import io.netty.buffer.api.Resource;
import io.netty.buffer.api.Send;
import java.lang.invoke.VarHandle;
import static java.lang.invoke.MethodHandles.lookup;
class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
public class TransferSend<I extends Resource<I>, T extends ResourceSupport<I, T>> implements Send<I> {
private static final VarHandle RECEIVED = Statics.findVarHandle(lookup(), TransferSend.class, "received", boolean.class);
private final Owned<T> outgoing;
private final Drop<T> drop;
@ -29,7 +32,7 @@ class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
@SuppressWarnings("unused")
private volatile boolean received; // Accessed via VarHandle
TransferSend(Owned<T> outgoing, Drop<T> drop, Class<?> concreteType) {
public TransferSend(Owned<T> outgoing, Drop<T> drop, Class<?> concreteType) {
this.outgoing = outgoing;
this.drop = drop;
this.concreteType = concreteType;

View File

@ -21,7 +21,7 @@ import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.ByteCursor;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned;
import io.netty.buffer.api.RcSupport;
import io.netty.buffer.api.internal.ResourceSupport;
import io.netty.buffer.api.ReadableComponent;
import io.netty.buffer.api.ReadableComponentProcessor;
import io.netty.buffer.api.WritableComponent;
@ -39,7 +39,7 @@ import static io.netty.buffer.api.internal.Statics.bufferIsClosed;
import static io.netty.buffer.api.internal.Statics.bufferIsReadOnly;
import static io.netty.util.internal.PlatformDependent.BIG_ENDIAN_NATIVE_ORDER;
class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buffer, ReadableComponent,
class UnsafeBuffer extends ResourceSupport<Buffer, UnsafeBuffer> implements Buffer, ReadableComponent,
WritableComponent {
private static final int CLOSED_SIZE = -1;
private static final boolean ACCESS_UNALIGNED = PlatformDependent.isUnaligned();

View File

@ -27,15 +27,13 @@ module netty.incubator.buffer {
exports io.netty.buffer.api;
exports io.netty.buffer.api.adaptor;
exports io.netty.buffer.api.internal to
netty.incubator.buffer.memseg,
netty.incubator.buffer.tests;
uses MemoryManagers;
// Permit reflective access to non-public members.
// Also means we don't have to make all test methods etc. public for JUnit to access them.
opens io.netty.buffer.api;//todo remove
opens io.netty.buffer.api;
exports io.netty.buffer.api.internal;
opens io.netty.buffer.api.internal;//todo remove
provides MemoryManagers with
ByteBufferMemoryManagers,

View File

@ -31,7 +31,7 @@ import io.netty.buffer.api.WritableComponent;
import io.netty.buffer.api.WritableComponentProcessor;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned;
import io.netty.buffer.api.RcSupport;
import io.netty.buffer.api.internal.ResourceSupport;
import jdk.incubator.foreign.MemorySegment;
import jdk.incubator.foreign.ResourceScope;
@ -53,8 +53,8 @@ import static jdk.incubator.foreign.MemoryAccess.setIntAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset;
class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, ReadableComponent, WritableComponent,
BufferIntegratable {
class MemSegBuffer extends ResourceSupport<Buffer, MemSegBuffer> implements Buffer, ReadableComponent,
WritableComponent, BufferIntegratable {
private static final MemorySegment CLOSED_SEGMENT;
static final Drop<MemSegBuffer> SEGMENT_CLOSE;

View File

@ -118,7 +118,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return CompositeBuffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send());
}
});
}
@ -134,7 +134,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return CompositeBuffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send());
}
});
}
@ -150,7 +150,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return CompositeBuffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send());
}
});
}
@ -166,7 +166,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return CompositeBuffer.compose(a, bufFirst, bufSecond);
return CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send());
}
});
}
@ -183,7 +183,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).slice();
}
});
}
@ -200,7 +200,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).slice();
}
});
}
@ -217,7 +217,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).slice();
}
});
}
@ -234,7 +234,7 @@ public class BufferBulkAccessTest extends BufferTestSupport {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(CompositeBuffer.compose(a, bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(CompositeBuffer.compose(a, bufFirst.send(), bufSecond.send())).writerOffset(size).slice();
}
});
}

View File

@ -20,6 +20,7 @@ import io.netty.buffer.api.BufferAllocator;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static io.netty.buffer.api.internal.Statics.asRS;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -54,7 +55,7 @@ public class BufferCompactTest extends BufferTestSupport {
Buffer buf = allocator.allocate(8, BIG_ENDIAN)) {
buf.writeLong(0x0102030405060708L);
assertEquals((byte) 0x01, buf.readByte());
try (Buffer ignore = buf.acquire()) {
try (var ignore = asRS(buf).acquire()) {
assertThrows(IllegalStateException.class, () -> buf.compact());
assertEquals(1, buf.readerOffset());
}

View File

@ -73,8 +73,8 @@ public class BufferComponentIterationTest extends BufferTestSupport {
try (Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
Buffer c = allocator.allocate(8);
Buffer x = CompositeBuffer.compose(allocator, b, c)) {
buf = CompositeBuffer.compose(allocator, a, x);
Buffer x = CompositeBuffer.compose(allocator, b.send(), c.send())) {
buf = CompositeBuffer.compose(allocator, a.send(), x.send());
}
assertThat(buf.countComponents()).isEqualTo(3);
assertThat(buf.countReadableComponents()).isZero();
@ -126,7 +126,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
a.writeInt(1);
b.writeInt(2);
c.writeInt(3);
composite = CompositeBuffer.compose(allocator, a, b, c);
composite = CompositeBuffer.compose(allocator, a.send(), b.send(), c.send());
}
var list = new LinkedList<Integer>(List.of(1, 2, 3));
int count = composite.forEachReadable(0, (index, component) -> {
@ -163,7 +163,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
a.writeInt(1);
b.writeInt(2);
c.writeInt(3);
composite = CompositeBuffer.compose(allocator, a, b, c);
composite = CompositeBuffer.compose(allocator, a.send(), b.send(), c.send());
}
int readPos = composite.readerOffset();
int writePos = composite.writerOffset();
@ -201,7 +201,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
try (Buffer a = allocator.allocate(4);
Buffer b = allocator.allocate(4);
Buffer c = allocator.allocate(4)) {
buf = CompositeBuffer.compose(allocator, a, b, c);
buf = CompositeBuffer.compose(allocator, a.send(), b.send(), c.send());
}
int i = 1;
while (buf.writableBytes() > 0) {
@ -273,7 +273,7 @@ public class BufferComponentIterationTest extends BufferTestSupport {
try (Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
Buffer c = allocator.allocate(8)) {
buf = CompositeBuffer.compose(allocator, a, b, c);
buf = CompositeBuffer.compose(allocator, a.send(), b.send(), c.send());
}
buf.order(BIG_ENDIAN);
buf.forEachWritable(0, (index, component) -> {

View File

@ -18,12 +18,14 @@ package io.netty.buffer.api.tests;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.Send;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteOrder;
import static io.netty.buffer.api.internal.Statics.asRS;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static org.assertj.core.api.Assertions.assertThat;
@ -35,14 +37,14 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void compositeBufferCanOnlyBeOwnedWhenAllConstituentBuffersAreOwned() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer composite;
try (Buffer a = allocator.allocate(8)) {
var composite = asRS(CompositeBuffer.compose(allocator));
try (var a = asRS(allocator.allocate(8))) {
assertTrue(a.isOwned());
Buffer leakB;
try (Buffer b = allocator.allocate(8)) {
try (var b = asRS(allocator.allocate(8))) {
assertTrue(a.isOwned());
assertTrue(b.isOwned());
composite = CompositeBuffer.compose(allocator, a, b);
composite = asRS(CompositeBuffer.compose(allocator, a.send(), b.send()));
assertFalse(composite.isOwned());
assertFalse(a.isOwned());
assertFalse(b.isOwned());
@ -50,7 +52,7 @@ public class BufferCompositionTest extends BufferTestSupport {
}
assertFalse(composite.isOwned());
assertFalse(a.isOwned());
assertTrue(leakB.isOwned());
assertTrue(asRS(leakB).isOwned());
}
assertTrue(composite.isOwned());
}
@ -58,20 +60,16 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void compositeBuffersCannotHaveDuplicateComponents() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(4)) {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Send<Buffer> a = allocator.allocate(4).send();
var e = assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, a));
assertThat(e).hasMessageContaining("duplicate");
try (CompositeBuffer composite = CompositeBuffer.compose(allocator, a)) {
a.close();
try {
Send<Buffer> b = allocator.allocate(4).send();
try (CompositeBuffer composite = CompositeBuffer.compose(allocator, b)) {
e = assertThrows(IllegalArgumentException.class,
() -> composite.extendWith(a));
() -> composite.extendWith(b));
assertThat(e).hasMessageContaining("duplicate");
} finally {
a.acquire();
}
}
}
}
@ -84,7 +82,7 @@ public class BufferCompositionTest extends BufferTestSupport {
allocator.allocate(8).send(),
allocator.allocate(8).send())) {
assertEquals(24, composite.capacity());
assertTrue(composite.isOwned());
assertTrue(asRS(composite).isOwned());
}
}
@ -92,26 +90,26 @@ public class BufferCompositionTest extends BufferTestSupport {
public void compositeBufferMustNotBeAllowedToContainThemselves() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer a = allocator.allocate(4);
CompositeBuffer buf = CompositeBuffer.compose(allocator, a);
CompositeBuffer buf = CompositeBuffer.compose(allocator, a.send());
try (buf; a) {
a.close();
try {
assertThrows(IllegalArgumentException.class, () -> buf.extendWith(buf));
assertThrows(IllegalArgumentException.class, () -> buf.extendWith(buf.send()));
assertTrue(buf.isOwned());
try (Buffer composite = CompositeBuffer.compose(allocator, buf)) {
try (Buffer composite = CompositeBuffer.compose(allocator, buf.send())) {
// the composing increments the reference count of constituent buffers...
// counter-act this, so it can be extended:
a.close(); // buf is now owned, so it can be extended.
try {
assertThrows(IllegalArgumentException.class,
() -> buf.extendWith(composite));
() -> buf.extendWith(composite.send()));
} finally {
a.acquire(); // restore the reference count to align with our try-with-resources structure.
asRS(a).acquire(); // restore the reference count to align with our try-with-resources structure.
}
}
assertTrue(buf.isOwned());
} finally {
a.acquire();
asRS(a).acquire();
}
}
}
@ -123,7 +121,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer composite;
try (Buffer a = allocator.allocate(4, BIG_ENDIAN)) {
composite = CompositeBuffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a.send());
}
try (composite) {
composite.writeInt(0x01020304);
@ -137,19 +135,14 @@ public class BufferCompositionTest extends BufferTestSupport {
@ParameterizedTest
@MethodSource("allocators")
public void ensureWritableOnCompositeBuffersMustRespectExistingLittleEndianByteOrder(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer composite;
try (Buffer a = allocator.allocate(4, LITTLE_ENDIAN)) {
composite = CompositeBuffer.compose(allocator, a);
}
try (composite) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer composite = CompositeBuffer.compose(allocator, allocator.allocate(4, LITTLE_ENDIAN).send())) {
composite.writeInt(0x05060708);
composite.ensureWritable(4);
composite.writeInt(0x01020304);
assertEquals(0x0102030405060708L, composite.readLong());
}
}
}
@Test
public void emptyCompositeBufferMustUseNativeByteOrder() {
@ -164,7 +157,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8)) {
assertThrows(ClassCastException.class, () -> ((CompositeBuffer) a).extendWith(b));
assertThrows(ClassCastException.class, () -> ((CompositeBuffer) a).extendWith(b.send()));
}
}
@ -173,9 +166,9 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
CompositeBuffer composed = CompositeBuffer.compose(allocator, a)) {
CompositeBuffer composed = CompositeBuffer.compose(allocator, a.send())) {
try (Buffer ignore = composed.acquire()) {
var exc = assertThrows(IllegalStateException.class, () -> composed.extendWith(b));
var exc = assertThrows(IllegalStateException.class, () -> composed.extendWith(b.send()));
assertThat(exc).hasMessageContaining("owned");
}
}
@ -186,11 +179,11 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = CompositeBuffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a.send());
}
try (composite) {
var exc = assertThrows(IllegalArgumentException.class,
() -> composite.extendWith(composite));
() -> composite.extendWith(composite.send()));
assertThat(exc).hasMessageContaining("cannot be extended");
}
}
@ -200,19 +193,18 @@ public class BufferCompositionTest extends BufferTestSupport {
public void extendingWithZeroCapacityBufferHasNoEffect() {
try (BufferAllocator allocator = BufferAllocator.heap();
CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
composite.extendWith(composite);
composite.extendWith(CompositeBuffer.compose(allocator).send());
assertThat(composite.capacity()).isZero();
assertThat(composite.countComponents()).isZero();
}
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer a = allocator.allocate(1);
CompositeBuffer composite = CompositeBuffer.compose(allocator, a);
a.close();
CompositeBuffer composite = CompositeBuffer.compose(allocator, a.send());
assertTrue(composite.isOwned());
assertThat(composite.capacity()).isOne();
assertThat(composite.countComponents()).isOne();
try (Buffer b = CompositeBuffer.compose(allocator)) {
composite.extendWith(b);
composite.extendWith(b.send());
}
assertTrue(composite.isOwned());
assertThat(composite.capacity()).isOne();
@ -234,7 +226,7 @@ public class BufferCompositionTest extends BufferTestSupport {
CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
assertThat(composite.capacity()).isZero();
try (Buffer buf = allocator.allocate(8, BIG_ENDIAN)) {
composite.extendWith(buf);
composite.extendWith(buf.send());
}
assertThat(composite.capacity()).isEqualTo(8);
composite.writeLong(0x0102030405060708L);
@ -248,7 +240,7 @@ public class BufferCompositionTest extends BufferTestSupport {
CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
assertThat(composite.capacity()).isZero();
try (Buffer buf = allocator.allocate(8, LITTLE_ENDIAN)) {
composite.extendWith(buf);
composite.extendWith(buf.send());
}
assertThat(composite.capacity()).isEqualTo(8);
composite.writeLong(0x0102030405060708L);
@ -261,12 +253,12 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8, BIG_ENDIAN)) {
composite = CompositeBuffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a.send());
}
try (composite) {
try (Buffer b = allocator.allocate(8, LITTLE_ENDIAN)) {
var exc = assertThrows(IllegalArgumentException.class,
() -> composite.extendWith(b));
() -> composite.extendWith(b.send()));
assertThat(exc).hasMessageContaining("byte order");
}
}
@ -278,12 +270,12 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8, LITTLE_ENDIAN)) {
composite = CompositeBuffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a.send());
}
try (composite) {
try (Buffer b = allocator.allocate(8, BIG_ENDIAN)) {
var exc = assertThrows(IllegalArgumentException.class,
() -> composite.extendWith(b));
() -> composite.extendWith(b.send()));
assertThat(exc).hasMessageContaining("byte order");
}
}
@ -295,7 +287,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
try (CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
try (Buffer b = allocator.allocate(8, BIG_ENDIAN)) {
composite.extendWith(b);
composite.extendWith(b.send());
assertThat(composite.order()).isEqualTo(BIG_ENDIAN);
}
}
@ -307,7 +299,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
try (CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
try (Buffer b = allocator.allocate(8, LITTLE_ENDIAN)) {
composite.extendWith(b);
composite.extendWith(b.send());
assertThat(composite.order()).isEqualTo(LITTLE_ENDIAN);
}
}
@ -319,7 +311,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
try (CompositeBuffer composite = CompositeBuffer.compose(allocator)) {
try (Buffer b = allocator.allocate(8).makeReadOnly()) {
composite.extendWith(b);
composite.extendWith(b.send());
assertTrue(composite.readOnly());
}
}
@ -331,13 +323,13 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = CompositeBuffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a.send());
}
try (composite) {
composite.writeLong(0);
try (Buffer b = allocator.allocate(8)) {
b.writeInt(1);
composite.extendWith(b);
composite.extendWith(b.send());
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
}
@ -350,17 +342,19 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = CompositeBuffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a.send());
}
try (composite) {
composite.writeInt(0);
try (Buffer b = allocator.allocate(8)) {
b.writeInt(1);
var exc = assertThrows(IllegalArgumentException.class,
() -> composite.extendWith(b));
() -> composite.extendWith(b.send()));
assertThat(exc).hasMessageContaining("unwritten gap");
b.writerOffset(0);
composite.extendWith(b);
}
try (Buffer b = allocator.allocate(8)) {
b.setInt(0, 1);
composite.extendWith(b.send());
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(4);
}
@ -373,7 +367,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = CompositeBuffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a.send());
}
try (composite) {
composite.writeLong(0);
@ -381,7 +375,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (Buffer b = allocator.allocate(8)) {
b.writeInt(1);
b.readInt();
composite.extendWith(b);
composite.extendWith(b.send());
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
}
@ -394,7 +388,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = CompositeBuffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a.send());
}
try (composite) {
composite.writeLong(0);
@ -403,10 +397,10 @@ public class BufferCompositionTest extends BufferTestSupport {
b.writeInt(1);
b.readInt();
var exc = assertThrows(IllegalArgumentException.class,
() -> composite.extendWith(b));
() -> composite.extendWith(b.send()));
assertThat(exc).hasMessageContaining("unread gap");
b.readerOffset(0);
composite.extendWith(b);
composite.extendWith(b.send());
assertThat(composite.capacity()).isEqualTo(16);
assertThat(composite.writerOffset()).isEqualTo(12);
assertThat(composite.readerOffset()).isEqualTo(4);
@ -420,7 +414,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(4, BIG_ENDIAN);
Buffer b = allocator.allocate(4, LITTLE_ENDIAN)) {
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, b));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a.send(), b.send()));
}
}
@ -429,7 +423,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(4).makeReadOnly();
Buffer b = allocator.allocate(4).makeReadOnly();
Buffer composite = CompositeBuffer.compose(allocator, a, b)) {
Buffer composite = CompositeBuffer.compose(allocator, a.send(), b.send())) {
assertTrue(composite.readOnly());
verifyWriteInaccessible(composite);
}
@ -437,13 +431,27 @@ public class BufferCompositionTest extends BufferTestSupport {
@Test
public void composingReadOnlyAndWritableBuffersMustThrow() {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8).makeReadOnly();
try (BufferAllocator allocator = BufferAllocator.heap()) {
try (Buffer a = allocator.allocate(8).makeReadOnly();
Buffer b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, b));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, b, a));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a, b, a));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, b, a, b));
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, a.send(), b.send()));
}
try (Buffer a = allocator.allocate(8).makeReadOnly();
Buffer b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> CompositeBuffer.compose(allocator, b.send(), a.send()));
}
try (Buffer a = allocator.allocate(8).makeReadOnly();
Buffer b = allocator.allocate(8);
Buffer c = allocator.allocate(8).makeReadOnly()) {
assertThrows(IllegalArgumentException.class,
() -> CompositeBuffer.compose(allocator, a.send(), b.send(), c.send()));
}
try (Buffer a = allocator.allocate(8).makeReadOnly();
Buffer b = allocator.allocate(8);
Buffer c = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class,
() -> CompositeBuffer.compose(allocator, b.send(), a.send(), c.send()));
}
}
}
@ -452,10 +460,10 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8)) {
composite = CompositeBuffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a.send());
}
try (composite; Buffer b = allocator.allocate(8).makeReadOnly()) {
assertThrows(IllegalArgumentException.class, () -> composite.extendWith(b));
assertThrows(IllegalArgumentException.class, () -> composite.extendWith(b.send()));
}
}
}
@ -465,10 +473,10 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap()) {
CompositeBuffer composite;
try (Buffer a = allocator.allocate(8).makeReadOnly()) {
composite = CompositeBuffer.compose(allocator, a);
composite = CompositeBuffer.compose(allocator, a.send());
}
try (composite; Buffer b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> composite.extendWith(b));
assertThrows(IllegalArgumentException.class, () -> composite.extendWith(b.send()));
}
}
}
@ -478,7 +486,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
CompositeBuffer composite = CompositeBuffer.compose(allocator, a, b)) {
CompositeBuffer composite = CompositeBuffer.compose(allocator, a.send(), b.send())) {
assertThrows(IllegalStateException.class, () -> composite.splitComponentsFloor(0));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsFloor(4));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsFloor(7));
@ -494,7 +502,7 @@ public class BufferCompositionTest extends BufferTestSupport {
try (BufferAllocator allocator = BufferAllocator.heap();
Buffer a = allocator.allocate(8);
Buffer b = allocator.allocate(8);
CompositeBuffer composite = CompositeBuffer.compose(allocator, a, b)) {
CompositeBuffer composite = CompositeBuffer.compose(allocator, a.send(), b.send())) {
assertThrows(IllegalStateException.class, () -> composite.splitComponentsCeil(0));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsCeil(4));
assertThrows(IllegalStateException.class, () -> composite.splitComponentsCeil(7));

View File

@ -36,7 +36,7 @@ public class BufferEnsureWritableTest extends BufferTestSupport {
assertThrows(IllegalStateException.class, () -> slice.ensureWritable(1));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
}
try (Buffer compose = CompositeBuffer.compose(allocator, buf)) {
try (Buffer compose = CompositeBuffer.compose(allocator, buf.send())) {
assertThrows(IllegalStateException.class, () -> compose.ensureWritable(1));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
}

View File

@ -26,6 +26,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteOrder;
import java.util.function.Supplier;
import static io.netty.buffer.api.internal.Statics.asRS;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
@ -164,7 +165,7 @@ public class BufferReadOnlyTest extends BufferTestSupport {
assertThat(buf.readerOffset()).isZero();
assertThat(buf.capacity()).isEqualTo(4);
assertThat(buf.writerOffset()).isEqualTo(4);
assertTrue(buf.isOwned());
assertTrue(asRS(buf).isOwned());
assertTrue(buf.isAccessible());
assertThat(buf.countComponents()).isOne();
assertEquals((byte) 1, buf.readByte());
@ -190,14 +191,14 @@ public class BufferReadOnlyTest extends BufferTestSupport {
Buffer b = a.split(8)) {
assertTrue(a.readOnly());
assertTrue(b.readOnly());
assertTrue(a.isOwned());
assertTrue(b.isOwned());
assertTrue(asRS(a).isOwned());
assertTrue(asRS(b).isOwned());
assertThat(a.capacity()).isEqualTo(8);
assertThat(b.capacity()).isEqualTo(8);
try (Buffer c = b.slice()) {
assertTrue(c.readOnly());
assertFalse(c.isOwned());
assertFalse(b.isOwned());
assertFalse(asRS(c).isOwned());
assertFalse(asRS(b).isOwned());
assertThat(c.capacity()).isEqualTo(8);
}
}

View File

@ -21,6 +21,8 @@ import io.netty.buffer.api.BufferRef;
import io.netty.buffer.api.Send;
import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicReference;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
@ -30,7 +32,7 @@ class BufferRefTest {
try (BufferAllocator allocator = BufferAllocator.heap()) {
BufferRef ref;
try (Buffer b = allocator.allocate(8)) {
ref = new BufferRef(b);
ref = new BufferRef(b.send());
}
ref.contents().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
@ -54,20 +56,23 @@ class BufferRefTest {
@Test
public void mustCloseOwnedBufferWhenReplaced() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer orig;
AtomicReference<Buffer> orig = new AtomicReference<>();
BufferRef ref;
try (Buffer buf = allocator.allocate(8)) {
ref = new BufferRef(orig = buf);
}
Send<Buffer> s = allocator.allocate(8).send();
ref = new BufferRef(Send.sending(Buffer.class, () -> {
Buffer b = s.receive();
orig.set(b);
return b;
}));
orig.writeInt(42);
orig.get().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
try (Buffer buf = allocator.allocate(8)) {
ref.replace(buf); // Pass replacement directly.
ref.replace(buf.send()); // Pass replacement directly.
}
assertThrows(IllegalStateException.class, () -> orig.writeInt(32));
assertThrows(IllegalStateException.class, () -> orig.get().writeInt(32));
ref.contents().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
ref.close();
@ -78,20 +83,23 @@ class BufferRefTest {
@Test
public void mustCloseOwnedBufferWhenReplacedFromSend() {
try (BufferAllocator allocator = BufferAllocator.heap()) {
Buffer orig;
AtomicReference<Buffer> orig = new AtomicReference<>();
BufferRef ref;
try (Buffer buf = allocator.allocate(8)) {
ref = new BufferRef(orig = buf);
}
Send<Buffer> s = allocator.allocate(8).send();
ref = new BufferRef(Send.sending(Buffer.class, () -> {
Buffer b = s.receive();
orig.set(b);
return b;
}));
orig.writeInt(42);
orig.get().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
try (Buffer buf = allocator.allocate(8)) {
ref.replace(buf.send()); // Pass replacement via send().
}
assertThrows(IllegalStateException.class, () -> orig.writeInt(32));
assertThrows(IllegalStateException.class, () -> orig.get().writeInt(32));
ref.contents().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
ref.close();

View File

@ -22,10 +22,10 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteOrder;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import static io.netty.buffer.api.internal.Statics.asRS;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static org.assertj.core.api.Assertions.assertThat;
@ -41,7 +41,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
Buffer buf = allocator.allocate(8)) {
buf.writeByte((byte) 1);
buf.writeByte((byte) 2);
try (Buffer inner = buf.acquire()) {
try (Buffer inner = asRS(buf).acquire()) {
inner.writeByte((byte) 3);
inner.writeByte((byte) 4);
inner.writeByte((byte) 5);
@ -75,7 +75,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator()) {
var buf = allocator.allocate(8);
buf.close();
assertThrows(IllegalStateException.class, buf::acquire);
assertThrows(IllegalStateException.class, () -> asRS(buf).acquire());
}
}
@ -166,10 +166,10 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer ignored = buf.slice()) {
assertFalse(buf.isOwned());
assertFalse(asRS(buf).isOwned());
assertThrows(IllegalStateException.class, buf::send);
}
assertTrue(buf.isOwned());
assertTrue(asRS(buf).isOwned());
}
}
@ -179,10 +179,10 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer ignored = buf.slice(0, 8)) {
assertFalse(buf.isOwned());
assertFalse(asRS(buf).isOwned());
assertThrows(IllegalStateException.class, buf::send);
}
assertTrue(buf.isOwned());
assertTrue(asRS(buf).isOwned());
}
}
@ -226,11 +226,11 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer slice = buf.slice()) {
assertFalse(buf.isOwned());
assertFalse(asRS(buf).isOwned());
assertThrows(IllegalStateException.class, slice::send);
}
// Verify that the slice is closed properly afterwards.
assertTrue(buf.isOwned());
assertTrue(asRS(buf).isOwned());
buf.send().receive().close();
}
}
@ -241,11 +241,11 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer slice = buf.slice(0, 8)) {
assertFalse(buf.isOwned());
assertFalse(asRS(buf).isOwned());
assertThrows(IllegalStateException.class, slice::send);
}
// Verify that the slice is closed properly afterwards.
assertTrue(buf.isOwned());
assertTrue(asRS(buf).isOwned());
}
}
@ -256,7 +256,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
Buffer buf = allocator.allocate(8)) {
assertThrows(IndexOutOfBoundsException.class, () -> buf.slice(-1, 1));
// Verify that the slice is closed properly afterwards.
assertTrue(buf.isOwned());
assertTrue(asRS(buf).isOwned());
}
}
@ -268,7 +268,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
assertThrows(IllegalArgumentException.class, () -> buf.slice(0, -1));
assertThrows(IllegalArgumentException.class, () -> buf.slice(2, -1));
// Verify that the slice is closed properly afterwards.
assertTrue(buf.isOwned());
assertTrue(asRS(buf).isOwned());
}
}
@ -281,7 +281,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
buf.slice(0, 8).close(); // This is still fine.
assertThrows(IndexOutOfBoundsException.class, () -> buf.slice(1, 8));
// Verify that the slice is closed properly afterwards.
assertTrue(buf.isOwned());
assertTrue(asRS(buf).isOwned());
}
}
@ -292,7 +292,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
Buffer buf = allocator.allocate(8)) {
buf.slice(0, 0).close(); // This is fine.
// Verify that the slice is closed properly afterwards.
assertTrue(buf.isOwned());
assertTrue(asRS(buf).isOwned());
}
}
@ -302,13 +302,13 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
int borrows = countBorrows(buf);
try (Buffer ignored = buf.acquire()) {
try (Buffer ignored = asRS(buf).acquire()) {
assertEquals(borrows + 1, countBorrows(buf));
try (Buffer slice = buf.slice()) {
assertEquals(0, slice.capacity()); // We haven't written anything, so the slice is empty.
int sliceBorrows = countBorrows(slice);
assertEquals(borrows + 2, countBorrows(buf));
try (Buffer ignored1 = CompositeBuffer.compose(allocator, buf, slice)) {
try (Buffer ignored1 = CompositeBuffer.compose(allocator, buf.send(), slice.send())) {
assertEquals(borrows + 3, countBorrows(buf));
// Note: Slice is empty; not acquired by the composite buffer.
assertEquals(sliceBorrows, countBorrows(slice));
@ -329,13 +329,13 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
Buffer buf = allocator.allocate(8)) {
buf.writeByte((byte) 1);
int borrows = countBorrows(buf);
try (Buffer ignored = buf.acquire()) {
try (Buffer ignored = asRS(buf).acquire()) {
assertEquals(borrows + 1, countBorrows(buf));
try (Buffer slice = buf.slice()) {
assertEquals(1, slice.capacity());
int sliceBorrows = countBorrows(slice);
assertEquals(borrows + 2, countBorrows(buf));
try (Buffer ignored1 = CompositeBuffer.compose(allocator, buf, slice)) {
try (Buffer ignored1 = CompositeBuffer.compose(allocator, buf.send(), slice.send())) {
assertEquals(borrows + 3, countBorrows(buf));
assertEquals(sliceBorrows + 1, countBorrows(slice));
}
@ -345,7 +345,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
assertEquals(borrows + 1, countBorrows(buf));
}
assertEquals(borrows, countBorrows(buf));
assertTrue(buf.isOwned());
assertTrue(asRS(buf).isOwned());
}
}
@ -358,9 +358,9 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
try (Buffer slice = buf.slice()) {
buf.close();
assertFalse(buf.isAccessible());
assertTrue(slice.isOwned());
assertTrue(asRS(slice).isOwned());
try (Buffer receive = slice.send().receive()) {
assertTrue(receive.isOwned());
assertTrue(asRS(receive).isOwned());
assertFalse(slice.isAccessible());
}
}
@ -425,7 +425,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.writeInt(1);
try (Buffer acquired = buf.acquire()) {
try (Buffer acquired = asRS(buf).acquire()) {
var exc = assertThrows(IllegalStateException.class, () -> acquired.split());
assertThat(exc).hasMessageContaining("owned");
}
@ -437,7 +437,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
public void splitOnOffsetOfNonOwnedBufferMustThrow(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer acquired = buf.acquire()) {
try (Buffer acquired = asRS(buf).acquire()) {
var exc = assertThrows(IllegalStateException.class, () -> acquired.split(4));
assertThat(exc).hasMessageContaining("owned");
}
@ -580,7 +580,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
buf.writeLong(0x0102030405060708L);
try (Buffer slice = buf.slice()) {
buf.close();
assertTrue(slice.isOwned());
assertTrue(asRS(slice).isOwned());
try (Buffer split = slice.split(4)) {
split.reset().ensureWritable(Long.BYTES);
slice.reset().ensureWritable(Long.BYTES);
@ -710,7 +710,7 @@ public class BufferReferenceCountingTest extends BufferTestSupport {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.makeReadOnly();
try (Buffer acquire = buf.acquire()) {
try (Buffer acquire = asRS(buf).acquire()) {
assertTrue(acquire.readOnly());
}
}

View File

@ -27,6 +27,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import static io.netty.buffer.api.internal.Statics.asRS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -77,12 +78,12 @@ public class BufferSendTest extends BufferTestSupport {
void sendMustThrowWhenBufIsAcquired(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
try (Buffer ignored = buf.acquire()) {
assertFalse(buf.isOwned());
try (Buffer ignored = asRS(buf).acquire()) {
assertFalse(asRS(buf).isOwned());
assertThrows(IllegalStateException.class, buf::send);
}
// Now send() should work again.
assertTrue(buf.isOwned());
assertTrue(asRS(buf).isOwned());
buf.send().receive().close();
}
}

View File

@ -19,7 +19,7 @@ import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.BufferAllocator;
import io.netty.buffer.api.CompositeBuffer;
import io.netty.buffer.api.MemoryManagers;
import io.netty.buffer.api.RcSupport;
import io.netty.buffer.api.internal.ResourceSupport;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@ -43,6 +43,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;
import static io.netty.buffer.api.internal.Statics.asRS;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static org.assertj.core.api.Assertions.assertThat;
@ -176,7 +177,7 @@ public abstract class BufferTestSupport {
int half = size / 2;
try (Buffer firstHalf = a.allocate(half);
Buffer secondHalf = b.allocate(size - half)) {
return CompositeBuffer.compose(a, firstHalf, secondHalf);
return CompositeBuffer.compose(a, firstHalf.send(), secondHalf.send());
}
}
@ -207,7 +208,7 @@ public abstract class BufferTestSupport {
try (Buffer a = alloc.allocate(part);
Buffer b = alloc.allocate(part);
Buffer c = alloc.allocate(size - part * 2)) {
return CompositeBuffer.compose(alloc, a, b, c);
return CompositeBuffer.compose(alloc, a.send(), b.send(), c.send());
}
}
@ -384,13 +385,13 @@ public abstract class BufferTestSupport {
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, new byte[1], 0, 1));
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, ByteBuffer.allocate(1), 0, 1));
if (CompositeBuffer.isComposite(buf)) {
assertThrows(IllegalStateException.class, () -> ((CompositeBuffer) buf).extendWith(target));
assertThrows(IllegalStateException.class, () -> ((CompositeBuffer) buf).extendWith(target.send()));
}
}
assertThrows(IllegalStateException.class, () -> buf.split());
assertThrows(IllegalStateException.class, () -> buf.send());
assertThrows(IllegalStateException.class, () -> buf.acquire());
assertThrows(IllegalStateException.class, () -> asRS(buf).send());
assertThrows(IllegalStateException.class, () -> asRS(buf).acquire());
assertThrows(IllegalStateException.class, () -> buf.slice());
assertThrows(IllegalStateException.class, () -> buf.openCursor());
assertThrows(IllegalStateException.class, () -> buf.openCursor(0, 0));
@ -914,7 +915,7 @@ public abstract class BufferTestSupport {
buf.setDouble(0, 3.2);
assertThat(buf.getDouble(0)).isEqualTo(3.2);
if (buf.isOwned()) {
if (asRS(buf).isOwned()) {
buf.ensureWritable(1);
}
buf.fill((byte) 0);
@ -999,7 +1000,7 @@ public abstract class BufferTestSupport {
}
public static int countBorrows(Buffer buf) {
return ((RcSupport<?, ?>) buf).countBorrows();
return ((ResourceSupport<?, ?>) buf).countBorrows();
}
public static void assertEquals(Buffer expected, Buffer actual) {

View File

@ -17,7 +17,7 @@ package io.netty.buffer.api.tests;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned;
import io.netty.buffer.api.RcSupport;
import io.netty.buffer.api.internal.ResourceSupport;
import io.netty.buffer.api.Scope;
import org.junit.jupiter.api.Test;
@ -29,12 +29,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ScopeTest {
@Test
void scopeMustCloseContainedRcsInReverseInsertOrder() {
void scopeMustCloseContainedRecouresInReverseInsertOrder() {
ArrayList<Integer> closeOrder = new ArrayList<>();
try (Scope scope = new Scope()) {
scope.add(new SomeRc(new OrderingDrop(1, closeOrder)));
scope.add(new SomeRc(new OrderingDrop(2, closeOrder)));
scope.add(new SomeRc(new OrderingDrop(3, closeOrder)));
scope.add(new SomeResource(new OrderingDrop(1, closeOrder)));
scope.add(new SomeResource(new OrderingDrop(2, closeOrder)));
scope.add(new SomeResource(new OrderingDrop(3, closeOrder)));
}
var itr = closeOrder.iterator();
assertTrue(itr.hasNext());
@ -46,18 +46,18 @@ public class ScopeTest {
assertFalse(itr.hasNext());
}
private static final class SomeRc extends RcSupport<SomeRc, SomeRc> {
SomeRc(Drop<SomeRc> drop) {
private static final class SomeResource extends ResourceSupport<SomeResource, SomeResource> {
SomeResource(Drop<SomeResource> drop) {
super(drop);
}
@Override
protected Owned<SomeRc> prepareSend() {
protected Owned<SomeResource> prepareSend() {
return null;
}
}
private static final class OrderingDrop implements Drop<SomeRc> {
private static final class OrderingDrop implements Drop<SomeResource> {
private final int order;
private final ArrayList<Integer> list;
@ -67,7 +67,7 @@ public class ScopeTest {
}
@Override
public void drop(SomeRc obj) {
public void drop(SomeResource obj) {
list.add(order);
}
}

View File

@ -63,14 +63,14 @@ public class ByteIterationBenchmark {
allocator = BufferAllocator.heap();
try (var a = allocator.allocate(SIZE / 2);
var b = allocator.allocate(SIZE / 2)) {
buf = CompositeBuffer.compose(allocator, a, b);
buf = CompositeBuffer.compose(allocator, a.send(), b.send());
}
break;
case "composite-direct":
allocator = BufferAllocator.direct();
try (var a = allocator.allocate(SIZE / 2);
var b = allocator.allocate(SIZE / 2)) {
buf = CompositeBuffer.compose(allocator, a, b);
buf = CompositeBuffer.compose(allocator, a.send(), b.send());
}
break;
default:

View File

@ -44,12 +44,10 @@ public final class ComposingAndSlicingExample {
}
private static Buffer createBigBuffer(BufferAllocator allocator) {
try (Scope scope = new Scope()) {
return CompositeBuffer.compose(allocator,
scope.add(allocator.allocate(64)),
scope.add(allocator.allocate(64)),
scope.add(allocator.allocate(64)),
scope.add(allocator.allocate(64)));
}
allocator.allocate(64).send(),
allocator.allocate(64).send(),
allocator.allocate(64).send(),
allocator.allocate(64).send());
}
}

View File

@ -77,45 +77,6 @@ public class SendExample {
executor.shutdown();
}
private static Future<?> beginTask(
ExecutorService executor, BufferAllocator allocator) {
try (Buffer buf = allocator.allocate(32)) {
// !!! pit-fall: Rc decrement in other thread.
return executor.submit(new Task(buf.acquire()));
}
}
private static class Task implements Runnable {
private final Buffer buf;
Task(Buffer buf) {
this.buf = buf;
}
@Override
public void run() {
try (buf) {
// !!! danger: access out-side owning thread.
while (buf.writableBytes() > 0) {
buf.writeByte((byte) 42);
}
}
}
}
}
static final class Ex3 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newSingleThreadExecutor();
BufferAllocator allocator = BufferAllocator.heap();
var future = beginTask(executor, allocator);
future.get();
allocator.close();
executor.shutdown();
}
private static Future<?> beginTask(
ExecutorService executor, BufferAllocator allocator) {
try (Buffer buf = allocator.allocate(32)) {
@ -141,7 +102,7 @@ public class SendExample {
}
}
static final class Ex4 {
static final class Ex3 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newFixedThreadPool(4);
BufferAllocator allocator = BufferAllocator.heap();
@ -180,7 +141,7 @@ public class SendExample {
}
}
static final class Ex5 {
static final class Ex4 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newFixedThreadPool(4);
BufferAllocator allocator = BufferAllocator.heap();
@ -220,7 +181,7 @@ public class SendExample {
}
}
static final class Ex6 {
static final class Ex5 {
public static void main(String[] args) throws Exception {
ExecutorService executor = newFixedThreadPool(4);
BufferAllocator allocator = BufferAllocator.heap();

View File

@ -83,25 +83,15 @@ public abstract class AlternativeMessageDecoder extends ChannelHandlerAdapter {
}
private void processRead(ChannelHandlerContext ctx, Buffer input) {
if (collector.isOwned() && CompositeBuffer.isComposite(collector) && input.isOwned()
if (CompositeBuffer.isComposite(collector)
&& (collector.writableBytes() == 0 || input.writerOffset() == 0)
&& (collector.readableBytes() == 0 || input.readerOffset() == 0)
&& collector.order() == input.order()) {
((CompositeBuffer) collector).extendWith(input);
((CompositeBuffer) collector).extendWith(input.send());
drainCollector(ctx);
return;
}
if (collector.isOwned()) {
collector.ensureWritable(input.readableBytes(), DEFAULT_CHUNK_SIZE, true);
} else {
int requiredCapacity = input.readableBytes() + collector.readableBytes();
int allocationSize = Math.max(requiredCapacity, DEFAULT_CHUNK_SIZE);
try (Buffer newBuffer = allocator.allocate(allocationSize, input.order())) {
newBuffer.writeBytes(collector);
collector.close();
collector = newBuffer.acquire();
}
}
collector.writeBytes(input);
drainCollector(ctx);
}

View File

@ -104,7 +104,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
// for whatever release (for example because of OutOfMemoryError)
try (in) {
final int required = in.readableBytes();
if (required > cumulation.writableBytes() || !cumulation.isOwned() || cumulation.readOnly()) {
if (required > cumulation.writableBytes() || cumulation.readOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
@ -129,7 +129,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
}
Buffer composite;
try (in) {
if (CompositeBuffer.isComposite(cumulation) && cumulation.isOwned()) {
if (CompositeBuffer.isComposite(cumulation)) {
composite = cumulation;
if (composite.writerOffset() != composite.capacity()) {
// Writer index must equal capacity if we are going to "write"
@ -138,9 +138,9 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
cumulation.close();
}
} else {
composite = CompositeBuffer.compose(alloc, cumulation);
composite = CompositeBuffer.compose(alloc, cumulation.send());
}
((CompositeBuffer) composite).extendWith(in);
((CompositeBuffer) composite).extendWith(in.send());
return composite;
}
};
@ -313,7 +313,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
}
protected final void discardSomeReadBytes() {
if (cumulation != null && !first && cumulation.isOwned()) {
if (cumulation != null && !first) {
// discard some bytes if possible to make more room in the
// buffer but only if the refCnt == 1 as otherwise the user may have
// used slice().retain() or duplicate().retain().

View File

@ -30,6 +30,7 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static io.netty.buffer.api.internal.Statics.asRS;
import static io.netty.buffer.api.tests.BufferTestSupport.assertEquals;
import static io.netty.buffer.api.CompositeBuffer.compose;
import static java.nio.ByteOrder.BIG_ENDIAN;
@ -131,7 +132,7 @@ public class ByteToMessageDecoderTest {
@Override
protected void decode(ChannelHandlerContext ctx, Buffer in) {
Buffer buf = internalBuffer();
assertTrue(buf.isOwned());
buf.ensureWritable(8, 8, false); // Verify we have full access to the buffer.
in.readByte();
// Removal from pipeline should clear internal buffer
ctx.pipeline().remove(this);
@ -352,7 +353,7 @@ public class ByteToMessageDecoderTest {
assertThat(thrown).hasMessage("boom");
assertFalse(in.isAccessible());
assertTrue(oldCumulation.isOwned());
oldCumulation.ensureWritable(8, 8, false); // Will throw if we don't have full access to the buffer.
oldCumulation.close();
}
@ -407,13 +408,13 @@ public class ByteToMessageDecoderTest {
if (shouldFail) {
assertThat(thrown).hasMessage("boom");
assertTrue(oldCumulation.isOwned());
oldCumulation.ensureWritable(8, 8, false); // Will throw if we don't have full access to the buffer.
oldCumulation.close();
assertFalse(newCumulation.isAccessible());
} else {
assertNull(thrown);
assertFalse(oldCumulation.isAccessible());
assertTrue(newCumulation.isOwned());
newCumulation.ensureWritable(8, 8, false); // Will throw if we don't have full access to the buffer.
newCumulation.close();
}
}