More efficient const buffer implementations

The const buffers of the various implementations are now able to share the underlying memory.
At least until they are forced not to.
Const buffers will behave ust like normal buffers, except they start out as read-only.
When they are made writable, or sliced, then they will allocate their own independent copy of the memory.
That way, const buffers can have their contents changed, and behave just like normal buffers.
The const-ness is a pure optimisation that should not have any externally observable behaviour.
This commit is contained in:
Chris Vest 2021-05-03 15:00:49 +02:00
parent d247ddeae3
commit 51cc1e7cf4
16 changed files with 386 additions and 71 deletions

View File

@ -83,7 +83,10 @@ public interface BufferAllocator extends AutoCloseable {
* any other buffer produced by the supplier.
* <p>
* It can generally be expected, but is not guaranteed, that the returned supplier is more resource efficient than
* allocating and copying memory with other available APIs.
* allocating and copying memory with other available APIs. In such optimised implementations, the underlying memory
* baking the buffers will be shared among all the buffers produced by the supplier. Each buffer will then allocate
* their own independent copy of the data only when needed, such as when making the buffer writable, or when slicing
* the buffer.
* <p>
* The primary use case for this API, is when you need to repeatedly produce buffers with the same contents, and
* you perhaps wish to keep a {@code static final} field with these contents. This use case has previously been
@ -110,11 +113,11 @@ public interface BufferAllocator extends AutoCloseable {
}
static BufferAllocator heap() {
return new ManagedBufferAllocator(MemoryManagers.getManagers().getHeapMemoryManager(), Statics.CLEANER);
return new ManagedBufferAllocator(MemoryManagers.getManagers().getHeapMemoryManager());
}
static BufferAllocator direct() {
return new ManagedBufferAllocator(MemoryManagers.getManagers().getNativeMemoryManager(), Statics.CLEANER);
return new ManagedBufferAllocator(MemoryManagers.getManagers().getNativeMemoryManager());
}
static BufferAllocator pooledHeap() {

View File

@ -15,29 +15,37 @@
*/
package io.netty.buffer.api;
import io.netty.buffer.api.internal.Statics;
import java.lang.ref.Cleaner;
import java.util.function.Supplier;
import static io.netty.buffer.api.internal.Statics.NO_OP_DROP;
class ManagedBufferAllocator implements BufferAllocator, AllocatorControl {
private final MemoryManager manager;
private final Cleaner cleaner;
ManagedBufferAllocator(MemoryManager manager, Cleaner cleaner) {
ManagedBufferAllocator(MemoryManager manager) {
this.manager = manager;
this.cleaner = cleaner;
}
@Override
public Buffer allocate(int size) {
BufferAllocator.checkSize(size);
return manager.allocateShared(this, size, manager.drop(), cleaner);
return manager.allocateShared(this, size, manager.drop(), Statics.CLEANER);
}
@Override
public Supplier<Buffer> constBufferSupplier(byte[] bytes) {
Buffer constantBuffer = manager.allocateShared(this, bytes.length, manager.drop(), Statics.CLEANER);
constantBuffer.writeBytes(bytes).readOnly(true);
return () -> manager.allocateCopyOnWritable(constantBuffer);
}
@Override
public Object allocateUntethered(Buffer originator, int size) {
BufferAllocator.checkSize(size);
var buf = manager.allocateShared(this, size, NO_OP_DROP, cleaner);
var buf = manager.allocateShared(this, size, NO_OP_DROP, Statics.CLEANER);
return manager.unwrapRecoverableMemory(buf);
}

View File

@ -18,8 +18,8 @@ package io.netty.buffer.api;
import java.lang.ref.Cleaner;
public interface MemoryManager {
boolean isNative();
Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner);
Buffer allocateCopyOnWritable(Buffer ownedReadOnlyBuffer);
Drop<Buffer> drop();
Object unwrapRecoverableMemory(Buffer buf);
int capacityOfRecoverableMemory(Object memory);

View File

@ -22,6 +22,7 @@ import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Supplier;
import static io.netty.buffer.api.internal.Statics.NO_OP_DROP;
import static java.lang.invoke.MethodHandles.lookup;
@ -54,6 +55,13 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
return createBuf(size, getDrop());
}
@Override
public Supplier<Buffer> constBufferSupplier(byte[] bytes) {
Buffer constantBuffer = manager.allocateShared(this, bytes.length, manager.drop(), Statics.CLEANER);
constantBuffer.writeBytes(bytes).readOnly(true);
return () -> manager.allocateCopyOnWritable(constantBuffer);
}
protected MemoryManager getMemoryManager() {
return manager;
}

View File

@ -34,11 +34,6 @@ public class ByteBufferMemoryManager implements MemoryManager {
this.direct = direct;
}
@Override
public boolean isNative() {
return direct;
}
@Override
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
int capacity = Math.toIntExact(size);
@ -47,6 +42,13 @@ public class ByteBufferMemoryManager implements MemoryManager {
return new NioBuffer(buffer, buffer, allocatorControl, convert(drop));
}
@Override
public Buffer allocateCopyOnWritable(Buffer ownedReadOnlyBuffer) {
assert ownedReadOnlyBuffer.isOwned() && ownedReadOnlyBuffer.readOnly();
NioBuffer buf = (NioBuffer) ownedReadOnlyBuffer;
return new NioBuffer(buf);
}
@Override
public Drop<Buffer> drop() {
return Statics.NO_OP_DROP;

View File

@ -47,6 +47,7 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
private int roff;
private int woff;
private boolean constBuffer;
NioBuffer(ByteBuffer base, ByteBuffer memory, AllocatorControl control, Drop<NioBuffer> drop) {
super(new MakeInaccisbleOnDrop(ArcDrop.wrap(drop)));
@ -56,6 +57,22 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
this.control = control;
}
/**
* Constructor for {@linkplain BufferAllocator#constBufferSupplier(byte[]) const buffers}.
*/
NioBuffer(NioBuffer parent) {
super(new MakeInaccisbleOnDrop(new ArcDrop<>(((ArcDrop<NioBuffer>) parent.unsafeGetDrop()).unwrap())));
control = parent.control;
base = parent.base;
rmem = parent.rmem.slice(0, parent.rmem.capacity()); // Need to slice to get independent byte orders.
assert parent.wmem == CLOSED_BUFFER;
wmem = CLOSED_BUFFER;
roff = parent.roff;
woff = parent.woff;
order(parent.order());
constBuffer = true;
}
private static final class MakeInaccisbleOnDrop implements Drop<NioBuffer> {
final Drop<NioBuffer> delegate;
@ -159,6 +176,9 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
@Override
public Buffer readOnly(boolean readOnly) {
if (!readOnly) {
deconstify();
}
if (readOnly && wmem == rmem) {
wmem = CLOSED_BUFFER;
} else if (!readOnly && wmem != rmem) {
@ -180,6 +200,7 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
if (!isAccessible()) {
throw new IllegalStateException("This buffer is closed: " + this + '.');
}
deconstify(); // Slice or parent could later be made writable, and if so, changes must be visible in both.
ByteBuffer slice = rmem.slice(offset, length);
ArcDrop<NioBuffer> drop = (ArcDrop<NioBuffer>) unsafeGetDrop();
drop.increment();
@ -398,21 +419,28 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
// Copy contents.
copyInto(0, buffer, 0, capacity());
// Release old memory:
// Release the old memory and install the new:
Drop<NioBuffer> drop = disconnectDrop();
attachNewBuffer(buffer, drop);
}
private Drop<NioBuffer> disconnectDrop() {
var drop = (Drop<NioBuffer>) unsafeGetDrop();
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
while (drop instanceof ArcDrop) {
drop = ((ArcDrop<NioBuffer>) drop).unwrap();
}
drop = ArcDrop.unwrapAllArcs(drop);
unsafeSetDrop(new ArcDrop<>(drop));
this.roff = roff;
this.woff = woff;
return drop;
}
private void attachNewBuffer(ByteBuffer buffer, Drop<NioBuffer> drop) {
base = buffer;
rmem = buffer;
wmem = buffer;
constBuffer = false;
drop.attach(this);
}
@ -438,6 +466,9 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
splitBuffer.order(order());
boolean readOnly = readOnly();
splitBuffer.readOnly(readOnly);
// Note that split, unlike slice, does not deconstify, because data changes in either buffer are not visible
// in the other. The split buffers can later deconstify independently if needed.
splitBuffer.constBuffer = constBuffer;
rmem = rmem.slice(splitOffset, rmem.capacity() - splitOffset);
if (!readOnly) {
wmem = rmem;
@ -554,6 +585,7 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
return processor.process(initialIndex, this)? 1 : -1;
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
@Override
public byte readByte() {
checkRead(roff, Byte.BYTES);
@ -1053,6 +1085,7 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
throw bufferIsReadOnly();
}
}
// </editor-fold>
@Override
protected Owned<NioBuffer> prepareSend() {
@ -1060,6 +1093,7 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
var roff = this.roff;
var woff = this.woff;
var readOnly = readOnly();
var isConst = constBuffer;
ByteBuffer base = this.base;
ByteBuffer rmem = this.rmem;
makeInaccessible();
@ -1071,6 +1105,7 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
copy.roff = roff;
copy.woff = woff;
copy.readOnly(readOnly);
copy.constBuffer = isConst;
return copy;
}
};
@ -1084,6 +1119,24 @@ class NioBuffer extends RcSupport<Buffer, NioBuffer> implements Buffer, Readable
woff = 0;
}
/**
* If this buffer is a {@linkplain BufferAllocator#constBufferSupplier(byte[]) const buffer}, turn it into a normal
* buffer, in order to protect the const parent.
* A const buffer is sharing its memory with some parent, whose contents cannot be allowed to change.
* To ensure this, we must allocate our own memory and copy the contents, because we will soon not be able to
* guarantee that the contents of <em>this</em> buffer won't change.
*/
private void deconstify() {
if (constBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) control.allocateUntethered(this, capacity());
byteBuffer.put(rmem);
byteBuffer.order(rmem.order());
Drop<NioBuffer> drop = ArcDrop.wrap(ArcDrop.unwrapAllArcs(unsafeGetDrop()));
unsafeSetDrop(drop);
attachNewBuffer(byteBuffer, drop);
}
}
@Override
public boolean isOwned() {
return super.isOwned() && ((ArcDrop<NioBuffer>) unsafeGetDrop()).isOwned();

View File

@ -46,6 +46,13 @@ public final class ArcDrop<T> implements Drop<T> {
return new ArcDrop<X>(drop);
}
public static <X> Drop<X> unwrapAllArcs(Drop<X> drop) {
while (drop instanceof ArcDrop) {
drop = ((ArcDrop<X>) drop).unwrap();
}
return drop;
}
public static <X> Drop<X> acquire(Drop<X> drop) {
if (drop.getClass() == ArcDrop.class) {
((ArcDrop<X>) drop).increment();

View File

@ -27,15 +27,19 @@ import java.lang.ref.Cleaner;
import static io.netty.buffer.api.internal.Statics.convert;
public abstract class AbstractMemorySegmentManager implements MemoryManager {
@Override
public abstract boolean isNative();
@Override
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
var segment = createSegment(size, cleaner);
return new MemSegBuffer(segment, segment, convert(drop), allocatorControl);
}
@Override
public Buffer allocateCopyOnWritable(Buffer ownedReadOnlyBuffer) {
assert ownedReadOnlyBuffer.isOwned() && ownedReadOnlyBuffer.readOnly();
MemSegBuffer buf = (MemSegBuffer) ownedReadOnlyBuffer;
return new MemSegBuffer(buf);
}
protected abstract MemorySegment createSegment(long size, Cleaner cleaner);
@Override

View File

@ -20,11 +20,6 @@ import jdk.incubator.foreign.MemorySegment;
import java.lang.ref.Cleaner;
public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
@Override
public boolean isNative() {
return false;
}
@Override
protected MemorySegment createSegment(long size, Cleaner cleaner) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);

View File

@ -84,23 +84,40 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
};
}
private final AllocatorControl alloc;
private final AllocatorControl control;
private MemorySegment base;
private MemorySegment seg;
private MemorySegment wseg;
private ByteOrder order;
private int roff;
private int woff;
private boolean constBuffer;
MemSegBuffer(MemorySegment base, MemorySegment view, Drop<MemSegBuffer> drop, AllocatorControl alloc) {
MemSegBuffer(MemorySegment base, MemorySegment view, Drop<MemSegBuffer> drop, AllocatorControl control) {
super(new MakeInaccisbleOnDrop(ArcDrop.wrap(drop)));
this.alloc = alloc;
this.control = control;
this.base = base;
seg = view;
wseg = view;
order = ByteOrder.nativeOrder();
}
/**
* Constructor for {@linkplain BufferAllocator#constBufferSupplier(byte[]) const buffers}.
*/
MemSegBuffer(MemSegBuffer parent) {
super(new MakeInaccisbleOnDrop(new ArcDrop<>(((ArcDrop<MemSegBuffer>) parent.unsafeGetDrop()).unwrap())));
control = parent.control;
base = parent.base;
seg = parent.seg;
wseg = parent.wseg;
order = parent.order;
roff = parent.roff;
woff = parent.woff;
adaptor = null; // The adaptor must be independent, because we can de-constify.
constBuffer = true;
}
private static final class MakeInaccisbleOnDrop implements Drop<MemSegBuffer> {
final Drop<MemSegBuffer> delegate;
@ -282,6 +299,9 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
@Override
public Buffer readOnly(boolean readOnly) {
if (!readOnly) {
deconstify();
}
wseg = readOnly? CLOSED_SEGMENT : seg;
return this;
}
@ -299,9 +319,10 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
if (!isAccessible()) {
throw new IllegalStateException("This buffer is closed: " + this + '.');
}
deconstify(); // Slice or parent could later be made writable, and if so, changes must be visible in both.
var slice = seg.asSlice(offset, length);
Drop<MemSegBuffer> drop = ArcDrop.acquire(unsafeGetDrop());
return new MemSegBuffer(base, slice, drop, alloc)
return new MemSegBuffer(base, slice, drop, control)
.writerOffset(length)
.order(order())
.readOnly(readOnly());
@ -518,32 +539,39 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
// Allocate a bigger buffer.
long newSize = capacity() + (long) Math.max(size - writableBytes(), minimumGrowth);
BufferAllocator.checkSize(newSize);
MemorySegment newSegment = (MemorySegment) alloc.allocateUntethered(this, (int) newSize);
MemorySegment newSegment = (MemorySegment) control.allocateUntethered(this, (int) newSize);
// Copy contents.
newSegment.copyFrom(seg);
// Release old memory segment:
// Release the old memory segment and install the new one:
Drop<MemSegBuffer> drop = disconnectDrop();
attachNewMemorySegment(newSegment, drop);
}
private Drop<MemSegBuffer> disconnectDrop() {
var drop = unsafeGetDrop();
if (drop instanceof ArcDrop) {
// Disconnect from the current arc drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
while (drop instanceof ArcDrop) {
drop = ((ArcDrop<MemSegBuffer>) drop).unwrap();
}
drop = ArcDrop.unwrapAllArcs(drop);
unsafeSetDrop(new ArcDrop<>(drop));
this.roff = roff;
this.woff = woff;
} else {
// TODO would we ever get here?
alloc.recoverMemory(recoverableMemory());
control.recoverMemory(recoverableMemory());
}
return drop;
}
private void attachNewMemorySegment(MemorySegment newSegment, Drop<MemSegBuffer> drop) {
base = newSegment;
seg = newSegment;
wseg = newSegment;
constBuffer = false;
drop.attach(this);
}
@ -562,12 +590,15 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
var drop = (ArcDrop<MemSegBuffer>) unsafeGetDrop();
unsafeSetDrop(new ArcDrop<>(drop));
var splitSegment = seg.asSlice(0, splitOffset);
var splitBuffer = new MemSegBuffer(base, splitSegment, new ArcDrop<>(drop.increment()), alloc);
var splitBuffer = new MemSegBuffer(base, splitSegment, new ArcDrop<>(drop.increment()), control);
splitBuffer.woff = Math.min(woff, splitOffset);
splitBuffer.roff = Math.min(roff, splitOffset);
splitBuffer.order(order);
boolean readOnly = readOnly();
splitBuffer.readOnly(readOnly);
// Note that split, unlike slice, does not deconstify, because data changes in either buffer are not visible
// in the other. The split buffers can later deconstify independently if needed.
splitBuffer.constBuffer = constBuffer;
seg = seg.asSlice(splitOffset, seg.byteSize() - splitOffset);
if (!readOnly) {
wseg = seg;
@ -1095,17 +1126,19 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
var roff = this.roff;
var woff = this.woff;
var readOnly = readOnly();
var isConst = constBuffer;
MemorySegment transferSegment = seg;
MemorySegment base = this.base;
makeInaccessible();
return new Owned<MemSegBuffer>() {
@Override
public MemSegBuffer transferOwnership(Drop<MemSegBuffer> drop) {
MemSegBuffer copy = new MemSegBuffer(base, transferSegment, drop, alloc);
MemSegBuffer copy = new MemSegBuffer(base, transferSegment, drop, control);
copy.order = order;
copy.roff = roff;
copy.woff = woff;
copy.readOnly(readOnly);
copy.constBuffer = isConst;
return copy;
}
};
@ -1119,6 +1152,23 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
woff = 0;
}
/**
* If this buffer is a {@linkplain BufferAllocator#constBufferSupplier(byte[]) const buffer}, turn it into a normal
* buffer, in order to protect the const parent.
* A const buffer is sharing its memory with some parent, whose contents cannot be allowed to change.
* To ensure this, we must allocate our own memory and copy the contents, because we will soon not be able to
* guarantee that the contents of <em>this</em> buffer won't change.
*/
private void deconstify() {
if (constBuffer) {
MemorySegment newSegment = (MemorySegment) control.allocateUntethered(this, capacity());
newSegment.copyFrom(seg);
Drop<MemSegBuffer> drop = ArcDrop.wrap(ArcDrop.unwrapAllArcs(unsafeGetDrop()));
unsafeSetDrop(drop);
attachNewMemorySegment(newSegment, drop);
}
}
@Override
public boolean isOwned() {
return super.isOwned() && ((ArcDrop<MemSegBuffer>) unsafeGetDrop()).isOwned();

View File

@ -52,11 +52,6 @@ public class NativeMemorySegmentManager extends AbstractMemorySegmentManager {
}
}
@Override
public boolean isNative() {
return true;
}
@Override
protected MemorySegment createSegment(long size, Cleaner cleaner) {
final ResourceScope scope = cleaner == null ? newSharedScope() : newSharedScope(cleaner);

View File

@ -38,7 +38,7 @@ import static io.netty.buffer.api.internal.Statics.bufferIsClosed;
import static io.netty.buffer.api.internal.Statics.bufferIsReadOnly;
import static io.netty.util.internal.PlatformDependent.BIG_ENDIAN_NATIVE_ORDER;
public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buffer, ReadableComponent,
class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buffer, ReadableComponent,
WritableComponent {
private static final int CLOSED_SIZE = -1;
private static final boolean ACCESS_UNALIGNED = PlatformDependent.isUnaligned();
@ -54,8 +54,9 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
private boolean readOnly;
private int roff;
private int woff;
private boolean constBuffer;
public UnsafeBuffer(UnsafeMemory memory, long offset, int size, AllocatorControl allocatorControl,
UnsafeBuffer(UnsafeMemory memory, long offset, int size, AllocatorControl allocatorControl,
Drop<UnsafeBuffer> drop) {
super(new MakeInaccisbleOnDrop(ArcDrop.wrap(drop)));
this.memory = memory;
@ -68,6 +69,23 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
order = ByteOrder.nativeOrder();
}
UnsafeBuffer(UnsafeBuffer parent) {
super(new MakeInaccisbleOnDrop(new ArcDrop<>(((ArcDrop<UnsafeBuffer>) parent.unsafeGetDrop()).unwrap())));
control = parent.control;
memory = parent.memory;
base = parent.base;
baseOffset = parent.baseOffset;
address = parent.address;
rsize = parent.rsize;
wsize = parent.wsize;
order = parent.order;
flipBytes = parent.flipBytes;
readOnly = parent.readOnly;
roff = parent.roff;
woff = parent.woff;
constBuffer = true;
}
@Override
public String toString() {
return "Buffer[roff:" + roff + ", woff:" + woff + ", cap:" + rsize + ']';
@ -135,6 +153,9 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
@Override
public Buffer readOnly(boolean readOnly) {
if (!readOnly) {
deconstify();
}
this.readOnly = readOnly;
wsize = readOnly? CLOSED_SIZE : rsize;
return this;
@ -151,6 +172,7 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
throw new IllegalArgumentException("Length cannot be negative: " + length + '.');
}
checkGet(offset, length);
deconstify(); // Slice or parent could later be made writable, and if so, changes must be visible in both.
ArcDrop<UnsafeBuffer> drop = (ArcDrop<UnsafeBuffer>) unsafeGetDrop();
drop.increment();
return new UnsafeBuffer(memory, baseOffset + offset, length, control, drop)
@ -436,24 +458,31 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
Reference.reachabilityFence(memory);
}
// Release old memory:
// Release the old memory, and install the new memory:
Drop<UnsafeBuffer> drop = disconnectDrop();
attachNewMemory(memory, drop);
}
private Drop<UnsafeBuffer> disconnectDrop() {
var drop = (Drop<UnsafeBuffer>) unsafeGetDrop();
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
while (drop instanceof ArcDrop) {
drop = ((ArcDrop<UnsafeBuffer>) drop).unwrap();
}
drop = ArcDrop.unwrapAllArcs(drop);
unsafeSetDrop(new ArcDrop<>(drop));
this.roff = roff;
this.woff = woff;
return drop;
}
private void attachNewMemory(UnsafeMemory memory, Drop<UnsafeBuffer> drop) {
this.memory = memory;
base = memory.base;
baseOffset = 0;
address = memory.address;
rsize = memory.size;
wsize = memory.size;
constBuffer = false;
drop.attach(this);
}
@ -478,6 +507,9 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
splitBuffer.order(order());
boolean readOnly = readOnly();
splitBuffer.readOnly(readOnly);
// Note that split, unlike slice, does not deconstify, because data changes in either buffer are not visible
// in the other. The split buffers can later deconstify independently if needed.
splitBuffer.constBuffer = constBuffer;
rsize -= splitOffset;
baseOffset += splitOffset;
address += splitOffset;
@ -628,6 +660,7 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
return processor.process(initialIndex, this)? 1 : -1;
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
@Override
public byte readByte() {
checkRead(roff, Byte.BYTES);
@ -1189,6 +1222,7 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
}
return this;
}
// </editor-fold>
@Override
protected Owned<UnsafeBuffer> prepareSend() {
@ -1196,6 +1230,7 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
var roff = this.roff;
var woff = this.woff;
var readOnly = readOnly();
var isConst = constBuffer;
UnsafeMemory memory = this.memory;
AllocatorControl control = this.control;
long baseOffset = this.baseOffset;
@ -1209,6 +1244,7 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
copy.roff = roff;
copy.woff = woff;
copy.readOnly(readOnly);
copy.constBuffer = isConst;
return copy;
}
};
@ -1260,6 +1296,29 @@ public class UnsafeBuffer extends RcSupport<Buffer, UnsafeBuffer> implements Buf
readOnly = false;
}
/**
* If this buffer is a {@linkplain BufferAllocator#constBufferSupplier(byte[]) const buffer}, turn it into a normal
* buffer, in order to protect the const parent.
* A const buffer is sharing its memory with some parent, whose contents cannot be allowed to change.
* To ensure this, we must allocate our own memory and copy the contents, because we will soon not be able to
* guarantee that the contents of <em>this</em> buffer won't change.
*/
private void deconstify() {
if (constBuffer) {
UnsafeMemory newMemory = (UnsafeMemory) control.allocateUntethered(this, capacity());
UnsafeMemory oldMemory = memory;
try {
PlatformDependent.copyMemory(base, address, newMemory.base, newMemory.address, newMemory.size);
Drop<UnsafeBuffer> drop = ArcDrop.wrap(ArcDrop.unwrapAllArcs(unsafeGetDrop()));
unsafeSetDrop(drop);
attachNewMemory(newMemory, drop);
} finally {
Reference.reachabilityFence(newMemory);
Reference.reachabilityFence(oldMemory);
}
}
}
@Override
public boolean isOwned() {
return super.isOwned() && ((ArcDrop<UnsafeBuffer>) unsafeGetDrop()).isOwned();

View File

@ -33,11 +33,6 @@ public class UnsafeMemoryManager implements MemoryManager {
this.offheap = offheap;
}
@Override
public boolean isNative() {
return offheap;
}
@Override
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
final Object base;
@ -61,6 +56,13 @@ public class UnsafeMemoryManager implements MemoryManager {
return new UnsafeBuffer(memory, 0, size32, allocatorControl, convert(drop));
}
@Override
public Buffer allocateCopyOnWritable(Buffer ownedReadOnlyBuffer) {
assert ownedReadOnlyBuffer.isOwned() && ownedReadOnlyBuffer.readOnly();
UnsafeBuffer buf = (UnsafeBuffer) ownedReadOnlyBuffer;
return new UnsafeBuffer(buf);
}
@Override
public Drop<Buffer> drop() {
// We cannot reliably drop unsafe memory. We have to rely on the cleaner to do that.

View File

@ -19,6 +19,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.function.Supplier;
import static java.nio.ByteOrder.BIG_ENDIAN;
@ -150,19 +152,129 @@ public class BufferReadOnlyTest extends BufferTestSupport {
}
}
@Test
public void modifyingConstBufferDoesNotImpactSiblings() {
Supplier<Buffer> supplier = BufferAllocator.heap().constBufferSupplier(new byte[] {1, 2, 3, 4});
try (Buffer a = supplier.get();
Buffer b = supplier.get().order(LITTLE_ENDIAN)) {
a.order(BIG_ENDIAN).readOnly(false).setInt(0, 0xA1A2A3A4);
a.readerOffset(2);
assertThat(toByteArray(a)).containsExactly(0xA1, 0xA2, 0xA3, 0xA4);
assertThat(toByteArray(b)).containsExactly(1, 2, 3, 4);
assertThat(b.readerOffset()).isZero();
assertThat(b.order()).isEqualTo(LITTLE_ENDIAN);
assertThat(a.writerOffset()).isEqualTo(4);
assertThat(b.writerOffset()).isEqualTo(4);
@ParameterizedTest
@MethodSource("initialNoConstAllocators")
public void constBufferInitialState(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.constBufferSupplier(new byte[] {1, 2, 3, 4}).get()) {
assertTrue(buf.readOnly());
assertThat(buf.order()).isEqualTo(ByteOrder.nativeOrder());
assertThat(buf.readerOffset()).isZero();
assertThat(buf.capacity()).isEqualTo(4);
assertThat(buf.writerOffset()).isEqualTo(4);
assertTrue(buf.isOwned());
assertTrue(buf.isAccessible());
assertThat(buf.countComponents()).isOne();
assertThat(buf.countBorrows()).isZero();
assertEquals((byte) 1, buf.readByte());
assertEquals((byte) 2, buf.readByte());
assertEquals((byte) 3, buf.readByte());
assertEquals((byte) 4, buf.readByte());
}
}
@ParameterizedTest
@MethodSource("initialNoConstAllocators")
public void modifyingConstBufferDoesNotImpactSiblings(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator()) {
Supplier<Buffer> supplier = allocator.constBufferSupplier(new byte[] {1, 2, 3, 4});
try (Buffer a = supplier.get().order(BIG_ENDIAN);
Buffer b = supplier.get().order(LITTLE_ENDIAN)) {
assertThat(a.order()).isEqualTo(BIG_ENDIAN);
assertThat(b.order()).isEqualTo(LITTLE_ENDIAN);
a.readOnly(false);
a.setInt(0, 0xA1A2A3A4);
a.readerOffset(2);
assertThat(toByteArray(a)).containsExactly(0xA1, 0xA2, 0xA3, 0xA4);
assertThat(toByteArray(b)).containsExactly(1, 2, 3, 4);
assertThat(b.readerOffset()).isZero();
assertThat(a.order()).isEqualTo(BIG_ENDIAN);
assertThat(b.order()).isEqualTo(LITTLE_ENDIAN);
assertThat(a.writerOffset()).isEqualTo(4);
assertThat(b.writerOffset()).isEqualTo(4);
}
}
}
@ParameterizedTest
@MethodSource("initialNoConstAllocators")
public void sliceOfConstBufferMustObserveChangesInParent(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer parent = allocator.constBufferSupplier(new byte[] {1, 2, 3, 4}).get();
Buffer slice = parent.slice(0, 4)) {
parent.readOnly(false);
parent.setByte(0, (byte) 42);
assertThat(slice.getByte(0)).isEqualTo((byte) 42);
}
}
@ParameterizedTest
@MethodSource("initialNoConstAllocators")
public void parentOfConstBufferSliceMustObserveChangesInSlice(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer parent = allocator.constBufferSupplier(new byte[] {1, 2, 3, 4}).get();
Buffer slice = parent.slice(0, 4)) {
slice.readOnly(false);
slice.setByte(0, (byte) 42);
assertThat(parent.getByte(0)).isEqualTo((byte) 42);
}
}
@ParameterizedTest
@MethodSource("initialNoConstAllocators")
public void splitsOfConstBuffersCanBecomeWritable(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer a = allocator.constBufferSupplier(new byte[16]).get();
Buffer b = a.split(8)) {
assertTrue(a.readOnly());
assertTrue(b.readOnly());
assertThat(a.capacity()).isEqualTo(8);
assertThat(b.capacity()).isEqualTo(8);
a.readOnly(false);
b.readOnly(false);
a.setInt(0, 1);
b.setInt(0, 2);
assertEquals(1, a.getInt(0));
assertEquals(2, b.getInt(0));
}
}
@ParameterizedTest
@MethodSource("initialNoConstAllocators")
public void compactOnConstBufferMustNotImpactSiblings(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator()) {
Supplier<Buffer> supplier = allocator.constBufferSupplier(new byte[] {1, 2, 3, 4});
try (Buffer a = supplier.get();
Buffer b = supplier.get()) {
a.readShort();
assertThrows(IllegalStateException.class, () -> a.compact()); // Can't compact read-only buffer.
a.readOnly(false).compact(); // Setting read-only to false will deconstify the buffer.
assertEquals(3, a.readByte());
assertEquals(4, a.readByte());
assertEquals(0, a.readableBytes());
assertEquals(1, b.readByte());
assertEquals(2, b.readByte());
assertEquals(3, b.readByte());
assertEquals(4, b.readByte());
}
}
}
@ParameterizedTest
@MethodSource("initialNoConstAllocators")
public void constBuffersMustBeSendable(Fixture fixture) throws Exception {
try (BufferAllocator allocator = fixture.createAllocator()) {
Supplier<Buffer> supplier = allocator.constBufferSupplier(new byte[] {1, 2, 3, 4});
try (Buffer buffer = supplier.get()) {
Send<Buffer> send = buffer.send();
var future = executor.submit(() -> {
try (Buffer receive = send.receive()) {
return receive.order(BIG_ENDIAN).readInt();
}
});
assertEquals(0x01020304, future.get());
}
}
}
}

