First spike prototype of a ByteBuf implementation based on MemorySegment

Motivation:
Future versions of Java will introduce a new API for working with off-heap and on-heap memory alike.
This API _could_ potentially relieve us of many of our use cases for Unsafe.
We wish to explore how suitable these APIs are for this task.

Modification:
Add an entirely separate version of the Netty ByteBuf API, implemented in terms of MemorySegment.
No existing code is changed at this time.
The current prototype is only to prove the concept, and does not aim to be a full replacement.

Result:
We are able to build a fairly nice API, but with caveats.
Restrictions in the current (JDK 16 EA) MemorySegment API, around how ownership is transferred between threads, means we are currently still relying on Unsafe.
While our use of Unsafe could be reduced, it can not be eliminated in our ByteBuf API, because we are relying on it to work around the current ownership restrictions.
I believe it is _possible_ to create a safe ownership transfer API at the JDK level, so hopefully this restriction can be lifted in the future.
This commit is contained in:
Chris Vest 2020-07-24 19:38:48 +02:00
parent 535184b7e7
commit 57af0f0e26
14 changed files with 558 additions and 0 deletions

View File

@ -0,0 +1,51 @@
package io.netty.buffer.b2;
import jdk.incubator.foreign.MemorySegment;
import static io.netty.buffer.b2.ByteBuf.*;
public interface Allocator extends AutoCloseable {
ByteBuf allocate(long size);
@Override
default void close() {
}
static Allocator heap() {
return new Allocator() {
@Override
public ByteBuf allocate(long size) {
var segment = MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
return new ByteBuf(segment, SEGMENT_CLOSE);
}
};
}
static Allocator direct() {
return new Allocator() {
@Override
public ByteBuf allocate(long size) {
return new ByteBuf(MemorySegment.allocateNative(size), SEGMENT_CLOSE);
}
};
}
static Allocator pooledHeap() {
return new SizeClassedMemoryPool() {
@Override
protected MemorySegment createMemorySegment(long size) {
return MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
}
};
}
static Allocator pooledDirect() {
return new SizeClassedMemoryPool() {
@Override
protected MemorySegment createMemorySegment(long size) {
return MemorySegment.allocateNative(size);
}
};
}
}

View File

@ -0,0 +1,84 @@
package io.netty.buffer.b2;
import io.netty.util.internal.PlatformDependent;
import jdk.incubator.foreign.MemoryAccess;
import jdk.incubator.foreign.MemoryAddress;
import jdk.incubator.foreign.MemorySegment;
import java.lang.invoke.VarHandle;
public class ByteBuf extends Rc<ByteBuf> {
static final Drop<ByteBuf> NO_DROP = buf -> {};
static final Drop<ByteBuf> SEGMENT_CLOSE = buf -> buf.segment.close();
private final MemorySegment segment;
private final MemoryAddress address;
private long read;
private long write;
ByteBuf(MemorySegment segment, Drop<ByteBuf> drop) {
super(drop);
this.segment = segment;
address = segment.address();
}
public byte get() {
return MemoryAccess.getByteAtOffset(address, read++);
}
public void put(byte value) {
MemoryAccess.setByteAtOffset(address, write++, value);
}
public void fill(byte value) {
segment.fill(value);
}
public long getNativeAddress() {
try {
return segment.address().toRawLongValue();
} catch (UnsupportedOperationException e) {
return 0; // This is a heap segment. Probably.
}
}
public long size() {
return segment.byteSize();
}
public byte[] debugAsByteArray() {
return address.segment().toByteArray();
}
@Override
protected ByteBuf copy(Thread recipient, Drop<ByteBuf> drop) {
ByteBuf copy = new ByteBuf(segment.withOwnerThread(recipient), drop);
copy.read = read;
copy.write = write;
return copy;
}
@Override
protected ByteBuf prepareSend() {
ByteBuf outer = this;
MemorySegment transferSegment = segment.withOwnerThread(Lazy.TRANSFER_OWNER);
return new ByteBuf(transferSegment, NO_DROP) {
@Override
protected ByteBuf copy(Thread recipient, Drop<ByteBuf> drop) {
Object scope = PlatformDependent.getObject(transferSegment, Lazy.SCOPE);
PlatformDependent.putObject(scope, Lazy.OWNER, recipient);
VarHandle.fullFence();
ByteBuf copy = new ByteBuf(transferSegment, drop);
copy.read = outer.read;
copy.write = outer.write;
return copy;
}
};
}
private static class Lazy {
@SuppressWarnings("InstantiatingAThreadWithDefaultRunMethod")
private static final Thread TRANSFER_OWNER = new Thread("ByteBuf Transfer Owner");
private static final long SCOPE = Statics.fieldOffset("jdk.internal.foreign.AbstractMemorySegmentImpl", "scope");
private static final long OWNER = Statics.fieldOffset("jdk.internal.foreign.MemoryScope", "owner");
}
}

