Move the MemorySegment based Buf implementation to its own package, and break the remaining bits of tight coupling.

This commit is contained in:
Chris Vest 2020-11-17 15:53:40 +01:00
parent 3efa93841e
commit a1785e8161
10 changed files with 163 additions and 96 deletions

View File

@ -15,8 +15,8 @@
*/
package io.netty.buffer.api;
import io.netty.buffer.api.MemSegBuf.RecoverableMemory;
import jdk.incubator.foreign.MemorySegment;
import io.netty.buffer.api.memseg.HeapMemorySegmentManager;
import io.netty.buffer.api.memseg.NativeMemorySegmentManager;
import java.lang.ref.Cleaner;
@ -35,78 +35,4 @@ public interface MemoryManager {
Drop<Buf> drop();
Object unwrapRecoverableMemory(Buf buf);
Buf recoverMemory(Object recoverableMemory, Drop<Buf> drop);
abstract class MemorySegmentManager implements MemoryManager {
@Override
public abstract boolean isNative();
@Override
public Buf allocateConfined(AllocatorControl alloc, long size, Drop<Buf> drop, Cleaner cleaner) {
var segment = createSegment(size);
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuf(segment, convert(drop), alloc);
}
@Override
public Buf allocateShared(AllocatorControl alloc, long size, Drop<Buf> drop, Cleaner cleaner) {
var segment = createSegment(size).share();
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuf(segment, convert(drop), alloc);
}
protected abstract MemorySegment createSegment(long size);
@Override
public Drop<Buf> drop() {
return convert(MemSegBuf.SEGMENT_CLOSE);
}
@Override
public Object unwrapRecoverableMemory(Buf buf) {
var b = (MemSegBuf) buf;
return b.recoverableMemory();
}
@Override
public Buf recoverMemory(Object recoverableMemory, Drop<Buf> drop) {
var recovery = (RecoverableMemory) recoverableMemory;
return recovery.recover(convert(drop));
}
@SuppressWarnings("unchecked")
private static <T, R> Drop<R> convert(Drop<T> drop) {
return (Drop<R>) drop;
}
}
class HeapMemorySegmentManager extends MemorySegmentManager {
@Override
public boolean isNative() {
return false;
}
@Override
protected MemorySegment createSegment(long size) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
}
}
class NativeMemorySegmentManager extends MemorySegmentManager {
@Override
public boolean isNative() {
return true;
}
@Override
protected MemorySegment createSegment(long size) {
var segment = MemorySegment.allocateNative(size);
// .withCleanupAction(Statics.getCleanupAction(size));
Statics.MEM_USAGE_NATIVE.add(size);
return segment;
}
}
}

View File

@ -19,7 +19,7 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
private int acquires; // Closed if negative.
private final Drop<T> drop;
RcSupport(Drop<T> drop) {
protected RcSupport(Drop<T> drop) {
this.drop = drop;
}

View File

@ -122,7 +122,7 @@ class SizeClassedMemoryPool implements Allocator, AllocatorControl, Drop<Buf> {
return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>());
}
private static void dispose(Buf buf) {
MemSegBuf.SEGMENT_CLOSE.drop((MemSegBuf) buf);
private void dispose(Buf buf) {
manager.drop().drop(buf);
}
}

View File

