Merge pull request #23 from netty/read-only

Add support for read-only buffers
This commit is contained in:
Chris Vest 2021-01-05 17:27:10 +01:00 committed by GitHub
commit 8cdcfd53c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 524 additions and 90 deletions

View File

@ -70,7 +70,7 @@ public interface Allocator extends AutoCloseable {
* <pre>{@code
* try (Buf a = allocator.allocate(size);
* Buf b = allocator.allocate(size)) {
* return Buf.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* return allocator.compose(a, b); // Reference counts for 'a' and 'b' incremented here.
* } // Reference count for 'a' and 'b' decremented here; composite buffer now holds the last references.
* }</pre>
* <p>

View File

@ -154,6 +154,7 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* @return This Buf.
* @throws IndexOutOfBoundsException if the specified {@code offset} is less than the current
* {@link #readerOffset()} or greater than {@link #capacity()}.
* @throws IllegalStateException if this buffer is {@linkplain #readOnly() read-only}.
*/
Buf writerOffset(int offset);
@ -178,6 +179,7 @@ public interface Buf extends Rc<Buf>, BufAccessors {
*
* @param value The byte value to write at every position in the buffer.
* @return This Buf.
* @throws IllegalStateException if this buffer is {@linkplain #readOnly() read-only}.
*/
Buf fill(byte value);
@ -187,6 +189,20 @@ public interface Buf extends Rc<Buf>, BufAccessors {
*/
long getNativeAddress();
/**
* Set the read-only state of this buffer.
*
* @return this buffer.
*/
Buf readOnly(boolean readOnly);
/**
* Query if this buffer is read-only or not.
*
* @return {@code true} if this buffer is read-only, {@code false} otherwise.
*/
boolean readOnly();
/**
* Returns a slice of this buffer's readable bytes.
* Modifying the content of the returned buffer or this buffer affects each other's content,
@ -371,8 +387,8 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* {@code false}.
*
* @param size The requested number of bytes of space that should be available for writing.
* @throws IllegalStateException if this buffer is not in an owned state.
* That is, if {@link #isOwned()} is {@code false}.
* @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state,
* or is {@linkplain #readOnly() read-only}.
*/
default void ensureWritable(int size) {
ensureWritable(size, true);
@ -410,8 +426,8 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* @param allowCompaction {@code true} if the method is allowed to modify the
* {@linkplain #readerOffset() reader offset} and
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
* @throws IllegalStateException if this buffer is not in an owned state.
* That is, if {@link #isOwned()} is {@code false}.
* @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state,
* * or is {@linkplain #readOnly() read-only}.
*/
void ensureWritable(int size, boolean allowCompaction);
@ -462,7 +478,8 @@ public interface Buf extends Rc<Buf>, BufAccessors {
/**
* Discards the read bytes, and moves the buffer contents to the beginning of the buffer.
*
* The buffer must be {@linkplain #isOwned() owned}, or an exception will be thrown.
* @throws IllegalStateException if this buffer is not in an {@linkplain #isOwned() owned} state,
* or is {@linkplain #readOnly() read-only}.
*/
void compact();
}

View File

@ -20,9 +20,6 @@ import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Objects;
import static jdk.incubator.foreign.MemoryAccess.setByteAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
/**
* The max array size is JVM implementation dependant, but most seem to settle on {@code Integer.MAX_VALUE - 8}.
@ -48,6 +45,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private int subOffset; // The next offset *within* a consituent buffer to read from or write to.
private ByteOrder order;
private boolean closed;
private boolean readOnly;
CompositeBuf(Allocator allocator, Buf[] bufs) {
this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
@ -68,6 +66,14 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
}
order = bufs[0].order();
boolean targetReadOnly = bufs[0].readOnly();
for (Buf buf : bufs) {
if (buf.readOnly() != targetReadOnly) {
throw new IllegalArgumentException("Constituent buffers have inconsistent read-only state.");
}
}
readOnly = targetReadOnly;
} else {
order = ByteOrder.nativeOrder();
}
@ -201,6 +207,20 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
return 0;
}
@Override
public Buf readOnly(boolean readOnly) {
for (Buf buf : bufs) {
buf.readOnly(readOnly);
}
this.readOnly = readOnly;
return this;
}
@Override
public boolean readOnly() {
return readOnly;
}
@Override
public Buf slice(int offset, int length) {
checkWriteBounds(offset, length);
@ -236,7 +256,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
slices = new Buf[] { choice.slice(subOffset, 0) };
}
return new CompositeBuf(allocator, false, slices, drop).writerOffset(length);
return new CompositeBuf(allocator, false, slices, drop);
} catch (Throwable throwable) {
// We called acquire prior to the try-clause. We need to undo that if we're not creating a composite buffer:
close();
@ -267,10 +287,10 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
throw new IndexOutOfBoundsException("Length cannot be negative: " + length + '.');
}
if (srcPos < 0) {
throw indexOutOfBounds(srcPos);
throw indexOutOfBounds(srcPos, false);
}
if (srcPos + length > capacity) {
throw indexOutOfBounds(srcPos + length);
throw indexOutOfBounds(srcPos + length, false);
}
while (length > 0) {
var buf = (Buf) chooseBuffer(srcPos, 0);
@ -293,10 +313,10 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
throw new IndexOutOfBoundsException("Length cannot be negative: " + length + '.');
}
if (srcPos < 0) {
throw indexOutOfBounds(srcPos);
throw indexOutOfBounds(srcPos, false);
}
if (srcPos + length > capacity) {
throw indexOutOfBounds(srcPos + length);
throw indexOutOfBounds(srcPos + length, false);
}
// Iterate in reverse to account for src and dest buffer overlap.
@ -530,6 +550,9 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (readOnly) {
throw bufferIsReadOnly();
}
if (writableBytes() >= size) {
// We already have enough space.
return;
@ -586,15 +609,23 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
"This buffer uses " + order() + " byte order, and cannot be extended with " +
"a buffer that uses " + extension.order() + " byte order.");
}
if (bufs.length == 0) {
order = extension.order();
if (bufs.length > 0 && extension.readOnly() != readOnly()) {
throw new IllegalArgumentException(
"This buffer is " + (readOnly? "read-only" : "writable") + ", " +
"and cannot be extended with a buffer that is " +
(extension.readOnly()? "read-only." : "writable."));
}
long newSize = capacity() + (long) extension.capacity();
Allocator.checkSize(newSize);
Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
try {
unsafeExtendWith(extension.acquire());
if (restoreTemp.length == 0) {
order = extension.order();
readOnly = extension.readOnly();
}
} catch (Exception e) {
bufs = restoreTemp;
throw e;
@ -642,6 +673,9 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
if (!isOwned()) {
throw new IllegalStateException("Buffer must be owned in order to compact.");
}
if (readOnly()) {
throw new IllegalStateException("Buffer must be writable in order to compact, but was read-only.");
}
int distance = roff;
if (distance == 0) {
return;
@ -962,6 +996,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
received[i] = sends[i].receive();
}
var composite = new CompositeBuf(allocator, true, received, drop);
composite.readOnly = readOnly;
drop.attach(composite);
return composite;
}
@ -1034,7 +1069,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private void checkReadBounds(int index, int size) {
if (index < 0 || woff < index + size) {
throw indexOutOfBounds(index);
throw indexOutOfBounds(index, false);
}
}
@ -1051,19 +1086,30 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private void checkWriteBounds(int index, int size) {
if (index < 0 || capacity < index + size) {
throw indexOutOfBounds(index);
throw indexOutOfBounds(index, true);
}
}
private RuntimeException indexOutOfBounds(int index) {
private RuntimeException indexOutOfBounds(int index, boolean write) {
if (closed) {
return new IllegalStateException("This buffer is closed.");
return bufferIsClosed();
}
if (write && readOnly) {
return bufferIsReadOnly();
}
return new IndexOutOfBoundsException(
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
(capacity - 1) + "].");
}
private static IllegalStateException bufferIsClosed() {
return new IllegalStateException("This buffer is closed.");
}
private static IllegalStateException bufferIsReadOnly() {
return new IllegalStateException("This buffer is read-only.");
}
private BufAccessors chooseBuffer(int index, int size) {
int i = searchOffsets(index);
if (i == bufs.length) {

View File

@ -43,7 +43,11 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
var sizeClassPool = getSizeClassPool(size);
Send<Buf> send = sizeClassPool.poll();
if (send != null) {
return send.receive().reset().fill((byte) 0).order(ByteOrder.nativeOrder());
return send.receive()
.reset()
.readOnly(false)
.fill((byte) 0)
.order(ByteOrder.nativeOrder());
}
return createBuf(size, getDrop());
}

View File

@ -59,6 +59,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
private final AllocatorControl alloc;
private final boolean isSendable;
private MemorySegment seg;
private MemorySegment wseg;
private ByteOrder order;
private int roff;
private int woff;
@ -71,6 +72,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
super(drop);
this.alloc = alloc;
seg = segment;
wseg = segment;
this.isSendable = isSendable;
order = ByteOrder.nativeOrder();
}
@ -122,6 +124,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf fill(byte value) {
checkWrite(0, capacity());
seg.fill(value);
return this;
}
@ -135,6 +138,17 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
}
@Override
public Buf readOnly(boolean readOnly) {
wseg = readOnly? CLOSED_SEGMENT : seg;
return this;
}
@Override
public boolean readOnly() {
return wseg == CLOSED_SEGMENT && seg != CLOSED_SEGMENT;
}
@Override
public Buf slice(int offset, int length) {
if (length < 0) {
@ -147,7 +161,10 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
b.makeInaccessible();
};
var sendable = false; // Sending implies ownership change, which we can't do for slices.
return new MemSegBuf(slice, drop, alloc, sendable).writerOffset(length).order(order());
return new MemSegBuf(slice, drop, alloc, sendable)
.writerOffset(length)
.order(order())
.readOnly(readOnly());
}
@Override
@ -165,12 +182,31 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
private void copyInto(int srcPos, MemorySegment dest, int destPos, int length) {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
if (srcPos < 0) {
throw new IllegalArgumentException("The srcPos cannot be negative: " + srcPos + '.');
}
if (length < 0) {
throw new IllegalArgumentException("The length cannot be negative: " + length + '.');
}
if (seg.byteSize() < srcPos + length) {
throw new IllegalArgumentException("The srcPos + length is beyond the end of the buffer: " +
"srcPos = " + srcPos + ", length = " + length + '.');
}
dest.asSlice(destPos, length).copyFrom(seg.asSlice(srcPos, length));
}
@Override
public void copyInto(int srcPos, Buf dest, int destPos, int length) {
// todo optimise: specialise for MemSegBuf.
if (dest instanceof MemSegBuf) {
var memSegBuf = (MemSegBuf) dest;
memSegBuf.checkWrite(destPos, length);
copyInto(srcPos, memSegBuf.seg, destPos, length);
return;
}
// Iterate in reverse to account for src and dest buffer overlap.
var itr = openReverseCursor(srcPos + length - 1, length);
ByteOrder prevOrder = dest.order();
@ -202,8 +238,8 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
throw new IllegalArgumentException("The length cannot be negative: " + length + '.');
}
if (seg.byteSize() < fromOffset + length) {
throw new IllegalArgumentException("The fromOffset+length is beyond the end of the buffer: " +
"fromOffset=" + fromOffset + ", length=" + length + '.');
throw new IllegalArgumentException("The fromOffset + length is beyond the end of the buffer: " +
"fromOffset = " + fromOffset + ", length = " + length + '.');
}
return new ByteCursor() {
final MemorySegment segment = seg;
@ -269,8 +305,8 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
throw new IllegalArgumentException("The fromOffset is beyond the end of the buffer: " + fromOffset + '.');
}
if (fromOffset - length < -1) {
throw new IllegalArgumentException("The fromOffset-length would underflow the buffer: " +
"fromOffset=" + fromOffset + ", length=" + length + '.');
throw new IllegalArgumentException("The fromOffset - length would underflow the buffer: " +
"fromOffset = " + fromOffset + ", length = " + length + '.');
}
return new ByteCursor() {
final MemorySegment segment = seg;
@ -330,6 +366,9 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
if (size < 0) {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (seg != wseg) {
throw bufferIsReadOnly();
}
if (writableBytes() >= size) {
// We already have enough space.
return;
@ -366,6 +405,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
seg = newSegment;
wseg = newSegment;
drop.attach(this);
}
@ -390,7 +430,12 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
bifurcatedBuf.woff = woff;
bifurcatedBuf.roff = roff;
bifurcatedBuf.order(order);
boolean readOnly = readOnly();
bifurcatedBuf.readOnly(readOnly);
seg = seg.asSlice(woff, seg.byteSize() - woff);
if (!readOnly) {
wseg = seg;
}
woff = 0;
roff = 0;
return bifurcatedBuf;
@ -401,6 +446,9 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
if (!isOwned()) {
throw new IllegalStateException("Buffer must be owned in order to compact.");
}
if (readOnly()) {
throw new IllegalStateException("Buffer must be writable in order to compact, but was read-only.");
}
int distance = roff;
if (distance == 0) {
return;
@ -442,7 +490,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeByte(byte value) {
try {
setByteAtOffset(seg, woff, value);
setByteAtOffset(wseg, woff, value);
woff += Byte.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -453,7 +501,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setByte(int woff, byte value) {
try {
setByteAtOffset(seg, woff, value);
setByteAtOffset(wseg, woff, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -463,7 +511,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeUnsignedByte(int value) {
try {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
woff += Byte.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -474,7 +522,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setUnsignedByte(int woff, int value) {
try {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -498,7 +546,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeChar(char value) {
try {
setCharAtOffset(seg, woff, order, value);
setCharAtOffset(wseg, woff, order, value);
woff += 2;
return this;
} catch (IndexOutOfBoundsException e) {
@ -509,7 +557,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setChar(int woff, char value) {
try {
setCharAtOffset(seg, woff, order, value);
setCharAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -547,7 +595,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeShort(short value) {
try {
setShortAtOffset(seg, woff, order, value);
setShortAtOffset(wseg, woff, order, value);
woff += Short.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -558,7 +606,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setShort(int woff, short value) {
try {
setShortAtOffset(seg, woff, order, value);
setShortAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -568,7 +616,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeUnsignedShort(int value) {
try {
setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF));
setShortAtOffset(wseg, woff, order, (short) (value & 0xFFFF));
woff += Short.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -579,7 +627,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setUnsignedShort(int woff, int value) {
try {
setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF));
setShortAtOffset(wseg, woff, order, (short) (value & 0xFFFF));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -642,13 +690,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
public Buf writeMedium(int value) {
checkWrite(woff, 3);
if (order == ByteOrder.BIG_ENDIAN) {
setByteAtOffset(seg, woff, (byte) (value >> 16));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value >> 16));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value & 0xFF));
} else {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value >> 16 & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value >> 16 & 0xFF));
}
woff += 3;
return this;
@ -658,13 +706,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
public Buf setMedium(int woff, int value) {
checkWrite(woff, 3);
if (order == ByteOrder.BIG_ENDIAN) {
setByteAtOffset(seg, woff, (byte) (value >> 16));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value >> 16));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value & 0xFF));
} else {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value >> 16 & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value >> 16 & 0xFF));
}
return this;
}
@ -673,13 +721,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
public Buf writeUnsignedMedium(int value) {
checkWrite(woff, 3);
if (order == ByteOrder.BIG_ENDIAN) {
setByteAtOffset(seg, woff, (byte) (value >> 16));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value >> 16));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value & 0xFF));
} else {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value >> 16 & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value >> 16 & 0xFF));
}
woff += 3;
return this;
@ -689,13 +737,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
public Buf setUnsignedMedium(int woff, int value) {
checkWrite(woff, 3);
if (order == ByteOrder.BIG_ENDIAN) {
setByteAtOffset(seg, woff, (byte) (value >> 16));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value >> 16));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value & 0xFF));
} else {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
setByteAtOffset(seg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(seg, woff + 2, (byte) (value >> 16 & 0xFF));
setByteAtOffset(wseg, woff, (byte) (value & 0xFF));
setByteAtOffset(wseg, woff + 1, (byte) (value >> 8 & 0xFF));
setByteAtOffset(wseg, woff + 2, (byte) (value >> 16 & 0xFF));
}
return this;
}
@ -731,7 +779,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeInt(int value) {
try {
setIntAtOffset(seg, woff, order, value);
setIntAtOffset(wseg, woff, order, value);
woff += Integer.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -742,7 +790,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setInt(int woff, int value) {
try {
setIntAtOffset(seg, woff, order, value);
setIntAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -752,7 +800,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeUnsignedInt(long value) {
try {
setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL));
setIntAtOffset(wseg, woff, order, (int) (value & 0xFFFFFFFFL));
woff += Integer.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -763,7 +811,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setUnsignedInt(int woff, long value) {
try {
setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL));
setIntAtOffset(wseg, woff, order, (int) (value & 0xFFFFFFFFL));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -787,7 +835,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeFloat(float value) {
try {
setFloatAtOffset(seg, woff, order, value);
setFloatAtOffset(wseg, woff, order, value);
woff += Float.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -798,7 +846,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setFloat(int woff, float value) {
try {
setFloatAtOffset(seg, woff, order, value);
setFloatAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -822,7 +870,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeLong(long value) {
try {
setLongAtOffset(seg, woff, order, value);
setLongAtOffset(wseg, woff, order, value);
woff += Long.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -833,7 +881,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setLong(int woff, long value) {
try {
setLongAtOffset(seg, woff, order, value);
setLongAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -857,7 +905,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeDouble(double value) {
try {
setDoubleAtOffset(seg, woff, order, value);
setDoubleAtOffset(wseg, woff, order, value);
woff += Double.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
@ -868,7 +916,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf setDouble(int woff, double value) {
try {
setDoubleAtOffset(seg, woff, order, value);
setDoubleAtOffset(wseg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
@ -881,6 +929,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
var order = this.order;
var roff = this.roff;
var woff = this.woff;
var readOnly = readOnly();
boolean isConfined = seg.ownerThread() == null;
MemorySegment transferSegment = isConfined? seg : seg.share();
makeInaccessible();
@ -891,6 +940,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
copy.order = order;
copy.roff = roff;
copy.woff = woff;
copy.readOnly(readOnly);
return copy;
}
};
@ -898,6 +948,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
void makeInaccessible() {
seg = CLOSED_SEGMENT;
wseg = CLOSED_SEGMENT;
roff = 0;
woff = 0;
}
@ -923,7 +974,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
private void checkWrite(int index, int size) {
if (index < 0 || seg.byteSize() < index + size) {
if (index < 0 || wseg.byteSize() < index + size) {
throw accessCheckException(index);
}
}
@ -932,6 +983,9 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
if (seg == CLOSED_SEGMENT) {
return bufferIsClosed();
}
if (wseg != seg) {
return bufferIsReadOnly();
}
return ioobe;
}
@ -939,15 +993,22 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
if (wseg != seg) {
return bufferIsReadOnly();
}
return new IndexOutOfBoundsException(
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
(seg.byteSize() - 1) + "].");
}
private IllegalStateException bufferIsClosed() {
private static IllegalStateException bufferIsClosed() {
return new IllegalStateException("This buffer is closed.");
}
private static IllegalStateException bufferIsReadOnly() {
return new IllegalStateException("This buffer is read-only.");
}
Object recoverableMemory() {
return new RecoverableMemory(seg, alloc);
}

View File

@ -33,7 +33,6 @@ import java.text.ParseException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -90,6 +89,10 @@ public class BufTest {
return fixtureCombinations().filter(f -> f.isDirect() && f.isCleaner() && f.isPooled());
}
static Stream<Fixture> pooledAllocators() {
return fixtureCombinations().filter(Fixture::isPooled);
}
private static Stream<Fixture> fixtureCombinations() {
Fixture[] fxs = fixtures;
if (fxs != null) {
@ -187,7 +190,10 @@ public class BufTest {
}, COMPOSITE));
}
return builder.build().flatMap(BufTest::injectBifurcations).flatMap(BufTest::injectSlices);
var stream = builder.build();
return stream.flatMap(BufTest::injectBifurcations)
.flatMap(BufTest::injectSlices)
.flatMap(BufTest::injectReadOnlyToggling);
}
private static Stream<Fixture> injectBifurcations(Fixture f) {
@ -251,6 +257,26 @@ public class BufTest {
return builder.build();
}
private static Stream<Fixture> injectReadOnlyToggling(Fixture f) {
Builder<Fixture> builder = Stream.builder();
builder.add(f);
builder.add(new Fixture(f + ".readOnly(true/false)", () -> {
var allocatorBase = f.get();
return new Allocator() {
@Override
public Buf allocate(int size) {
return allocatorBase.allocate(size).readOnly(true).readOnly(false);
}
@Override
public void close() {
allocatorBase.close();
}
};
}, f.getProperties()));
return builder.build();
}
@BeforeAll
static void startExecutor() throws IOException, ParseException {
executor = Executors.newSingleThreadExecutor();
@ -375,6 +401,31 @@ public class BufTest {
}
private static void verifyInaccessible(Buf buf) {
verifyReadInaccessible(buf);
verifyWriteInaccessible(buf);
try (Allocator allocator = Allocator.heap();
Buf target = allocator.allocate(24)) {
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, target, 0, 1));
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, new byte[1], 0, 1));
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, ByteBuffer.allocate(1), 0, 1));
if (Allocator.isComposite(buf)) {
assertThrows(IllegalStateException.class, () -> Allocator.extend(buf, target));
}
}
assertThrows(IllegalStateException.class, () -> buf.bifurcate());
assertThrows(IllegalStateException.class, () -> buf.send());
assertThrows(IllegalStateException.class, () -> buf.acquire());
assertThrows(IllegalStateException.class, () -> buf.slice());
assertThrows(IllegalStateException.class, () -> buf.openCursor());
assertThrows(IllegalStateException.class, () -> buf.openCursor(0, 0));
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor());
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor(0, 0));
}
private static void verifyReadInaccessible(Buf buf) {
assertThrows(IllegalStateException.class, () -> buf.readByte());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedByte());
assertThrows(IllegalStateException.class, () -> buf.readChar());
@ -387,6 +438,7 @@ public class BufTest {
assertThrows(IllegalStateException.class, () -> buf.readFloat());
assertThrows(IllegalStateException.class, () -> buf.readLong());
assertThrows(IllegalStateException.class, () -> buf.readDouble());
assertThrows(IllegalStateException.class, () -> buf.getByte(0));
assertThrows(IllegalStateException.class, () -> buf.getUnsignedByte(0));
assertThrows(IllegalStateException.class, () -> buf.getChar(0));
@ -399,6 +451,9 @@ public class BufTest {
assertThrows(IllegalStateException.class, () -> buf.getFloat(0));
assertThrows(IllegalStateException.class, () -> buf.getLong(0));
assertThrows(IllegalStateException.class, () -> buf.getDouble(0));
}
private static void verifyWriteInaccessible(Buf buf) {
assertThrows(IllegalStateException.class, () -> buf.writeByte((byte) 32));
assertThrows(IllegalStateException.class, () -> buf.writeUnsignedByte(32));
assertThrows(IllegalStateException.class, () -> buf.writeChar('3'));
@ -411,6 +466,7 @@ public class BufTest {
assertThrows(IllegalStateException.class, () -> buf.writeFloat(3.2f));
assertThrows(IllegalStateException.class, () -> buf.writeLong(32));
assertThrows(IllegalStateException.class, () -> buf.writeDouble(32));
assertThrows(IllegalStateException.class, () -> buf.setByte(0, (byte) 32));
assertThrows(IllegalStateException.class, () -> buf.setUnsignedByte(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setChar(0, '3'));
@ -425,22 +481,11 @@ public class BufTest {
assertThrows(IllegalStateException.class, () -> buf.setDouble(0, 32));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
try (Allocator allocator = Allocator.heap();
Buf target = allocator.allocate(24)) {
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, target, 0, 1));
if (Allocator.isComposite(buf)) {
assertThrows(IllegalStateException.class, () -> Allocator.extend(buf, target));
}
}
assertThrows(IllegalStateException.class, () -> buf.bifurcate());
assertThrows(IllegalStateException.class, () -> buf.send());
assertThrows(IllegalStateException.class, () -> buf.acquire());
assertThrows(IllegalStateException.class, () -> buf.slice());
assertThrows(IllegalStateException.class, () -> buf.fill((byte) 0));
assertThrows(IllegalStateException.class, () -> buf.openCursor());
assertThrows(IllegalStateException.class, () -> buf.openCursor(0, 0));
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor());
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor(0, 0));
try (Allocator allocator = Allocator.heap();
Buf source = allocator.allocate(8)) {
assertThrows(IllegalStateException.class, () -> source.copyInto(0, buf, 0, 1));
}
}
@ParameterizedTest
@ -1893,6 +1938,18 @@ public class BufTest {
}
}
@Test
public void emptyCompositeBufferMustAllowExtendingWithReadOnlyBuffer() {
try (Allocator allocator = Allocator.heap()) {
try (Buf composite = allocator.compose()) {
try (Buf b = allocator.allocate(8).readOnly(true)) {
Allocator.extend(composite, b);
assertTrue(composite.readOnly());
}
}
}
}
@Test
public void whenExtendingCompositeBufferWithWriteOffsetAtCapacityExtensionWriteOffsetCanBeNonZero() {
try (Allocator allocator = Allocator.heap()) {
@ -2262,6 +2319,255 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("allocators")
public void readOnlyBufferMustPreventWriteAccess(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
var b = buf.readOnly(true);
assertThat(b).isSameAs(buf);
verifyWriteInaccessible(buf);
}
}
@ParameterizedTest
@MethodSource("allocators")
public void readOnlyBufferMustBecomeWritableAgainAfterTogglingReadOnlyOff(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
assertFalse(buf.readOnly());
buf.readOnly(true);
assertTrue(buf.readOnly());
verifyWriteInaccessible(buf);
buf.readOnly(false);
assertFalse(buf.readOnly());
verifyWriteAccessible(buf);
}
}
private static void verifyWriteAccessible(Buf buf) {
buf.writerOffset(0).writeByte((byte) 32);
assertThat(buf.readerOffset(0).readByte()).isEqualTo((byte) 32);
buf.writerOffset(0).writeUnsignedByte(32);
assertThat(buf.readerOffset(0).readUnsignedByte()).isEqualTo(32);
buf.writerOffset(0).writeChar('3');
assertThat(buf.readerOffset(0).readChar()).isEqualTo('3');
buf.writerOffset(0).writeShort((short) 32);
assertThat(buf.readerOffset(0).readShort()).isEqualTo((short) 32);
buf.writerOffset(0).writeUnsignedShort(32);
assertThat(buf.readerOffset(0).readUnsignedShort()).isEqualTo(32);
buf.writerOffset(0).writeMedium(32);
assertThat(buf.readerOffset(0).readMedium()).isEqualTo(32);
buf.writerOffset(0).writeUnsignedMedium(32);
assertThat(buf.readerOffset(0).readUnsignedMedium()).isEqualTo(32);
buf.writerOffset(0).writeInt(32);
assertThat(buf.readerOffset(0).readInt()).isEqualTo(32);
buf.writerOffset(0).writeUnsignedInt(32);
assertThat(buf.readerOffset(0).readUnsignedInt()).isEqualTo(32L);
buf.writerOffset(0).writeFloat(3.2f);
assertThat(buf.readerOffset(0).readFloat()).isEqualTo(3.2f);
buf.writerOffset(0).writeLong(32);
assertThat(buf.readerOffset(0).readLong()).isEqualTo(32L);
buf.writerOffset(0).writeDouble(3.2);
assertThat(buf.readerOffset(0).readDouble()).isEqualTo(3.2);
buf.setByte(0, (byte) 32);
assertThat(buf.getByte(0)).isEqualTo((byte) 32);
buf.setUnsignedByte(0, 32);
assertThat(buf.getUnsignedByte(0)).isEqualTo(32);
buf.setChar(0, '3');
assertThat(buf.getChar(0)).isEqualTo('3');
buf.setShort(0, (short) 32);
assertThat(buf.getShort(0)).isEqualTo((short) 32);
buf.setUnsignedShort(0, 32);
assertThat(buf.getUnsignedShort(0)).isEqualTo(32);
buf.setMedium(0, 32);
assertThat(buf.getMedium(0)).isEqualTo(32);
buf.setUnsignedMedium(0, 32);
assertThat(buf.getUnsignedMedium(0)).isEqualTo(32);
buf.setInt(0, 32);
assertThat(buf.getInt(0)).isEqualTo(32);
buf.setUnsignedInt(0, 32);
assertThat(buf.getUnsignedInt(0)).isEqualTo(32L);
buf.setFloat(0, 3.2f);
assertThat(buf.getFloat(0)).isEqualTo(3.2f);
buf.setLong(0, 32);
assertThat(buf.getLong(0)).isEqualTo(32L);
buf.setDouble(0, 3.2);
assertThat(buf.getDouble(0)).isEqualTo(3.2);
if (buf.isOwned()) {
buf.ensureWritable(1);
}
buf.fill((byte) 0);
try (Allocator allocator = Allocator.heap();
Buf source = allocator.allocate(8)) {
source.copyInto(0, buf, 0, 1);
}
}
@Test
public void composingReadOnlyBuffersMustCreateReadOnlyCompositeBuffer() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(4).readOnly(true);
Buf b = allocator.allocate(4).readOnly(true);
Buf composite = allocator.compose(a, b)) {
assertTrue(composite.readOnly());
verifyWriteInaccessible(composite);
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void readOnlyBufferMustRemainReadOnlyAfterSend(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.readOnly(true);
var send = buf.send();
try (Buf receive = send.receive()) {
assertTrue(receive.readOnly());
verifyWriteInaccessible(receive);
}
}
}
@Test
public void readOnlyBufferMustRemainReadOnlyAfterSendForEmptyCompositeBuffer() {
try (Allocator allocator = Allocator.heap();
Buf buf = allocator.compose()) {
buf.readOnly(true);
var send = buf.send();
try (Buf receive = send.receive()) {
assertTrue(receive.readOnly());
}
}
}
@ParameterizedTest
@MethodSource("pooledAllocators")
public void readOnlyBufferMustNotBeReadOnlyAfterBeingReusedFromPool(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator()) {
for (int i = 0; i < 1000; i++) {
try (Buf buf = allocator.allocate(8)) {
assertFalse(buf.readOnly());
buf.readOnly(true);
assertTrue(buf.readOnly());
}
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void acquireOfReadOnlyBufferMustBeReadOnly(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.readOnly(true);
try (Buf acquire = buf.acquire()) {
assertTrue(acquire.readOnly());
}
}
}
@ParameterizedTest
@MethodSource("allocators")
public void sliceOfReadOnlyBufferMustBeReadOnly(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.writeLong(0x0102030405060708L);
buf.readOnly(true);
try (Buf slice = buf.slice()) {
assertTrue(slice.readOnly());
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void bifurcateOfReadOnlyBufferMustBeReadOnly(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(16)) {
buf.writeLong(0x0102030405060708L);
buf.readOnly(true);
try (Buf bifurcate = buf.bifurcate()) {
assertTrue(bifurcate.readOnly());
assertTrue(buf.readOnly());
}
}
}
@Test
public void composingReadOnlyAndWritableBuffersMustThrow() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(8).readOnly(true);
Buf b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> allocator.compose(a, b));
assertThrows(IllegalArgumentException.class, () -> allocator.compose(b, a));
assertThrows(IllegalArgumentException.class, () -> allocator.compose(a, b, a));
assertThrows(IllegalArgumentException.class, () -> allocator.compose(b, a, b));
}
}
@Test
public void compositeWritableBufferCannotBeExtendedWithReadOnlyBuffer() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8)) {
composite = allocator.compose(a);
}
try (composite; Buf b = allocator.allocate(8).readOnly(true)) {
assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
}
}
}
@Test
public void compositeReadOnlyBufferCannotBeExtendedWithWritableBuffer() {
try (Allocator allocator = Allocator.heap()) {
Buf composite;
try (Buf a = allocator.allocate(8).readOnly(true)) {
composite = allocator.compose(a);
}
try (composite; Buf b = allocator.allocate(8)) {
assertThrows(IllegalArgumentException.class, () -> Allocator.extend(composite, b));
}
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void compactOnReadOnlyBufferMustThrow(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.readOnly(true);
assertThrows(IllegalStateException.class, () -> buf.compact());
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void ensureWritableOnReadOnlyBufferMustThrow(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.readOnly(true);
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void copyIntoOnReadOnlyBufferMustThrow(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf dest = allocator.allocate(8)) {
dest.readOnly(true);
try (Buf src = allocator.allocate(8)) {
assertThrows(IllegalStateException.class, () -> src.copyInto(0, dest, 0, 1));
}
}
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
@ParameterizedTest
@MethodSource("allocators")