View File

@ -0,0 +1,6 @@
package io.netty.buffer.b2;
@FunctionalInterface
public interface Drop<T extends Rc<T>> {
void drop(T obj);
}

View File

@ -0,0 +1,58 @@
package io.netty.buffer.b2;
import java.util.function.Consumer;
public abstract class Rc<T extends Rc<T>> implements AutoCloseable {
private int acquires; // closed if negative
private final Drop<T> drop;
Rc(Drop<T> drop) {
this.drop = drop;
}
public T acquire() {
if (acquires < 0) {
throw new IllegalStateException("Resource is closed.");
}
acquires++;
return self();
}
@Override
public void close() {
if (acquires == -1) {
throw new IllegalStateException("Double-free: Already closed and dropped.");
}
if (acquires == 0) {
drop.drop(self());
}
acquires--;
}
public void sendTo(Consumer<Send<T>> consumer) throws InterruptedException {
var send = new RendezvousSend<>(self(), drop);
consumer.accept(send);
send.finish();
acquires = -2; // close without dropping (also ignore future double-free attempts)
}
/**
* @implNote Not possible without hacks because we need the receiving thread in order to set the new owner in the
* currently owning thread.
*/
public Send<T> send() {
acquires = -2; // close without dropping (also ignore future double-free attempts)
return new TransferSend<>(prepareSend(), drop);
}
protected abstract T copy(Thread recipient, Drop<T> drop);
protected T prepareSend() {
return self();
}
@SuppressWarnings("unchecked")
private T self() {
return (T) this;
}
}

View File

