Add a Buf.bifurcate method
Motivation: There are use cases that involve accumulating data into a buffer, then carving out prefix slices and sending them off on their own journey for further processing. Modification: Add a Buf.bifurcate API, that split a buffer, and its ownership, in two. Internally, the API will inject and maintain an atomically reference counted Drop instance, so that the original memory segment is not released until all bifurcated parts are closed. This works particularly well for composite buffers, where only the buffer (if any) wherein the bifurcation point lands, will actually have its memory split. A composite buffer can otherwise just crack its buffer array in two. Result: We now have a safe way of breaking the single ownership of some memory into multiple parts, that can be sent and owned independently.
This commit is contained in:
parent
deeea157c0
commit
b749106c0c
@ -28,7 +28,7 @@ public interface AllocatorControl {
|
||||
* This allows a buffer to implement {@link Buf#ensureWritable(int)} by having new memory allocated to it,
|
||||
* without that memory being attached to some other lifetime.
|
||||
*
|
||||
* @param originator The buffer that originated the request for an untethered memory allocated.
|
||||
* @param originator The buffer that originated the request for an untethered memory allocated.
|
||||
* @param size The size of the requested memory allocation, in bytes.
|
||||
* @return A "recoverable memory" object that is the requested allocation.
|
||||
*/
|
||||
|
@ -373,4 +373,48 @@ public interface Buf extends Rc<Buf>, BufAccessors {
|
||||
* That is, if {@link #countBorrows()} is not {@code 0}.
|
||||
*/
|
||||
void ensureWritable(int size);
|
||||
|
||||
/**
|
||||
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
|
||||
* <p>
|
||||
* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new
|
||||
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently
|
||||
* {@linkplain #send() sent} to other threads.
|
||||
* <p>
|
||||
* The returned buffer will adopt the {@link #readerOffset()} of this buffer, and have its {@link #writerOffset()}
|
||||
* and {@link #capacity()} both set to the equal to the write offset of this buffer.
|
||||
* <p>
|
||||
* The memory region in the returned buffer will become inaccessible through this buffer. This buffer will have its
|
||||
* capacity reduced by the capacity of the returned buffer, and the read and write offsets of this buffer will both
|
||||
* become zero, even though their position in memory remain unchanged.
|
||||
* <p>
|
||||
* Effectively, the following transformation takes place:
|
||||
* <pre>{@code
|
||||
* This buffer:
|
||||
* +------------------------------------------+
|
||||
* 0| |r/o |w/o |cap
|
||||
* +---+---------------------+----------------+
|
||||
* / / / \ \
|
||||
* / / / \ \
|
||||
* / / / \ \
|
||||
* / / / \ \
|
||||
* / / / \ \
|
||||
* +---+---------------------+ +---------------+
|
||||
* | |r/o |w/o & cap |r/o & w/o |cap
|
||||
* +---+---------------------+ +---------------+
|
||||
* Returned buffer. This buffer.
|
||||
* }</pre>
|
||||
* When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the
|
||||
* underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until
|
||||
* all of the bifurcated parts have been closed.
|
||||
* <p>
|
||||
* Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be
|
||||
* bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can
|
||||
* simply split its internal array in two.
|
||||
* <p>
|
||||
* Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}.
|
||||
*
|
||||
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
|
||||
*/
|
||||
Buf bifurcate();
|
||||
}
|
||||
|
@ -27,12 +27,9 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
* non-composite copy of the buffer.
|
||||
*/
|
||||
private static final int MAX_CAPACITY = Integer.MAX_VALUE - 8;
|
||||
private static final Drop<CompositeBuf> COMPOSITE_DROP = new Drop<CompositeBuf>() {
|
||||
@Override
|
||||
public void drop(CompositeBuf obj) {
|
||||
for (Buf buf : obj.bufs) {
|
||||
buf.close();
|
||||
}
|
||||
private static final Drop<CompositeBuf> COMPOSITE_DROP = buf -> {
|
||||
for (Buf b : buf.bufs) {
|
||||
b.close();
|
||||
}
|
||||
};
|
||||
|
||||
@ -45,6 +42,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
private int roff;
|
||||
private int woff;
|
||||
private int subOffset; // The next offset *within* a consituent buffer to read from or write to.
|
||||
private ByteOrder order;
|
||||
|
||||
CompositeBuf(Allocator allocator, Buf[] bufs) {
|
||||
this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
|
||||
@ -64,6 +62,9 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
|
||||
}
|
||||
}
|
||||
order = bufs[0].order();
|
||||
} else {
|
||||
order = ByteOrder.nativeOrder();
|
||||
}
|
||||
this.bufs = bufs;
|
||||
computeBufferOffsets();
|
||||
@ -129,6 +130,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
|
||||
@Override
|
||||
public Buf order(ByteOrder order) {
|
||||
this.order = order;
|
||||
for (Buf buf : bufs) {
|
||||
buf.order(order);
|
||||
}
|
||||
@ -137,7 +139,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
|
||||
@Override
|
||||
public ByteOrder order() {
|
||||
return bufs.length > 0? bufs[0].order() : ByteOrder.nativeOrder();
|
||||
return order;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -543,6 +545,9 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
"This buffer uses " + order() + " byte order, and cannot be extended with " +
|
||||
"a buffer that uses " + extension.order() + " byte order.");
|
||||
}
|
||||
if (bufs.length == 0) {
|
||||
order = extension.order();
|
||||
}
|
||||
long newSize = capacity() + (long) extension.capacity();
|
||||
Allocator.checkSize(newSize);
|
||||
|
||||
@ -561,6 +566,36 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
computeBufferOffsets();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buf bifurcate() {
|
||||
if (!isOwned()) {
|
||||
throw new IllegalStateException("Cannot bifurcate a buffer that is not owned.");
|
||||
}
|
||||
if (bufs.length == 0) {
|
||||
// Bifurcating a zero-length buffer is trivial.
|
||||
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop()).order(order);
|
||||
}
|
||||
|
||||
int i = searchOffsets(woff);
|
||||
int off = woff - offsets[i];
|
||||
Buf[] bifs = Arrays.copyOf(bufs, off == 0? i : 1 + i);
|
||||
bufs = Arrays.copyOfRange(bufs, off == bufs[i].capacity()? 1 + i : i, bufs.length);
|
||||
if (off > 0 && bifs.length > 0 && off < bifs[bifs.length - 1].capacity()) {
|
||||
bifs[bifs.length - 1] = bufs[0].bifurcate();
|
||||
}
|
||||
computeBufferOffsets();
|
||||
try {
|
||||
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop());
|
||||
compositeBuf.order = order; // Preserve byte order even if bifs array is empty.
|
||||
return compositeBuf;
|
||||
} finally {
|
||||
// Drop our references to the buffers in the bifs array. They belong to the new composite buffer now.
|
||||
for (Buf bif : bifs) {
|
||||
bif.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
|
||||
@Override
|
||||
public byte readByte() {
|
||||
@ -856,7 +891,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
received[i] = sends[i].receive();
|
||||
}
|
||||
var composite = new CompositeBuf(allocator, true, received, drop);
|
||||
drop.accept(composite);
|
||||
drop.reconnect(composite);
|
||||
return composite;
|
||||
}
|
||||
};
|
||||
|
@ -15,16 +15,13 @@
|
||||
*/
|
||||
package io.netty.buffer.api;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* The Drop interface is used by {@link Rc} instances to implement their resource disposal mechanics. The {@link
|
||||
* #drop(Object)} method will be called by the Rc when their last reference is closed.
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface Drop<T> extends Consumer<T> {
|
||||
public interface Drop<T> {
|
||||
/**
|
||||
* Dispose of the resources in the given Rc.
|
||||
*
|
||||
@ -37,7 +34,6 @@ public interface Drop<T> extends Consumer<T> {
|
||||
*
|
||||
* @param obj The new Rc instance with the new owner.
|
||||
*/
|
||||
@Override
|
||||
default void accept(T obj) {
|
||||
default void reconnect(T obj) {
|
||||
}
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ class NativeMemoryCleanerDrop implements Drop<Buf> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(Buf buf) {
|
||||
public void reconnect(Buf buf) {
|
||||
// Unregister old cleanable, if any, to avoid uncontrolled build-up.
|
||||
GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null);
|
||||
if (c != null) {
|
||||
|
@ -15,9 +15,12 @@
|
||||
*/
|
||||
package io.netty.buffer.api;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.function.Function;
|
||||
|
||||
public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> implements Rc<I> {
|
||||
private int acquires; // Closed if negative.
|
||||
private final Drop<T> drop;
|
||||
private Drop<T> drop;
|
||||
|
||||
protected RcSupport(Drop<T> drop) {
|
||||
this.drop = drop;
|
||||
@ -114,6 +117,12 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
|
||||
return drop;
|
||||
}
|
||||
|
||||
protected Drop<T> unsafeExchangeDrop(Drop<T> replacement) {
|
||||
Objects.requireNonNull(replacement, "Replacement drop cannot be null.");
|
||||
drop = replacement;
|
||||
return replacement;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private I self() {
|
||||
return (I) this;
|
||||
|
@ -54,7 +54,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
|
||||
|
||||
protected Buf createBuf(int size, Drop<Buf> drop) {
|
||||
var buf = manager.allocateShared(this, size, drop, null);
|
||||
drop.accept(buf);
|
||||
drop.reconnect(buf);
|
||||
return buf;
|
||||
}
|
||||
|
||||
@ -119,7 +119,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
|
||||
public void recoverMemory(Object memory) {
|
||||
var drop = getDrop();
|
||||
var buf = manager.recoverMemory(memory, drop);
|
||||
drop.accept(buf);
|
||||
drop.reconnect(buf);
|
||||
buf.close();
|
||||
}
|
||||
|
||||
|
@ -37,7 +37,7 @@ class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
|
||||
public I receive() {
|
||||
gateReception();
|
||||
var copy = outgoing.transferOwnership(drop);
|
||||
drop.accept(copy);
|
||||
drop.reconnect(copy);
|
||||
return (I) copy;
|
||||
}
|
||||
|
||||
|
61
src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java
Normal file
61
src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java
Normal file
@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.buffer.api.memseg;
|
||||
|
||||
import io.netty.buffer.api.Drop;
|
||||
|
||||
public class BifurcatedDrop<T> implements Drop<T> {
|
||||
private final T originalBuf;
|
||||
private final Drop<T> delegate;
|
||||
private int count;
|
||||
private Exception closeTrace;
|
||||
|
||||
public BifurcatedDrop(T originalBuf, Drop<T> delegate) {
|
||||
this.originalBuf = originalBuf;
|
||||
this.delegate = delegate;
|
||||
count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop.
|
||||
}
|
||||
|
||||
public synchronized void increment() {
|
||||
checkValidState();
|
||||
count++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void drop(T obj) {
|
||||
checkValidState();
|
||||
if (--count == 0) {
|
||||
closeTrace = new Exception("close: " + delegate);
|
||||
delegate.reconnect(originalBuf);
|
||||
delegate.drop(originalBuf);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reconnect(T obj) {
|
||||
delegate.reconnect(obj);
|
||||
}
|
||||
|
||||
Drop<T> unwrap() {
|
||||
return delegate;
|
||||
}
|
||||
|
||||
private void checkValidState() {
|
||||
if (count == 0) {
|
||||
throw new IllegalStateException("Underlying resources have already been freed.", closeTrace);
|
||||
}
|
||||
}
|
||||
}
|
@ -315,12 +315,49 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
|
||||
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
|
||||
var newSegment = recoverableMemory.segment;
|
||||
newSegment.copyFrom(seg);
|
||||
alloc.recoverMemory(recoverableMemory()); // Release old memory segment.
|
||||
|
||||
// Release old memory segment:
|
||||
var drop = unsafeGetDrop();
|
||||
if (drop instanceof BifurcatedDrop) {
|
||||
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
|
||||
drop.drop(this);
|
||||
drop = ((BifurcatedDrop<MemSegBuf>) drop).unwrap();
|
||||
unsafeExchangeDrop(drop);
|
||||
} else {
|
||||
alloc.recoverMemory(recoverableMemory());
|
||||
}
|
||||
|
||||
seg = newSegment;
|
||||
unsafeGetDrop().accept(this);
|
||||
drop.reconnect(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buf bifurcate() {
|
||||
if (!isOwned()) {
|
||||
throw new IllegalStateException("Cannot bifurcate a buffer that is not owned.");
|
||||
}
|
||||
var drop = unsafeGetDrop();
|
||||
if (seg.ownerThread() != null) {
|
||||
seg = seg.share();
|
||||
drop.reconnect(this);
|
||||
}
|
||||
if (drop instanceof BifurcatedDrop) {
|
||||
((BifurcatedDrop<?>) drop).increment();
|
||||
} else {
|
||||
drop = unsafeExchangeDrop(new BifurcatedDrop<MemSegBuf>(new MemSegBuf(seg, drop, alloc), drop));
|
||||
}
|
||||
var bifurcatedSeg = seg.asSlice(0, woff);
|
||||
var bifurcatedBuf = new MemSegBuf(bifurcatedSeg, drop, alloc);
|
||||
bifurcatedBuf.woff = woff;
|
||||
bifurcatedBuf.roff = roff;
|
||||
bifurcatedBuf.order(order);
|
||||
seg = seg.asSlice(woff, seg.byteSize() - woff);
|
||||
woff = 0;
|
||||
roff = 0;
|
||||
return bifurcatedBuf;
|
||||
}
|
||||
|
||||
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
|
||||
@Override
|
||||
public byte readByte() {
|
||||
|
@ -65,6 +65,16 @@ public class BufTest {
|
||||
return fixtures = fixtureCombinations().toArray(Fixture[]::new);
|
||||
}
|
||||
|
||||
static List<Fixture> initialAllocators() {
|
||||
return List.of(
|
||||
new Fixture("heap", Allocator::heap, HEAP),
|
||||
new Fixture("direct", Allocator::direct, DIRECT),
|
||||
new Fixture("directWithCleaner", Allocator::directWithCleaner, DIRECT, CLEANER),
|
||||
new Fixture("pooledHeap", Allocator::pooledHeap, POOLED, HEAP),
|
||||
new Fixture("pooledDirect", Allocator::pooledDirect, POOLED, DIRECT),
|
||||
new Fixture("pooledDirectWithCleaner", Allocator::pooledDirectWithCleaner, POOLED, DIRECT, CLEANER));
|
||||
}
|
||||
|
||||
static Stream<Fixture> nonSliceAllocators() {
|
||||
return fixtureCombinations().filter(f -> !f.isSlice());
|
||||
}
|
||||
@ -94,13 +104,7 @@ public class BufTest {
|
||||
if (fxs != null) {
|
||||
return Arrays.stream(fxs);
|
||||
}
|
||||
List<Fixture> initFixtures = List.of(
|
||||
new Fixture("heap", Allocator::heap, HEAP),
|
||||
new Fixture("direct", Allocator::direct, DIRECT),
|
||||
new Fixture("directWithCleaner", Allocator::directWithCleaner, DIRECT, CLEANER),
|
||||
new Fixture("pooledHeap", Allocator::pooledHeap, POOLED, HEAP),
|
||||
new Fixture("pooledDirect", Allocator::pooledDirect, POOLED, DIRECT),
|
||||
new Fixture("pooledDirectWithCleaner", Allocator::pooledDirectWithCleaner, POOLED, DIRECT, CLEANER));
|
||||
List<Fixture> initFixtures = initialAllocators();
|
||||
Builder<Fixture> builder = Stream.builder();
|
||||
initFixtures.forEach(builder);
|
||||
|
||||
@ -192,44 +196,68 @@ public class BufTest {
|
||||
}, COMPOSITE));
|
||||
}
|
||||
|
||||
return builder.build().flatMap(f -> {
|
||||
// Inject slice versions of everything
|
||||
Builder<Fixture> andSlices = Stream.builder();
|
||||
andSlices.add(f);
|
||||
andSlices.add(new Fixture(f + ".slice(0, capacity())", () -> {
|
||||
var allocatorBase = f.get();
|
||||
return new Allocator() {
|
||||
@Override
|
||||
public Buf allocate(int size) {
|
||||
try (Buf base = allocatorBase.allocate(size)) {
|
||||
return base.slice(0, base.capacity()).writerOffset(0);
|
||||
}
|
||||
}
|
||||
return builder.build().flatMap(BufTest::injectBifurcations).flatMap(BufTest::injectSlices);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
allocatorBase.close();
|
||||
}
|
||||
};
|
||||
}, Properties.SLICE));
|
||||
andSlices.add(new Fixture(f + ".slice(1, capacity() - 2)", () -> {
|
||||
var allocatorBase = f.get();
|
||||
return new Allocator() {
|
||||
@Override
|
||||
public Buf allocate(int size) {
|
||||
try (Buf base = allocatorBase.allocate(size + 2)) {
|
||||
return base.slice(1, size).writerOffset(0);
|
||||
}
|
||||
private static Stream<Fixture> injectBifurcations(Fixture f) {
|
||||
Builder<Fixture> builder = Stream.builder();
|
||||
builder.add(f);
|
||||
builder.add(new Fixture(f + ".bifurcate", () -> {
|
||||
var allocatorBase = f.get();
|
||||
return new Allocator() {
|
||||
@Override
|
||||
public Buf allocate(int size) {
|
||||
try (Buf buf = allocatorBase.allocate(size + 1)) {
|
||||
buf.writerOffset(size);
|
||||
return buf.bifurcate().writerOffset(0);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
allocatorBase.close();
|
||||
@Override
|
||||
public void close() {
|
||||
allocatorBase.close();
|
||||
}
|
||||
};
|
||||
}, f.getProperties()));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private static Stream<Fixture> injectSlices(Fixture f) {
|
||||
Builder<Fixture> builder = Stream.builder();
|
||||
builder.add(f);
|
||||
builder.add(new Fixture(f + ".slice(0, capacity())", () -> {
|
||||
var allocatorBase = f.get();
|
||||
return new Allocator() {
|
||||
@Override
|
||||
public Buf allocate(int size) {
|
||||
try (Buf base = allocatorBase.allocate(size)) {
|
||||
return base.slice(0, base.capacity()).writerOffset(0);
|
||||
}
|
||||
};
|
||||
}, Properties.SLICE));
|
||||
return andSlices.build();
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
allocatorBase.close();
|
||||
}
|
||||
};
|
||||
}, Properties.SLICE));
|
||||
builder.add(new Fixture(f + ".slice(1, capacity() - 2)", () -> {
|
||||
var allocatorBase = f.get();
|
||||
return new Allocator() {
|
||||
@Override
|
||||
public Buf allocate(int size) {
|
||||
try (Buf base = allocatorBase.allocate(size + 2)) {
|
||||
return base.slice(1, size).writerOffset(0);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
allocatorBase.close();
|
||||
}
|
||||
};
|
||||
}, Properties.SLICE));
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
@ -342,7 +370,7 @@ public class BufTest {
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonSliceAllocators")
|
||||
@MethodSource("initialAllocators")
|
||||
void mustThrowWhenAllocatingZeroSizedBuffer(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator()) {
|
||||
assertThrows(IllegalArgumentException.class, () -> allocator.allocate(0));
|
||||
@ -1814,6 +1842,198 @@ public class BufTest {
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonSliceAllocators")
|
||||
public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8)) {
|
||||
buf.writeInt(1);
|
||||
try (Buf acquired = buf.acquire()) {
|
||||
var exc = assertThrows(IllegalStateException.class, () -> acquired.bifurcate());
|
||||
assertThat(exc).hasMessageContaining("owned");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonSliceAllocators")
|
||||
public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
assertThat(buf.readByte()).isEqualTo((byte) 0x01);
|
||||
try (Buf bif = buf.bifurcate()) {
|
||||
// Original buffer:
|
||||
assertThat(buf.capacity()).isEqualTo(8);
|
||||
assertThat(buf.readerOffset()).isZero();
|
||||
assertThat(buf.writerOffset()).isZero();
|
||||
assertThat(buf.readableBytes()).isZero();
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte());
|
||||
|
||||
// Bifurcated part:
|
||||
assertThat(bif.capacity()).isEqualTo(8);
|
||||
assertThat(bif.readerOffset()).isOne();
|
||||
assertThat(bif.writerOffset()).isEqualTo(8);
|
||||
assertThat(bif.readableBytes()).isEqualTo(7);
|
||||
assertThat(bif.readByte()).isEqualTo((byte) 0x02);
|
||||
assertThat(bif.readInt()).isEqualTo(0x03040506);
|
||||
assertThat(bif.readByte()).isEqualTo((byte) 0x07);
|
||||
assertThat(bif.readByte()).isEqualTo((byte) 0x08);
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> bif.readByte());
|
||||
}
|
||||
|
||||
// Bifurcated part does NOT return when closed:
|
||||
assertThat(buf.capacity()).isEqualTo(8);
|
||||
assertThat(buf.readerOffset()).isZero();
|
||||
assertThat(buf.writerOffset()).isZero();
|
||||
assertThat(buf.readableBytes()).isZero();
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte());
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonSliceAllocators")
|
||||
public void bifurcatedPartsMustBeIndividuallySendable(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
assertThat(buf.readByte()).isEqualTo((byte) 0x01);
|
||||
try (Buf sentBif = buf.bifurcate().send().receive()) {
|
||||
try (Buf sentBuf = buf.send().receive()) {
|
||||
assertThat(sentBuf.capacity()).isEqualTo(8);
|
||||
assertThat(sentBuf.readerOffset()).isZero();
|
||||
assertThat(sentBuf.writerOffset()).isZero();
|
||||
assertThat(sentBuf.readableBytes()).isZero();
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> sentBuf.readByte());
|
||||
}
|
||||
|
||||
assertThat(sentBif.capacity()).isEqualTo(8);
|
||||
assertThat(sentBif.readerOffset()).isOne();
|
||||
assertThat(sentBif.writerOffset()).isEqualTo(8);
|
||||
assertThat(sentBif.readableBytes()).isEqualTo(7);
|
||||
assertThat(sentBif.readByte()).isEqualTo((byte) 0x02);
|
||||
assertThat(sentBif.readInt()).isEqualTo(0x03040506);
|
||||
assertThat(sentBif.readByte()).isEqualTo((byte) 0x07);
|
||||
assertThat(sentBif.readByte()).isEqualTo((byte) 0x08);
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> sentBif.readByte());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonSliceAllocators")
|
||||
public void mustBePossibleToBifurcateMoreThanOnce(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
try (Buf a = buf.bifurcate()) {
|
||||
a.writerOffset(4);
|
||||
try (Buf b = a.bifurcate()) {
|
||||
assertEquals(0x01020304, b.readInt());
|
||||
a.writerOffset(4);
|
||||
assertEquals(0x05060708, a.readInt());
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> b.readByte());
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> a.readByte());
|
||||
buf.writeLong(0xA1A2A3A4A5A6A7A8L);
|
||||
buf.writerOffset(4);
|
||||
try (Buf c = buf.bifurcate()) {
|
||||
assertEquals(0xA1A2A3A4, c.readInt());
|
||||
buf.writerOffset(4);
|
||||
assertEquals(0xA5A6A7A8, buf.readInt());
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> c.readByte());
|
||||
assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonSliceAllocators")
|
||||
public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8).order(ByteOrder.BIG_ENDIAN)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
try (Buf a = buf.bifurcate()) {
|
||||
assertThat(a.order()).isEqualTo(ByteOrder.BIG_ENDIAN);
|
||||
a.order(ByteOrder.LITTLE_ENDIAN);
|
||||
a.writerOffset(4);
|
||||
try (Buf b = a.bifurcate()) {
|
||||
assertThat(b.order()).isEqualTo(ByteOrder.LITTLE_ENDIAN);
|
||||
assertThat(buf.order()).isEqualTo(ByteOrder.BIG_ENDIAN);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonSliceAllocators")
|
||||
public void ensureWritableOnBifurcatedBuffers(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
try (Buf a = buf.bifurcate()) {
|
||||
assertEquals(0x0102030405060708L, a.readLong());
|
||||
a.ensureWritable(8);
|
||||
a.writeLong(0xA1A2A3A4A5A6A7A8L);
|
||||
assertEquals(0xA1A2A3A4A5A6A7A8L, a.readLong());
|
||||
|
||||
buf.ensureWritable(8);
|
||||
buf.writeLong(0xA1A2A3A4A5A6A7A8L);
|
||||
assertEquals(0xA1A2A3A4A5A6A7A8L, buf.readLong());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonSliceAllocators")
|
||||
public void ensureWritableOnBifurcatedBuffersWithOddOffsets(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(10).order(ByteOrder.BIG_ENDIAN)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
buf.writeByte((byte) 0x09);
|
||||
buf.readByte();
|
||||
try (Buf a = buf.bifurcate()) {
|
||||
assertEquals(0x0203040506070809L, a.readLong());
|
||||
a.ensureWritable(8);
|
||||
a.writeLong(0xA1A2A3A4A5A6A7A8L);
|
||||
assertEquals(0xA1A2A3A4A5A6A7A8L, a.readLong());
|
||||
|
||||
buf.ensureWritable(8);
|
||||
buf.writeLong(0xA1A2A3A4A5A6A7A8L);
|
||||
assertEquals(0xA1A2A3A4A5A6A7A8L, buf.readLong());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bifurcateOnEmptyBigEndianCompositeBuffer() {
|
||||
try (Allocator allocator = Allocator.heap();
|
||||
Buf buf = allocator.compose().order(ByteOrder.BIG_ENDIAN)) {
|
||||
verifyBifurcateEmptyCompositeBuffer(buf);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void bifurcateOnEmptyLittleEndianCompositeBuffer() {
|
||||
try (Allocator allocator = Allocator.heap();
|
||||
Buf buf = allocator.compose().order(ByteOrder.LITTLE_ENDIAN)) {
|
||||
verifyBifurcateEmptyCompositeBuffer(buf);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
|
||||
try (Buf a = buf.bifurcate()) {
|
||||
a.ensureWritable(4);
|
||||
buf.ensureWritable(4);
|
||||
a.writeInt(1);
|
||||
buf.writeInt(2);
|
||||
assertEquals(1, a.readInt());
|
||||
assertEquals(2, buf.readInt());
|
||||
assertThat(a.order()).isEqualTo(buf.order());
|
||||
}
|
||||
}
|
||||
|
||||
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
|
Loading…
Reference in New Issue
Block a user