Update code to support bleeding edge MemorySegment APIs after the latest refactoring

This commit is contained in:
Chris Vest 2021-04-07 14:28:05 +02:00
parent 2e162c9ab0
commit 513cef1c1e
9 changed files with 25 additions and 72 deletions

View File

@ -70,7 +70,7 @@
<javaModuleName>io.netty.incubator.buffer</javaModuleName> <javaModuleName>io.netty.incubator.buffer</javaModuleName>
<netty.version>5.0.0.Final-SNAPSHOT</netty.version> <netty.version>5.0.0.Final-SNAPSHOT</netty.version>
<netty.build.version>29</netty.build.version> <netty.build.version>29</netty.build.version>
<java.version>16</java.version> <java.version>17</java.version>
<junit.version>5.7.0</junit.version> <junit.version>5.7.0</junit.version>
<surefire.version>3.0.0-M5</surefire.version> <surefire.version>3.0.0-M5</surefire.version>
<skipTests>false</skipTests> <skipTests>false</skipTests>

View File

@ -22,7 +22,6 @@ import java.lang.ref.Cleaner;
public interface MemoryManager { public interface MemoryManager {
boolean isNative(); boolean isNative();
Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner);
Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner); Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner);
Drop<Buffer> drop(); Drop<Buffer> drop();
Object unwrapRecoverableMemory(Buffer buf); Object unwrapRecoverableMemory(Buffer buf);

View File

@ -39,11 +39,6 @@ public class ByteBufferMemoryManager implements MemoryManager {
return direct; return direct;
} }
@Override
public Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
return allocateShared(allocatorControl, size, drop, cleaner);
}
@Override @Override
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) { public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
int capacity = Math.toIntExact(size); int capacity = Math.toIntExact(size);

View File

@ -30,25 +30,13 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager {
@Override @Override
public abstract boolean isNative(); public abstract boolean isNative();
@Override
public Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
var segment = createSegment(size);
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuffer(segment, segment, convert(drop), allocatorControl);
}
@Override @Override
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) { public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
var segment = createSegment(size).share(); var segment = createSegment(size, cleaner);
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuffer(segment, segment, convert(drop), allocatorControl); return new MemSegBuffer(segment, segment, convert(drop), allocatorControl);
} }
protected abstract MemorySegment createSegment(long size); protected abstract MemorySegment createSegment(long size, Cleaner cleaner);
@Override @Override
public Drop<Buffer> drop() { public Drop<Buffer> drop() {

View File

@ -17,6 +17,8 @@ package io.netty.buffer.api.memseg;
import jdk.incubator.foreign.MemorySegment; import jdk.incubator.foreign.MemorySegment;
import java.lang.ref.Cleaner;
public class HeapMemorySegmentManager extends AbstractMemorySegmentManager { public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
@Override @Override
public boolean isNative() { public boolean isNative() {
@ -24,7 +26,7 @@ public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
} }
@Override @Override
protected MemorySegment createSegment(long size) { protected MemorySegment createSegment(long size, Cleaner cleaner) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]); return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
} }
} }

View File

