Rename ...b2.ByteBuf -> ...b2.BBuf

Motivation:
Make it easier to use the existing ByteBuf and the new BBuf in the same source files.
To help transitioning between, and comparing, the two APIs.

Modification:
The new ...b2.ByteBuf class has been renamed to BBuf to avoid clashing with the existing ByteBuf name.

Result:
It's easier to make use of both classes in the same source files, since both can be imported independently.
This commit is contained in:
Chris Vest 2020-07-29 10:30:03 +02:00
parent 57af0f0e26
commit ddde3a42d9
8 changed files with 99 additions and 75 deletions

View File

@ -2,11 +2,11 @@ package io.netty.buffer.b2;
import jdk.incubator.foreign.MemorySegment; import jdk.incubator.foreign.MemorySegment;
import static io.netty.buffer.b2.ByteBuf.*; import static io.netty.buffer.b2.BBuf.*;
public interface Allocator extends AutoCloseable { public interface Allocator extends AutoCloseable {
ByteBuf allocate(long size); BBuf allocate(long size);
@Override @Override
default void close() { default void close() {
@ -15,9 +15,9 @@ public interface Allocator extends AutoCloseable {
static Allocator heap() { static Allocator heap() {
return new Allocator() { return new Allocator() {
@Override @Override
public ByteBuf allocate(long size) { public BBuf allocate(long size) {
var segment = MemorySegment.ofArray(new byte[Math.toIntExact(size)]); var segment = MemorySegment.ofArray(new byte[Math.toIntExact(size)]);
return new ByteBuf(segment, SEGMENT_CLOSE); return new BBuf(segment, SEGMENT_CLOSE);
} }
}; };
} }
@ -25,8 +25,8 @@ public interface Allocator extends AutoCloseable {
static Allocator direct() { static Allocator direct() {
return new Allocator() { return new Allocator() {
@Override @Override
public ByteBuf allocate(long size) { public BBuf allocate(long size) {
return new ByteBuf(MemorySegment.allocateNative(size), SEGMENT_CLOSE); return new BBuf(MemorySegment.allocateNative(size), SEGMENT_CLOSE);
} }
}; };
} }

View File

@ -1,5 +1,7 @@
package io.netty.buffer.b2; package io.netty.buffer.b2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import jdk.incubator.foreign.MemoryAccess; import jdk.incubator.foreign.MemoryAccess;
import jdk.incubator.foreign.MemoryAddress; import jdk.incubator.foreign.MemoryAddress;
@ -7,28 +9,46 @@ import jdk.incubator.foreign.MemorySegment;
import java.lang.invoke.VarHandle; import java.lang.invoke.VarHandle;
public class ByteBuf extends Rc<ByteBuf> { public class BBuf extends Rc<BBuf> {
static final Drop<ByteBuf> NO_DROP = buf -> {}; static final Drop<BBuf> NO_DROP = buf -> {};
static final Drop<ByteBuf> SEGMENT_CLOSE = buf -> buf.segment.close(); static final Drop<BBuf> SEGMENT_CLOSE = buf -> buf.segment.close();
private final MemorySegment segment; private final MemorySegment segment;
private final MemoryAddress address; private final MemoryAddress address;
private long read; private long read;
private long write; private long write;
ByteBuf(MemorySegment segment, Drop<ByteBuf> drop) { BBuf(MemorySegment segment, Drop<BBuf> drop) {
super(drop); super(drop);
this.segment = segment; this.segment = segment;
address = segment.address(); address = segment.address();
} }
public byte get() { public BBuf readerIndex(long index) {
read = index;
return this;
}
public BBuf touch() {
return this;
}
public byte readByte() {
return MemoryAccess.getByteAtOffset(address, read++); return MemoryAccess.getByteAtOffset(address, read++);
} }
public void put(byte value) { public void writeByte(byte value) {
MemoryAccess.setByteAtOffset(address, write++, value); MemoryAccess.setByteAtOffset(address, write++, value);
} }
public BBuf setLong(long offset, long value) {
MemoryAccess.setLongAtOffset(address, offset, value);
return this;
}
public long getLong(long offset) {
return MemoryAccess.getLongAtOffset(address, offset);
}
public void fill(byte value) { public void fill(byte value) {
segment.fill(value); segment.fill(value);
} }
@ -49,25 +69,29 @@ public class ByteBuf extends Rc<ByteBuf> {
return address.segment().toByteArray(); return address.segment().toByteArray();
} }
public ByteBuf view() {
return Unpooled.wrappedBuffer(getNativeAddress(), Math.toIntExact(size()), false);
}
@Override @Override
protected ByteBuf copy(Thread recipient, Drop<ByteBuf> drop) { protected BBuf copy(Thread recipient, Drop<BBuf> drop) {
ByteBuf copy = new ByteBuf(segment.withOwnerThread(recipient), drop); BBuf copy = new BBuf(segment.withOwnerThread(recipient), drop);
copy.read = read; copy.read = read;
copy.write = write; copy.write = write;
return copy; return copy;
} }
@Override @Override
protected ByteBuf prepareSend() { protected BBuf prepareSend() {
ByteBuf outer = this; BBuf outer = this;
MemorySegment transferSegment = segment.withOwnerThread(Lazy.TRANSFER_OWNER); MemorySegment transferSegment = segment.withOwnerThread(Lazy.TRANSFER_OWNER);
return new ByteBuf(transferSegment, NO_DROP) { return new BBuf(transferSegment, NO_DROP) {
@Override @Override
protected ByteBuf copy(Thread recipient, Drop<ByteBuf> drop) { protected BBuf copy(Thread recipient, Drop<BBuf> drop) {
Object scope = PlatformDependent.getObject(transferSegment, Lazy.SCOPE); Object scope = PlatformDependent.getObject(transferSegment, Lazy.SCOPE);
PlatformDependent.putObject(scope, Lazy.OWNER, recipient); PlatformDependent.putObject(scope, Lazy.OWNER, recipient);
VarHandle.fullFence(); VarHandle.fullFence();
ByteBuf copy = new ByteBuf(transferSegment, drop); BBuf copy = new BBuf(transferSegment, drop);
copy.read = outer.read; copy.read = outer.read;
copy.write = outer.write; copy.write = outer.write;
return copy; return copy;

View File

@ -8,9 +8,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import static java.lang.invoke.MethodHandles.*; import static java.lang.invoke.MethodHandles.*;
abstract class SizeClassedMemoryPool implements Allocator, Drop<ByteBuf> { abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
private static final VarHandle CLOSE = Statics.findVarHandle(lookup(), SizeClassedMemoryPool.class, "closed", boolean.class); private static final VarHandle CLOSE = Statics.findVarHandle(lookup(), SizeClassedMemoryPool.class, "closed", boolean.class);
private final ConcurrentHashMap<Long, ConcurrentLinkedQueue<Send<ByteBuf>>> pool; private final ConcurrentHashMap<Long, ConcurrentLinkedQueue<Send<BBuf>>> pool;
@SuppressWarnings("unused") @SuppressWarnings("unused")
private volatile boolean closed; private volatile boolean closed;
@ -19,13 +19,13 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<ByteBuf> {
} }
@Override @Override
public ByteBuf allocate(long size) { public BBuf allocate(long size) {
var sizeClassPool = getSizeClassPool(size); var sizeClassPool = getSizeClassPool(size);
Send<ByteBuf> send = sizeClassPool.poll(); Send<BBuf> send = sizeClassPool.poll();
if (send != null) { if (send != null) {
return send.receive(); return send.receive();
} }
return new ByteBuf(createMemorySegment(size), this); return new BBuf(createMemorySegment(size), this);
} }
protected abstract MemorySegment createMemorySegment(long size); protected abstract MemorySegment createMemorySegment(long size);
@ -34,7 +34,7 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<ByteBuf> {
public void close() { public void close() {
if (CLOSE.compareAndSet(this, false, true)) { if (CLOSE.compareAndSet(this, false, true)) {
pool.forEach((k,v) -> { pool.forEach((k,v) -> {
Send<ByteBuf> send; Send<BBuf> send;
while ((send = v.poll()) != null) { while ((send = v.poll()) != null) {
dispose(send.receive()); dispose(send.receive());
} }
@ -43,7 +43,7 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<ByteBuf> {
} }
@Override @Override
public void drop(ByteBuf buf) { public void drop(BBuf buf) {
var sizeClassPool = getSizeClassPool(buf.size()); var sizeClassPool = getSizeClassPool(buf.size());
sizeClassPool.offer(buf.send()); sizeClassPool.offer(buf.send());
if (closed) { if (closed) {
@ -54,11 +54,11 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<ByteBuf> {
} }
} }
private ConcurrentLinkedQueue<Send<ByteBuf>> getSizeClassPool(long size) { private ConcurrentLinkedQueue<Send<BBuf>> getSizeClassPool(long size) {
return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>()); return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>());
} }
private static void dispose(ByteBuf buf) { private static void dispose(BBuf buf) {
ByteBuf.SEGMENT_CLOSE.drop(buf); BBuf.SEGMENT_CLOSE.drop(buf);
} }
} }

View File

@ -11,45 +11,45 @@ import java.util.concurrent.SynchronousQueue;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
public abstract class ByteBufTest { public abstract class BBufTest {
protected abstract Allocator createAllocator(); protected abstract Allocator createAllocator();
@Test @Test
public void allocateAndAccessingBuffer() { public void allocateAndAccessingBuffer() {
try (Allocator allocator = createAllocator(); try (Allocator allocator = createAllocator();
ByteBuf buf = allocator.allocate(8)) { BBuf buf = allocator.allocate(8)) {
buf.put((byte) 1); buf.writeByte((byte) 1);
buf.put((byte) 2); buf.writeByte((byte) 2);
try (ByteBuf inner = buf.acquire()) { try (BBuf inner = buf.acquire()) {
inner.put((byte) 3); inner.writeByte((byte) 3);
inner.put((byte) 4); inner.writeByte((byte) 4);
inner.put((byte) 5); inner.writeByte((byte) 5);
inner.put((byte) 6); inner.writeByte((byte) 6);
inner.put((byte) 7); inner.writeByte((byte) 7);
inner.put((byte) 8); inner.writeByte((byte) 8);
try { try {
inner.put((byte) 9); inner.writeByte((byte) 9);
fail("Expected to be out of bounds."); fail("Expected to be out of bounds.");
} catch (RuntimeException re) { } catch (RuntimeException re) {
assertThat(re.getMessage(), containsString("bound")); assertThat(re.getMessage(), containsString("bound"));
} }
try { try {
buf.put((byte) 9); buf.writeByte((byte) 9);
fail("Expected to be out of bounds."); fail("Expected to be out of bounds.");
} catch (RuntimeException re) { } catch (RuntimeException re) {
assertThat(re.getMessage(), containsString("bound")); assertThat(re.getMessage(), containsString("bound"));
} }
} }
assertEquals((byte) 1, buf.get()); assertEquals((byte) 1, buf.readByte());
assertEquals((byte) 2, buf.get()); assertEquals((byte) 2, buf.readByte());
assertEquals((byte) 3, buf.get()); assertEquals((byte) 3, buf.readByte());
assertEquals((byte) 4, buf.get()); assertEquals((byte) 4, buf.readByte());
assertEquals((byte) 5, buf.get()); assertEquals((byte) 5, buf.readByte());
assertEquals((byte) 6, buf.get()); assertEquals((byte) 6, buf.readByte());
assertEquals((byte) 7, buf.get()); assertEquals((byte) 7, buf.readByte());
assertEquals((byte) 8, buf.get()); assertEquals((byte) 8, buf.readByte());
try { try {
assertEquals((byte) 9, buf.get()); assertEquals((byte) 9, buf.readByte());
fail("Expected to be out of bounds."); fail("Expected to be out of bounds.");
} catch (RuntimeException re) { } catch (RuntimeException re) {
assertThat(re.getMessage(), containsString("bound")); assertThat(re.getMessage(), containsString("bound"));
@ -61,17 +61,17 @@ public abstract class ByteBufTest {
@Test @Test
public void allocateAndRendesvousWithThread() throws Exception { public void allocateAndRendesvousWithThread() throws Exception {
try (Allocator allocator = createAllocator()) { try (Allocator allocator = createAllocator()) {
ArrayBlockingQueue<Send<ByteBuf>> queue = new ArrayBlockingQueue<>(10); ArrayBlockingQueue<Send<BBuf>> queue = new ArrayBlockingQueue<>(10);
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> { Future<Byte> future = executor.submit(() -> {
try (ByteBuf byteBuf = queue.take().receive()) { try (BBuf byteBuf = queue.take().receive()) {
return byteBuf.get(); return byteBuf.readByte();
} }
}); });
executor.shutdown(); executor.shutdown();
try (ByteBuf buf = allocator.allocate(8)) { try (BBuf buf = allocator.allocate(8)) {
buf.put((byte) 42); buf.writeByte((byte) 42);
buf.sendTo(queue::offer); buf.sendTo(queue::offer);
} }
@ -82,17 +82,17 @@ public abstract class ByteBufTest {
@Test @Test
public void allocateAndRendesvousWithThreadViaSyncQueue() throws Exception { public void allocateAndRendesvousWithThreadViaSyncQueue() throws Exception {
try (Allocator allocator = createAllocator()) { try (Allocator allocator = createAllocator()) {
SynchronousQueue<Send<ByteBuf>> queue = new SynchronousQueue<>(); SynchronousQueue<Send<BBuf>> queue = new SynchronousQueue<>();
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> { Future<Byte> future = executor.submit(() -> {
try (ByteBuf byteBuf = queue.take().receive()) { try (BBuf byteBuf = queue.take().receive()) {
return byteBuf.get(); return byteBuf.readByte();
} }
}); });
executor.shutdown(); executor.shutdown();
try (ByteBuf buf = allocator.allocate(8)) { try (BBuf buf = allocator.allocate(8)) {
buf.put((byte) 42); buf.writeByte((byte) 42);
buf.sendTo(e -> { buf.sendTo(e -> {
try { try {
queue.put(e); queue.put(e);
@ -109,17 +109,17 @@ public abstract class ByteBufTest {
@Test @Test
public void allocateAndSendToThread() throws Exception { public void allocateAndSendToThread() throws Exception {
try (Allocator allocator = createAllocator()) { try (Allocator allocator = createAllocator()) {
ArrayBlockingQueue<Send<ByteBuf>> queue = new ArrayBlockingQueue<>(10); ArrayBlockingQueue<Send<BBuf>> queue = new ArrayBlockingQueue<>(10);
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> { Future<Byte> future = executor.submit(() -> {
try (ByteBuf byteBuf = queue.take().receive()) { try (BBuf byteBuf = queue.take().receive()) {
return byteBuf.get(); return byteBuf.readByte();
} }
}); });
executor.shutdown(); executor.shutdown();
try (ByteBuf buf = allocator.allocate(8)) { try (BBuf buf = allocator.allocate(8)) {
buf.put((byte) 42); buf.writeByte((byte) 42);
assertTrue(queue.offer(buf.send())); assertTrue(queue.offer(buf.send()));
} }
@ -130,17 +130,17 @@ public abstract class ByteBufTest {
@Test @Test
public void allocateAndSendToThreadViaSyncQueue() throws Exception { public void allocateAndSendToThreadViaSyncQueue() throws Exception {
try (Allocator allocator = createAllocator()) { try (Allocator allocator = createAllocator()) {
SynchronousQueue<Send<ByteBuf>> queue = new SynchronousQueue<>(); SynchronousQueue<Send<BBuf>> queue = new SynchronousQueue<>();
ExecutorService executor = Executors.newSingleThreadExecutor(); ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> { Future<Byte> future = executor.submit(() -> {
try (ByteBuf byteBuf = queue.take().receive()) { try (BBuf byteBuf = queue.take().receive()) {
return byteBuf.get(); return byteBuf.readByte();
} }
}); });
executor.shutdown(); executor.shutdown();
try (ByteBuf buf = allocator.allocate(8)) { try (BBuf buf = allocator.allocate(8)) {
buf.put((byte) 42); buf.writeByte((byte) 42);
queue.put(buf.send()); queue.put(buf.send());
} }

View File

@ -1,6 +1,6 @@
package io.netty.buffer.b2; package io.netty.buffer.b2;
public class DirectByteBufTest extends ByteBufTest { public class DirectBBufTest extends BBufTest {
@Override @Override
protected Allocator createAllocator() { protected Allocator createAllocator() {
return Allocator.direct(); return Allocator.direct();

View File

@ -1,6 +1,6 @@
package io.netty.buffer.b2; package io.netty.buffer.b2;
public class HeapByteBufTest extends ByteBufTest { public class HeapBBufTest extends BBufTest {
@Override @Override
protected Allocator createAllocator() { protected Allocator createAllocator() {
return Allocator.heap(); return Allocator.heap();

View File

@ -1,6 +1,6 @@
package io.netty.buffer.b2; package io.netty.buffer.b2;
public class PooledDirectByteBufTest extends ByteBufTest { public class PooledDirectBBufTest extends BBufTest {
@Override @Override
protected Allocator createAllocator() { protected Allocator createAllocator() {
return Allocator.pooledDirect(); return Allocator.pooledDirect();

View File

@ -1,6 +1,6 @@
package io.netty.buffer.b2; package io.netty.buffer.b2;
public class PooledHeapByteBufTest extends ByteBufTest { public class PooledHeapBBufTest extends BBufTest {
@Override @Override
protected Allocator createAllocator() { protected Allocator createAllocator() {
return Allocator.pooledHeap(); return Allocator.pooledHeap();