Add Buf.ensureWritable

Motivation:
Having buffers that are able to expand to accommodate more data on demand is a great convenience.

Modification:
Composite and MemSeg buffers are now able to mutate their backing storage, to increase their capacity.
This required some tricky integration with allocators via AllocatorControl.
Basically, it's now possible to allocate memory that is NOT bound by any life time, so that it can be attached to the life time that already exists for the buffer being expanded.

Result:
Buffers can now be expanded via Buf.ensureWritable.
This commit is contained in:
Chris Vest 2020-11-16 18:00:32 +01:00
parent ec9395d36e
commit 9c54aa43b4
13 changed files with 437 additions and 140 deletions

View File

@ -59,6 +59,58 @@ public interface Allocator extends AutoCloseable {
return allocate(size).order(order);
}
/**
* Compose the given sequence of buffers and present them as a single buffer.
* <p>
* <strong>Note:</strong> The composite buffer increments the reference count on all the constituent buffers,
* and holds a reference to them until the composite buffer is deallocated.
* This means the constituent buffers must still have their outside-reference count decremented as normal.
* If the buffers are allocated for the purpose of participating in the composite buffer,
* then they should be closed as soon as the composite buffer has been created, like in this example:
* <pre>{@code
* try (Buf a = allocator.allocate(size);
* Buf b = allocator.allocate(size)) {
* return Buf.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>
* {@linkplain Buf#send() Sending} a composite buffer implies sending all of its constituent buffers.
* For sending to be possible, both the composite buffer itself, and all of its constituent buffers, must be in an
* {@linkplain Rc#isOwned() owned state}.
* This means that the composite buffer must be the only reference to the constituent buffers.
* <p>
* All of the constituent buffers must have the same {@linkplain Buf#order() byte order}.
* An exception will be thrown if you attempt to compose buffers that have different byte orders,
* and changing the byte order of the constituent buffers so they become inconsistent after construction,
* will result in unspecified behaviour.
* <p>
* The read and write offsets of the constituent buffers must be arranged such that there are no "gaps" when viewed
* as a single connected chunk of memory.
* Specifically, there can be at most one buffer whose write offset is neither zero nor at capacity,
* and all buffers prior to it must have their write offsets at capacity, and all buffers after it must have a write
* offset of zero.
* Likewise, there can be at most one buffer whose read offset is neither zero nor at capacity,
* and all buffers prior to it must have their read offsets at capacity, and all buffers after it must have a read
* offset of zero.
* Furthermore, the sum of the read offsets must be less than or equal to the sum of the write offsets.
* <p>
* Reads and writes to the composite buffer that modifies the read or write offsets, will also modify the relevant
* offsets in the constituent buffers.
* <p>
* It is not a requirement that the buffers have the same size.
* <p>
* It is not a requirement that the buffers are allocated by this allocator, but if {@link Buf#ensureWritable(int)}
* is called on the composed buffer, and the composed buffer needs to be expanded, then this allocator instance
* will be used for allocation the extra memory.
*
* @param bufs The buffers to compose into a single buffer view.
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent {@linkplain Buf#order() byte order}.
*/
default Buf compose(Buf... bufs) {
return new CompositeBuf(this, bufs);
}
/**
* Close this allocator, freeing all of its internal resources. It is not specified if the allocator can still be
* used after this method has been called on it.
@ -68,36 +120,15 @@ public interface Allocator extends AutoCloseable {
}
static Allocator heap() {
var man = MemoryManager.getHeapMemoryManager();
return new Allocator() {
@Override
public Buf allocate(int size) {
checkSize(size);
return man.allocateConfined(size, man.drop(), null);
}
};
return new ManagedAllocator(MemoryManager.getHeapMemoryManager(), null);
}
static Allocator direct() {
var man = MemoryManager.getNativeMemoryManager();
return new Allocator() {
@Override
public Buf allocate(int size) {
checkSize(size);
return man.allocateConfined(size, man.drop(), null);
}
};
return new ManagedAllocator(MemoryManager.getNativeMemoryManager(), null);
}
static Allocator directWithCleaner() {
var man = MemoryManager.getNativeMemoryManager();
return new Allocator() {
@Override
public Buf allocate(int size) {
checkSize(size);
return man.allocateConfined(size, man.drop(), Statics.CLEANER);
}
};
return new ManagedAllocator(MemoryManager.getNativeMemoryManager(), Statics.CLEANER);
}
static Allocator pooledHeap() {

View File

@ -0,0 +1,36 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.b2;
/**
* Methods for accessing and controlling the internals of an allocator.
* This interface is intended to be used by implementors of the {@link Allocator}, {@link Buf} and
* {@link MemoryManager} interfaces.
*/
public interface AllocatorControl {
/**
* Allocate a buffer that is not tethered to any particular {@link Drop} implementation,
* and return the recoverable memory object from it.
* <p>
* This allows a buffer to implement {@link Buf#ensureWritable(int)} by having new memory allocated to it,
* without that memory being attached to some other lifetime.
*
* @param originator The buffer that originated the request for an untethered memory allocated.
* @param size The size of the requested memory allocation, in bytes.
* @return A "recoverable memory" object that is the requested allocation.
*/
Object allocateUntethered(Buf originator, int size);
}