@ -0,0 +1,51 @@
package io.netty.buffer.b2;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.CountDownLatch;
import static io.netty.buffer.b2.Statics.*;
import static java.lang.invoke.MethodHandles.*;
class RendezvousSend<T extends Rc<T>> implements Send<T> {
private static final VarHandle RECEIVED = findVarHandle(lookup(), RendezvousSend.class, "received", boolean.class);
private final CountDownLatch recipientLatch;
private final CountDownLatch sentLatch;
private final Drop<T> drop;
private final T outgoing;
@SuppressWarnings("unused")
private volatile boolean received; // Accessed via VarHandle
private volatile Thread recipient;
private volatile T incoming;
RendezvousSend(T outgoing, Drop<T> drop) {
this.outgoing = outgoing;
this.drop = drop;
recipientLatch = new CountDownLatch(1);
sentLatch = new CountDownLatch(1);
}
@Override
public T receive() {
if (!RECEIVED.compareAndSet(this, false, true)) {
throw new IllegalStateException("This object has already been received.");
}
recipient = Thread.currentThread();
recipientLatch.countDown();
try {
sentLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return incoming;
}
void finish() throws InterruptedException {
if (incoming != null) {
throw new IllegalStateException("Already sent.");
}
recipientLatch.await();
incoming = outgoing.copy(recipient, drop);
sentLatch.countDown();
}
}

View File

@ -0,0 +1,6 @@
package io.netty.buffer.b2;
@FunctionalInterface
public interface Send<T extends Rc<T>> {
T receive();
}

View File

@ -0,0 +1,64 @@
package io.netty.buffer.b2;
import jdk.incubator.foreign.MemorySegment;
import java.lang.invoke.VarHandle;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import static java.lang.invoke.MethodHandles.*;
abstract class SizeClassedMemoryPool implements Allocator, Drop<ByteBuf> {
private static final VarHandle CLOSE = Statics.findVarHandle(lookup(), SizeClassedMemoryPool.class, "closed", boolean.class);
private final ConcurrentHashMap<Long, ConcurrentLinkedQueue<Send<ByteBuf>>> pool;
@SuppressWarnings("unused")
private volatile boolean closed;
protected SizeClassedMemoryPool() {
pool = new ConcurrentHashMap<>();
}
@Override
public ByteBuf allocate(long size) {
var sizeClassPool = getSizeClassPool(size);
Send<ByteBuf> send = sizeClassPool.poll();
if (send != null) {
return send.receive();
}
return new ByteBuf(createMemorySegment(size), this);
}
protected abstract MemorySegment createMemorySegment(long size);
@Override
public void close() {
if (CLOSE.compareAndSet(this, false, true)) {
pool.forEach((k,v) -> {
Send<ByteBuf> send;
while ((send = v.poll()) != null) {
dispose(send.receive());
}
});
}
}
@Override
public void drop(ByteBuf buf) {
var sizeClassPool = getSizeClassPool(buf.size());
sizeClassPool.offer(buf.send());
if (closed) {
var send = sizeClassPool.poll();
if (send != null) {
dispose(send.receive());
}
}
}
private ConcurrentLinkedQueue<Send<ByteBuf>> getSizeClassPool(long size) {
return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>());
}
private static void dispose(ByteBuf buf) {
ByteBuf.SEGMENT_CLOSE.drop(buf);
}
}

View File

