Add features to MemoryManager
The ability to allocate a buffer on a sub-region of some recoverable memory will be useful when porting over the arena-based pooling allocator from Netty.
This commit is contained in:
parent
fc7ba4522f
commit
e6a238b14d
|
@ -23,6 +23,9 @@ public interface MemoryManager {
|
|||
Drop<Buffer> drop();
|
||||
Object unwrapRecoverableMemory(Buffer buf);
|
||||
int capacityOfRecoverableMemory(Object memory);
|
||||
void discardRecoverableMemory(Object recoverableMemory);
|
||||
// todo should recoverMemory re-attach a cleaner?
|
||||
Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop<Buffer> drop);
|
||||
Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase,
|
||||
int offset, int length, Drop<Buffer> drop);
|
||||
}
|
||||
|
|
|
@ -82,7 +82,7 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
|
|||
Object memory;
|
||||
while ((memory = v.poll()) != null) {
|
||||
try {
|
||||
dispose(recoverMemoryIntoBuffer(memory));
|
||||
manager.discardRecoverableMemory(memory);
|
||||
} catch (Exception e) {
|
||||
capturedExceptions.add(e);
|
||||
}
|
||||
|
@ -99,7 +99,7 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
|
|||
@Override
|
||||
public void drop(Buffer buf) {
|
||||
if (closed) {
|
||||
dispose(buf);
|
||||
manager.drop().drop(buf);
|
||||
return;
|
||||
}
|
||||
Object mem = manager.unwrapRecoverableMemory(buf);
|
||||
|
@ -108,7 +108,7 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
|
|||
if (closed) {
|
||||
Object memory;
|
||||
while ((memory = sizeClassPool.poll()) != null) {
|
||||
dispose(recoverMemoryIntoBuffer(memory));
|
||||
manager.discardRecoverableMemory(memory);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -145,8 +145,4 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
|
|||
private ConcurrentLinkedQueue<Object> getSizeClassPool(int size) {
|
||||
return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>());
|
||||
}
|
||||
|
||||
private void dispose(Buffer buf) {
|
||||
manager.drop().drop(buf);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,9 +65,22 @@ public class ByteBufferMemoryManager implements MemoryManager {
|
|||
return ((ByteBuffer) memory).capacity();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void discardRecoverableMemory(Object recoverableMemory) {
|
||||
// ByteBuffers have their memory released by the GC, so there is nothing for us to do.
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop<Buffer> drop) {
|
||||
ByteBuffer memory = (ByteBuffer) recoverableMemory;
|
||||
return new NioBuffer(memory, memory, allocatorControl, convert(drop));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase,
|
||||
int offset, int length, Drop<Buffer> drop) {
|
||||
ByteBuffer memory = (ByteBuffer) recoverableMemoryBase;
|
||||
memory = memory.slice(offset, length);
|
||||
return new NioBuffer(memory, memory, allocatorControl, convert(drop));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import io.netty.buffer.api.Drop;
|
|||
import io.netty.buffer.api.MemoryManager;
|
||||
import io.netty.buffer.api.internal.ArcDrop;
|
||||
import jdk.incubator.foreign.MemorySegment;
|
||||
import jdk.incubator.foreign.ResourceScope;
|
||||
|
||||
import java.lang.ref.Cleaner;
|
||||
|
||||
|
@ -58,9 +59,26 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager {
|
|||
return (int) ((MemorySegment) memory).byteSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void discardRecoverableMemory(Object recoverableMemory) {
|
||||
var segment = (MemorySegment) recoverableMemory;
|
||||
ResourceScope scope = segment.scope();
|
||||
if (!scope.isImplicit()) {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop<Buffer> drop) {
|
||||
var segment = (MemorySegment) recoverableMemory;
|
||||
return new MemSegBuffer(segment, segment, convert(ArcDrop.acquire(drop)), allocatorControl);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase,
|
||||
int offset, int length, Drop<Buffer> drop) {
|
||||
var segment = (MemorySegment) recoverableMemoryBase;
|
||||
segment = segment.asSlice(offset, length);
|
||||
return new MemSegBuffer(segment, segment, convert(ArcDrop.acquire(drop)), allocatorControl);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,4 +25,8 @@ class UnsafeMemory {
|
|||
this.address = address;
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
public UnsafeMemory slice(int offset, int length) {
|
||||
return new UnsafeMemory(base, address + offset, length);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,9 +79,22 @@ public class UnsafeMemoryManager implements MemoryManager {
|
|||
return ((UnsafeMemory) memory).size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void discardRecoverableMemory(Object recoverableMemory) {
|
||||
// We cannot reliably drop unsafe memory. We have to rely on the cleaner to do that.
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop<Buffer> drop) {
|
||||
UnsafeMemory memory = (UnsafeMemory) recoverableMemory;
|
||||
return new UnsafeBuffer(memory, 0, memory.size, allocatorControl, convert(drop));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemoryBase,
|
||||
int offset, int length, Drop<Buffer> drop) {
|
||||
UnsafeMemory memory = (UnsafeMemory) recoverableMemoryBase;
|
||||
memory = memory.slice(offset, length);
|
||||
return new UnsafeBuffer(memory, 0, memory.size, allocatorControl, convert(drop));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue