Fix a bug in Buf.ensureWritable for pooled buffers

Motivation:
Resource lifetime was not correctly handled.

Modification:
We cannot call drop(buf) on a pooled buffer in order to release its memory from within ensureWritable, because that will send() the buffer back to the pool, which implies closing the buffer instance.
Instead, ensureWritable has to always work with untethered memory, so new APIs are added to AllocatorControl for releasing untethered memory.
The implementation already existed, because it was used by NativeMemoryCleanerDrop.

Result:
Buf.ensureWritable no longer closes pooled buffers.
This commit is contained in:
Chris Vest 2020-11-16 19:27:21 +01:00
parent 9c54aa43b4
commit a535fb8cd8
8 changed files with 83 additions and 8 deletions

View File

@ -33,4 +33,13 @@ public interface AllocatorControl {
* @return A "recoverable memory" object that is the requested allocation.
*/
Object allocateUntethered(Buf originator, int size);
/**
* Return memory to the allocator, after it has been untethered from it's lifetime.
* This either happens if the memory has leaked and been re-captured, or if it is no longer in use by a buffer
* through {@link Buf#ensureWritable(int)}.
*
* @param memory The untethered memory to return to the allocator.
*/
void recoverMemory(Object memory);
}

View File

@ -477,12 +477,11 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
if (writableBytes() < size) {
long newSize = capacity() + (long) size;
Allocator.checkSize(newSize);
int minBumpSize = 256;
int growth = Math.min(Integer.MAX_VALUE - 8, Math.min(size - writableBytes(), minBumpSize));
int growth = size - writableBytes();
if (bufs.length == 0) {
bufs = new Buf[] { allocator.allocate(growth) };
} else if (bufs[bufs.length - 1].capacity() < minBumpSize) {
bufs[bufs.length - 1].ensureWritable(growth);
// } else if (bufs[bufs.length - 1].capacity() + growth < minBumpSize) {
// bufs[bufs.length - 1].ensureWritable(growth);
} else {
bufs = Arrays.copyOf(bufs, bufs.length + 1);
bufs[bufs.length - 1] = allocator.allocate(growth);

View File

@ -40,4 +40,10 @@ class ManagedAllocator implements Allocator, AllocatorControl {
var buf = manager.allocateConfined(this, size, NO_OP_DROP, null);
return manager.unwrapRecoverableMemory(buf);
}
@Override
public void recoverMemory(Object memory) {
// Free the recovered memory.
manager.recoverMemory(memory, manager.drop()).close();
}
}

View File

@ -310,12 +310,12 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.');
}
if (writableBytes() < size) {
long newSize = capacity() + (long) size;
long newSize = capacity() + size - (long) writableBytes();
Allocator.checkSize(newSize);
RecoverableMemory recoverableMemory = (RecoverableMemory) alloc.allocateUntethered(this, (int) newSize);
var newSegment = recoverableMemory.segment;
newSegment.copyFrom(seg);
unsafeGetDrop().drop(this); // Release old memory segment.
alloc.recoverMemory(recoverableMemory()); // Release old memory segment.
seg = newSegment;
unsafeGetDrop().accept(this);
}

View File

@ -66,7 +66,7 @@ class NativeMemoryCleanerDrop implements Drop<Buf> {
if (gate.compareAndSet(false, true)) {
Buf b = ref.get();
if (b == null) {
pool.recoverLostMemory(mem);
pool.recoverMemory(mem);
} else {
delegate.drop(b);
}

View File

@ -110,7 +110,8 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
return manager.unwrapRecoverableMemory(untetheredBuf);
}
void recoverLostMemory(Object memory) {
@Override
public void recoverMemory(Object memory) {
var drop = getDrop();
var buf = manager.recoverMemory(memory, drop);
drop.accept(buf);

View File

@ -145,6 +145,48 @@ public class BufTest {
}
};
}, COMPOSITE));
for (Fixture fixture : initFixtures) {
builder.add(new Fixture(fixture + ".ensureWritable", () -> {
var allocator = fixture.createAllocator();
return new Allocator() {
@Override
public Buf allocate(int size) {
if (size < 2) {
return allocator.allocate(size);
}
var buf = allocator.allocate(size - 1);
buf.ensureWritable(size);
return buf;
}
@Override
public void close() {
allocator.close();
}
};
}, fixture.getProperties()));
builder.add(new Fixture(fixture + ".compose.ensureWritable", () -> {
var allocator = fixture.createAllocator();
return new Allocator() {
@Override
public Buf allocate(int size) {
if (size < 2) {
return allocator.allocate(size);
}
var buf = allocator.compose();
buf.ensureWritable(size);
return buf;
}
@Override
public void close() {
allocator.close();
}
};
}, COMPOSITE));
}
return builder.build().flatMap(f -> {
// Inject slice versions of everything
Builder<Fixture> andSlices = Stream.builder();
@ -1450,6 +1492,20 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void mustBeAbleToSliceAfterEnsureWritable(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(4)) {
buf.ensureWritable(8);
assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8);
buf.writeLong(0x0102030405060708L);
try (Buf slice = buf.slice()) {
assertEquals(0x0102030405060708L, slice.readLong());
}
}
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors tests.">
@ParameterizedTest
@MethodSource("allocators")

View File

@ -44,6 +44,10 @@ public final class Fixture implements Supplier<Allocator> {
return name;
}
public Properties[] getProperties() {
return properties.toArray(Properties[]::new);
}
public boolean isHeap() {
return properties.contains(Properties.HEAP);
}