@ -33,6 +33,7 @@ import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
import io.netty.buffer.api.internal.ArcDrop; import io.netty.buffer.api.internal.ArcDrop;
import io.netty.buffer.api.internal.Statics; import io.netty.buffer.api.internal.Statics;
import jdk.incubator.foreign.MemorySegment; import jdk.incubator.foreign.MemorySegment;
import jdk.incubator.foreign.ResourceScope;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
@ -58,12 +59,13 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
static final Drop<MemSegBuffer> SEGMENT_CLOSE; static final Drop<MemSegBuffer> SEGMENT_CLOSE;
static { static {
CLOSED_SEGMENT = MemorySegment.ofArray(new byte[0]); try (ResourceScope scope = ResourceScope.newSharedScope()) {
CLOSED_SEGMENT.close(); CLOSED_SEGMENT = MemorySegment.allocateNative(1, scope);
}
SEGMENT_CLOSE = new Drop<MemSegBuffer>() { SEGMENT_CLOSE = new Drop<MemSegBuffer>() {
@Override @Override
public void drop(MemSegBuffer buf) { public void drop(MemSegBuffer buf) {
buf.base.close(); buf.base.scope().close();
} }
@Override @Override
@ -298,16 +300,12 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
@Override @Override
public void copyInto(int srcPos, byte[] dest, int destPos, int length) { public void copyInto(int srcPos, byte[] dest, int destPos, int length) {
try (var target = MemorySegment.ofArray(dest)) { copyInto(srcPos, MemorySegment.ofArray(dest), destPos, length);
copyInto(srcPos, target, destPos, length);
}
} }
@Override @Override
public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) { public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) {
try (var target = MemorySegment.ofByteBuffer(dest.duplicate().clear())) { copyInto(srcPos, MemorySegment.ofByteBuffer(dest.duplicate().clear()), destPos, length);
copyInto(srcPos, target, destPos, length);
}
} }
private void copyInto(int srcPos, MemorySegment dest, int destPos, int length) { private void copyInto(int srcPos, MemorySegment dest, int destPos, int length) {
@ -1078,8 +1076,7 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
var roff = this.roff; var roff = this.roff;
var woff = this.woff; var woff = this.woff;
var readOnly = readOnly(); var readOnly = readOnly();
boolean isConfined = seg.ownerThread() == null; MemorySegment transferSegment = seg;
MemorySegment transferSegment = isConfined? seg : seg.share(); // TODO remove confimenent checks
MemorySegment base = this.base; MemorySegment base = this.base;
makeInaccessible(); makeInaccessible();
return new Owned<MemSegBuffer>() { return new Owned<MemSegBuffer>() {

View File

@ -16,7 +16,9 @@
package io.netty.buffer.api.memseg; package io.netty.buffer.api.memseg;
import jdk.incubator.foreign.MemorySegment; import jdk.incubator.foreign.MemorySegment;
import jdk.incubator.foreign.ResourceScope;
import java.lang.ref.Cleaner;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.atomic.LongAdder;
@ -34,9 +36,10 @@ public class NativeMemorySegmentManager extends AbstractMemorySegmentManager {
} }
@Override @Override
protected MemorySegment createSegment(long size) { protected MemorySegment createSegment(long size, Cleaner cleaner) {
var segment = MemorySegment.allocateNative(size); final ResourceScope scope = ResourceScope.newSharedScope(cleaner);
// .withCleanupAction(Statics.getCleanupAction(size)); scope.addOnClose(getCleanupAction(size));
var segment = MemorySegment.allocateNative(size, scope);
MEM_USAGE_NATIVE.add(size); MEM_USAGE_NATIVE.add(size);
return segment; return segment;
} }

View File

@ -16,6 +16,7 @@
package io.netty.buffer.api.benchmarks; package io.netty.buffer.api.benchmarks;
import jdk.incubator.foreign.MemorySegment; import jdk.incubator.foreign.MemorySegment;
import jdk.incubator.foreign.ResourceScope;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Fork;
@ -64,31 +65,17 @@ public class MemorySegmentCloseBenchmark {
} }
} }
@Benchmark
public MemorySegment heapConfined() {
try (MemorySegment segment = MemorySegment.ofArray(array)) {
return segment;
}
}
@Benchmark
public MemorySegment heapShared() {
try (MemorySegment segment = MemorySegment.ofArray(array).share()) {
return segment;
}
}
@Benchmark @Benchmark
public MemorySegment nativeConfined() { public MemorySegment nativeConfined() {
try (MemorySegment segment = MemorySegment.allocateNative(size)) { try (ResourceScope scope = ResourceScope.newConfinedScope()) {
return segment; return MemorySegment.allocateNative(size, scope);
} }
} }
@Benchmark @Benchmark
public MemorySegment nativeShared() { public MemorySegment nativeShared() {
try (MemorySegment segment = MemorySegment.allocateNative(size).share()) { try (ResourceScope scope = ResourceScope.newSharedScope()) {
return segment; return MemorySegment.allocateNative(size, scope);
} }
} }
} }

View File

@ -1,18 +0,0 @@
# Copyright 2021 The Netty Project
#
# The Netty Project licenses this file to you under the Apache License,
# version 2.0 (the "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
junit.jupiter.execution.parallel.enabled = true
junit.jupiter.execution.parallel.mode.default = concurrent
junit.jupiter.testinstance.lifecycle.default = per_class
junit.jupiter.execution.parallel.config.strategy = fixed
junit.jupiter.execution.parallel.config.fixed.parallelism = 16