Make Buf.send() faster, again, for confined buffers

Motivation:
As has been mentioned before, this is likely to be an important operation.

Modification:
Use a dedicated transfer thread to facilitate the handoff of thread-confined memory segments, from one thread to another.
By using a separate thread for this, we no longer need to create and close a shared segment, sparing us of the thread-local handshake costs related to that.

Result:
Sending a thread-confined buffer from one thread to another is now about twice as fast.
This commit is contained in:
Chris Vest 2020-12-04 16:42:09 +01:00
parent 2f99ee64a4
commit e2ce5c1f06
2 changed files with 52 additions and 14 deletions

View File

@ -27,6 +27,9 @@ import jdk.incubator.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.FutureTask;
import java.util.concurrent.SynchronousQueue;
import static jdk.incubator.foreign.MemoryAccess.getByteAtOffset;
import static jdk.incubator.foreign.MemoryAccess.getCharAtOffset;
@ -45,6 +48,22 @@ import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset;
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
static final Drop<MemSegBuf> SEGMENT_CLOSE = buf -> buf.seg.close();
private static final BlockingQueue<Runnable> QUEUE = new SynchronousQueue<>();
private static final Thread TRANSFER_THREAD = new Thread(() -> {
try {
for (;;) {
QUEUE.take().run();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
static {
TRANSFER_THREAD.setDaemon(true);
TRANSFER_THREAD.start();
}
private final AllocatorControl alloc;
private final boolean isSendable;
private MemorySegment seg;
@ -707,18 +726,37 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
protected Owned<MemSegBuf> prepareSend() {
MemSegBuf outer = this;
boolean isConfined = seg.ownerThread() == null;
MemorySegment transferSegment = isConfined? seg : seg.share();
return new Owned<MemSegBuf>() {
@Override
public MemSegBuf transferOwnership(Drop<MemSegBuf> drop) {
MemSegBuf copy = new MemSegBuf(transferSegment, drop, alloc);
copy.order = outer.order;
copy.roff = outer.roff;
copy.woff = outer.woff;
return copy;
}
};
if (seg.ownerThread() != null) {
var transfer = seg.handoff(TRANSFER_THREAD);
return new Owned<MemSegBuf>() {
@Override
public MemSegBuf transferOwnership(Drop<MemSegBuf> drop) {
try {
Thread recipient = Thread.currentThread();
FutureTask<MemorySegment> task = new FutureTask<>(() -> transfer.handoff(recipient));
QUEUE.put(task);
MemSegBuf copy = new MemSegBuf(task.get(), drop, alloc);
copy.order = outer.order;
copy.roff = outer.roff;
copy.woff = outer.woff;
return copy;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
} else {
return new Owned<MemSegBuf>() {
@Override
public MemSegBuf transferOwnership(Drop<MemSegBuf> drop) {
MemSegBuf copy = new MemSegBuf(seg, drop, alloc);
copy.order = outer.order;
copy.roff = outer.roff;
copy.woff = outer.woff;
return copy;
}
};
}
}
@Override

View File

@ -34,8 +34,8 @@ import java.util.concurrent.TimeUnit;
import static java.util.concurrent.CompletableFuture.completedFuture;
@Warmup(iterations = 30, time = 1)
@Measurement(iterations = 30, time = 1)
@Warmup(iterations = 10, time = 1)
@Measurement(iterations = 10, time = 1)
@Fork(value = 5, jvmArgsAppend = { "-XX:+UnlockDiagnosticVMOptions", "-XX:+DebugNonSafepoints" })
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)