Merge pull request #16 from netty/bifurcate

Add a Buf.bifurcate method
This commit is contained in:
Chris Vest 2020-12-10 16:07:09 +01:00 committed by GitHub
commit 3cddd2b0b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 533 additions and 65 deletions

View File

@ -28,7 +28,7 @@ public interface AllocatorControl {
* This allows a buffer to implement {@link Buf#ensureWritable(int)} by having new memory allocated to it,
* without that memory being attached to some other lifetime.
*
* @param originator The buffer that originated the request for an untethered memory allocated.
* @param originator The buffer that originated the request for an untethered memory allocated.
* @param size The size of the requested memory allocation, in bytes.
* @return A "recoverable memory" object that is the requested allocation.
*/

View File

@ -373,4 +373,48 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* That is, if {@link #countBorrows()} is not {@code 0}.
*/
void ensureWritable(int size);
/**
* Split the buffer into two, at the {@linkplain #writerOffset() write offset} position.
* <p>
* The region of this buffer that contain the read and readable bytes, will be captured and returned in a new
* buffer, that will hold its own ownership of that region. This allows the returned buffer to be indepentently
* {@linkplain #send() sent} to other threads.
* <p>
* The returned buffer will adopt the {@link #readerOffset()} of this buffer, and have its {@link #writerOffset()}
* and {@link #capacity()} both set to the equal to the write offset of this buffer.
* <p>
* The memory region in the returned buffer will become inaccessible through this buffer. This buffer will have its
* capacity reduced by the capacity of the returned buffer, and the read and write offsets of this buffer will both
* become zero, even though their position in memory remain unchanged.
* <p>
* Effectively, the following transformation takes place:
* <pre>{@code
* This buffer:
* +------------------------------------------+
* 0| |r/o |w/o |cap
* +---+---------------------+----------------+
* / / / \ \
* / / / \ \
* / / / \ \
* / / / \ \
* / / / \ \
* +---+---------------------+ +---------------+
* | |r/o |w/o & cap |r/o & w/o |cap
* +---+---------------------+ +---------------+
* Returned buffer. This buffer.
* }</pre>
* When the buffers are in this state, both of the bifurcated parts retain an atomic reference count on the
* underlying memory. This means that shared underlying memory will not be deallocated or returned to a pool, until
* all of the bifurcated parts have been closed.
* <p>
* Composite buffers have it a little easier, in that at most only one of the constituent buffers will actually be
* bifurcated. If the split point lands perfectly between two constituent buffers, then a composite buffer can
* simply split its internal array in two.
* <p>
* Bifurcated buffers support all operations that normal buffers do, including {@link #ensureWritable(int)}.
*
* @return A new buffer with independent and exclusive ownership over the read and readable bytes from this buffer.
*/
Buf bifurcate();
}

View File

@ -27,12 +27,9 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
* non-composite copy of the buffer.
*/
private static final int MAX_CAPACITY = Integer.MAX_VALUE - 8;
private static final Drop<CompositeBuf> COMPOSITE_DROP = new Drop<CompositeBuf>() {
@Override
public void drop(CompositeBuf obj) {
for (Buf buf : obj.bufs) {
buf.close();
}
private static final Drop<CompositeBuf> COMPOSITE_DROP = buf -> {
for (Buf b : buf.bufs) {
b.close();
}
};
@ -45,6 +42,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private int roff;
private int woff;
private int subOffset; // The next offset *within* a consituent buffer to read from or write to.
private ByteOrder order;
CompositeBuf(Allocator allocator, Buf[] bufs) {
this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
@ -64,6 +62,9 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
throw new IllegalArgumentException("Constituent buffers have inconsistent byte order.");
}
}
order = bufs[0].order();
} else {
order = ByteOrder.nativeOrder();
}
this.bufs = bufs;
computeBufferOffsets();
@ -129,15 +130,18 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
@Override
public Buf order(ByteOrder order) {
for (Buf buf : bufs) {
buf.order(order);
if (this.order != order) {
this.order = order;
for (Buf buf : bufs) {
buf.order(order);
}
}
return this;
}
@Override
public ByteOrder order() {
return bufs.length > 0? bufs[0].order() : ByteOrder.nativeOrder();
return order;
}
@Override
@ -543,6 +547,9 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
"This buffer uses " + order() + " byte order, and cannot be extended with " +
"a buffer that uses " + extension.order() + " byte order.");
}
if (bufs.length == 0) {
order = extension.order();
}
long newSize = capacity() + (long) extension.capacity();
Allocator.checkSize(newSize);
@ -561,6 +568,36 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
computeBufferOffsets();
}
@Override
public Buf bifurcate() {
if (!isOwned()) {
throw new IllegalStateException("Cannot bifurcate a buffer that is not owned.");
}
if (bufs.length == 0) {
// Bifurcating a zero-length buffer is trivial.
return new CompositeBuf(allocator, true, bufs, unsafeGetDrop()).order(order);
}
int i = searchOffsets(woff);
int off = woff - offsets[i];
Buf[] bifs = Arrays.copyOf(bufs, off == 0? i : 1 + i);
bufs = Arrays.copyOfRange(bufs, off == bufs[i].capacity()? 1 + i : i, bufs.length);
if (off > 0 && bifs.length > 0 && off < bifs[bifs.length - 1].capacity()) {
bifs[bifs.length - 1] = bufs[0].bifurcate();
}
computeBufferOffsets();
try {
var compositeBuf = new CompositeBuf(allocator, true, bifs, unsafeGetDrop());
compositeBuf.order = order; // Preserve byte order even if bifs array is empty.
return compositeBuf;
} finally {
// Drop our references to the buffers in the bifs array. They belong to the new composite buffer now.
for (Buf bif : bifs) {
bif.close();
}
}
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
@Override
public byte readByte() {
@ -856,7 +893,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
received[i] = sends[i].receive();
}
var composite = new CompositeBuf(allocator, true, received, drop);
drop.accept(composite);
drop.attach(composite);
return composite;
}
};

View File

@ -15,8 +15,6 @@
*/
package io.netty.buffer.api;
import java.util.function.Consumer;
/**
* The Drop interface is used by {@link Rc} instances to implement their resource disposal mechanics. The {@link
* #drop(Object)} method will be called by the Rc when their last reference is closed.
@ -24,7 +22,7 @@ import java.util.function.Consumer;
* @param <T>
*/
@FunctionalInterface
public interface Drop<T> extends Consumer<T> {
public interface Drop<T> {
/**
* Dispose of the resources in the given Rc.
*
@ -37,7 +35,6 @@ public interface Drop<T> extends Consumer<T> {
*
* @param obj The new Rc instance with the new owner.
*/
@Override
default void accept(T obj) {
default void attach(T obj) {
}
}

View File

@ -49,7 +49,7 @@ class NativeMemoryCleanerDrop implements Drop<Buf> {
}
@Override
public void accept(Buf buf) {
public void attach(Buf buf) {
// Unregister old cleanable, if any, to avoid uncontrolled build-up.
GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null);
if (c != null) {

View File

@ -15,9 +15,12 @@
*/
package io.netty.buffer.api;
import java.util.Objects;
import java.util.function.Function;
public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> implements Rc<I> {
private int acquires; // Closed if negative.
private final Drop<T> drop;
private Drop<T> drop;
protected RcSupport(Drop<T> drop) {
this.drop = drop;
@ -114,6 +117,11 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
return drop;
}
protected Drop<T> unsafeExchangeDrop(Drop<T> replacement) {
drop = Objects.requireNonNull(replacement, "Replacement drop cannot be null.");
return replacement;
}
@SuppressWarnings("unchecked")
private I self() {
return (I) this;

View File

@ -54,7 +54,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
protected Buf createBuf(int size, Drop<Buf> drop) {
var buf = manager.allocateShared(this, size, drop, null);
drop.accept(buf);
drop.attach(buf);
return buf;
}
@ -119,7 +119,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
public void recoverMemory(Object memory) {
var drop = getDrop();
var buf = manager.recoverMemory(memory, drop);
drop.accept(buf);
drop.attach(buf);
buf.close();
}

View File

@ -37,7 +37,7 @@ class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {
public I receive() {
gateReception();
var copy = outgoing.transferOwnership(drop);
drop.accept(copy);
drop.attach(copy);
return (I) copy;
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.memseg;
import io.netty.buffer.api.Drop;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
class BifurcatedDrop<T> implements Drop<T> {
private static final VarHandle COUNT;
static {
try {
COUNT = MethodHandles.lookup().findVarHandle(BifurcatedDrop.class, "count", int.class);
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
private final T originalBuf;
private final Drop<T> delegate;
@SuppressWarnings("FieldMayBeFinal")
private volatile int count;
BifurcatedDrop(T originalBuf, Drop<T> delegate) {
this.originalBuf = originalBuf;
this.delegate = delegate;
count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop.
}
void increment() {
int c;
do {
c = count;
checkValidState(c);
} while (!COUNT.compareAndSet(this, c, c + 1));
}
@Override
public synchronized void drop(T obj) {
int c;
int n;
do {
c = count;
n = c - 1;
checkValidState(c);
} while (!COUNT.compareAndSet(this, c, n));
if (n == 0) {
delegate.attach(originalBuf);
delegate.drop(originalBuf);
}
}
@Override
public void attach(T obj) {
delegate.attach(obj);
}
Drop<T> unwrap() {
return delegate;
}
private static void checkValidState(int count) {
if (count == 0) {
throw new IllegalStateException("Underlying resources have already been freed.");
}
}
}

View File

@ -315,12 +315,49 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
var newSegment = recoverableMemory.segment;
newSegment.copyFrom(seg);
alloc.recoverMemory(recoverableMemory()); // Release old memory segment.
// Release old memory segment:
var drop = unsafeGetDrop();
if (drop instanceof BifurcatedDrop) {
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
drop.drop(this);
drop = ((BifurcatedDrop<MemSegBuf>) drop).unwrap();
unsafeExchangeDrop(drop);
} else {
alloc.recoverMemory(recoverableMemory());
}
seg = newSegment;
unsafeGetDrop().accept(this);
drop.attach(this);
}
}
@Override
public Buf bifurcate() {
if (!isOwned()) {
throw new IllegalStateException("Cannot bifurcate a buffer that is not owned.");
}
var drop = unsafeGetDrop();
if (seg.ownerThread() != null) {
seg = seg.share();
drop.attach(this);
}
if (drop instanceof BifurcatedDrop) {
((BifurcatedDrop<?>) drop).increment();
} else {
drop = unsafeExchangeDrop(new BifurcatedDrop<MemSegBuf>(new MemSegBuf(seg, drop, alloc), drop));
}
var bifurcatedSeg = seg.asSlice(0, woff);
var bifurcatedBuf = new MemSegBuf(bifurcatedSeg, drop, alloc);
bifurcatedBuf.woff = woff;
bifurcatedBuf.roff = roff;
bifurcatedBuf.order(order);
seg = seg.asSlice(woff, seg.byteSize() - woff);
woff = 0;
roff = 0;
return bifurcatedBuf;
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
@Override
public byte readByte() {

View File

@ -65,6 +65,16 @@ public class BufTest {
return fixtures = fixtureCombinations().toArray(Fixture[]::new);
}
static List<Fixture> initialAllocators() {
return List.of(
new Fixture("heap", Allocator::heap, HEAP),
new Fixture("direct", Allocator::direct, DIRECT),
new Fixture("directWithCleaner", Allocator::directWithCleaner, DIRECT, CLEANER),
new Fixture("pooledHeap", Allocator::pooledHeap, POOLED, HEAP),
new Fixture("pooledDirect", Allocator::pooledDirect, POOLED, DIRECT),
new Fixture("pooledDirectWithCleaner", Allocator::pooledDirectWithCleaner, POOLED, DIRECT, CLEANER));
}
static Stream<Fixture> nonSliceAllocators() {
return fixtureCombinations().filter(f -> !f.isSlice());
}
@ -94,13 +104,7 @@ public class BufTest {
if (fxs != null) {
return Arrays.stream(fxs);
}
List<Fixture> initFixtures = List.of(
new Fixture("heap", Allocator::heap, HEAP),
new Fixture("direct", Allocator::direct, DIRECT),
new Fixture("directWithCleaner", Allocator::directWithCleaner, DIRECT, CLEANER),
new Fixture("pooledHeap", Allocator::pooledHeap, POOLED, HEAP),
new Fixture("pooledDirect", Allocator::pooledDirect, POOLED, DIRECT),
new Fixture("pooledDirectWithCleaner", Allocator::pooledDirectWithCleaner, POOLED, DIRECT, CLEANER));
List<Fixture> initFixtures = initialAllocators();
Builder<Fixture> builder = Stream.builder();
initFixtures.forEach(builder);
@ -192,44 +196,68 @@ public class BufTest {
}, COMPOSITE));
}
return builder.build().flatMap(f -> {
// Inject slice versions of everything
Builder<Fixture> andSlices = Stream.builder();
andSlices.add(f);
andSlices.add(new Fixture(f + ".slice(0, capacity())", () -> {
var allocatorBase = f.get();
return new Allocator() {
@Override
public Buf allocate(int size) {
try (Buf base = allocatorBase.allocate(size)) {
return base.slice(0, base.capacity()).writerOffset(0);
}
}
return builder.build().flatMap(BufTest::injectBifurcations).flatMap(BufTest::injectSlices);
}
@Override
public void close() {
allocatorBase.close();
}
};
}, Properties.SLICE));
andSlices.add(new Fixture(f + ".slice(1, capacity() - 2)", () -> {
var allocatorBase = f.get();
return new Allocator() {
@Override
public Buf allocate(int size) {
try (Buf base = allocatorBase.allocate(size + 2)) {
return base.slice(1, size).writerOffset(0);
}
private static Stream<Fixture> injectBifurcations(Fixture f) {
Builder<Fixture> builder = Stream.builder();
builder.add(f);
builder.add(new Fixture(f + ".bifurcate", () -> {
var allocatorBase = f.get();
return new Allocator() {
@Override
public Buf allocate(int size) {
try (Buf buf = allocatorBase.allocate(size + 1)) {
buf.writerOffset(size);
return buf.bifurcate().writerOffset(0);
}
}
@Override
public void close() {
allocatorBase.close();
@Override
public void close() {
allocatorBase.close();
}
};
}, f.getProperties()));
return builder.build();
}
private static Stream<Fixture> injectSlices(Fixture f) {
Builder<Fixture> builder = Stream.builder();
builder.add(f);
builder.add(new Fixture(f + ".slice(0, capacity())", () -> {
var allocatorBase = f.get();
return new Allocator() {
@Override
public Buf allocate(int size) {
try (Buf base = allocatorBase.allocate(size)) {
return base.slice(0, base.capacity()).writerOffset(0);
}
};
}, Properties.SLICE));
return andSlices.build();
});
}
@Override
public void close() {
allocatorBase.close();
}
};
}, Properties.SLICE));
builder.add(new Fixture(f + ".slice(1, capacity() - 2)", () -> {
var allocatorBase = f.get();
return new Allocator() {
@Override
public Buf allocate(int size) {
try (Buf base = allocatorBase.allocate(size + 2)) {
return base.slice(1, size).writerOffset(0);
}
}
@Override
public void close() {
allocatorBase.close();
}
};
}, Properties.SLICE));
return builder.build();
}
@BeforeAll
@ -342,7 +370,7 @@ public class BufTest {
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
@MethodSource("initialAllocators")
void mustThrowWhenAllocatingZeroSizedBuffer(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator()) {
assertThrows(IllegalArgumentException.class, () -> allocator.allocate(0));
@ -1814,6 +1842,242 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.writeInt(1);
try (Buf acquired = buf.acquire()) {
var exc = assertThrows(IllegalStateException.class, () -> acquired.bifurcate());
assertThat(exc).hasMessageContaining("owned");
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void bifurcatedPartMustContainFirstHalfOfBuffer(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) {
buf.writeLong(0x0102030405060708L);
assertThat(buf.readByte()).isEqualTo((byte) 0x01);
try (Buf bif = buf.bifurcate()) {
// Original buffer:
assertThat(buf.capacity()).isEqualTo(8);
assertThat(buf.readerOffset()).isZero();
assertThat(buf.writerOffset()).isZero();
assertThat(buf.readableBytes()).isZero();
assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte());
// Bifurcated part:
assertThat(bif.capacity()).isEqualTo(8);
assertThat(bif.readerOffset()).isOne();
assertThat(bif.writerOffset()).isEqualTo(8);
assertThat(bif.readableBytes()).isEqualTo(7);
assertThat(bif.readByte()).isEqualTo((byte) 0x02);
assertThat(bif.readInt()).isEqualTo(0x03040506);
assertThat(bif.readByte()).isEqualTo((byte) 0x07);
assertThat(bif.readByte()).isEqualTo((byte) 0x08);
assertThrows(IndexOutOfBoundsException.class, () -> bif.readByte());
}
// Bifurcated part does NOT return when closed:
assertThat(buf.capacity()).isEqualTo(8);
assertThat(buf.readerOffset()).isZero();
assertThat(buf.writerOffset()).isZero();
assertThat(buf.readableBytes()).isZero();
assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte());
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void bifurcatedPartsMustBeIndividuallySendable(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) {
buf.writeLong(0x0102030405060708L);
assertThat(buf.readByte()).isEqualTo((byte) 0x01);
try (Buf sentBif = buf.bifurcate().send().receive()) {
try (Buf sentBuf = buf.send().receive()) {
assertThat(sentBuf.capacity()).isEqualTo(8);
assertThat(sentBuf.readerOffset()).isZero();
assertThat(sentBuf.writerOffset()).isZero();
assertThat(sentBuf.readableBytes()).isZero();
assertThrows(IndexOutOfBoundsException.class, () -> sentBuf.readByte());
}
assertThat(sentBif.capacity()).isEqualTo(8);
assertThat(sentBif.readerOffset()).isOne();
assertThat(sentBif.writerOffset()).isEqualTo(8);
assertThat(sentBif.readableBytes()).isEqualTo(7);
assertThat(sentBif.readByte()).isEqualTo((byte) 0x02);
assertThat(sentBif.readInt()).isEqualTo(0x03040506);
assertThat(sentBif.readByte()).isEqualTo((byte) 0x07);
assertThat(sentBif.readByte()).isEqualTo((byte) 0x08);
assertThrows(IndexOutOfBoundsException.class, () -> sentBif.readByte());
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void mustBePossibleToBifurcateMoreThanOnce(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(16).order(ByteOrder.BIG_ENDIAN)) {
buf.writeLong(0x0102030405060708L);
try (Buf a = buf.bifurcate()) {
a.writerOffset(4);
try (Buf b = a.bifurcate()) {
assertEquals(0x01020304, b.readInt());
a.writerOffset(4);
assertEquals(0x05060708, a.readInt());
assertThrows(IndexOutOfBoundsException.class, () -> b.readByte());
assertThrows(IndexOutOfBoundsException.class, () -> a.readByte());
buf.writeLong(0xA1A2A3A4A5A6A7A8L);
buf.writerOffset(4);
try (Buf c = buf.bifurcate()) {
assertEquals(0xA1A2A3A4, c.readInt());
buf.writerOffset(4);
assertEquals(0xA5A6A7A8, buf.readInt());
assertThrows(IndexOutOfBoundsException.class, () -> c.readByte());
assertThrows(IndexOutOfBoundsException.class, () -> buf.readByte());
}
}
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void bifurcatedBufferMustHaveSameByteOrderAsParent(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8).order(ByteOrder.BIG_ENDIAN)) {
buf.writeLong(0x0102030405060708L);
try (Buf a = buf.bifurcate()) {
assertThat(a.order()).isEqualTo(ByteOrder.BIG_ENDIAN);
a.order(ByteOrder.LITTLE_ENDIAN);
a.writerOffset(4);
try (Buf b = a.bifurcate()) {
assertThat(b.order()).isEqualTo(ByteOrder.LITTLE_ENDIAN);
assertThat(buf.order()).isEqualTo(ByteOrder.BIG_ENDIAN);
}
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableOnBifurcatedBuffers(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.writeLong(0x0102030405060708L);
try (Buf a = buf.bifurcate()) {
assertEquals(0x0102030405060708L, a.readLong());
a.ensureWritable(8);
a.writeLong(0xA1A2A3A4A5A6A7A8L);
assertEquals(0xA1A2A3A4A5A6A7A8L, a.readLong());
buf.ensureWritable(8);
buf.writeLong(0xA1A2A3A4A5A6A7A8L);
assertEquals(0xA1A2A3A4A5A6A7A8L, buf.readLong());
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableOnBifurcatedBuffersWithOddOffsets(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(10).order(ByteOrder.BIG_ENDIAN)) {
buf.writeLong(0x0102030405060708L);
buf.writeByte((byte) 0x09);
buf.readByte();
try (Buf a = buf.bifurcate()) {
assertEquals(0x0203040506070809L, a.readLong());
a.ensureWritable(8);
a.writeLong(0xA1A2A3A4A5A6A7A8L);
assertEquals(0xA1A2A3A4A5A6A7A8L, a.readLong());
buf.ensureWritable(8);
buf.writeLong(0xA1A2A3A4A5A6A7A8L);
assertEquals(0xA1A2A3A4A5A6A7A8L, buf.readLong());
}
}
}
@Test
public void bifurcateOnEmptyBigEndianCompositeBuffer() {
try (Allocator allocator = Allocator.heap();
Buf buf = allocator.compose().order(ByteOrder.BIG_ENDIAN)) {
verifyBifurcateEmptyCompositeBuffer(buf);
}
}
@Test
public void bifurcateOnEmptyLittleEndianCompositeBuffer() {
try (Allocator allocator = Allocator.heap();
Buf buf = allocator.compose().order(ByteOrder.LITTLE_ENDIAN)) {
verifyBifurcateEmptyCompositeBuffer(buf);
}
}
private void verifyBifurcateEmptyCompositeBuffer(Buf buf) {
try (Buf a = buf.bifurcate()) {
a.ensureWritable(4);
buf.ensureWritable(4);
a.writeInt(1);
buf.writeInt(2);
assertEquals(1, a.readInt());
assertEquals(2, buf.readInt());
assertThat(a.order()).isEqualTo(buf.order());
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void bifurcatedBuffersMustBeAccessibleInOtherThreads(Fixture fixture) throws Exception {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.writeInt(42);
var send = buf.bifurcate().send();
var fut = executor.submit(() -> {
try (Buf receive = send.receive()) {
assertEquals(42, receive.readInt());
receive.readerOffset(0).writerOffset(0).writeInt(24);
assertEquals(24, receive.readInt());
}
});
fut.get();
buf.writeInt(32);
assertEquals(32, buf.readInt());
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void sendMustNotMakeBifurcatedBuffersInaccessible(Fixture fixture) throws Exception {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(16)) {
buf.writeInt(64);
var bifA = buf.bifurcate();
buf.writeInt(42);
var send = buf.bifurcate().send();
buf.writeInt(72);
var bifB = buf.bifurcate();
var fut = executor.submit(() -> {
try (Buf receive = send.receive()) {
assertEquals(42, receive.readInt());
}
});
fut.get();
buf.writeInt(32);
assertEquals(32, buf.readInt());
assertEquals(64, bifA.readInt());
assertEquals(72, bifB.readInt());
}
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
@ParameterizedTest
@MethodSource("allocators")