View File

@ -40,6 +40,7 @@ import java.util.stream.Stream.Builder;
import static io.netty.buffer.api.Fixture.Properties.CLEANER;
import static io.netty.buffer.api.Fixture.Properties.COMPOSITE;
import static io.netty.buffer.api.Fixture.Properties.CONST;
import static io.netty.buffer.api.Fixture.Properties.DIRECT;
import static io.netty.buffer.api.Fixture.Properties.HEAP;
import static io.netty.buffer.api.Fixture.Properties.POOLED;
@ -55,6 +56,8 @@ import static org.junit.jupiter.api.Assertions.fail;
public abstract class BufferTestSupport {
public static ExecutorService executor;
private static final Memoize<Fixture[]> INITIAL_NO_CONST = new Memoize<>(
() -> initialFixturesForEachImplementation().stream().filter(f -> !f.isConst()).toArray(Fixture[]::new));
private static final Memoize<Fixture[]> ALL_COMBINATIONS = new Memoize<>(
() -> fixtureCombinations().toArray(Fixture[]::new));
private static final Memoize<Fixture[]> ALL_ALLOCATORS = new Memoize<>(
@ -127,12 +130,16 @@ public abstract class BufferTestSupport {
return POOLED_DIRECT_ALLOCS.get();
}
static Fixture[] initialNoConstAllocators() {
return INITIAL_NO_CONST.get();
}
static List<Fixture> initialAllocators() {
return List.of(
new Fixture("heap", BufferAllocator::heap, HEAP),
new Fixture("constHeap", () -> constantBufferBasedAllocator(BufferAllocator.heap()), HEAP),
new Fixture("constHeap", () -> constantBufferBasedAllocator(BufferAllocator.heap()), HEAP, CONST),
new Fixture("constDirect", () -> constantBufferBasedAllocator(BufferAllocator.direct()),
DIRECT, CLEANER),
DIRECT, CONST, CLEANER),
new Fixture("direct", BufferAllocator::direct, DIRECT, CLEANER),
new Fixture("pooledHeap", BufferAllocator::pooledHeap, POOLED, HEAP),
new Fixture("pooledDirect", BufferAllocator::pooledDirect, POOLED, DIRECT, CLEANER));
@ -147,7 +154,7 @@ public abstract class BufferTestSupport {
};
}
private static Stream<Fixture> fixtureCombinations() {
static List<Fixture> initialFixturesForEachImplementation() {
List<Fixture> initFixtures = initialAllocators();
// Multiply by all MemoryManagers.
@ -156,12 +163,17 @@ public abstract class BufferTestSupport {
loadableManagers.add(provider.get());
});
initFixtures = initFixtures.stream().flatMap(f -> {
Stream.Builder<Fixture> builder = Stream.builder();
Builder<Fixture> builder = Stream.builder();
for (MemoryManagers managers : loadableManagers) {
builder.add(new Fixture(f + "/" + managers, () -> using(managers, f), f.getProperties()));
}
return builder.build();
}).toList();
return initFixtures;
}
private static Stream<Fixture> fixtureCombinations() {
List<Fixture> initFixtures = initialFixturesForEachImplementation();
Builder<Fixture> builder = Stream.builder();
initFixtures.forEach(builder);

View File

@ -72,9 +72,14 @@ public final class Fixture implements Supplier<BufferAllocator> {
return properties.contains(Properties.SLICE);
}
public boolean isConst() {
return properties.contains(Properties.CONST);
}
public enum Properties {
HEAP,
DIRECT,
CONST,
COMPOSITE,
CLEANER,
POOLED,