Hide the memory segment buffer implementation behind the MemoryManager interface

Motivation:
We'd like to separate the API and the implementation, so we can make other implementations in the future.
This will allow us to deliver the API changes without the new MemorySegment implementation.

Modification:
The MemoryManager interface abstracts away the implementation details of the concrete buffer type, and how to allocate and deallocate them.

Result:
Knowledge of MemorySegments are now confined to just the MemSegBuf and MemoryManager implementation classes.
This commit is contained in:
Chris Vest 2020-10-07 11:07:34 +02:00
parent cfb45597f0
commit 0aed8daad4
7 changed files with 182 additions and 94 deletions

View File

@ -15,12 +15,8 @@
*/
package io.netty.buffer.b2;
import jdk.incubator.foreign.MemorySegment;
import java.nio.ByteOrder;
import static io.netty.buffer.b2.BBuf.SEGMENT_CLOSE;
/**
* Interface for {@link Buf} allocators.
*/
@ -72,85 +68,54 @@ public interface Allocator extends AutoCloseable {
}
static Allocator heap() {
var man = MemoryManager.getHeapMemoryManager();
return new Allocator() {
@Override
public BBuf allocate(long size) {
public Buf allocate(long size) {
checkSize(size);
var segment = allocateHeap(size);
return new BBuf(segment, SEGMENT_CLOSE);
return man.allocateConfined(size, man.drop());
}
};
}
static Allocator direct() {
var man = MemoryManager.getNativeMemoryManager();
return new Allocator() {
@Override
public Buf allocate(long size) {
checkSize(size);
var segment = allocateNative(size);
return new BBuf(segment, SEGMENT_CLOSE);
return man.allocateConfined(size, man.drop());
}
};
}
static Allocator directWithCleaner() {
var man = MemoryManager.getNativeMemoryManager();
return new Allocator() {
@Override
public Buf allocate(long size) {
checkSize(size);
var segment = allocateNative(size);
segment.registerCleaner(Statics.CLEANER);
return new BBuf(segment, SEGMENT_CLOSE);
}
};
}
static Allocator pooledHeap() {
return new SizeClassedMemoryPool() {
@Override
protected MemorySegment createMemorySegment(long size) {
checkSize(size);
return allocateHeap(size).withOwnerThread(null);
}
};
}
static Allocator pooledDirect() {
return new SizeClassedMemoryPool() {
@Override
protected MemorySegment createMemorySegment(long size) {
checkSize(size);
return allocateNative(size).withOwnerThread(null);
}
};
}
static Allocator pooledDirectWithCleaner() {
return new SizeClassedMemoryPool() {
@Override
protected MemorySegment createMemorySegment(long size) {
checkSize(size);
return allocateNative(size).withOwnerThread(null);
}
@Override
protected BBuf createBBuf(MemorySegment segment) {
var drop = new NativeMemoryCleanerDrop(this, getDrop());
var buf = new BBuf(segment, drop);
drop.accept(buf);
var buf = man.allocateConfined(size, man.drop());
man.registerCleaner(buf, Statics.CLEANER);
return buf;
}
};
}
private static MemorySegment allocateHeap(long size) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
static Allocator pooledHeap() {
return new SizeClassedMemoryPool(MemoryManager.getHeapMemoryManager());
}
private static MemorySegment allocateNative(long size) {
var segment = MemorySegment.allocateNative(size)
.withCleanupAction(Statics.getCleanupAction(size));
Statics.MEM_USAGE_NATIVE.add(size);
return segment;
static Allocator pooledDirect() {
return new SizeClassedMemoryPool(MemoryManager.getNativeMemoryManager());
}
static Allocator pooledDirectWithCleaner() {
return new SizeClassedMemoryPool(MemoryManager.getNativeMemoryManager()) {
@Override
protected Drop<Buf> getDrop() {
return new NativeMemoryCleanerDrop(this, getMemoryManager(), super.getDrop());
}
};
}
}

View File

@ -48,14 +48,14 @@ import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset_LE;
import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset_BE;
import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset_LE;
class BBuf extends RcSupport<Buf, BBuf> implements Buf {
static final Drop<BBuf> SEGMENT_CLOSE = buf -> buf.seg.close();
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
static final Drop<MemSegBuf> SEGMENT_CLOSE = buf -> buf.seg.close();
final MemorySegment seg;
private boolean isBigEndian;
private int roff;
private int woff;
BBuf(MemorySegment segment, Drop<BBuf> drop) {
MemSegBuf(MemorySegment segment, Drop<MemSegBuf> drop) {
super(drop);
seg = segment;
isBigEndian = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
@ -83,7 +83,7 @@ class BBuf extends RcSupport<Buf, BBuf> implements Buf {
}
@Override
public BBuf readerIndex(int index) {
public MemSegBuf readerIndex(int index) {
checkRead(index, 0);
roff = index;
return this;
@ -95,7 +95,7 @@ class BBuf extends RcSupport<Buf, BBuf> implements Buf {
}
@Override
public BBuf writerIndex(int index) {
public MemSegBuf writerIndex(int index) {
checkWrite(index, 0);
woff = index;
return this;
@ -1181,15 +1181,15 @@ getByteAtOffset_BE(seg, roff) & 0xFF |
// ### CODEGEN END primitive accessors implementation
@Override
protected Owned<BBuf> prepareSend() {
BBuf outer = this;
protected Owned<MemSegBuf> prepareSend() {
MemSegBuf outer = this;
boolean isConfined = seg.ownerThread() == null;
MemorySegment transferSegment = isConfined? seg : seg.withOwnerThread(null);
return new Owned<BBuf>() {
return new Owned<MemSegBuf>() {
@Override
public BBuf transferOwnership(Thread recipient, Drop<BBuf> drop) {
public MemSegBuf transferOwnership(Thread recipient, Drop<MemSegBuf> drop) {
var newSegment = isConfined? transferSegment.withOwnerThread(recipient) : transferSegment;
BBuf copy = new BBuf(newSegment, drop);
MemSegBuf copy = new MemSegBuf(newSegment, drop);
copy.isBigEndian = outer.isBigEndian;
copy.roff = outer.roff;
copy.woff = outer.woff;

View File

@ -0,0 +1,112 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.b2;
import jdk.incubator.foreign.MemorySegment;
import java.lang.ref.Cleaner;
public interface MemoryManager {
static MemoryManager getHeapMemoryManager() {
return new HeapMemorySegmentManager();
}
static MemoryManager getNativeMemoryManager() {
return new NativeMemorySegmentManager();
}
boolean isNative();
Buf allocateConfined(long size, Drop<Buf> drop);
Buf allocateShared(long size, Drop<Buf> drop);
Drop<Buf> drop();
void registerCleaner(Buf buf, Cleaner cleaner);
Object unwrapRecoverableMemory(Buf buf);
Buf recoverMemory(Object recoverableMemory, Drop<Buf> drop);
abstract class MemorySegmentManager implements MemoryManager {
@Override
public abstract boolean isNative();
@Override
public Buf allocateConfined(long size, Drop<Buf> drop) {
var segment = createSegment(size);
return new MemSegBuf(segment, convert(drop));
}
@Override
public Buf allocateShared(long size, Drop<Buf> drop) {
var segment = createSegment(size).withOwnerThread(null);
return new MemSegBuf(segment, convert(drop));
}
protected abstract MemorySegment createSegment(long size);
@Override
public Drop<Buf> drop() {
return convert(MemSegBuf.SEGMENT_CLOSE);
}
@Override
public void registerCleaner(Buf buf, Cleaner cleaner) {
var b = (MemSegBuf) buf;
b.seg.registerCleaner(cleaner);
}
@Override
public Object unwrapRecoverableMemory(Buf buf) {
var b = (MemSegBuf) buf;
return b.seg;
}
@Override
public Buf recoverMemory(Object recoverableMemory, Drop<Buf> drop) {
var segment = (MemorySegment) recoverableMemory;
return new MemSegBuf(segment, convert(drop));
}
@SuppressWarnings("unchecked")
private static <T,R> Drop<R> convert(Drop<T> drop) {
return (Drop<R>) drop;
}
}
class HeapMemorySegmentManager extends MemorySegmentManager {
@Override
public boolean isNative() {
return false;
}
@Override
protected MemorySegment createSegment(long size) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
}
}
class NativeMemorySegmentManager extends MemorySegmentManager {
@Override
public boolean isNative() {
return true;
}
@Override
protected MemorySegment createSegment(long size) {
var segment = MemorySegment.allocateNative(size)
.withCleanupAction(Statics.getCleanupAction(size));
Statics.MEM_USAGE_NATIVE.add(size);
return segment;
}
}
}

View File

@ -20,24 +20,28 @@ import java.lang.ref.Cleaner.Cleanable;
import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.buffer.b2.Statics.*;
import static java.lang.invoke.MethodHandles.*;
import static io.netty.buffer.b2.Statics.CLEANER;
import static io.netty.buffer.b2.Statics.findVarHandle;
import static java.lang.invoke.MethodHandles.lookup;
class NativeMemoryCleanerDrop implements Drop<BBuf> {
class NativeMemoryCleanerDrop implements Drop<Buf> {
private static final VarHandle CLEANABLE =
findVarHandle(lookup(), NativeMemoryCleanerDrop.class, "cleanable", GatedCleanable.class);
private final SizeClassedMemoryPool pool;
private final Drop<BBuf> delegate;
private final MemoryManager manager;
private final Drop<Buf> delegate;
@SuppressWarnings("unused")
private volatile GatedCleanable cleanable;
NativeMemoryCleanerDrop(SizeClassedMemoryPool pool, Drop<BBuf> delegate) {
NativeMemoryCleanerDrop(SizeClassedMemoryPool pool, MemoryManager manager,
Drop<Buf> delegate) {
this.pool = pool;
this.manager = manager;
this.delegate = delegate;
}
@Override
public void drop(BBuf buf) {
public void drop(Buf buf) {
GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null);
if (c != null) {
c.clean();
@ -45,7 +49,7 @@ class NativeMemoryCleanerDrop implements Drop<BBuf> {
}
@Override
public void accept(BBuf buf) {
public void accept(Buf buf) {
// Unregister old cleanable, if any, to avoid uncontrolled build-up.
GatedCleanable c = (GatedCleanable) CLEANABLE.getAndSet(this, null);
if (c != null) {
@ -54,15 +58,15 @@ class NativeMemoryCleanerDrop implements Drop<BBuf> {
}
var pool = this.pool;
var seg = buf.seg;
var mem = manager.unwrapRecoverableMemory(buf);
var delegate = this.delegate;
WeakReference<BBuf> ref = new WeakReference<>(buf);
WeakReference<Buf> ref = new WeakReference<>(buf);
AtomicBoolean gate = new AtomicBoolean();
cleanable = new GatedCleanable(gate, CLEANER.register(this, () -> {
if (gate.compareAndSet(false, true)) {
BBuf b = ref.get();
Buf b = ref.get();
if (b == null) {
pool.recoverLostSegment(seg);
pool.recoverLostMemory(mem);
} else {
delegate.drop(b);
}

View File

@ -15,8 +15,6 @@
*/
package io.netty.buffer.b2;
import jdk.incubator.foreign.MemorySegment;
import java.lang.invoke.VarHandle;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
@ -24,38 +22,44 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import static java.lang.invoke.MethodHandles.lookup;
abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
class SizeClassedMemoryPool implements Allocator, Drop<Buf> {
private static final VarHandle CLOSE = Statics.findVarHandle(
lookup(), SizeClassedMemoryPool.class, "closed", boolean.class);
private final MemoryManager manager;
private final ConcurrentHashMap<Long, ConcurrentLinkedQueue<Send<Buf>>> pool;
@SuppressWarnings("unused")
private volatile boolean closed;
protected SizeClassedMemoryPool() {
protected SizeClassedMemoryPool(MemoryManager manager) {
this.manager = manager;
pool = new ConcurrentHashMap<>();
}
@Override
public Buf allocate(long size) {
Allocator.checkSize(size);
var sizeClassPool = getSizeClassPool(size);
Send<Buf> send = sizeClassPool.poll();
if (send != null) {
return send.receive();
}
var segment = createMemorySegment(size);
return createBBuf(segment);
return createBuf(size, getDrop());
}
protected BBuf createBBuf(MemorySegment segment) {
return new BBuf(segment, getDrop());
protected MemoryManager getMemoryManager() {
return manager;
}
protected SizeClassedMemoryPool getDrop() {
protected Buf createBuf(long size, Drop<Buf> drop) {
var buf = manager.allocateShared(size, drop);
drop.accept(buf);
return buf;
}
protected Drop<Buf> getDrop() {
return this;
}
protected abstract MemorySegment createMemorySegment(long size);
@Override
public void close() {
if (CLOSE.compareAndSet(this, false, true)) {
@ -79,7 +83,7 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
}
@Override
public void drop(BBuf buf) {
public void drop(Buf buf) {
var sizeClassPool = getSizeClassPool(buf.capacity());
sizeClassPool.offer(buf.send());
if (closed) {
@ -90,8 +94,11 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
}
}
void recoverLostSegment(MemorySegment segment) {
createBBuf(segment).close();
void recoverLostMemory(Object memory) {
var drop = getDrop();
var buf = manager.recoverMemory(memory, drop);
drop.accept(buf);
buf.close();
}
private ConcurrentLinkedQueue<Send<Buf>> getSizeClassPool(long size) {
@ -99,6 +106,6 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
}
private static void dispose(Buf buf) {
BBuf.SEGMENT_CLOSE.drop((BBuf) buf);
MemSegBuf.SEGMENT_CLOSE.drop((MemSegBuf) buf);
}
}

View File

@ -570,7 +570,7 @@ public final class Codegen {
public static void main(String[] args) throws Exception {
generateCodeInline(Path.of("buffer/src/main/java/io/netty/buffer/b2/Buf.java"));
generateCodeInline(Path.of("buffer/src/main/java/io/netty/buffer/b2/BBuf.java"));
generateCodeInline(Path.of("buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java"));
generateCodeInline(Path.of("buffer/src/test/java/io/netty/buffer/b2/BufTest.java"));
}

View File

@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit;
@Fork(1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class BBufAccessBenchmark extends AbstractMicrobenchmark {
public class MemSegBufAccessBenchmark extends AbstractMicrobenchmark {
public enum BBufType {
UNSAFE {
@Override