Add support for read-only buffers

Motivation:
There are cases where you want a buffer to be "constant."
Buffers are inherently mutable, but it's possible to block off write access to the buffer contents.
This doesn't make it completely safe to share the buffer across multiple threads, but it does catch most races that could occur.

Modification:
Add a method to Buf for toggling read-only mode.
When a buffer is read-only, the write accessors throw exceptions when called.
In the MemSegBuf, this is implemented by having separate read and write references to the underlying memory segment.
In a read-only buffer, the write reference is redirected to point to a closed memory segment, thus preventing all writes to the memory backing the buffer.

Result:
It is now possible to make buffers read-only.
Note, however, that it is also possible to toggle a read-only buffer back to writable.
We need that in order for buffer pools to be able to fully reset the state of a buffer, regardless of the buffer implementation.
This commit is contained in:
Chris Vest 2021-01-05 16:53:21 +01:00
parent f862220c4a
commit 617d9ccef1
6 changed files with 524 additions and 90 deletions

View File

@ -70,7 +70,7 @@ public interface Allocator extends AutoCloseable {
* <pre>{@code
* try (Buf a = allocator.allocate(size);
* Buf b = allocator.allocate(size)) {
* return Buf.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>

View File

@ -154,6 +154,7 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* @return This Buf.
* @throws IndexOutOfBoundsException if the specified {@code offset} is less than the current
* {@link #readerOffset()} or greater than {@link #capacity()}.
* @throws IllegalStateException if this buffer is {@linkplain #readOnly() read-only}.
*/
Buf writerOffset(int offset);
@ -178,6 +179,7 @@ public interface Buf extends Rc<Buf>, BufAccessors {
*
* @param value The byte value to write at every position in the buffer.
* @return This Buf.
* @throws IllegalStateException if this buffer is {@linkplain #readOnly() read-only}.
*/
Buf fill(byte value);
@ -187,6 +189,20 @@ public interface Buf extends Rc<Buf>, BufAccessors {
*/
long getNativeAddress();
/**
* Set the read-only state of this buffer.
*
* @return this buffer.
*/
Buf readOnly(boolean readOnly);
/**
* Query if this buffer is read-only or not.
*
* @return {@code true} if this buffer is read-only, {@code false} otherwise.
*/
boolean readOnly();
/**
* Returns a slice of this buffer's readable bytes.
* Modifying the content of the returned buffer or this buffer affects each other's content,
@ -371,8 +387,8 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* {@code false}.
*
* @param size The requested number of bytes of space that should be available for writing.
* @throws IllegalStateException if this buffer is not in an owned state.
* That is, if {@link #isOwned()} is {@code false}.
* @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state,
* or is {@linkplain #readOnly() read-only}.
*/
default void ensureWritable(int size) {
ensureWritable(size, true);
@ -410,8 +426,8 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* @param allowCompaction {@code true} if the method is allowed to modify the
* {@linkplain #readerOffset() reader offset} and
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
* @throws IllegalStateException if this buffer is not in an owned state.
* That is, if {@link #isOwned()} is {@code false}.
* @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state,
* * or is {@linkplain #readOnly() read-only}.
*/
void ensureWritable(int size, boolean allowCompaction);
@ -462,7 +478,8 @@ public interface Buf extends Rc<Buf>, BufAccessors {
/**
* Discards the read bytes, and moves the buffer contents to the beginning of the buffer.
*
* The buffer must be {@linkplain #isOwned() owned}, or an exception will be thrown.
* @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state,
* or is {@linkplain #readOnly() read-only}.
*/
void compact();
}

View File

@ -20,9 +20,6 @@ import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Objects;
import static jdk.incubator.foreign.MemoryAccess.setByteAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
/**
* The max array size is JVM implementation dependant, but most seem to settle on {@code Integer.MAX_VALUE - 8}.
@ -48,6 +45,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private int subOffset; // The next offset *within* a consituent buffer to read from or write to.
private ByteOrder order;
private boolean closed;
private boolean readOnly;
CompositeBuf(Allocator allocator, Buf[] bufs) {
this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
@ -68,6 +66,14 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
}
order = bufs[0].order();
boolean targetReadOnly = bufs[0].readOnly();
for (Buf buf : bufs) {
if (buf.readOnly() != targetReadOnly) {
throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state.");
}
}
readOnly = targetReadOnly;
} else {
order = ByteOrder.nativeOrder();
}
@ -201,6 +207,20 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
return 0;
}
@Override
public Buf readOnly(boolean readOnly) {
for (Buf buf : bufs) {
buf.readOnly(readOnly);
}
this.readOnly = readOnly;
return this;
}
@Override
public boolean readOnly() {
return readOnly;
}
@Override
public Buf slice(int offset, int length) {
checkWriteBounds(offset, length);
@ -236,7 +256,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
slices = new Buf[] { choice.slice(subOffset, 0) };
}
return new CompositeBuf(allocator, false, slices, drop).writerOffset(length);
return new CompositeBuf(allocator, false, slices, drop);
} catch (Throwable throwable) {
// We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer:
close();
@ -267,10 +287,10 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
throw new IndexOutOfBoundsException("Length cannot be negative: " + length + '.');
}
if (srcPos < 0) {
throw indexOutOfBounds(srcPos);
throw indexOutOfBounds(srcPos, false);
}
if (srcPos + length > capacity) {
throw indexOutOfBounds(srcPos + length);
throw indexOutOfBounds(srcPos + length, false);
}
while (length > 0) {
var buf = (Buf) chooseBuffer(srcPos, 0);
@ -293,10 +313,10 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
throw new IndexOutOfBoundsException("Length cannot be negative: " + length + '.');
}
if (srcPos < 0) {
throw indexOutOfBounds(srcPos);
throw indexOutOfBounds(srcPos, false);
}
if (srcPos + length > capacity) {
throw indexOutOfBounds(srcPos + length);
throw indexOutOfBounds(srcPos + length, false);
}
// Iterate in reverse to account for src and dest buffer overlap.
@ -530,6 +550,9 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (readOnly) {
throw bufferIsReadOnly();
}
if (writableBytes() >= size) {
// We already have enough space.
return;
@ -586,15 +609,23 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
"This buffer uses " + order() + " byte order, and cannot be extended with " +
"a buffer that uses " + extension.order() + " byte order.");
}
if (bufs.length == 0) {
order = extension.order();
if (bufs.length > 0 && extension.readOnly() != readOnly()) {
throw new IllegalArgumentException(
"This buffer is " + (readOnly? "read-only" : "writable") + ", " +
"and cannot be extended with a buffer that is " +
(extension.readOnly()? "read-only." : "writable."));
}
long newSize = capacity() + (long) extension.capacity();
Allocator.checkSize(newSize);
Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
try {
unsafeExtendWith(extension.acquire());
if (restoreTemp.length == 0) {
order = extension.order();
readOnly = extension.readOnly();
}
} catch (Exception e) {
bufs = restoreTemp;
throw e;
@ -642,6 +673,9 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
if (!isOwned()) {
throw new IllegalStateException("Buffer must be owned in order to compact.");
}
if (readOnly()) {
throw new IllegalStateException("Buffer must be writable in order to compact, but was read-only.");
}
int distance = roff;
if (distance == 0) {
return;
@ -962,6 +996,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
received[i] = sends[i].receive();
}
var composite = new CompositeBuf(allocator, true, received, drop);
composite.readOnly = readOnly;
drop.attach(composite);
return composite;
}
@ -1034,7 +1069,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private void checkReadBounds(int index, int size) {
if (index < 0 || woff < index + size) {
throw indexOutOfBounds(index);
throw indexOutOfBounds(index, false);
}
}
@ -1051,19 +1086,30 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private void checkWriteBounds(int index, int size) {
if (index < 0 || capacity < index + size) {
throw indexOutOfBounds(index);
throw indexOutOfBounds(index, true);
}
}
private RuntimeException indexOutOfBounds(int index) {
private RuntimeException indexOutOfBounds(int index, boolean write) {
if (closed) {
return new IllegalStateException("This buffer is closed.");
return bufferIsClosed();
}
if (write && readOnly) {
return bufferIsReadOnly();
}
return new IndexOutOfBoundsException(
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
(capacity - 1) + "].");
}
private static IllegalStateException bufferIsClosed() {
return new IllegalStateException("This buffer is closed.");
}
private static IllegalStateException bufferIsReadOnly() {
return new IllegalStateException("This buffer is read-only.");
}
private BufAccessors chooseBuffer(int index, int size) {
int i = searchOffsets(index);
if (i == bufs.length) {

View File

@ -43,7 +43,11 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
var sizeClassPool = getSizeClassPool(size);
Send<Buf> send = sizeClassPool.poll();
if (send != null) {
return send.receive().reset().fill((byte) 0).order(ByteOrder.nativeOrder());
return send.receive()
.reset()
.readOnly(false)
.fill((byte) 0)
.order(ByteOrder.nativeOrder());
}
return createBuf(size, getDrop());
}

View File

@ -59,6 +59,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
private final AllocatorControl alloc;
private final boolean isSendable;
private MemorySegment seg;
private MemorySegment wseg;
private ByteOrder order;
private int roff;
private int woff;
@ -71,6 +72,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
super(drop);
this.alloc = alloc;
seg = segment;
wseg = segment;
this.isSendable = isSendable;
order = ByteOrder.nativeOrder();
}
@ -122,6 +124,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf fill(byte value) {
checkWrite(0, capacity());
seg.fill(value);
return this;
}
@ -135,6 +138,17 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
}
@Override
public Buf readOnly(boolean readOnly) {
wseg = readOnly? CLOSED_SEGMENT : seg;
return this;
}
@Override
public boolean readOnly() {
return wseg == CLOSED_SEGMENT && seg != CLOSED_SEGMENT;
}
@Override
public Buf slice(int offset, int length) {
if (length < 0) {
@ -147,7 +161,10 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
b.makeInaccessible();
};
var sendable = false; // Sending implies ownership change, which we can't do for slices.
return new MemSegBuf(slice, drop, alloc, sendable).writerOffset(length).order(order());
return new MemSegBuf(slice, drop, alloc, sendable)
.writerOffset(length)
.order(order())
.readOnly(readOnly());
}
@Override
@ -165,12 +182,31 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
private void copyInto(int srcPos, MemorySegment dest, int destPos, int length) {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
if (srcPos < 0) {
throw new IllegalArgumentException("The srcPos cannot be negative: " + srcPos + '.');
}
if (length < 0) {
throw new IllegalArgumentException("The length cannot be negative: " + length + '.');
}
if (seg.byteSize() < srcPos + length) {
throw new IllegalArgumentException("The srcPos + length is beyond the end of the buffer: " +
"srcPos = " + srcPos + ", length = " + length + '.');
}
dest.asSlice(destPos, length).copyFrom(seg.asSlice(srcPos, length));
}
@Override
public void copyInto(int srcPos, Buf dest, int destPos, int length) {
// todo optimise: specialise for MemSegBuf.
if (dest instanceof MemSegBuf) {
var memSegBuf = (MemSegBuf) dest;
memSegBuf.checkWrite(destPos, length);
copyInto(srcPos, memSegBuf.seg, destPos, length);
return;
}
// Iterate in reverse to account for src and dest buffer overlap.
var itr = openReverseCursor(srcPos + length - 1, length);
ByteOrder prevOrder = dest.order();
@ -202,8 +238,8 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
throw new IllegalArgumentException("The length cannot be negative: " + length + '.');
}
if (seg.byteSize() < fromOffset + length) {
throw new IllegalArgumentException("The fromOffset+length is beyond the end of the buffer: " +
"fromOffset=" + fromOffset + ", length=" + length + '.');
throw new IllegalArgumentException("The fromOffset + length is beyond the end of the buffer: " +
"fromOffset = " + fromOffset + ", length = " + length + '.');
}
return new ByteCursor() {
final MemorySegment segment = seg;
@ -269,8 +305,8 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
throw new IllegalArgumentException("The fromOffset is beyond the end of the buffer: " + fromOffset + '.');
}
if (fromOffset - length < -1) {
throw new IllegalArgumentException("The fromOffset-length would underflow the buffer: " +
"fromOffset=" + fromOffset + ", length=" + length + '.');
throw new IllegalArgumentException("The fromOffset - length would underflow the buffer: " +
"fromOffset = " + fromOffset + ", length = " + length + '.');
}
return new ByteCursor() {
final MemorySegment segment = seg;
@ -330,6 +366,9 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (seg != wseg) {
throw bufferIsReadOnly();
}
if (writableBytes() >= size) {
// We already have enough space.
return;
@ -366,6 +405,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
seg = newSegment;
wseg = newSegment;
drop.attach(this);
}
@ -390,7 +430,12 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
bifurcatedBuf.woff = woff;
bifurcatedBuf.roff = roff;
bifurcatedBuf.order(order);
boolean readOnly = readOnly();
bifurcatedBuf.readOnly(readOnly);
seg = seg.asSlice(woff, seg.byteSize() - woff);
if (!readOnly) {
wseg = seg;
}
woff = 0;
roff = 0;
return bifurcatedBuf;
@ -401,6 +446,9 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
if (!isOwned()) {
throw new IllegalStateException("Buffer must be owned in order to compact.");
}
if (readOnly()) {
throw new IllegalStateException("Buffer must be writable in order to compact, but was read-only.");
}
int distance = roff;
if (distance == 0) {
return;
@ -442,7 +490,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeByte(byte value) {
try {
setByteAtOffset(seg, woff, value);
setByteAtOffset(wseg, woff, value);
woff += Byte.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -453,7 +501,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setByte(int woff, byte value) {
try {
setByteAtOffset(seg, woff, value);
setByteAtOffset(wseg, woff, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -463,7 +511,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeUnsignedByte(int value) {
try {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
woff += Byte.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -474,7 +522,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setUnsignedByte(int woff, int value) {
try {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -498,7 +546,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeChar(char value) {
try {
setCharAtOffset(seg, woff, order, value);
setCharAtOffset(wseg, woff, order, value);
woff += 2;
return this;
} catch (IndexOutOfBoundsException e) {
@ -509,7 +557,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setChar(int woff, char value) {
try {
setCharAtOffset(seg, woff, order, value);
setCharAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -547,7 +595,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeShort(short value) {
try {
setShortAtOffset(seg, woff, order, value);
setShortAtOffset(wseg, woff, order, value);
woff += Short.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -558,7 +606,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setShort(int woff, short value) {
try {
setShortAtOffset(seg, woff, order, value);
setShortAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -568,7 +616,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeUnsignedShort(int value) {
try {
setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF));
setShortAtOffset(wseg, woff, order, (short) (value & 0xFFFF));
woff += Short.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -579,7 +627,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setUnsignedShort(int woff, int value) {
try {
setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF));
setShortAtOffset(wseg, woff, order, (short) (value & 0xFFFF));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -642,13 +690,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
public Buf writeMedium(int value) {
checkWrite(woff, 3);
if (order == ByteOrder.BIG_ENDIAN) {
setByteAtOffset(seg, woff, (byte) (value >> 16));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value >> 16));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value & 0xFF));
} else {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value >> 16 & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value >> 16 & 0xFF));
}
woff += 3;
return this;
@ -658,13 +706,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
public Buf setMedium(int woff, int value) {
checkWrite(woff, 3);
if (order == ByteOrder.BIG_ENDIAN) {
setByteAtOffset(seg, woff, (byte) (value >> 16));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value >> 16));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value & 0xFF));
} else {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value >> 16 & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value >> 16 & 0xFF));
}
return this;
}
@ -673,13 +721,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
public Buf writeUnsignedMedium(int value) {
checkWrite(woff, 3);
if (order == ByteOrder.BIG_ENDIAN) {
setByteAtOffset(seg, woff, (byte) (value >> 16));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value >> 16));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value & 0xFF));
} else {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value >> 16 & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value >> 16 & 0xFF));
}
woff += 3;
return this;
@ -689,13 +737,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
public Buf setUnsignedMedium(int woff, int value) {
checkWrite(woff, 3);
if (order == ByteOrder.BIG_ENDIAN) {
setByteAtOffset(seg, woff, (byte) (value >> 16));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value >> 16));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value & 0xFF));
} else {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value >> 16 & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value >> 16 & 0xFF));
}
return this;
}
@ -731,7 +779,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeInt(int value) {
try {
setIntAtOffset(seg, woff, order, value);
setIntAtOffset(wseg, woff, order, value);
woff += Integer.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -742,7 +790,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setInt(int woff, int value) {
try {
setIntAtOffset(seg, woff, order, value);
setIntAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -752,7 +800,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeUnsignedInt(long value) {
try {
setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL));
setIntAtOffset(wseg, woff, order, (int) (value & 0xFFFFFFFFL));
woff += Integer.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -763,7 +811,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setUnsignedInt(int woff, long value) {
try {
setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL));
setIntAtOffset(wseg, woff, order, (int) (value & 0xFFFFFFFFL));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -787,7 +835,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeFloat(float value) {
try {
setFloatAtOffset(seg, woff, order, value);
setFloatAtOffset(wseg, woff, order, value);
woff += Float.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -798,7 +846,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setFloat(int woff, float value) {
try {
setFloatAtOffset(seg, woff, order, value);
setFloatAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -822,7 +870,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeLong(long value) {
try {
setLongAtOffset(seg, woff, order, value);
setLongAtOffset(wseg, woff, order, value);
woff += Long.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -833,7 +881,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setLong(int woff, long value) {
try {
setLongAtOffset(seg, woff, order, value);
setLongAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -857,7 +905,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeDouble(double value) {
try {
setDoubleAtOffset(seg, woff, order, value);
setDoubleAtOffset(wseg, woff, order, value);
woff += Double.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -868,7 +916,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setDouble(int woff, double value) {
try {
setDoubleAtOffset(seg, woff, order, value);
setDoubleAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -881,6 +929,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
var order = this.order;
var roff = this.roff;
var woff = this.woff;
var readOnly = readOnly();
boolean isConfined = seg.ownerThread() == null;
MemorySegment transferSegment = isConfined? seg : seg.share();
makeInaccessible();
@ -891,6 +940,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
copy.order = order;
copy.roff = roff;
copy.woff = woff;
copy.readOnly(readOnly);
return copy;
}
};
@ -898,6 +948,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
void makeInaccessible() {
seg = CLOSED_SEGMENT;
wseg = CLOSED_SEGMENT;
roff = 0;
woff = 0;
}
@ -923,7 +974,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
private void checkWrite(int index, int size) {
if (index < 0 || seg.byteSize() < index + size) {
if (index < 0 || wseg.byteSize() < index + size) {
throw accessCheckException(index);
}
}
@ -932,6 +983,9 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
if (seg == CLOSED_SEGMENT) {
return bufferIsClosed();
}
if (wseg != seg) {
return bufferIsReadOnly();
}
return ioobe;
}
@ -939,15 +993,22 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
if (wseg != seg) {
return bufferIsReadOnly();
}
return new IndexOutOfBoundsException(
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
(seg.byteSize() - 1) + "].");
}
private IllegalStateException bufferIsClosed() {
private static IllegalStateException bufferIsClosed() {
return new IllegalStateException("This buffer is closed.");
}
private static IllegalStateException bufferIsReadOnly() {
return new IllegalStateException("This buffer is read-only.");
}
Object recoverableMemory() {
return new RecoverableMemory(seg, alloc);
}

View File

@ -33,7 +33,6 @@ import java.text.ParseException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -90,6 +89,10 @@ public class BufTest {
return fixtureCombinations().filter(f -> f.isDirect() && f.isCleaner() && f.isPooled());
}
static Stream<Fixture> pooledAllocators() {
return fixtureCombinations().filter(Fixture::isPooled);
}
private static Stream<Fixture> fixtureCombinations() {
Fixture[] fxs = fixtures;
if (fxs != null) {
@ -187,7 +190,10 @@ public class BufTest {
}, COMPOSITE));
}
return builder.build().flatMap(BufTest::injectBifurcations).flatMap(BufTest::injectSlices);
var stream = builder.build();
return stream.flatMap(BufTest::injectBifurcations)
.flatMap(BufTest::injectSlices)
.flatMap(BufTest::injectReadOnlyToggling);
}
private static Stream<Fixture> injectBifurcations(Fixture f) {
@ -251,6 +257,26 @@ public class BufTest {
return builder.build();
}
private static Stream<Fixture> injectReadOnlyToggling(Fixture f) {
Builder<Fixture> builder = Stream.builder();
builder.add(f);
builder.add(new Fixture(f + ".readOnly(true/false)", () -> {
var allocatorBase = f.get();
return new Allocator() {
@Override
public Buf allocate(int size) {
return allocatorBase.allocate(size).readOnly(true).readOnly(false);
}
@Override
public void close() {
allocatorBase.close();
}
};
}, f.getProperties()));
return builder.build();
}
@BeforeAll
static void startExecutor() throws IOException, ParseException {
executor = Executors.newSingleThreadExecutor();
@ -375,6 +401,31 @@ public class BufTest {
}
private static void verifyInaccessible(Buf buf) {
verifyReadInaccessible(buf);
verifyWriteInaccessible(buf);
try (Allocator allocator = Allocator.heap();
Buf target = allocator.allocate(24)) {
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, target, 0, 1));
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, new byte[1], 0, 1));
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, ByteBuffer.allocate(1), 0, 1));
if (Allocator.isComposite(buf)) {
assertThrows(IllegalStateException.class, () -> Allocator.extend(buf, target));
}
}
assertThrows(IllegalStateException.class, () -> buf.bifurcate());
assertThrows(IllegalStateException.class, () -> buf.send());
assertThrows(IllegalStateException.class, () -> buf.acquire());
assertThrows(IllegalStateException.class, () -> buf.slice());
assertThrows(IllegalStateException.class, () -> buf.openCursor());
assertThrows(IllegalStateException.class, () -> buf.openCursor(0, 0));
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor());
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor(0, 0));
}
private static void verifyReadInaccessible(Buf buf) {
assertThrows(IllegalStateException.class, () -> buf.readByte());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedByte());
assertThrows(IllegalStateException.class, () -> buf.readChar());
@ -387,6 +438,7 @@ public class BufTest {
assertThrows(IllegalStateException.class, () -> buf.readFloat());
assertThrows(IllegalStateException.class, () -> buf.readLong());
assertThrows(IllegalStateException.class, () -> buf.readDouble());
assertThrows(IllegalStateException.class, () -> buf.getByte(0));
assertThrows(IllegalStateException.class, () -> buf.getUnsignedByte(0));
assertThrows(IllegalStateException.class, () -> buf.getChar(0));
@ -399,6 +451,9 @@ public class BufTest {
assertThrows(IllegalStateException.class, () -> buf.getFloat(0));
assertThrows(IllegalStateException.class, () -> buf.getLong(0));
assertThrows(IllegalStateException.class, () -> buf.getDouble(0));
}
private static void verifyWriteInaccessible(Buf buf) {
assertThrows(IllegalStateException.class, () -> buf.writeByte((byte) 32));
assertThrows(IllegalStateException.class, () -> buf.writeUnsignedByte(32));
assertThrows(IllegalStateException.class, () -> buf.writeChar('3'));
@ -411,6 +466,7 @@ public class BufTest {
assertThrows(IllegalStateException.class, () -> buf.writeFloat(3.2f));
assertThrows(IllegalStateException.class, () -> buf.writeLong(32));
assertThrows(IllegalStateException.class, () -> buf.writeDouble(32));
assertThrows(IllegalStateException.class, () -> buf.setByte(0, (byte) 32));
assertThrows(IllegalStateException.class, () -> buf.setUnsignedByte(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setChar(0, '3'));
@ -425,22 +481,11 @@ public class BufTest {
assertThrows(IllegalStateException.class, () -> buf.setDouble(0, 32));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
try (Allocator allocator = Allocator.heap();
Buf target = allocator.allocate(24)) {
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, target, 0, 1));
if (Allocator.isComposite(buf)) {
assertThrows(IllegalStateException.class, () -> Allocator.extend(buf, target));
}
}
assertThrows(IllegalStateException.class, () -> buf.bifurcate());
assertThrows(IllegalStateException.class, () -> buf.send());
assertThrows(IllegalStateException.class, () -> buf.acquire());
assertThrows(IllegalStateException.class, () -> buf.slice());
assertThrows(IllegalStateException.class, () -> buf.fill((byte) 0));
assertThrows(IllegalStateException.class, () -> buf.openCursor());
assertThrows(IllegalStateException.class, () -> buf.openCursor(0, 0));
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor());
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor(0, 0));
try (Allocator allocator = Allocator.heap();
Buf source = allocator.allocate(8)) {
assertThrows(IllegalStateException.class, () -> source.copyInto(0, buf, 0, 1));
}
}
@ParameterizedTest
@ -1893,6 +1938,18 @@ public class BufTest {
}
}
@Test
public void emptyCompositeBufferMustAllowExtendingWithReadOnlyBuffer() {
try (Allocator allocator = Allocator.heap()) {
try (Buf composite = allocator.compose()) {
try (Buf b = allocator.allocate(8).readOnly(true)) {
Allocator.extend(composite, b);
assertTrue(composite.readOnly());
}
}
}
}
@Test
public void whenExtendingCompositeBufferWithWriteOffsetAtCapacityExtensionWriteOffsetCanBeNonZero() {
try (Allocator allocator = Allocator.heap()) {
@ -2262,6 +2319,255 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("allocators")
public void readOnlyBufferMustPreventWriteAccess(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
var b = buf.readOnly(true);
assertThat(b).isSameAs(buf);
verifyWriteInaccessible(buf);
}
}
@ParameterizedTest
@MethodSource("allocators")
public void readOnlyBufferMustBecomeWritableAgainAfterTogglingReadOnlyOff(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
assertFalse(buf.readOnly());
buf.readOnly(true);
assertTrue(buf.readOnly());
verifyWriteInaccessible(buf);
buf.readOnly(false);
assertFalse(buf.readOnly());
verifyWriteAccessible(buf);
}
}
private static void verifyWriteAccessible(Buf buf) {
buf.writerOffset(0).writeByte((byte) 32);
assertThat(buf.readerOffset(0).readByte()).isEqualTo((byte) 32);
buf.writerOffset(0).writeUnsignedByte(32);
assertThat(buf.readerOffset(0).readUnsignedByte()).isEqualTo(32);
buf.writerOffset(0).writeChar('3');
assertThat(buf.readerOffset(0).readChar()).isEqualTo('3');
buf.writerOffset(0).writeShort((short) 32);
assertThat(buf.readerOffset(0).readShort()).isEqualTo((short) 32);
buf.writerOffset(0).writeUnsignedShort(32);
assertThat(buf.readerOffset(0).readUnsignedShort()).isEqualTo(32);
buf.writerOffset(0).writeMedium(32);
assertThat(buf.readerOffset(0).readMedium()).isEqualTo(32);
buf.writerOffset(0).writeUnsignedMedium(32);
assertThat(buf.readerOffset(0).readUnsignedMedium()).isEqualTo(32);
buf.writerOffset(0).writeInt(32);
assertThat(buf.readerOffset(0).readInt()).isEqualTo(32);
buf.writerOffset(0).writeUnsignedInt(32);
assertThat(buf.readerOffset(0).readUnsignedInt()).isEqualTo(32L);
buf.writerOffset(0).writeFloat(3.2f);
assertThat(buf.readerOffset(0).readFloat()).isEqualTo(3.2f);
buf.writerOffset(0).writeLong(32);
assertThat(buf.readerOffset(0).readLong()).isEqualTo(32L);
buf.writerOffset(0).writeDouble(3.2);
assertThat(buf.readerOffset(0).readDouble()).isEqualTo(3.2);
buf.setByte(0, (byte) 32);
assertThat(buf.getByte(0)).isEqualTo((byte) 32);
buf.setUnsignedByte(0, 32);
assertThat(buf.getUnsignedByte(0)).isEqualTo(32);
buf.setChar(0, '3');
assertThat(buf.getChar(0)).isEqualTo('3');
buf.setShort(0, (short) 32);
assertThat(buf.getShort(0)).isEqualTo((short) 32);
buf.setUnsignedShort(0, 32);
assertThat(buf.getUnsignedShort(0)).isEqualTo(32);
buf.setMedium(0, 32);
assertThat(buf.getMedium(0)).isEqualTo(32);
buf.setUnsignedMedium(0, 32);
assertThat(buf.getUnsignedMedium(0)).isEqualTo(32);
buf.setInt(0, 32);
assertThat(buf.getInt(0)).isEqualTo(32);
buf.setUnsignedInt(0, 32);
assertThat(buf.getUnsignedInt(0)).isEqualTo(32L);
buf.setFloat(0, 3.2f);
assertThat(buf.getFloat(0)).isEqualTo(3.2f);
buf.setLong(0, 32);
assertThat(buf.getLong(0)).isEqualTo(32L);
buf.setDouble(0, 3.2);
assertThat(buf.getDouble(0)).isEqualTo(3.2);
if (buf.isOwned()) {
buf.ensureWritable(1);
}
buf.fill((byte) 0);
try (Allocator allocator = Allocator.heap();
Buf source = allocator.allocate(8)) {
source.copyInto(0, buf, 0, 1);
}
}
@Test
public void composingReadOnlyBuffersMustCreateReadOnlyCompositeBuffer() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(4).readOnly(true);
Buf b = allocator.allocate(4).readOnly(true);
Buf composite = allocator.compose(a, b)) {
assertTrue(composite.readOnly());
verifyWriteInaccessible(composite);
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void readOnlyBufferMustRemainReadOnlyAfterSend(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.readOnly(true);
var send = buf.send();
try (Buf receive = send.receive()) {
assertTrue(receive.readOnly());
verifyWriteInaccessible(receive);
}
}
}
@Test
public void readOnlyBufferMustRemainReadOnlyAfterSendForEmptyCompositeBuffer() {
try (Allocator allocator = Allocator.heap();
Buf buf = allocator.compose()) {
buf.readOnly(true);
var send = buf.send();
try (Buf receive = send.receive()) {
assertTrue(receive.readOnly());
}
}
}
@ParameterizedTest
@MethodSource("pooledAllocators")
public void readOnlyBufferMustNotBeReadOnlyAfterBeingReusedFromPool(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator()) {
for (int i = 0; i < 1000; i++) {
try (Buf buf = allocator.allocate(8)) {
assertFalse(buf.readOnly());
buf.readOnly(true);
assertTrue(buf.readOnly());
}
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void acquireOfReadOnlyBufferMustBeReadOnly(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.readOnly(true);
try (Buf acquire = buf.acquire()) {
assertTrue(acquire.readOnly());
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void sliceOfReadOnlyBufferMustBeReadOnly(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.writeLong(0x0102030405060708L);
buf.readOnly(true);
try (Buf slice = buf.slice()) {
assertTrue(slice.readOnly());
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void bifurcateOfReadOnlyBufferMustBeReadOnly(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(16)) {
buf.writeLong(0x0102030405060708L);
buf.readOnly(true);
try (Buf bifurcate = buf.bifurcate()) {
assertTrue(bifurcate.readOnly());
assertTrue(buf.readOnly());
}
}
}
@Test
public void composingReadOnlyAndWritableBuffersMustThrow() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(8).readOnly(true);
Buf b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> allocator.compose(a, b));
assertThrows(IllegalArgumentException.class, () -> allocator.compose(b, a));
assertThrows(IllegalArgumentException.class, () -> allocator.compose(a, b, a));
assertThrows(IllegalArgumentException.class, () -> allocator.compose(b, a, b));
}
}
@Test
public void compositeWritableBufferCannotBeExtendedWithReadOnlyBuffer() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite; Buf b = allocator.allocate(8).readOnly(true)) {
assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
}
}
}
@Test
public void compositeReadOnlyBufferCannotBeExtendedWithWritableBuffer() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8).readOnly(true)) {
composite = allocator.compose(a);
}
try (composite; Buf b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void compactOnReadOnlyBufferMustThrow(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.readOnly(true);
assertThrows(IllegalStateException.class, () -> buf.compact());
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableOnReadOnlyBufferMustThrow(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.readOnly(true);
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void copyIntoOnReadOnlyBufferMustThrow(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf dest = allocator.allocate(8)) {
dest.readOnly(true);
try (Buf src = allocator.allocate(8)) {
assertThrows(IllegalStateException.class, () -> src.copyInto(0, dest, 0, 1));
}
}
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
@ParameterizedTest
@MethodSource("allocators")