@ -18,13 +18,9 @@ package io.netty.buffer.api;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.invoke.VarHandle;
import java.lang.ref.Cleaner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
interface Statics {
Cleaner CLEANER = Cleaner.create();
LongAdder MEM_USAGE_NATIVE = new LongAdder();
ConcurrentHashMap<Long, Runnable> CLEANUP_ACTIONS = new ConcurrentHashMap<>();
Drop<Buf> NO_OP_DROP = buf -> {
};
@ -35,8 +31,4 @@ interface Statics {
throw new ExceptionInInitializerError(e);
}
}
static Runnable getCleanupAction(long size) {
return CLEANUP_ACTIONS.computeIfAbsent(size, s -> () -> MEM_USAGE_NATIVE.add(-s));
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.memseg;
import io.netty.buffer.api.AllocatorControl;
import io.netty.buffer.api.Buf;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.MemoryManager;
import io.netty.buffer.api.memseg.MemSegBuf.RecoverableMemory;
import jdk.incubator.foreign.MemorySegment;
import java.lang.ref.Cleaner;
public abstract class AbstractMemorySegmentManager implements MemoryManager {
@Override
public abstract boolean isNative();
@Override
public Buf allocateConfined(AllocatorControl alloc, long size, Drop<Buf> drop, Cleaner cleaner) {
var segment = createSegment(size);
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuf(segment, convert(drop), alloc);
}
@Override
public Buf allocateShared(AllocatorControl alloc, long size, Drop<Buf> drop, Cleaner cleaner) {
var segment = createSegment(size).share();
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuf(segment, convert(drop), alloc);
}
protected abstract MemorySegment createSegment(long size);
@Override
public Drop<Buf> drop() {
return convert(MemSegBuf.SEGMENT_CLOSE);
}
@Override
public Object unwrapRecoverableMemory(Buf buf) {
var b = (MemSegBuf) buf;
return b.recoverableMemory();
}
@Override
public Buf recoverMemory(Object recoverableMemory, Drop<Buf> drop) {
var recovery = (RecoverableMemory) recoverableMemory;
return recovery.recover(convert(drop));
}
@SuppressWarnings("unchecked")
private static <T, R> Drop<R> convert(Drop<T> drop) {
return (Drop<R>) drop;
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.memseg;
import jdk.incubator.foreign.MemorySegment;
public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
@Override
public boolean isNative() {
return false;
}
@Override
protected MemorySegment createSegment(long size) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
}
}

View File

@ -13,8 +13,15 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
package io.netty.buffer.api.memseg;
import io.netty.buffer.api.Allocator;
import io.netty.buffer.api.AllocatorControl;
import io.netty.buffer.api.Buf;
import io.netty.buffer.api.ByteIterator;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned;
import io.netty.buffer.api.RcSupport;
import jdk.incubator.foreign.MemorySegment;
import java.nio.ByteBuffer;

View File

@ -0,0 +1,43 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.memseg;
import jdk.incubator.foreign.MemorySegment;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
public class NativeMemorySegmentManager extends AbstractMemorySegmentManager {
public static final LongAdder MEM_USAGE_NATIVE = new LongAdder();
private static final ConcurrentHashMap<Long, Runnable> CLEANUP_ACTIONS = new ConcurrentHashMap<>();
static Runnable getCleanupAction(long size) {
return CLEANUP_ACTIONS.computeIfAbsent(size, s -> () -> MEM_USAGE_NATIVE.add(-s));
}
@Override
public boolean isNative() {
return true;
}
@Override
protected MemorySegment createSegment(long size) {
var segment = MemorySegment.allocateNative(size);
// .withCleanupAction(Statics.getCleanupAction(size));
MEM_USAGE_NATIVE.add(size);
return segment;
}
}

View File

@ -16,6 +16,7 @@
package io.netty.buffer.api;
import io.netty.buffer.api.Fixture.Properties;
import io.netty.buffer.api.memseg.NativeMemorySegmentManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
@ -1344,7 +1345,7 @@ public class BufTest {
class CleanerTests {
@Disabled("Precise native memory accounting does not work since recent panama-foreign changes.")
@ParameterizedTest
@MethodSource("io.netty.buffer.b2.BufTest#directWithCleanerAllocators")
@MethodSource("io.netty.buffer.api.BufTest#directWithCleanerAllocators")
public void bufferMustBeClosedByCleaner(Fixture fixture) throws InterruptedException {
var allocator = fixture.createAllocator();
allocator.close();
@ -1355,7 +1356,7 @@ public class BufTest {
System.gc();
System.runFinalization();
}
var sum = Statics.MEM_USAGE_NATIVE.sum();
var sum = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum();
var totalAllocated = (long) allocationSize * iterations;
assertThat(sum).isLessThan(totalAllocated);
}
@ -1366,7 +1367,7 @@ public class BufTest {
@Disabled("Precise native memory accounting does not work since recent panama-foreign changes.")
@ParameterizedTest
@MethodSource("io.netty.buffer.b2.BufTest#directPooledWithCleanerAllocators")
@MethodSource("io.netty.buffer.api.BufTest#directPooledWithCleanerAllocators")
public void buffersMustBeReusedByPoolingAllocatorEvenWhenDroppedByCleanerInsteadOfExplicitly(Fixture fixture)
throws InterruptedException {
try (var allocator = fixture.createAllocator()) {
@ -1377,7 +1378,7 @@ public class BufTest {
System.gc();
System.runFinalization();
}
var sum = Statics.MEM_USAGE_NATIVE.sum();
var sum = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum();
var totalAllocated = (long) allocationSize * iterations;
assertThat(sum).isLessThan(totalAllocated);
}

View File

@ -56,10 +56,6 @@ public final class Fixture implements Supplier<Allocator> {
return properties.contains(Properties.DIRECT);
}
public boolean isComposite() {
return properties.contains(Properties.COMPOSITE);
}
public boolean isPooled() {
return properties.contains(Properties.POOLED);
}