Allow slices to obtain ownership when parent is closed

Motivation:
It is kind of a weird internal and hidden state, that slices were special.
For instance, slices could not be sent, and they could never obtain ownership.
This means buffers from slices behaved differently from allocated buffers.
In doing so, they violated both the principle that magic should stay hidden, and the principle of consistent behaviour.

Modification:
- The special reference-counting drop implementation that was added to support bifurcation, has been renamed to ArcDrop (for atomic reference counting).
- The ArcDrop is then used throughout the MemSegBuffer implementation to account for every instance where multiple buffers reference the same memory, e.g. slices and the like.
- Borrows of a buffer is then the sum of borrows from the buffer itself, and its ArcDrop.
- Ownership is thus tied to both the buffer itself being owned, and the ArcDrop being in an owned state.
- SizeClassedMemoryPool is changed to pool recoverable memory instead of sends, because the sends could come from slices.
- We also take care to keep around a "base" memory segment, so that we don't return memory segment slices to the memory pool (doing so would leak the memory from the parent segment that is not part of the slice).
- CleanerPooledDrop now keeps a weak reference to itself, rather than the buffer, which is more correct anyway, but now also required because we cannot rely on the buffer reference the cleaner was created with.
- The CleanerPooledDrop now takes care to drop the buffer that is actually passed to it, rather than what it was referencing from some earlier point.
- MemoryManager can now disclose the size of recoverable memory, so that SizeClassedMemoryPool can pick the correct size pool to return memory to. It cannot rely on the passed down buffer instance for this, because that buffer might have been a slice.

Result:
It is now possible for slices to obtain ownership when their parent buffer is closed.
This commit is contained in:
Chris Vest 2021-03-15 16:42:56 +01:00
parent 374b0524d8
commit 253b6cb919
8 changed files with 208 additions and 99 deletions

View File

