diff --git a/pom.xml b/pom.xml
index cdfc15a..e5fb8c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,7 @@
io.netty.incubator.buffer
5.0.0.Final-SNAPSHOT
29
- 16
+ 17
5.7.0
3.0.0-M5
false
@@ -173,6 +173,9 @@
${argLine.common} ${argLine.printGC} --add-modules jdk.incubator.foreign
false
+
+ nosample
+
diff --git a/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java b/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java
index 8921014..1295c71 100644
--- a/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java
+++ b/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java
@@ -37,7 +37,7 @@ class ManagedBufferAllocator implements BufferAllocator, AllocatorControl {
@Override
public Object allocateUntethered(Buffer originator, int size) {
BufferAllocator.checkSize(size);
- var buf = manager.allocateShared(this, size, NO_OP_DROP, null);
+ var buf = manager.allocateShared(this, size, NO_OP_DROP, cleaner);
return manager.unwrapRecoverableMemory(buf);
}
diff --git a/src/main/java/io/netty/buffer/api/MemoryManager.java b/src/main/java/io/netty/buffer/api/MemoryManager.java
index 7f00cad..2486d5b 100644
--- a/src/main/java/io/netty/buffer/api/MemoryManager.java
+++ b/src/main/java/io/netty/buffer/api/MemoryManager.java
@@ -22,7 +22,6 @@ import java.lang.ref.Cleaner;
public interface MemoryManager {
boolean isNative();
- Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner);
Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner);
Drop drop();
Object unwrapRecoverableMemory(Buffer buf);
diff --git a/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java b/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java
index 23be2a0..676de27 100644
--- a/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java
+++ b/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java
@@ -39,11 +39,6 @@ public class ByteBufferMemoryManager implements MemoryManager {
return direct;
}
- @Override
- public Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner) {
- return allocateShared(allocatorControl, size, drop, cleaner);
- }
-
@Override
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner) {
int capacity = Math.toIntExact(size);
diff --git a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java
index 0af29b9..4c2f62a 100644
--- a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java
+++ b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java
@@ -30,25 +30,13 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager {
@Override
public abstract boolean isNative();
- @Override
- public Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner) {
- var segment = createSegment(size);
- if (cleaner != null) {
- segment = segment.registerCleaner(cleaner);
- }
- return new MemSegBuffer(segment, segment, convert(drop), allocatorControl);
- }
-
@Override
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner) {
- var segment = createSegment(size).share();
- if (cleaner != null) {
- segment = segment.registerCleaner(cleaner);
- }
+ var segment = createSegment(size, cleaner);
return new MemSegBuffer(segment, segment, convert(drop), allocatorControl);
}
- protected abstract MemorySegment createSegment(long size);
+ protected abstract MemorySegment createSegment(long size, Cleaner cleaner);
@Override
public Drop drop() {
diff --git a/src/main/java/io/netty/buffer/api/memseg/HeapMemorySegmentManager.java b/src/main/java/io/netty/buffer/api/memseg/HeapMemorySegmentManager.java
index f62e0de..47d6bad 100644
--- a/src/main/java/io/netty/buffer/api/memseg/HeapMemorySegmentManager.java
+++ b/src/main/java/io/netty/buffer/api/memseg/HeapMemorySegmentManager.java
@@ -17,6 +17,8 @@ package io.netty.buffer.api.memseg;
import jdk.incubator.foreign.MemorySegment;
+import java.lang.ref.Cleaner;
+
public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
@Override
public boolean isNative() {
@@ -24,7 +26,7 @@ public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
}
@Override
- protected MemorySegment createSegment(long size) {
+ protected MemorySegment createSegment(long size, Cleaner cleaner) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
}
}
diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java
index 7674ba2..842e48a 100644
--- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java
+++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java
@@ -33,6 +33,7 @@ import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
import io.netty.buffer.api.internal.ArcDrop;
import io.netty.buffer.api.internal.Statics;
import jdk.incubator.foreign.MemorySegment;
+import jdk.incubator.foreign.ResourceScope;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -58,12 +59,20 @@ class MemSegBuffer extends RcSupport implements Buffer, Re
static final Drop SEGMENT_CLOSE;
static {
- CLOSED_SEGMENT = MemorySegment.ofArray(new byte[0]);
- CLOSED_SEGMENT.close();
+ try (ResourceScope scope = ResourceScope.newSharedScope()) {
+ // We are not allowed to allocate a zero-sized native buffer, but we *can* take a zero-sized slice from it.
+ // We need the CLOSED_SEGMENT to have a size of zero, because we'll use its size for bounds checks after
+ // the buffer is closed.
+ MemorySegment segment = MemorySegment.allocateNative(1, scope);
+ CLOSED_SEGMENT = segment.asSlice(0, 0);
+ }
SEGMENT_CLOSE = new Drop() {
@Override
public void drop(MemSegBuffer buf) {
- buf.base.close();
+ ResourceScope scope = buf.base.scope();
+ if (!scope.isImplicit()) {
+ scope.close();
+ }
}
@Override
@@ -298,16 +307,12 @@ class MemSegBuffer extends RcSupport implements Buffer, Re
@Override
public void copyInto(int srcPos, byte[] dest, int destPos, int length) {
- try (var target = MemorySegment.ofArray(dest)) {
- copyInto(srcPos, target, destPos, length);
- }
+ copyInto(srcPos, MemorySegment.ofArray(dest), destPos, length);
}
@Override
public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) {
- try (var target = MemorySegment.ofByteBuffer(dest.duplicate().clear())) {
- copyInto(srcPos, target, destPos, length);
- }
+ copyInto(srcPos, MemorySegment.ofByteBuffer(dest.duplicate().clear()), destPos, length);
}
private void copyInto(int srcPos, MemorySegment dest, int destPos, int length) {
@@ -1078,8 +1083,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re
var roff = this.roff;
var woff = this.woff;
var readOnly = readOnly();
- boolean isConfined = seg.ownerThread() == null;
- MemorySegment transferSegment = isConfined? seg : seg.share(); // TODO remove confimenent checks
+ MemorySegment transferSegment = seg;
MemorySegment base = this.base;
makeInaccessible();
return new Owned() {
diff --git a/src/main/java/io/netty/buffer/api/memseg/NativeMemorySegmentManager.java b/src/main/java/io/netty/buffer/api/memseg/NativeMemorySegmentManager.java
index a240335..78d1872 100644
--- a/src/main/java/io/netty/buffer/api/memseg/NativeMemorySegmentManager.java
+++ b/src/main/java/io/netty/buffer/api/memseg/NativeMemorySegmentManager.java
@@ -16,16 +16,40 @@
package io.netty.buffer.api.memseg;
import jdk.incubator.foreign.MemorySegment;
+import jdk.incubator.foreign.ResourceScope;
+import java.lang.ref.Cleaner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Function;
+
+import static jdk.incubator.foreign.ResourceScope.newSharedScope;
public class NativeMemorySegmentManager extends AbstractMemorySegmentManager {
public static final LongAdder MEM_USAGE_NATIVE = new LongAdder();
private static final ConcurrentHashMap CLEANUP_ACTIONS = new ConcurrentHashMap<>();
+ private static final Function CLEANUP_ACTION_MAKER = s -> new ReduceNativeMemoryUsage(s);
static Runnable getCleanupAction(long size) {
- return CLEANUP_ACTIONS.computeIfAbsent(size, s -> () -> MEM_USAGE_NATIVE.add(-s));
+ return CLEANUP_ACTIONS.computeIfAbsent(size, CLEANUP_ACTION_MAKER);
+ }
+
+ private static final class ReduceNativeMemoryUsage implements Runnable {
+ private final long size;
+
+ private ReduceNativeMemoryUsage(long size) {
+ this.size = size;
+ }
+
+ @Override
+ public void run() {
+ MEM_USAGE_NATIVE.add(-size);
+ }
+
+ @Override
+ public String toString() {
+ return "ReduceNativeMemoryUsage(by " + size + " bytes)";
+ }
}
@Override
@@ -34,9 +58,10 @@ public class NativeMemorySegmentManager extends AbstractMemorySegmentManager {
}
@Override
- protected MemorySegment createSegment(long size) {
- var segment = MemorySegment.allocateNative(size);
-// .withCleanupAction(Statics.getCleanupAction(size));
+ protected MemorySegment createSegment(long size, Cleaner cleaner) {
+ final ResourceScope scope = cleaner == null ? newSharedScope() : newSharedScope(cleaner);
+ scope.addOnClose(getCleanupAction(size));
+ var segment = MemorySegment.allocateNative(size, scope);
MEM_USAGE_NATIVE.add(size);
return segment;
}
diff --git a/src/test/java/io/netty/buffer/api/BufferTest.java b/src/test/java/io/netty/buffer/api/BufferTest.java
index f2a9c43..4844e53 100644
--- a/src/test/java/io/netty/buffer/api/BufferTest.java
+++ b/src/test/java/io/netty/buffer/api/BufferTest.java
@@ -31,10 +31,14 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.ReadOnlyBufferException;
import java.text.ParseException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import java.util.SplittableRandom;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -43,6 +47,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;
@@ -65,17 +70,52 @@ public class BufferTest {
private static final Memoize ALL_COMBINATIONS = new Memoize<>(
() -> fixtureCombinations().toArray(Fixture[]::new));
+ private static final Memoize ALL_ALLOCATORS = new Memoize<>(
+ () -> Arrays.stream(ALL_COMBINATIONS.get())
+ .filter(sample())
+ .toArray(Fixture[]::new));
private static final Memoize NON_COMPOSITE = new Memoize<>(
- () -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> !f.isComposite()).toArray(Fixture[]::new));
+ () -> Arrays.stream(ALL_COMBINATIONS.get())
+ .filter(f -> !f.isComposite())
+ .filter(sample())
+ .toArray(Fixture[]::new));
private static final Memoize HEAP_ALLOCS = new Memoize<>(
- () -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> f.isHeap()).toArray(Fixture[]::new));
+ () -> Arrays.stream(ALL_COMBINATIONS.get())
+ .filter(f -> f.isHeap())
+ .filter(sample())
+ .toArray(Fixture[]::new));
private static final Memoize DIRECT_ALLOCS = new Memoize<>(
- () -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> f.isDirect()).toArray(Fixture[]::new));
+ () -> Arrays.stream(ALL_COMBINATIONS.get())
+ .filter(f -> f.isDirect())
+ .filter(sample())
+ .toArray(Fixture[]::new));
private static final Memoize POOLED_ALLOCS = new Memoize<>(
- () -> Arrays.stream(ALL_COMBINATIONS.get()).filter(f -> f.isPooled()).toArray(Fixture[]::new));
+ () -> Arrays.stream(ALL_COMBINATIONS.get())
+ .filter(f -> f.isPooled())
+ .filter(sample())
+ .toArray(Fixture[]::new));
+ private static final Memoize POOLED_DIRECT_ALLOCS = new Memoize<>(
+ () -> Arrays.stream(ALL_COMBINATIONS.get())
+ .filter(f -> f.isPooled() && f.isDirect())
+ .filter(sample())
+ .toArray(Fixture[]::new));
+
+ private static Predicate sample() {
+ String sampleSetting = System.getProperty("sample");
+ if ("nosample".equalsIgnoreCase(sampleSetting)) {
+ return fixture -> true;
+ }
+ Instant today = Instant.now().truncatedTo(ChronoUnit.DAYS);
+ SplittableRandom rng = new SplittableRandom(today.hashCode());
+ AtomicInteger counter = new AtomicInteger();
+ return fixture -> {
+ boolean res = counter.getAndIncrement() < 1 || rng.nextInt(0, 100) <= 2;
+ return res;
+ }; // Filter out 97% of tests.
+ }
static Fixture[] allocators() {
- return ALL_COMBINATIONS.get();
+ return ALL_ALLOCATORS.get();
}
static Fixture[] nonCompositeAllocators() {
@@ -94,6 +134,10 @@ public class BufferTest {
return POOLED_ALLOCS.get();
}
+ static Fixture[] pooledDirectAllocators() {
+ return POOLED_DIRECT_ALLOCS.get();
+ }
+
static List initialAllocators() {
return List.of(
new Fixture("heap", BufferAllocator::heap, HEAP),
@@ -962,7 +1006,7 @@ public class BufferTest {
}
}
- @Disabled
+ @Disabled // TODO
@ParameterizedTest
@MethodSource("allocators")
public void sliceMustBecomeOwnedOnSourceBufferClose(Fixture fixture) {
@@ -1650,42 +1694,44 @@ public class BufferTest {
@Nested
@Isolated
class CleanerTests {
- @Disabled("Precise native memory accounting does not work since recent panama-foreign changes.")
+ @Disabled("Too slow, for now")
@ParameterizedTest
- @MethodSource("io.netty.buffer.api.BufTest#directAllocators")
+ @MethodSource("io.netty.buffer.api.BufferTest#directAllocators")
public void bufferMustBeClosedByCleaner(Fixture fixture) throws InterruptedException {
+ var initial = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum();
var allocator = fixture.createAllocator();
allocator.close();
- int iterations = 100;
+ int iterations = 15;
int allocationSize = 1024;
for (int i = 0; i < iterations; i++) {
allocateAndForget(allocator, allocationSize);
System.gc();
- System.runFinalization();
}
- var sum = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum();
+ System.runFinalization();
+ var sum = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum() - initial;
var totalAllocated = (long) allocationSize * iterations;
assertThat(sum).isLessThan(totalAllocated);
}
- private void allocateAndForget(BufferAllocator allocator, int size) {
+ private static void allocateAndForget(BufferAllocator allocator, int size) {
allocator.allocate(size);
}
- @Disabled("Precise native memory accounting does not work since recent panama-foreign changes.")
+ @Disabled("Too slow, for now")
@ParameterizedTest
- @MethodSource("io.netty.buffer.api.BufTest#directPooledAllocators")
+ @MethodSource("io.netty.buffer.api.BufferTest#pooledDirectAllocators")
public void buffersMustBeReusedByPoolingAllocatorEvenWhenDroppedByCleanerInsteadOfExplicitly(Fixture fixture)
throws InterruptedException {
+ var initial = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum();
try (var allocator = fixture.createAllocator()) {
- int iterations = 100;
+ int iterations = 15;
int allocationSize = 1024;
for (int i = 0; i < iterations; i++) {
allocateAndForget(allocator, allocationSize);
System.gc();
- System.runFinalization();
}
- var sum = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum();
+ System.runFinalization();
+ var sum = NativeMemorySegmentManager.MEM_USAGE_NATIVE.sum() - initial;
var totalAllocated = (long) allocationSize * iterations;
assertThat(sum).isLessThan(totalAllocated);
}
diff --git a/src/test/java/io/netty/buffer/api/benchmarks/MemorySegmentCloseBenchmark.java b/src/test/java/io/netty/buffer/api/benchmarks/MemorySegmentCloseBenchmark.java
index 7c7da97..083ff29 100644
--- a/src/test/java/io/netty/buffer/api/benchmarks/MemorySegmentCloseBenchmark.java
+++ b/src/test/java/io/netty/buffer/api/benchmarks/MemorySegmentCloseBenchmark.java
@@ -16,6 +16,7 @@
package io.netty.buffer.api.benchmarks;
import jdk.incubator.foreign.MemorySegment;
+import jdk.incubator.foreign.ResourceScope;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@@ -64,31 +65,17 @@ public class MemorySegmentCloseBenchmark {
}
}
- @Benchmark
- public MemorySegment heapConfined() {
- try (MemorySegment segment = MemorySegment.ofArray(array)) {
- return segment;
- }
- }
-
- @Benchmark
- public MemorySegment heapShared() {
- try (MemorySegment segment = MemorySegment.ofArray(array).share()) {
- return segment;
- }
- }
-
@Benchmark
public MemorySegment nativeConfined() {
- try (MemorySegment segment = MemorySegment.allocateNative(size)) {
- return segment;
+ try (ResourceScope scope = ResourceScope.newConfinedScope()) {
+ return MemorySegment.allocateNative(size, scope);
}
}
@Benchmark
public MemorySegment nativeShared() {
- try (MemorySegment segment = MemorySegment.allocateNative(size).share()) {
- return segment;
+ try (ResourceScope scope = ResourceScope.newSharedScope()) {
+ return MemorySegment.allocateNative(size, scope);
}
}
}
diff --git a/src/test/resources/junit-platform.properties b/src/test/resources/junit-platform.properties
deleted file mode 100644
index 080b321..0000000
--- a/src/test/resources/junit-platform.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Copyright 2021 The Netty Project
-#
-# The Netty Project licenses this file to you under the Apache License,
-# version 2.0 (the "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at:
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-junit.jupiter.execution.parallel.enabled = true
-junit.jupiter.execution.parallel.mode.default = concurrent
-junit.jupiter.testinstance.lifecycle.default = per_class
-junit.jupiter.execution.parallel.config.strategy = fixed
-junit.jupiter.execution.parallel.config.fixed.parallelism = 16