Merge pull request #21 from netty/no-confinement

Remove thread-confinement of Buffers
This commit is contained in:
Chris Vest 2021-01-05 12:18:31 +01:00 committed by GitHub
commit d72bdb7fc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 6 deletions

View File

@ -31,13 +31,13 @@ class ManagedAllocator implements Allocator, AllocatorControl {
@Override
public Buf allocate(int size) {
Allocator.checkSize(size);
return manager.allocateConfined(this, size, manager.drop(), cleaner);
return manager.allocateShared(this, size, manager.drop(), cleaner);
}
@Override
public Object allocateUntethered(Buf originator, int size) {
Allocator.checkSize(size);
var buf = manager.allocateConfined(this, size, NO_OP_DROP, null);
var buf = manager.allocateShared(this, size, NO_OP_DROP, null);
return manager.unwrapRecoverableMemory(buf);
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.buffer.api.memseg;
import io.netty.buffer.api.Buf;
import io.netty.buffer.api.Drop;
import jdk.incubator.foreign.MemorySegment;
public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
@ -27,4 +29,15 @@ public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
protected MemorySegment createSegment(long size) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
}
@Override
public Drop<Buf> drop() {
return convert(buf -> buf.makeInaccessible());
}
@SuppressWarnings({ "unchecked", "UnnecessaryLocalVariable" })
private static Drop<Buf> convert(Drop<MemSegBuf> drop) {
Drop<?> tmp = drop;
return (Drop<Buf>) tmp;
}
}

View File

@ -33,6 +33,7 @@ import java.text.ParseException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -465,6 +466,22 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bufferMustNotBeThreadConfined(Fixture fixture) throws Exception {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.writeInt(42);
Future<Integer> fut = executor.submit(() -> buf.readInt());
assertEquals(42, fut.get());
fut = executor.submit(() -> {
buf.writeInt(32);
return buf.readInt();
});
assertEquals(32, fut.get());
}
}
@ParameterizedTest
@MethodSource("initialAllocators")
void mustThrowWhenAllocatingZeroSizedBuffer(Fixture fixture) {

View File

@ -34,13 +34,15 @@ import java.util.concurrent.TimeUnit;
import static java.util.concurrent.CompletableFuture.completedFuture;
@Warmup(iterations = 30, time = 1)
@Measurement(iterations = 30, time = 1)
@Warmup(iterations = 15, time = 1)
@Measurement(iterations = 15, time = 1)
@Fork(value = 5, jvmArgsAppend = { "-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints" })
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
public class MemorySegmentClosedByCleanerBenchmark {
private static final Allocator heap = Allocator.heap();
private static final Allocator heapPooled = Allocator.pooledHeap();
private static final Allocator direct = Allocator.direct();
private static final Allocator directPooled = Allocator.pooledDirect();
@ -60,14 +62,28 @@ public class MemorySegmentClosedByCleanerBenchmark {
}
@Benchmark
public Buf explicitClose() throws Exception {
public Buf explicitCloseHeap() throws Exception {
try (Buf buf = process(heap.allocate(256))) {
return buf;
}
}
@Benchmark
public Buf explicitPooledCloseHeap() throws Exception {
try (Buf buf = process(heapPooled.allocate(256))) {
return buf;
}
}
@Benchmark
public Buf explicitCloseDirect() throws Exception {
try (Buf buf = process(direct.allocate(256))) {
return buf;
}
}
@Benchmark
public Buf explicitPooledClose() throws Exception {
public Buf explicitPooledCloseDirect() throws Exception {
try (Buf buf = process(directPooled.allocate(256))) {
return buf;
}