@ -45,6 +45,7 @@ class CleanerPooledDrop implements Drop<Buffer> {
GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null);
if (c != null) {
c.clean();
delegate.drop(buf);
}
}
@ -57,24 +58,38 @@ class CleanerPooledDrop implements Drop<Buffer> {
c.clean();
}
var pool = this.pool;
var mem = manager.unwrapRecoverableMemory(buf);
var delegate = this.delegate;
WeakReference<Buffer> ref = new WeakReference<>(buf);
WeakReference<CleanerPooledDrop> ref = new WeakReference<>(this);
AtomicBoolean gate = new AtomicBoolean(true);
cleanable = new GatedCleanable(gate, CLEANER.register(this, () -> {
if (gate.getAndSet(false)) {
Buffer b = ref.get();
if (b == null) {
pool.recoverMemory(mem);
} else {
delegate.drop(b);
}
}
}));
cleanable = new GatedCleanable(gate, CLEANER.register(this, new CleanAction(pool, mem, ref, gate)));
}
private static class GatedCleanable implements Cleanable {
private static final class CleanAction implements Runnable {
private final SizeClassedMemoryPool pool;
private final Object mem;
private final WeakReference<CleanerPooledDrop> ref;
private final AtomicBoolean gate;
private CleanAction(SizeClassedMemoryPool pool, Object mem, WeakReference<CleanerPooledDrop> ref,
AtomicBoolean gate) {
this.pool = pool;
this.mem = mem;
this.ref = ref;
this.gate = gate;
}
@Override
public void run() {
if (gate.getAndSet(false)) {
var monitored = ref.get();
if (monitored == null) {
pool.recoverMemory(mem);
}
}
}
}
private static final class GatedCleanable implements Cleanable {
private final AtomicBoolean gate;
private final Cleanable cleanable;

View File

@ -34,5 +34,6 @@ public interface MemoryManager {
Buffer allocateShared(AllocatorControl allo, long size, Drop<Buffer> drop, Cleaner cleaner);
Drop<Buffer> drop();
Object unwrapRecoverableMemory(Buffer buf);
int capacityOfRecoverableMemory(Object memory);
Buffer recoverMemory(Object recoverableMemory, Drop<Buffer> drop);
}

View File

@ -37,7 +37,7 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
@Override
public final I acquire() {
if (acquires < 0) {
throw attachTrace(new IllegalStateException("Resource is closed."));
throw attachTrace(new IllegalStateException("This resource is closed: " + this + '.'));
}
if (acquires == Integer.MAX_VALUE) {
throw new IllegalStateException("Cannot acquire more references; counter would overflow.");

View File

@ -28,7 +28,7 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
private static final VarHandle CLOSE = Statics.findVarHandle(
lookup(), SizeClassedMemoryPool.class, "closed", boolean.class);
private final MemoryManager manager;
private final ConcurrentHashMap<Integer, ConcurrentLinkedQueue<Send<Buffer>>> pool;
private final ConcurrentHashMap<Integer, ConcurrentLinkedQueue<Object>> pool;
@SuppressWarnings("unused")
private volatile boolean closed;
@ -41,9 +41,9 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
public Buffer allocate(int size) {
BufferAllocator.checkSize(size);
var sizeClassPool = getSizeClassPool(size);
Send<Buffer> send = sizeClassPool.poll();
if (send != null) {
return send.receive()
Object memory = sizeClassPool.poll();
if (memory != null) {
return recoverMemoryIntoBuffer(memory)
.reset()
.readOnly(false)
.fill((byte) 0)
@ -71,10 +71,10 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
if (CLOSE.compareAndSet(this, false, true)) {
var capturedExceptions = new ArrayList<Exception>(4);
pool.forEach((k, v) -> {
Send<Buffer> send;
while ((send = v.poll()) != null) {
Object memory;
while ((memory = v.poll()) != null) {
try {
send.receive().close();
dispose(recoverMemoryIntoBuffer(memory));
} catch (Exception e) {
capturedExceptions.add(e);
}
@ -94,12 +94,13 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
dispose(buf);
return;
}
var sizeClassPool = getSizeClassPool(buf.capacity());
sizeClassPool.offer(buf.send());
Object mem = manager.unwrapRecoverableMemory(buf);
var sizeClassPool = getSizeClassPool(manager.capacityOfRecoverableMemory(mem));
sizeClassPool.offer(mem);
if (closed) {
Send<Buffer> send;
while ((send = sizeClassPool.poll()) != null) {
send.receive().close();
Object memory;
while ((memory = sizeClassPool.poll()) != null) {
dispose(recoverMemoryIntoBuffer(memory));
}
}
}
@ -107,27 +108,28 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
@Override
public Object allocateUntethered(Buffer originator, int size) {
var sizeClassPool = getSizeClassPool(size);
Send<Buffer> send = sizeClassPool.poll();
Buffer untetheredBuf;
if (send != null) {
var transfer = (TransferSend<Buffer, Buffer>) send;
var owned = transfer.unsafeUnwrapOwned();
untetheredBuf = owned.transferOwnership(NO_OP_DROP);
} else {
untetheredBuf = createBuf(size, NO_OP_DROP);
Object memory = sizeClassPool.poll();
if (memory == null) {
Buffer untetheredBuf = createBuf(size, NO_OP_DROP);
memory = manager.unwrapRecoverableMemory(untetheredBuf);
}
return manager.unwrapRecoverableMemory(untetheredBuf);
return memory;
}
@Override
public void recoverMemory(Object memory) {
var drop = getDrop();
var buf = manager.recoverMemory(memory, drop);
drop.attach(buf);
Buffer buf = recoverMemoryIntoBuffer(memory);
buf.close();
}
private ConcurrentLinkedQueue<Send<Buffer>> getSizeClassPool(int size) {
private Buffer recoverMemoryIntoBuffer(Object memory) {
var drop = getDrop();
var buf = manager.recoverMemory(memory, drop);
drop.attach(buf);
return buf;
}
private ConcurrentLinkedQueue<Object> getSizeClassPool(int size) {
return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>());
}

View File

@ -34,7 +34,7 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager {
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuffer(segment, convert(drop), alloc);
return new MemSegBuffer(segment, segment, convert(drop), alloc);
}
@Override
@ -43,7 +43,7 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager {
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuffer(segment, convert(drop), alloc);
return new MemSegBuffer(segment, segment, convert(drop), alloc);
}
protected abstract MemorySegment createSegment(long size);
@ -59,6 +59,11 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager {
return b.recoverableMemory();
}
@Override
public int capacityOfRecoverableMemory(Object memory) {
return ((RecoverableMemory) memory).capacity();
}
@Override
public Buffer recoverMemory(Object recoverableMemory, Drop<Buffer> drop) {
var recovery = (RecoverableMemory) recoverableMemory;

View File

@ -20,33 +20,47 @@ import io.netty.buffer.api.Drop;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
class BifurcatedDrop implements Drop<MemSegBuffer> {
final class ArcDrop implements Drop<MemSegBuffer> {
private static final VarHandle COUNT;
static {
try {
COUNT = MethodHandles.lookup().findVarHandle(BifurcatedDrop.class, "count", int.class);
COUNT = MethodHandles.lookup().findVarHandle(ArcDrop.class, "count", int.class);
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
private final MemSegBuffer originalBuf;
private final Drop<MemSegBuffer> delegate;
@SuppressWarnings("FieldMayBeFinal")
private volatile int count;
BifurcatedDrop(MemSegBuffer originalBuf, Drop<MemSegBuffer> delegate) {
this.originalBuf = originalBuf;
ArcDrop(Drop<MemSegBuffer> delegate) {
this.delegate = delegate;
count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop.
count = 1;
}
void increment() {
static Drop<MemSegBuffer> wrap(Drop<MemSegBuffer> drop) {
if (drop.getClass() == ArcDrop.class) {
return drop;
}
return new ArcDrop(drop);
}
static Drop<MemSegBuffer> acquire(Drop<MemSegBuffer> drop) {
if (drop.getClass() == ArcDrop.class) {
((ArcDrop) drop).increment();
return drop;
}
return new ArcDrop(drop);
}
ArcDrop increment() {
int c;
do {
c = count;
checkValidState(c);
} while (!COUNT.compareAndSet(this, c, c + 1));
return this;
}
@Override
@ -59,10 +73,8 @@ class BifurcatedDrop implements Drop<MemSegBuffer> {
checkValidState(c);
} while (!COUNT.compareAndSet(this, c, n));
if (n == 0) {
delegate.attach(originalBuf);
delegate.drop(originalBuf);
delegate.drop(buf);
}
buf.makeInaccessible();
}
@Override
@ -70,6 +82,14 @@ class BifurcatedDrop implements Drop<MemSegBuffer> {
delegate.attach(obj);
}
boolean isOwned() {
return count <= 1;
}
int countBorrows() {
return count - 1;
}
Drop<MemSegBuffer> unwrap() {
return delegate;
}

View File

@ -59,31 +59,58 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
CLOSED_SEGMENT = MemorySegment.ofArray(new byte[0]);
CLOSED_SEGMENT.close();
SEGMENT_CLOSE = buf -> {
try (var ignore = buf.seg) {
buf.makeInaccessible();
}
buf.base.close();
};
}
private final AllocatorControl alloc;
private final boolean isSendable;
private MemorySegment base;
private MemorySegment seg;
private MemorySegment wseg;
private ByteOrder order;
private int roff;
private int woff;
MemSegBuffer(MemorySegment segmet, Drop<MemSegBuffer> drop, AllocatorControl alloc) {
this(segmet, drop, alloc, true);
MemSegBuffer(MemorySegment baseSegment, MemorySegment viewSegment, Drop<MemSegBuffer> drop, AllocatorControl alloc) {
super(new MakeInaccisbleOnDrop(ArcDrop.wrap(drop)));
this.alloc = alloc;
base = baseSegment;
seg = viewSegment;
wseg = viewSegment;
order = ByteOrder.nativeOrder();
}
private MemSegBuffer(MemorySegment segment, Drop<MemSegBuffer> drop, AllocatorControl alloc, boolean isSendable) {
super(drop);
this.alloc = alloc;
seg = segment;
wseg = segment;
this.isSendable = isSendable;
order = ByteOrder.nativeOrder();
private static final class MakeInaccisbleOnDrop implements Drop<MemSegBuffer> {
final Drop<MemSegBuffer> delegate;
private MakeInaccisbleOnDrop(Drop<MemSegBuffer> delegate) {
this.delegate = delegate;
}
@Override
public void drop(MemSegBuffer buf) {
try {
delegate.drop(buf);
} finally {
buf.makeInaccessible();
}
}
@Override
public void attach(MemSegBuffer buf) {
delegate.attach(buf);
}
}
@Override
protected Drop<MemSegBuffer> unsafeGetDrop() {
MakeInaccisbleOnDrop drop = (MakeInaccisbleOnDrop) super.unsafeGetDrop();
return drop.delegate;
}
@Override
protected void unsafeSetDrop(Drop<MemSegBuffer> replacement) {
super.unsafeSetDrop(new MakeInaccisbleOnDrop(replacement));
}
@Override
@ -233,14 +260,12 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
if (length < 0) {
throw new IllegalArgumentException("Length cannot be negative: " + length + '.');
}
if (!isAccessible()) {
throw new IllegalStateException("This buffer is closed: " + this + '.');
}
var slice = seg.asSlice(offset, length);
acquire();
Drop<MemSegBuffer> drop = b -> {
close();
b.makeInaccessible();
};
var sendable = false; // Sending implies ownership change, which we can't do for slices.
return new MemSegBuffer(slice, drop, alloc, sendable)
Drop<MemSegBuffer> drop = ArcDrop.acquire(unsafeGetDrop());
return new MemSegBuffer(base, slice, drop, alloc)
.writerOffset(length)
.order(order())
.readOnly(readOnly());
@ -482,19 +507,22 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
// Release old memory segment:
var drop = unsafeGetDrop();
if (drop instanceof BifurcatedDrop) {
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
if (drop instanceof ArcDrop) {
// Disconnect from the current arc drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
drop = ((BifurcatedDrop) drop).unwrap();
unsafeSetDrop(drop);
while (drop instanceof ArcDrop) {
drop = ((ArcDrop) drop).unwrap();
}
unsafeSetDrop(new ArcDrop(drop));
this.roff = roff;
this.woff = woff;
} else {
alloc.recoverMemory(recoverableMemory());
}
base = newSegment;
seg = newSegment;
wseg = newSegment;
drop.attach(this);
@ -505,19 +533,10 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
if (!isOwned()) {
throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned."));
}
var drop = unsafeGetDrop();
if (seg.ownerThread() != null) {
seg = seg.share();
drop.attach(this);
}
if (drop instanceof BifurcatedDrop) {
((BifurcatedDrop) drop).increment();
} else {
drop = new BifurcatedDrop(new MemSegBuffer(seg, drop, alloc), drop);
unsafeSetDrop(drop);
}
var drop = (ArcDrop) unsafeGetDrop();
unsafeSetDrop(new ArcDrop(drop));
var bifurcatedSeg = seg.asSlice(0, woff);
var bifurcatedBuf = new MemSegBuffer(bifurcatedSeg, drop, alloc);
var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop(drop.increment()), alloc);
bifurcatedBuf.woff = woff;
bifurcatedBuf.roff = roff;
bifurcatedBuf.order(order);
@ -1052,11 +1071,12 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
var readOnly = readOnly();
boolean isConfined = seg.ownerThread() == null;
MemorySegment transferSegment = isConfined? seg : seg.share();
MemorySegment base = this.base;
makeInaccessible();
return new Owned<MemSegBuffer>() {
@Override
public MemSegBuffer transferOwnership(Drop<MemSegBuffer> drop) {
MemSegBuffer copy = new MemSegBuffer(transferSegment, drop, alloc);
MemSegBuffer copy = new MemSegBuffer(base, transferSegment, drop, alloc);
copy.order = order;
copy.roff = roff;
copy.woff = woff;
@ -1067,6 +1087,7 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
}
void makeInaccessible() {
base = CLOSED_SEGMENT;
seg = CLOSED_SEGMENT;
wseg = CLOSED_SEGMENT;
roff = 0;
@ -1074,17 +1095,13 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
}
@Override
protected IllegalStateException notSendableException() {
if (!isSendable) {
return new IllegalStateException(
"Cannot send() this buffer. This buffer might be a slice of another buffer.");
}
return super.notSendableException();
public boolean isOwned() {
return super.isOwned() && ((ArcDrop) unsafeGetDrop()).isOwned();
}
@Override
public boolean isOwned() {
return isSendable && super.isOwned();
public int countBorrows() {
return super.countBorrows() + ((ArcDrop) unsafeGetDrop()).countBorrows();
}
private void checkRead(int index, int size) {
@ -1153,7 +1170,7 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
}
Object recoverableMemory() {
return new RecoverableMemory(seg, alloc);
return new RecoverableMemory(base, alloc);
}
// <editor-fold name="BufferIntegratable methods">
@ -1226,7 +1243,11 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
}
Buffer recover(Drop<MemSegBuffer> drop) {
return new MemSegBuffer(segment, drop, alloc);
return new MemSegBuffer(segment, segment, ArcDrop.acquire(drop), alloc);
}
int capacity() {
return (int) segment.byteSize();
}
}
}

View File

@ -926,6 +926,51 @@ public class BufferTest {
}
}
@ParameterizedTest
@MethodSource("nonCompositeAllocators")
public void acquireComposingAndSlicingMustIncrementBorrowsWithData(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator();
Buffer buf = allocator.allocate(8)) {
buf.writeByte((byte) 1);
int borrows = buf.countBorrows();
try (Buffer ignored = buf.acquire()) {
assertEquals(borrows + 1, buf.countBorrows());
try (Buffer slice = buf.slice()) {
assertEquals(1, slice.capacity());
int sliceBorrows = slice.countBorrows();
assertEquals(borrows + 2, buf.countBorrows());
try (Buffer ignored1 = Buffer.compose(allocator, buf, slice)) {
assertEquals(borrows + 3, buf.countBorrows());
assertEquals(sliceBorrows + 1, slice.countBorrows());
}
assertEquals(sliceBorrows, slice.countBorrows());
assertEquals(borrows + 2, buf.countBorrows());
}
assertEquals(borrows + 1, buf.countBorrows());
}
assertEquals(borrows, buf.countBorrows());
}
}
@Disabled
@ParameterizedTest
@MethodSource("allocators")
public void sliceMustBecomeOwnedOnSourceBufferClose(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer buf = allocator.allocate(8);
buf.writeInt(42);
try (Buffer slice = buf.slice()) {
buf.close();
assertFalse(buf.isAccessible());
assertTrue(slice.isOwned());
try (Buffer receive = slice.send().receive()) {
assertTrue(receive.isOwned());
assertFalse(slice.isAccessible());
}
}
}
}
@ParameterizedTest
@MethodSource("allocators")
void copyIntoByteArray(Fixture fixture) {