View File

@ -36,7 +36,7 @@ import java.nio.ByteOrder;
* The reference count controls this life cycle.
* <p>
* When the buffer is initially allocated, a pairing {@link #close()} call will deallocate it.
* In this state, the buffer is "owned".
* In this state, the buffer {@linkplain #isOwned() is "owned"}.
* <p>
* The buffer can also be {@linkplain #acquire() acquired} when it's about to be involved in a complicated life time.
* The {@link #acquire()} call increments the reference count of the buffer,
@ -67,10 +67,10 @@ import java.nio.ByteOrder;
* To send a buffer to another thread, the buffer must not have any outstanding borrows.
* That is to say, all {@linkplain #acquire() acquires} must have been paired with a {@link #close()};
* all {@linkplain #slice() slices} must have been closed.
* And if this buffer is a constituent of a {@linkplain #compose(Buf...) composite buffer},
* And if this buffer is a constituent of a {@linkplain Allocator#compose(Buf...) composite buffer},
* then that composite buffer must be closed.
* And if this buffer is itself a composite buffer, then it must own all of its constituent buffers.
* The {@link #isSendable()} method can be used on any buffer to check if it can be sent or not.
* The {@link #isOwned()} method can be used on any buffer to check if it can be sent or not.
*
* <h3>Accessing data</h3>
*
@ -104,51 +104,6 @@ import java.nio.ByteOrder;
*
*/
public interface Buf extends Rc<Buf>, BufAccessors {
/**
* Compose the given sequence of buffers and present them as a single buffer.
* <p>
* <strong>Note:</strong> The composite buffer increments the reference count on all the constituent buffers,
* and holds a reference to them until the composite buffer is deallocated.
* This means the constituent buffers must still have their outside-reference count decremented as normal.
* If the buffers are allocated for the purpose of participating in the composite buffer,
* then they should be closed as soon as the composite buffer has been created, like in this example:
* <pre>{@code
* try (Buf a = allocator.allocate(size);
* Buf b = allocator.allocate(size)) {
* return Buf.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>
* {@linkplain #send() Sending} a composite buffer implies sending all of its constituent buffers.
* <p>
* All of the constituent buffers must have the same {@linkplain #order() byte order}.
* An exception will be thrown if you attempt to compose buffers that have different byte orders,
* and changing the byte order of the constituent buffers so they become inconsistent after construction,
* will result in unspecified behaviour.
* <p>
* The read and write offsets of the constituent buffers must be arranged such that there are no "gaps" when viewed
* as a single connected chunk of memory.
* Specifically, there can be at most one buffer whose write offset is neither zero nor at capacity,
* and all buffers prior to it must have their write offsets at capacity, and all buffers after it must have a write
* offset of zero.
* Likewise, there can be at most one buffer whose read offset is neither zero nor at capacity,
* and all buffers prior to it must have their read offsets at capacity, and all buffers after it must have a read
* offset of zero.
* Furthermore, the sum of the read offsets must be less than or equal to the sum of the write offsets.
* <p>
* Reads and writes to the composite buffer that modifies the read or write offsets, will also modify the relevant
* offsets in the constituent buffers.
* <p>
* It is not a requirement that the buffers have the same size.
*
* @param bufs The buffers to compose into a single buffer view.
* @return A buffer composed of, and backed by, the given buffers.
* @throws IllegalArgumentException if the given buffers have an inconsistent {@linkplain #order() byte order}.
*/
static Buf compose(Buf... bufs) {
return new CompositeBuf(bufs);
}
/**
* Change the default byte order of this buffer, and return this buffer.
*
@ -404,4 +359,18 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* the {@code length} reaches outside of the bounds of this buffer.
*/
ByteIterator iterateReverse(int fromOffset, int length);
/**
* Ensure that this buffer has {@linkplain #writableBytes() available space for writing} the given number of
* bytes.
* The buffer must be in {@linkplain #isOwned() an owned state}, or an exception will be thrown.
* If this buffer already has the necessary space, then this method returns immediately.
* If this buffer does not already have the necessary space, then it will be expanded using the {@link Allocator}
* the buffer was created with.
*
* @param size The requested number of bytes of space that should be available for writing.
* @throws IllegalStateException if this buffer is not in an owned state.
* That is, if {@link #countBorrows()} is not {@code 0}.
*/
void ensureWritable(int size);
}

View File

@ -38,21 +38,23 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
};
private final Allocator allocator;
private final TornBufAccessors tornBufAccessors;
private final boolean isSendable;
private final Buf[] bufs;
private final int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts.
private final int capacity;
private Buf[] bufs;
private int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts.
private int capacity;
private int roff;
private int woff;
private int subOffset; // The next offset *within* a consituent buffer to read from or write to.
CompositeBuf(Buf[] bufs) {
this(true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
CompositeBuf(Allocator allocator, Buf[] bufs) {
this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
}
private CompositeBuf(boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
super(drop);
this.allocator = allocator;
this.isSendable = isSendable;
for (Buf buf : bufs) {
buf.acquire();
@ -93,6 +95,12 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
assert roff <= woff:
"The given buffers place the read offset ahead of the write offset: " + Arrays.toString(bufs) + '.';
}
this.bufs = bufs;
computeBufferOffsets();
tornBufAccessors = new TornBufAccessors(this);
}
private void computeBufferOffsets() {
offsets = new int[bufs.length];
long cap = 0;
for (int i = 0; i < bufs.length; i++) {
@ -106,8 +114,6 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
"but the sum of the constituent buffer capacities was " + cap + '.');
}
capacity = (int) cap;
this.bufs = bufs;
tornBufAccessors = new TornBufAccessors(this);
}
@Override
@ -210,7 +216,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
slices = new Buf[] { choice.slice(subOffset, 0) };
}
return new CompositeBuf(false, slices, drop).writerOffset(length);
return new CompositeBuf(allocator, false, slices, drop).writerOffset(length);
} catch (Throwable throwable) {
// We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer:
close();
@ -460,6 +466,31 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
};
}
@Override
public void ensureWritable(int size) {
if (!isOwned()) {
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
}
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (writableBytes() < size) {
long newSize = capacity() + (long) size;
Allocator.checkSize(newSize);
int minBumpSize = 256;
int growth = Math.min(Integer.MAX_VALUE - 8, Math.min(size - writableBytes(), minBumpSize));
if (bufs.length == 0) {
bufs = new Buf[] { allocator.allocate(growth) };
} else if (bufs[bufs.length - 1].capacity() < minBumpSize) {
bufs[bufs.length - 1].ensureWritable(growth);
} else {
bufs = Arrays.copyOf(bufs, bufs.length + 1);
bufs[bufs.length - 1] = allocator.allocate(growth);
}
computeBufferOffsets();
}
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
@Override
public byte readByte() {
@ -754,7 +785,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
for (int i = 0; i < sends.length; i++) {
received[i] = sends[i].receive();
}
var composite = new CompositeBuf(true, received, drop);
var composite = new CompositeBuf(allocator, true, received, drop);
drop.accept(composite);
return composite;
}
@ -771,8 +802,16 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
@Override
public boolean isSendable() {
return isSendable && super.isSendable();
public boolean isOwned() {
return isSendable && super.isOwned() && allConstituentsAreOwned();
}
private boolean allConstituentsAreOwned() {
boolean result = true;
for (Buf buf : bufs) {
result &= buf.isOwned();
}
return result;
}
long readPassThrough() {

View File

@ -0,0 +1,43 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.b2;
import java.lang.ref.Cleaner;
import static io.netty.buffer.b2.Statics.NO_OP_DROP;
class ManagedAllocator implements Allocator, AllocatorControl {
private final MemoryManager manager;
private final Cleaner cleaner;
ManagedAllocator(MemoryManager manager, Cleaner cleaner) {
this.manager = manager;
this.cleaner = cleaner;
}
@Override
public Buf allocate(int size) {
Allocator.checkSize(size);
return manager.allocateConfined(this, size, manager.drop(), cleaner);
}
@Override
public Object allocateUntethered(Buf originator, int size) {
Allocator.checkSize(size);
var buf = manager.allocateConfined(this, size, NO_OP_DROP, null);
return manager.unwrapRecoverableMemory(buf);
}
}

View File

@ -53,18 +53,20 @@ import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset_LE;
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
static final Drop<MemSegBuf> SEGMENT_CLOSE = buf -> buf.seg.close();
final MemorySegment seg;
private final AllocatorControl alloc;
private final boolean isSendable;
private MemorySegment seg;
private boolean isBigEndian;
private int roff;
private int woff;
MemSegBuf(MemorySegment segmet, Drop<MemSegBuf> drop) {
this(segmet, drop, true);
MemSegBuf(MemorySegment segmet, Drop<MemSegBuf> drop, AllocatorControl alloc) {
this(segmet, drop, alloc, true);
}
private MemSegBuf(MemorySegment segment, Drop<MemSegBuf> drop, boolean isSendable) {
private MemSegBuf(MemorySegment segment, Drop<MemSegBuf> drop, AllocatorControl alloc, boolean isSendable) {
super(drop);
this.alloc = alloc;
seg = segment;
this.isSendable = isSendable;
isBigEndian = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
@ -134,7 +136,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
acquire();
Drop<MemSegBuf> drop = b -> close();
var sendable = false; // Sending implies ownership change, which we can't do for slices.
return new MemSegBuf(slice, drop, sendable).writerOffset(length).order(order());
return new MemSegBuf(slice, drop, alloc, sendable).writerOffset(length).order(order());
}
@Override
@ -299,6 +301,26 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
};
}
@Override
public void ensureWritable(int size) {
if (!isOwned()) {
throw new IllegalStateException("Buffer is not owned. Only owned buffers can call ensureWritable.");
}
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (writableBytes() < size) {
long newSize = capacity() + (long) size;
Allocator.checkSize(newSize);
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
var newSegment = recoverableMemory.segment;
newSegment.copyFrom(seg);
unsafeGetDrop().drop(this); // Release old memory segment.
seg = newSegment;
unsafeGetDrop().accept(this);
}
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
@Override
public byte readByte() {
@ -774,7 +796,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public MemSegBuf transferOwnership(Drop<MemSegBuf> drop) {
var newSegment = isConfined? transferSegment.handoff(Thread.currentThread()) : transferSegment;
MemSegBuf copy = new MemSegBuf(newSegment, drop);
MemSegBuf copy = new MemSegBuf(newSegment, drop, alloc);
copy.isBigEndian = outer.isBigEndian;
copy.roff = outer.roff;
copy.woff = outer.woff;
@ -793,8 +815,8 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
@Override
public boolean isSendable() {
return isSendable && super.isSendable();
public boolean isOwned() {
return isSendable && super.isOwned();
}
private void checkRead(int index, int size) {
@ -814,4 +836,22 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
(seg.byteSize() - 1) + "].");
}
Object recoverableMemory() {
return new RecoverableMemory(seg, alloc);
}
static final class RecoverableMemory {
private final MemorySegment segment;
private final AllocatorControl alloc;
RecoverableMemory(MemorySegment segment, AllocatorControl alloc) {
this.segment = segment;
this.alloc = alloc;
}
Buf recover(Drop<MemSegBuf> drop) {
return new MemSegBuf(segment, drop, alloc);
}
}
}

View File

@ -29,8 +29,8 @@ public interface MemoryManager {
}
boolean isNative();
Buf allocateConfined(long size, Drop<Buf> drop, Cleaner cleaner);
Buf allocateShared(long size, Drop<Buf> drop, Cleaner cleaner);
Buf allocateConfined(AllocatorControl alloc, long size, Drop<Buf> drop, Cleaner cleaner);
Buf allocateShared(AllocatorControl allo, long size, Drop<Buf> drop, Cleaner cleaner);
Drop<Buf> drop();
Object unwrapRecoverableMemory(Buf buf);
Buf recoverMemory(Object recoverableMemory, Drop<Buf> drop);
@ -40,21 +40,21 @@ public interface MemoryManager {
public abstract boolean isNative();
@Override
public Buf allocateConfined(long size, Drop<Buf> drop, Cleaner cleaner) {
public Buf allocateConfined(AllocatorControl alloc, long size, Drop<Buf> drop, Cleaner cleaner) {
var segment = createSegment(size);
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuf(segment, convert(drop));
return new MemSegBuf(segment, convert(drop), alloc);
}
@Override
public Buf allocateShared(long size, Drop<Buf> drop, Cleaner cleaner) {
public Buf allocateShared(AllocatorControl alloc, long size, Drop<Buf> drop, Cleaner cleaner) {
var segment = createSegment(size).share();
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuf(segment, convert(drop));
return new MemSegBuf(segment, convert(drop), alloc);
}
protected abstract MemorySegment createSegment(long size);
@ -67,13 +67,13 @@ public interface MemoryManager {
@Override
public Object unwrapRecoverableMemory(Buf buf) {
var b = (MemSegBuf) buf;
return b.seg;
return b.recoverableMemory();
}
@Override
public Buf recoverMemory(Object recoverableMemory, Drop<Buf> drop) {
var segment = (MemorySegment) recoverableMemory;
return new MemSegBuf(segment, convert(drop));
var recovery = (MemSegBuf.RecoverableMemory) recoverableMemory;
return recovery.recover(convert(drop));
}
@SuppressWarnings("unchecked")

View File

@ -49,9 +49,9 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable {
/**
* Send this reference counted object instance to another Thread, transferring the ownership to the recipient.
* <p>
* Note that the object must be {@linkplain #isSendable() sendable}, and cannot have any outstanding borrows,
* Note that the object must be {@linkplain #isOwned() owned}, and cannot have any outstanding borrows,
* when it's being sent.
* That is, all previous acquires must have been closed, and {@link #isSendable()} must return {@code true}.
* That is, all previous acquires must have been closed, and {@link #isOwned()} must return {@code true}.
* <p>
* This instance immediately becomes inaccessible, and all attempts at accessing this reference counted object
* will throw. Calling {@link #close()} will have no effect, so this method is safe to call within a
@ -60,22 +60,21 @@ public interface Rc<I extends Rc<I>> extends AutoCloseable {
Send<I> send();
/**
* Check that this reference counted object is sendable.
* Check that this reference counted object is owned.
* <p>
* To be sendable, the object must have no outstanding acquires, and no other implementation defined restrictions.
* To be owned, the object must have no outstanding acquires, and no other implementation defined restrictions.
*
* @return {@code true} if this object can be {@linkplain #send() sent},
* or {@code false} if calling {@link #send()} would throw an exception.
*/
boolean isSendable();
boolean isOwned();
/**
* Count the number of borrows of this object.
* If the number of borrows is {@code 0}, then the object is in an "owned" state.
* If the number of borrows is greater than {@code 0}, then the object is in a "borrowed" state.
* If this returns a negative number, then this object has been deallocated.
* Note that even if the number of borrows is {@code 0}, this object might not be {@linkplain #isOwned() owned}
* because there could be other restrictions involved in ownership.
*
* @return The number of borrows, if any, of this object, or a negative number if this object has been deallocated.
* @return The number of borrows, if any, of this object.
*/
int countBorrows();
}

View File

@ -72,7 +72,7 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
*/
@Override
public final Send<I> send() {
if (!isSendable()) {
if (!isOwned()) {
throw notSendableException();
}
var owned = prepareSend();
@ -86,13 +86,13 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
}
@Override
public boolean isSendable() {
public boolean isOwned() {
return acquires == 0;
}
@Override
public int countBorrows() {
return acquires;
return Math.max(acquires, 0);
}
/**
@ -105,6 +105,15 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
*/
protected abstract Owned<T> prepareSend();
/**
* Get access to the underlying {@link Drop} object.
* This method is unsafe because it open the possibility of bypassing and overriding resource lifetimes.
* @return The {@link Drop} object used by this reference counted object.
*/
protected Drop<T> unsafeGetDrop() {
return drop;
}
@SuppressWarnings("unchecked")
private I self() {
return (I) this;

View File

@ -20,13 +20,14 @@ import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import static io.netty.buffer.b2.Statics.NO_OP_DROP;
import static java.lang.invoke.MethodHandles.lookup;
class SizeClassedMemoryPool implements Allocator, Drop<Buf> {
class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
private static final VarHandle CLOSE = Statics.findVarHandle(
lookup(), SizeClassedMemoryPool.class, "closed", boolean.class);
private final MemoryManager manager;
private final ConcurrentHashMap<Long, ConcurrentLinkedQueue<Send<Buf>>> pool;
private final ConcurrentHashMap<Integer, ConcurrentLinkedQueue<Send<Buf>>> pool;
@SuppressWarnings("unused")
private volatile boolean closed;
@ -50,8 +51,8 @@ class SizeClassedMemoryPool implements Allocator, Drop<Buf> {
return manager;
}
protected Buf createBuf(long size, Drop<Buf> drop) {
var buf = manager.allocateShared(size, drop, null);
protected Buf createBuf(int size, Drop<Buf> drop) {
var buf = manager.allocateShared(this, size, drop, null);
drop.accept(buf);
return buf;
}
@ -94,6 +95,21 @@ class SizeClassedMemoryPool implements Allocator, Drop<Buf> {
}
}
@Override
public Object allocateUntethered(Buf originator, int size) {
var sizeClassPool = getSizeClassPool(size);
Send<Buf> send = sizeClassPool.poll();
Buf untetheredBuf;
if (send != null) {
var transfer = (TransferSend<Buf, Buf>) send;
var owned = transfer.unsafeUnwrapOwned();
untetheredBuf = owned.transferOwnership(NO_OP_DROP);
} else {
untetheredBuf = createBuf(size, NO_OP_DROP);
}
return manager.unwrapRecoverableMemory(untetheredBuf);
}
void recoverLostMemory(Object memory) {
var drop = getDrop();
var buf = manager.recoverMemory(memory, drop);
@ -101,7 +117,7 @@ class SizeClassedMemoryPool implements Allocator, Drop<Buf> {
buf.close();
}
private ConcurrentLinkedQueue<Send<Buf>> getSizeClassPool(long size) {
private ConcurrentLinkedQueue<Send<Buf>> getSizeClassPool(int size) {
return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>());
}

View File

@ -25,6 +25,8 @@ interface Statics {
Cleaner CLEANER = Cleaner.create();
LongAdder MEM_USAGE_NATIVE = new LongAdder();
ConcurrentHashMap<Long,Runnable> CLEANUP_ACTIONS = new ConcurrentHashMap<>();
Drop<Buf> NO_OP_DROP = buf -> {
};
static VarHandle findVarHandle(Lookup lookup, Class<?> recv, String name, Class<?> type) {
try {

View File

@ -35,11 +35,20 @@ class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
@SuppressWarnings("unchecked")
@Override
public I receive() {
if (!RECEIVED.compareAndSet(this, false, true)) {
throw new IllegalStateException("This object has already been received.");
}
gateReception();
var copy = outgoing.transferOwnership(drop);
drop.accept(copy);
return (I) copy;
}
Owned<T> unsafeUnwrapOwned() {
gateReception();
return outgoing;
}
private void gateReception() {
if (!RECEIVED.compareAndSet(this, false, true)) {
throw new IllegalStateException("This object has already been received.");
}
}
}

View File

@ -19,6 +19,7 @@ import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@ -110,7 +111,7 @@ public class BufTest {
int half = size / 2;
try (Buf firstHalf = a.allocate(half);
Buf secondHalf = b.allocate(size - half)) {
return Buf.compose(firstHalf, secondHalf);
return a.compose(firstHalf, secondHalf);
}
}
@ -134,7 +135,7 @@ public class BufTest {
try (Buf a = alloc.allocate(part);
Buf b = alloc.allocate(part);
Buf c = alloc.allocate(size - part * 2)) {
return Buf.compose(a, b, c);
return alloc.compose(a, b, c);
}
}
@ -284,11 +285,11 @@ public class BufTest {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
try (Buf ignored = buf.acquire()) {
assertFalse(buf.isSendable());
assertFalse(buf.isOwned());
assertThrows(IllegalStateException.class, buf::send);
}
// Now send() should work again.
assertTrue(buf.isSendable());
assertTrue(buf.isOwned());
buf.send().receive().close();
}
}
@ -494,7 +495,7 @@ public class BufTest {
Buf buf = allocator.allocate(8)) {
int borrows = buf.countBorrows();
try (Buf ignored = buf.slice()) {
assertFalse(buf.isSendable());
assertFalse(buf.isOwned());
assertThrows(IllegalStateException.class, buf::send);
}
assertEquals(borrows, buf.countBorrows());
@ -508,7 +509,7 @@ public class BufTest {
Buf buf = allocator.allocate(8)) {
int borrows = buf.countBorrows();
try (Buf ignored = buf.slice(0, 8)) {
assertFalse(buf.isSendable());
assertFalse(buf.isOwned());
assertThrows(IllegalStateException.class, buf::send);
}
assertEquals(borrows, buf.countBorrows());
@ -555,11 +556,11 @@ public class BufTest {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
try (Buf slice = buf.slice()) {
assertFalse(buf.isSendable());
assertFalse(buf.isOwned());
assertThrows(IllegalStateException.class, slice::send);
}
// Verify that the slice is closed properly afterwards.
assertTrue(buf.isSendable());
assertTrue(buf.isOwned());
buf.send().receive().close();
}
}
@ -570,11 +571,11 @@ public class BufTest {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
try (Buf slice = buf.slice(0, 8)) {
assertFalse(buf.isSendable());
assertFalse(buf.isOwned());
assertThrows(IllegalStateException.class, slice::send);
}
// Verify that the slice is closed properly afterwards.
assertTrue(buf.isSendable());
assertTrue(buf.isOwned());
}
}
@ -638,7 +639,7 @@ public class BufTest {
assertEquals(borrows + 1, buf.countBorrows());
try (Buf slice = buf.slice()) {
assertEquals(borrows + 2, buf.countBorrows());
try (Buf ignored1 = Buf.compose(buf, slice)) {
try (Buf ignored1 = allocator.compose(buf, slice)) {
assertEquals(borrows + 3, buf.countBorrows());
}
assertEquals(borrows + 2, buf.countBorrows());
@ -772,7 +773,7 @@ public class BufTest {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buf.compose(bufFirst, bufSecond);
return a.compose(bufFirst, bufSecond);
}
});
}
@ -788,7 +789,7 @@ public class BufTest {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buf.compose(bufFirst, bufSecond);
return a.compose(bufFirst, bufSecond);
}
});
}
@ -804,7 +805,7 @@ public class BufTest {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buf.compose(bufFirst, bufSecond);
return a.compose(bufFirst, bufSecond);
}
});
}
@ -820,7 +821,7 @@ public class BufTest {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buf.compose(bufFirst, bufSecond);
return a.compose(bufFirst, bufSecond);
}
});
}
@ -837,7 +838,7 @@ public class BufTest {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buf.compose(bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(a.compose(bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}
@ -854,7 +855,7 @@ public class BufTest {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buf.compose(bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(a.compose(bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}
@ -871,7 +872,7 @@ public class BufTest {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buf.compose(bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(a.compose(bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}
@ -888,7 +889,7 @@ public class BufTest {
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buf.compose(bufFirst, bufSecond)).writerOffset(size).slice();
return scope.add(a.compose(bufFirst, bufSecond)).writerOffset(size).slice();
}
});
}
@ -1346,6 +1347,109 @@ public class BufTest {
}
}
@Test
public void compositeBufferCanOnlyBeOwnedWhenAllConstituentBuffersAreOwned() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
assertTrue(a.isOwned());
Buf leakB;
try (Buf b = allocator.allocate(8)) {
assertTrue(a.isOwned());
assertTrue(b.isOwned());
composite = allocator.compose(a, b);
assertFalse(composite.isOwned());
assertFalse(a.isOwned());
assertFalse(b.isOwned());
leakB = b;
}
assertFalse(composite.isOwned());
assertFalse(a.isOwned());
assertTrue(leakB.isOwned());
}
assertTrue(composite.isOwned());
}
}
@ParameterizedTest
@MethodSource("allocators")
public void ensureWritableMustThrowForBorrowedBuffers(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
try (Buf slice = buf.slice()) {
assertThrows(IllegalStateException.class, () -> slice.ensureWritable(1));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
}
try (Buf compose = allocator.compose(buf)) {
assertThrows(IllegalStateException.class, () -> compose.ensureWritable(1));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableMustThrowForNegativeSize(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> buf.ensureWritable(-1));
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableMustThrowIfRequestedSizeWouldGrowBeyondMaxAllowed(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(512)) {
assertThrows(IllegalArgumentException.class, () -> buf.ensureWritable(Integer.MAX_VALUE - 8));
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableMustNotThrowWhenSpaceIsAlreadyAvailable(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.ensureWritable(8);
buf.writeLong(1);
assertThrows(IndexOutOfBoundsException.class, () -> buf.writeByte((byte) 1));
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableMustExpandBufferCapacity(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
assertThat(buf.writableBytes()).isEqualTo(8);
buf.writeLong(0x0102030405060708L);
assertThat(buf.writableBytes()).isEqualTo(0);
buf.ensureWritable(8);
assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8);
buf.writeLong(0xA1A2A3A4A5A6A7A8L);
assertThat(buf.readableBytes()).isEqualTo(16);
assertThat(buf.readLong()).isEqualTo(0x0102030405060708L);
assertThat(buf.readLong()).isEqualTo(0xA1A2A3A4A5A6A7A8L);
assertThrows(IndexOutOfBoundsException.class, buf::readByte);
// Is it implementation dependent if the capacity increased by *exactly* the requested size, or more.
}
}
@Test
public void ensureWritableMustExpandCapacityOfEmptyCompositeBuffer() {
try (Allocator allocator = Allocator.heap();
Buf buf = allocator.compose()) {
assertThat(buf.writableBytes()).isEqualTo(0);
buf.ensureWritable(8);
assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8);
buf.writeLong(0xA1A2A3A4A5A6A7A8L);
assertThat(buf.readableBytes()).isEqualTo(8);
assertThat(buf.readLong()).isEqualTo(0xA1A2A3A4A5A6A7A8L);
assertThrows(IndexOutOfBoundsException.class, buf::readByte);
// Is it implementation dependent if the capacity increased by *exactly* the requested size, or more.
}
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
@ParameterizedTest
@MethodSource("allocators")