@ -0,0 +1,27 @@
package io.netty.buffer.b2;
import io.netty.util.internal.PlatformDependent;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.invoke.VarHandle;
import java.lang.reflect.Field;
interface Statics {
static VarHandle findVarHandle(Lookup lookup, Class<?> recv, String name, Class<?> type) {
try {
return lookup.findVarHandle(recv, name, type);
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
static long fieldOffset(String className, String fieldName) {
try {
Class<?> cls = Class.forName(className);
Field field = cls.getDeclaredField(fieldName);
return PlatformDependent.objectFieldOffset(field);
} catch (Exception e) {
throw new ExceptionInInitializerError(e);
}
}
}

View File

@ -0,0 +1,28 @@
package io.netty.buffer.b2;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import static io.netty.buffer.b2.Statics.*;
import static java.lang.invoke.MethodHandles.*;
class TransferSend<T extends Rc<T>> implements Send<T> {
private static final VarHandle RECEIVED = findVarHandle(lookup(), TransferSend.class, "received", boolean.class);
private final T outgoing;
private final Drop<T> drop;
@SuppressWarnings("unused")
private volatile boolean received; // Accessed via VarHandle
TransferSend(T outgoing, Drop<T> drop) {
this.outgoing = outgoing;
this.drop = drop;
}
@Override
public T receive() {
if (!RECEIVED.compareAndSet(this, false, true)) {
throw new IllegalStateException("This object has already been received.");
}
return outgoing.copy(Thread.currentThread(), drop);
}
}

View File

@ -0,0 +1,151 @@
package io.netty.buffer.b2;
import org.junit.Test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
public abstract class ByteBufTest {
protected abstract Allocator createAllocator();
@Test
public void allocateAndAccessingBuffer() {
try (Allocator allocator = createAllocator();
ByteBuf buf = allocator.allocate(8)) {
buf.put((byte) 1);
buf.put((byte) 2);
try (ByteBuf inner = buf.acquire()) {
inner.put((byte) 3);
inner.put((byte) 4);
inner.put((byte) 5);
inner.put((byte) 6);
inner.put((byte) 7);
inner.put((byte) 8);
try {
inner.put((byte) 9);
fail("Expected to be out of bounds.");
} catch (RuntimeException re) {
assertThat(re.getMessage(), containsString("bound"));
}
try {
buf.put((byte) 9);
fail("Expected to be out of bounds.");
} catch (RuntimeException re) {
assertThat(re.getMessage(), containsString("bound"));
}
}
assertEquals((byte) 1, buf.get());
assertEquals((byte) 2, buf.get());
assertEquals((byte) 3, buf.get());
assertEquals((byte) 4, buf.get());
assertEquals((byte) 5, buf.get());
assertEquals((byte) 6, buf.get());
assertEquals((byte) 7, buf.get());
assertEquals((byte) 8, buf.get());
try {
assertEquals((byte) 9, buf.get());
fail("Expected to be out of bounds.");
} catch (RuntimeException re) {
assertThat(re.getMessage(), containsString("bound"));
}
assertArrayEquals(new byte[] {1, 2, 3, 4, 5, 6, 7, 8}, buf.debugAsByteArray());
}
}
@Test
public void allocateAndRendesvousWithThread() throws Exception {
try (Allocator allocator = createAllocator()) {
ArrayBlockingQueue<Send<ByteBuf>> queue = new ArrayBlockingQueue<>(10);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (ByteBuf byteBuf = queue.take().receive()) {
return byteBuf.get();
}
});
executor.shutdown();
try (ByteBuf buf = allocator.allocate(8)) {
buf.put((byte) 42);
buf.sendTo(queue::offer);
}
assertEquals((byte) 42, future.get().byteValue());
}
}
@Test
public void allocateAndRendesvousWithThreadViaSyncQueue() throws Exception {
try (Allocator allocator = createAllocator()) {
SynchronousQueue<Send<ByteBuf>> queue = new SynchronousQueue<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (ByteBuf byteBuf = queue.take().receive()) {
return byteBuf.get();
}
});
executor.shutdown();
try (ByteBuf buf = allocator.allocate(8)) {
buf.put((byte) 42);
buf.sendTo(e -> {
try {
queue.put(e);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
});
}
assertEquals((byte) 42, future.get().byteValue());
}
}
@Test
public void allocateAndSendToThread() throws Exception {
try (Allocator allocator = createAllocator()) {
ArrayBlockingQueue<Send<ByteBuf>> queue = new ArrayBlockingQueue<>(10);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (ByteBuf byteBuf = queue.take().receive()) {
return byteBuf.get();
}
});
executor.shutdown();
try (ByteBuf buf = allocator.allocate(8)) {
buf.put((byte) 42);
assertTrue(queue.offer(buf.send()));
}
assertEquals((byte) 42, future.get().byteValue());
}
}
@Test
public void allocateAndSendToThreadViaSyncQueue() throws Exception {
try (Allocator allocator = createAllocator()) {
SynchronousQueue<Send<ByteBuf>> queue = new SynchronousQueue<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (ByteBuf byteBuf = queue.take().receive()) {
return byteBuf.get();
}
});
executor.shutdown();
try (ByteBuf buf = allocator.allocate(8)) {
buf.put((byte) 42);
queue.put(buf.send());
}
assertEquals((byte) 42, future.get().byteValue());
}
}
}

View File

@ -0,0 +1,8 @@
package io.netty.buffer.b2;
public class DirectByteBufTest extends ByteBufTest {
@Override
protected Allocator createAllocator() {
return Allocator.direct();
}
}

View File

@ -0,0 +1,8 @@
package io.netty.buffer.b2;
public class HeapByteBufTest extends ByteBufTest {
@Override
protected Allocator createAllocator() {
return Allocator.heap();
}
}

View File

@ -0,0 +1,8 @@
package io.netty.buffer.b2;
public class PooledDirectByteBufTest extends ByteBufTest {
@Override
protected Allocator createAllocator() {
return Allocator.pooledDirect();
}
}

View File

@ -0,0 +1,8 @@
package io.netty.buffer.b2;
public class PooledHeapByteBufTest extends ByteBufTest {
@Override
protected Allocator createAllocator() {
return Allocator.pooledHeap();
}
}