Remove thread-confinement of Buffers

Motivation:
Thread-confinement ends up being too confusing to code for, and also prevents some legitimate use cases.
Additionally, thread-confinement exposed implementation specific behavioural differences of buffers, where we would ideally like all buffers to always behave the same, regardless of implementation.

Modification:
All MemorySegment based buffers now always use shared segments.
For heap-based segments, we avoid the overhead associated with the closing of shared segments, by just not closing them, and instead just leave the whole thing for the GC to deal with.

Result:
Buffers can now always be accessed from multiple different threads at the same time.
This commit is contained in:
Chris Vest 2020-12-18 14:45:16 +01:00
parent 9afad3a578
commit 41b3c02812
4 changed files with 52 additions and 6 deletions

View File

@ -31,13 +31,13 @@ class ManagedAllocator implements Allocator, AllocatorControl {
@Override
public Buf allocate(int size) {
Allocator.checkSize(size);
return manager.allocateConfined(this, size, manager.drop(), cleaner);
return manager.allocateShared(this, size, manager.drop(), cleaner);
}
@Override
public Object allocateUntethered(Buf originator, int size) {
Allocator.checkSize(size);
var buf = manager.allocateConfined(this, size, NO_OP_DROP, null);
var buf = manager.allocateShared(this, size, NO_OP_DROP, null);
return manager.unwrapRecoverableMemory(buf);
}

View File

@ -15,6 +15,8 @@
*/
package io.netty.buffer.api.memseg;
import io.netty.buffer.api.Buf;
import io.netty.buffer.api.Drop;
import jdk.incubator.foreign.MemorySegment;
public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
@ -27,4 +29,15 @@ public class HeapMemorySegmentManager extends AbstractMemorySegmentManager {
protected MemorySegment createSegment(long size) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
}
@Override
public Drop<Buf> drop() {
return convert(buf -> buf.makeInaccessible());
}
@SuppressWarnings({ "unchecked", "UnnecessaryLocalVariable" })
private static Drop<Buf> convert(Drop<MemSegBuf> drop) {
Drop<?> tmp = drop;
return (Drop<Buf>) tmp;
}
}

View File

@ -33,6 +33,7 @@ import java.text.ParseException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -465,6 +466,22 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bufferMustNotBeThreadConfined(Fixture fixture) throws Exception {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
buf.writeInt(42);
Future<Integer> fut = executor.submit(() -> buf.readInt());
assertEquals(42, fut.get());
fut = executor.submit(() -> {
buf.writeInt(32);
return buf.readInt();
});
assertEquals(32, fut.get());
}
}
@ParameterizedTest
@MethodSource("initialAllocators")
void mustThrowWhenAllocatingZeroSizedBuffer(Fixture fixture) {

View File

@ -34,13 +34,15 @@ import java.util.concurrent.TimeUnit;
import static java.util.concurrent.CompletableFuture.completedFuture;
@Warmup(iterations = 30, time = 1)
@Measurement(iterations = 30, time = 1)
@Warmup(iterations = 15, time = 1)
@Measurement(iterations = 15, time = 1)
@Fork(value = 5, jvmArgsAppend = { "-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints" })
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
public class MemorySegmentClosedByCleanerBenchmark {
private static final Allocator heap = Allocator.heap();
private static final Allocator heapPooled = Allocator.pooledHeap();
private static final Allocator direct = Allocator.direct();
private static final Allocator directPooled = Allocator.pooledDirect();
@ -60,14 +62,28 @@ public class MemorySegmentClosedByCleanerBenchmark {
}
@Benchmark
public Buf explicitClose() throws Exception {
public Buf explicitCloseHeap() throws Exception {
try (Buf buf = process(heap.allocate(256))) {
return buf;
}
}
@Benchmark
public Buf explicitPooledCloseHeap() throws Exception {
try (Buf buf = process(heapPooled.allocate(256))) {
return buf;
}
}
@Benchmark
public Buf explicitCloseDirect() throws Exception {
try (Buf buf = process(direct.allocate(256))) {
return buf;
}
}
@Benchmark
public Buf explicitPooledClose() throws Exception {
public Buf explicitPooledCloseDirect() throws Exception {
try (Buf buf = process(directPooled.allocate(256))) {
return buf;
}