Merge pull request #42 from netty/fix-after-refactoring
Update code to support bleeding edge MemorySegment APIs after the latest refactoring
This commit is contained in:
commit
e7f7335804
5
pom.xml
5
pom.xml
@ -70,7 +70,7 @@
|
||||
<javaModuleName>io.netty.incubator.buffer</javaModuleName>
|
||||
<netty.version>5.0.0.Final-SNAPSHOT</netty.version>
|
||||
<netty.build.version>29</netty.build.version>
|
||||
<java.version>16</java.version>
|
||||
<java.version>17</java.version>
|
||||
<junit.version>5.7.0</junit.version>
|
||||
<surefire.version>3.0.0-M5</surefire.version>
|
||||
<skipTests>false</skipTests>
|
||||
@ -173,6 +173,9 @@
|
||||
<argLine>${argLine.common} ${argLine.printGC} --add-modules jdk.incubator.foreign</argLine>
|
||||
<!-- Ensure the whole stacktrace is preserved when an exception is thrown. See https://issues.apache.org/jira/browse/SUREFIRE-1457 -->
|
||||
<trimStackTrace>false</trimStackTrace>
|
||||
<systemProperties>
|
||||
<sample>nosample</sample>
|
||||
</systemProperties>
|
||||
</configuration>
|
||||
<dependencies>
|
||||
<!-- Declare the surefire dynamic dependencies explicitly, to speed up the docker build. -->
|
||||
|
@ -37,7 +37,7 @@ class ManagedBufferAllocator implements BufferAllocator, AllocatorControl {
|
||||
@Override
|
||||
public Object allocateUntethered(Buffer originator, int size) {
|
||||
BufferAllocator.checkSize(size);
|
||||
var buf = manager.allocateShared(this, size, NO_OP_DROP, null);
|
||||
var buf = manager.allocateShared(this, size, NO_OP_DROP, cleaner);
|
||||
return manager.unwrapRecoverableMemory(buf);
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,6 @@ import java.lang.ref.Cleaner;
|
||||
|
||||
public interface MemoryManager {
|
||||
boolean isNative();
|
||||
Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner);
|
||||
Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner);
|
||||
Drop<Buffer> drop();
|
||||
Object unwrapRecoverableMemory(Buffer buf);
|
||||
|
@ -39,11 +39,6 @@ public class ByteBufferMemoryManager implements MemoryManager {
|
||||
return direct;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
|
||||
return allocateShared(allocatorControl, size, drop, cleaner);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
|
||||
int capacity = Math.toIntExact(size);
|
||||
|
@ -30,25 +30,13 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager {
|
||||
@Override
|
||||
public abstract boolean isNative();
|
||||
|
||||
@Override
|
||||
public Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
|
||||
var segment = createSegment(size);
|
||||
if (cleaner != null) {
|
||||
segment = segment.registerCleaner(cleaner);
|
||||
}
|
||||
return new MemSegBuffer(segment, segment, convert(drop), allocatorControl);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
|
||||
var segment = createSegment(size).share();
|
||||
if (cleaner != null) {
|
||||
segment = segment.registerCleaner(cleaner);
|
||||
}
|
||||
var segment = createSegment(size, cleaner);
|
||||
return new MemSegBuffer(segment, segment, convert(drop), allocatorControl);
|
||||
}
|
||||
|
||||
protected abstract MemorySegment createSegment(long size);
|
||||
protected abstract MemorySegment createSegment(long size, Cleaner cleaner);
|
||||
|
||||
@Override
|
||||
public Drop<Buffer> drop() {
|
||||
|
@ -17,6 +17,8 @@ package io.netty.buffer.api.memseg;
|
||||
|
||||
import jdk.incubator.foreign.MemorySegment;
|
||||
|
||||
import java.lang.ref.Cleaner;
|
||||
|
||||
public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
|
||||
@Override
|
||||
public boolean isNative() {
|
||||
@ -24,7 +26,7 @@ public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MemorySegment createSegment(long size) {
|
||||
protected MemorySegment createSegment(long size, Cleaner cleaner) {
|
||||
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
|
||||
}
|
||||
}
|
||||
|
@ -33,6 +33,7 @@ import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
|
||||
import io.netty.buffer.api.internal.ArcDrop;
|
||||
import io.netty.buffer.api.internal.Statics;
|
||||
import jdk.incubator.foreign.MemorySegment;
|
||||
import jdk.incubator.foreign.ResourceScope;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
@ -58,12 +59,20 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
|
||||
static final Drop<MemSegBuffer> SEGMENT_CLOSE;
|
||||
|
||||
static {
|
||||
CLOSED_SEGMENT = MemorySegment.ofArray(new byte[0]);
|
||||
CLOSED_SEGMENT.close();
|
||||
try (ResourceScope scope = ResourceScope.newSharedScope()) {
|
||||
// We are not allowed to allocate a zero-sized native buffer, but we *can* take a zero-sized slice from it.
|
||||
// We need the CLOSED_SEGMENT to have a size of zero, because we'll use its size for bounds checks after
|
||||
// the buffer is closed.
|
||||
MemorySegment segment = MemorySegment.allocateNative(1, scope);
|
||||
CLOSED_SEGMENT = segment.asSlice(0, 0);
|
||||
}
|
||||
SEGMENT_CLOSE = new Drop<MemSegBuffer>() {
|
||||
@Override
|
||||
public void drop(MemSegBuffer buf) {
|
||||
buf.base.close();
|
||||
ResourceScope scope = buf.base.scope();
|
||||
if (!scope.isImplicit()) {
|
||||
scope.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -298,16 +307,12 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
|
||||
|
||||
@Override
|
||||
public void copyInto(int srcPos, byte[] dest, int destPos, int length) {
|
||||
try (var target = MemorySegment.ofArray(dest)) {
|
||||
copyInto(srcPos, target, destPos, length);
|
||||
}
|
||||
copyInto(srcPos, MemorySegment.ofArray(dest), destPos, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) {
|
||||
try (var target = MemorySegment.ofByteBuffer(dest.duplicate().clear())) {
|
||||
copyInto(srcPos, target, destPos, length);
|
||||
}
|
||||
copyInto(srcPos, MemorySegment.ofByteBuffer(dest.duplicate().clear()), destPos, length);
|
||||
}
|
||||
|
||||
private void copyInto(int srcPos, MemorySegment dest, int destPos, int length) {
|
||||
@ -1078,8 +1083,7 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
|
||||
var roff = this.roff;
|
||||
var woff = this.woff;
|
||||
var readOnly = readOnly();
|
||||
boolean isConfined = seg.ownerThread() == null;
|
||||
MemorySegment transferSegment = isConfined? seg : seg.share(); // TODO remove confimenent checks
|
||||
MemorySegment transferSegment = seg;
|
||||
MemorySegment base = this.base;
|
||||
makeInaccessible();
|
||||
return new Owned<MemSegBuffer>() {
|
||||
|
@ -16,16 +16,40 @@
|
||||
package io.netty.buffer.api.memseg;
|
||||
|
||||
import jdk.incubator.foreign.MemorySegment;
|
||||
import jdk.incubator.foreign.ResourceScope;
|
||||
|
||||
import java.lang.ref.Cleaner;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static jdk.incubator.foreign.ResourceScope.newSharedScope;
|
||||
|
||||
public class NativeMemorySegmentManager extends AbstractMemorySegmentManager {
|
||||
public static final LongAdder MEM_USAGE_NATIVE = new LongAdder();
|
||||
private static final ConcurrentHashMap<Long, Runnable> CLEANUP_ACTIONS = new ConcurrentHashMap<>();
|
||||
private static final Function<Long, Runnable> CLEANUP_ACTION_MAKER = s -> new ReduceNativeMemoryUsage(s);
|
||||
|
||||
static Runnable getCleanupAction(long size) {
|
||||
return CLEANUP_ACTIONS.computeIfAbsent(size, s -> () -> MEM_USAGE_NATIVE.add(-s));
|
||||
return CLEANUP_ACTIONS.computeIfAbsent(size, CLEANUP_ACTION_MAKER);
|
||||
}
|
||||
|
||||
private static final class ReduceNativeMemoryUsage implements Runnable {
|
||||
private final long size;
|
||||
|
||||
private ReduceNativeMemoryUsage(long size) {
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
MEM_USAGE_NATIVE.add(-size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ReduceNativeMemoryUsage(by " + size + " bytes)";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -34,9 +58,10 @@ public class NativeMemorySegmentManager extends AbstractMemorySegmentManager {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MemorySegment createSegment(long size) {
|
||||
var segment = MemorySegment.allocateNative(size);
|
||||
// .withCleanupAction(Statics.getCleanupAction(size));
|
||||
protected MemorySegment createSegment(long size, Cleaner cleaner) {
|
||||
final ResourceScope scope = cleaner == null ? newSharedScope() : newSharedScope(cleaner);
|
||||
scope.addOnClose(getCleanupAction(size));
|
||||
var segment = MemorySegment.allocateNative(size, scope);
|
||||
MEM_USAGE_NATIVE.add(size);
|
||||
return segment;
|
||||
}
|
||||
|
@ -31,10 +31,14 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.nio.ReadOnlyBufferException;
|
||||
import java.text.ParseException;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.time.temporal.TemporalUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.SplittableRandom;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -43,6 +47,7 @@ import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.Stream.Builder;
|
||||
|
||||
@ -65,17 +70,52 @@ public class BufferTest {
|
||||
|
||||
private static final Memoize<Fixture[]> ALL_COMBINATIONS = new Memoize<>(
|
||||
() -> fixtureCombinations().toArray(Fixture[]::new));
|
||||
private static final Memoize<Fixture[]> ALL_ALLOCATORS = new Memoize<>(
|
||||
() -> Arrays.stream(ALL_COMBINATIONS.get())
|
||||
.filter(sample())
|
||||
.toArray(Fixture[]::new));
|
||||
private static final Memoize<Fixture[]> NON_COMPOSITE = new Memoize<>(
|
||||
() -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> !f.isComposite()).toArray(Fixture[]::new));
|
||||
() -> Arrays.stream(ALL_COMBINATIONS.get())
|
||||
.filter(f -> !f.isComposite())
|
||||
.filter(sample())
|
||||
.toArray(Fixture[]::new));
|
||||
private static final Memoize<Fixture[]> HEAP_ALLOCS = new Memoize<>(
|
||||
() -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> f.isHeap()).toArray(Fixture[]::new));
|
||||
() -> Arrays.stream(ALL_COMBINATIONS.get())
|
||||
.filter(f -> f.isHeap())
|
||||
.filter(sample())
|
||||
.toArray(Fixture[]::new));
|
||||
private static final Memoize<Fixture[]> DIRECT_ALLOCS = new Memoize<>(
|
||||
() -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> f.isDirect()).toArray(Fixture[]::new));
|
||||
() -> Arrays.stream(ALL_COMBINATIONS.get())
|
||||
.filter(f -> f.isDirect())
|
||||
.filter(sample())
|
||||
.toArray(Fixture[]::new));
|
||||
private static final Memoize<Fixture[]> POOLED_ALLOCS = new Memoize<>(
|
||||
() -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> f.isPooled()).toArray(Fixture[]::new));
|
||||
() -> Arrays.stream(ALL_COMBINATIONS.get())
|
||||
.filter(f -> f.isPooled())
|
||||
.filter(sample())
|
||||
.toArray(Fixture[]::new));
|
||||
private static final Memoize<Fixture[]> POOLED_DIRECT_ALLOCS = new Memoize<>(
|
||||
() -> Arrays.stream(ALL_COMBINATIONS.get())
|
||||
.filter(f -> f.isPooled() && f.isDirect())
|
||||
.filter(sample())
|
||||
.toArray(Fixture[]::new));
|
||||
|
||||
private static Predicate<Fixture> sample() {
|
||||
String sampleSetting = System.getProperty("sample");
|
||||
if ("nosample".equalsIgnoreCase(sampleSetting)) {
|
||||
return fixture -> true;
|
||||
}
|
||||
Instant today = Instant.now().truncatedTo(ChronoUnit.DAYS);
|
||||
SplittableRandom rng = new SplittableRandom(today.hashCode());
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
return fixture -> {
|
||||
boolean res = counter.getAndIncrement() < 1 || rng.nextInt(0, 100) <= 2;
|
||||
return res;
|
||||
}; // Filter out 97% of tests.
|
||||
}
|
||||
|
||||
static Fixture[] allocators() {
|
||||
return ALL_COMBINATIONS.get();
|
||||
return ALL_ALLOCATORS.get();
|
||||
}
|
||||
|
||||
static Fixture[] nonCompositeAllocators() {
|
||||
@ -94,6 +134,10 @@ public class BufferTest {
|
||||
return POOLED_ALLOCS.get();
|
||||
}
|
||||
|
||||
static Fixture[] pooledDirectAllocators() {
|
||||
return POOLED_DIRECT_ALLOCS.get();
|
||||
}
|
||||
|
||||
static List<Fixture> initialAllocators() {
|
||||
return List.of(
|
||||
new Fixture("heap", BufferAllocator::heap, HEAP),
|
||||
@ -962,7 +1006,7 @@ public class BufferTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Disabled
|
||||
@Disabled // TODO
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void sliceMustBecomeOwnedOnSourceBufferClose(Fixture fixture) {
|
||||
@ -1650,42 +1694,44 @@ public class BufferTest {
|
||||
@Nested
|
||||
@Isolated
|
||||
class CleanerTests {
|
||||
@Disabled("Precise native memory accounting does not work since recent panama-foreign changes.")
|
||||
@Disabled("Too slow, for now")
|
||||
@ParameterizedTest
|
||||
@MethodSource("io.netty.buffer.api.BufTest#directAllocators")
|
||||
@MethodSource("io.netty.buffer.api.BufferTest#directAllocators")
|
||||
public void bufferMustBeClosedByCleaner(Fixture fixture) throws InterruptedException {
|
||||
var initial = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum();
|
||||
var allocator = fixture.createAllocator();
|
||||
allocator.close();
|
||||
int iterations = 100;
|
||||
int iterations = 15;
|
||||
int allocationSize = 1024;
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
allocateAndForget(allocator, allocationSize);
|
||||
System.gc();
|
||||
System.runFinalization();
|
||||
}
|
||||
var sum = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum();
|
||||
System.runFinalization();
|
||||
var sum = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum() - initial;
|
||||
var totalAllocated = (long) allocationSize * iterations;
|
||||
assertThat(sum).isLessThan(totalAllocated);
|
||||
}
|
||||
|
||||
private void allocateAndForget(BufferAllocator allocator, int size) {
|
||||
private static void allocateAndForget(BufferAllocator allocator, int size) {
|
||||
allocator.allocate(size);
|
||||
}
|
||||
|
||||
@Disabled("Precise native memory accounting does not work since recent panama-foreign changes.")
|
||||
@Disabled("Too slow, for now")
|
||||
@ParameterizedTest
|
||||
@MethodSource("io.netty.buffer.api.BufTest#directPooledAllocators")
|
||||
@MethodSource("io.netty.buffer.api.BufferTest#pooledDirectAllocators")
|
||||
public void buffersMustBeReusedByPoolingAllocatorEvenWhenDroppedByCleanerInsteadOfExplicitly(Fixture fixture)
|
||||
throws InterruptedException {
|
||||
var initial = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum();
|
||||
try (var allocator = fixture.createAllocator()) {
|
||||
int iterations = 100;
|
||||
int iterations = 15;
|
||||
int allocationSize = 1024;
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
allocateAndForget(allocator, allocationSize);
|
||||
System.gc();
|
||||
System.runFinalization();
|
||||
}
|
||||
var sum = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum();
|
||||
System.runFinalization();
|
||||
var sum = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum() - initial;
|
||||
var totalAllocated = (long) allocationSize * iterations;
|
||||
assertThat(sum).isLessThan(totalAllocated);
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
package io.netty.buffer.api.benchmarks;
|
||||
|
||||
import jdk.incubator.foreign.MemorySegment;
|
||||
import jdk.incubator.foreign.ResourceScope;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
@ -64,31 +65,17 @@ public class MemorySegmentCloseBenchmark {
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public MemorySegment heapConfined() {
|
||||
try (MemorySegment segment = MemorySegment.ofArray(array)) {
|
||||
return segment;
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public MemorySegment heapShared() {
|
||||
try (MemorySegment segment = MemorySegment.ofArray(array).share()) {
|
||||
return segment;
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public MemorySegment nativeConfined() {
|
||||
try (MemorySegment segment = MemorySegment.allocateNative(size)) {
|
||||
return segment;
|
||||
try (ResourceScope scope = ResourceScope.newConfinedScope()) {
|
||||
return MemorySegment.allocateNative(size, scope);
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public MemorySegment nativeShared() {
|
||||
try (MemorySegment segment = MemorySegment.allocateNative(size).share()) {
|
||||
return segment;
|
||||
try (ResourceScope scope = ResourceScope.newSharedScope()) {
|
||||
return MemorySegment.allocateNative(size, scope);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,18 +0,0 @@
|
||||
# Copyright 2021 The Netty Project
|
||||
#
|
||||
# The Netty Project licenses this file to you under the Apache License,
|
||||
# version 2.0 (the "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at:
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
junit.jupiter.execution.parallel.enabled = true
|
||||
junit.jupiter.execution.parallel.mode.default = concurrent
|
||||
junit.jupiter.testinstance.lifecycle.default = per_class
|
||||
junit.jupiter.execution.parallel.config.strategy = fixed
|
||||
junit.jupiter.execution.parallel.config.fixed.parallelism = 16
|
Loading…
Reference in New Issue
Block a user