From ddde3a42d98c1f61b9a2a208858abb525a14e103 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 29 Jul 2020 10:30:03 +0200 Subject: [PATCH] Rename ...b2.ByteBuf -> ...b2.BBuf Motivation: Make it easier to use the existing ByteBuf and the new BBuf in the same source files. To help transitioning between, and comparing, the two APIs. Modification: The new ...b2.ByteBuf class has been renamed to BBuf to avoid clashing with the existing ByteBuf name. Result: It's easier to make use of both classes in the same source files, since both can be imported independently. --- .../java/io/netty/buffer/b2/Allocator.java | 12 +-- .../buffer/b2/{ByteBuf.java => BBuf.java} | 50 ++++++++--- .../buffer/b2/SizeClassedMemoryPool.java | 20 ++--- .../b2/{ByteBufTest.java => BBufTest.java} | 84 +++++++++---------- ...ctByteBufTest.java => DirectBBufTest.java} | 2 +- ...HeapByteBufTest.java => HeapBBufTest.java} | 2 +- ...BufTest.java => PooledDirectBBufTest.java} | 2 +- ...teBufTest.java => PooledHeapBBufTest.java} | 2 +- 8 files changed, 99 insertions(+), 75 deletions(-) rename buffer/src/main/java/io/netty/buffer/b2/{ByteBuf.java => BBuf.java} (62%) rename buffer/src/test/java/io/netty/buffer/b2/{ByteBufTest.java => BBufTest.java} (61%) rename buffer/src/test/java/io/netty/buffer/b2/{DirectByteBufTest.java => DirectBBufTest.java} (71%) rename buffer/src/test/java/io/netty/buffer/b2/{HeapByteBufTest.java => HeapBBufTest.java} (71%) rename buffer/src/test/java/io/netty/buffer/b2/{PooledDirectByteBufTest.java => PooledDirectBBufTest.java} (69%) rename buffer/src/test/java/io/netty/buffer/b2/{PooledHeapByteBufTest.java => PooledHeapBBufTest.java} (70%) diff --git a/buffer/src/main/java/io/netty/buffer/b2/Allocator.java b/buffer/src/main/java/io/netty/buffer/b2/Allocator.java index c896181..34a7c53 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Allocator.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Allocator.java @@ -2,11 +2,11 @@ package io.netty.buffer.b2; import jdk.incubator.foreign.MemorySegment; -import static io.netty.buffer.b2.ByteBuf.*; +import static io.netty.buffer.b2.BBuf.*; public interface Allocator extends AutoCloseable { - ByteBuf allocate(long size); + BBuf allocate(long size); @Override default void close() { @@ -15,9 +15,9 @@ public interface Allocator extends AutoCloseable { static Allocator heap() { return new Allocator() { @Override - public ByteBuf allocate(long size) { + public BBuf allocate(long size) { var segment = MemorySegment.ofArray(new byte[Math.toIntExact(size)]); - return new ByteBuf(segment, SEGMENT_CLOSE); + return new BBuf(segment, SEGMENT_CLOSE); } }; } @@ -25,8 +25,8 @@ public interface Allocator extends AutoCloseable { static Allocator direct() { return new Allocator() { @Override - public ByteBuf allocate(long size) { - return new ByteBuf(MemorySegment.allocateNative(size), SEGMENT_CLOSE); + public BBuf allocate(long size) { + return new BBuf(MemorySegment.allocateNative(size), SEGMENT_CLOSE); } }; } diff --git a/buffer/src/main/java/io/netty/buffer/b2/ByteBuf.java b/buffer/src/main/java/io/netty/buffer/b2/BBuf.java similarity index 62% rename from buffer/src/main/java/io/netty/buffer/b2/ByteBuf.java rename to buffer/src/main/java/io/netty/buffer/b2/BBuf.java index aac6ed3..61d1dbc 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/ByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/BBuf.java @@ -1,5 +1,7 @@ package io.netty.buffer.b2; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.util.internal.PlatformDependent; import jdk.incubator.foreign.MemoryAccess; import jdk.incubator.foreign.MemoryAddress; @@ -7,28 +9,46 @@ import jdk.incubator.foreign.MemorySegment; import java.lang.invoke.VarHandle; -public class ByteBuf extends Rc { - static final Drop NO_DROP = buf -> {}; - static final Drop SEGMENT_CLOSE = buf -> buf.segment.close(); +public class BBuf extends Rc { + static final Drop NO_DROP = buf -> {}; + static final Drop SEGMENT_CLOSE = buf -> buf.segment.close(); private final MemorySegment segment; private final MemoryAddress address; private long read; private long write; - ByteBuf(MemorySegment segment, Drop drop) { + BBuf(MemorySegment segment, Drop drop) { super(drop); this.segment = segment; address = segment.address(); } - public byte get() { + public BBuf readerIndex(long index) { + read = index; + return this; + } + + public BBuf touch() { + return this; + } + + public byte readByte() { return MemoryAccess.getByteAtOffset(address, read++); } - public void put(byte value) { + public void writeByte(byte value) { MemoryAccess.setByteAtOffset(address, write++, value); } + public BBuf setLong(long offset, long value) { + MemoryAccess.setLongAtOffset(address, offset, value); + return this; + } + + public long getLong(long offset) { + return MemoryAccess.getLongAtOffset(address, offset); + } + public void fill(byte value) { segment.fill(value); } @@ -49,25 +69,29 @@ public class ByteBuf extends Rc { return address.segment().toByteArray(); } + public ByteBuf view() { + return Unpooled.wrappedBuffer(getNativeAddress(), Math.toIntExact(size()), false); + } + @Override - protected ByteBuf copy(Thread recipient, Drop drop) { - ByteBuf copy = new ByteBuf(segment.withOwnerThread(recipient), drop); + protected BBuf copy(Thread recipient, Drop drop) { + BBuf copy = new BBuf(segment.withOwnerThread(recipient), drop); copy.read = read; copy.write = write; return copy; } @Override - protected ByteBuf prepareSend() { - ByteBuf outer = this; + protected BBuf prepareSend() { + BBuf outer = this; MemorySegment transferSegment = segment.withOwnerThread(Lazy.TRANSFER_OWNER); - return new ByteBuf(transferSegment, NO_DROP) { + return new BBuf(transferSegment, NO_DROP) { @Override - protected ByteBuf copy(Thread recipient, Drop drop) { + protected BBuf copy(Thread recipient, Drop drop) { Object scope = PlatformDependent.getObject(transferSegment, Lazy.SCOPE); PlatformDependent.putObject(scope, Lazy.OWNER, recipient); VarHandle.fullFence(); - ByteBuf copy = new ByteBuf(transferSegment, drop); + BBuf copy = new BBuf(transferSegment, drop); copy.read = outer.read; copy.write = outer.write; return copy; diff --git a/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java b/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java index 75d1e08..70852b1 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java +++ b/buffer/src/main/java/io/netty/buffer/b2/SizeClassedMemoryPool.java @@ -8,9 +8,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; import static java.lang.invoke.MethodHandles.*; -abstract class SizeClassedMemoryPool implements Allocator, Drop { +abstract class SizeClassedMemoryPool implements Allocator, Drop { private static final VarHandle CLOSE = Statics.findVarHandle(lookup(), SizeClassedMemoryPool.class, "closed", boolean.class); - private final ConcurrentHashMap>> pool; + private final ConcurrentHashMap>> pool; @SuppressWarnings("unused") private volatile boolean closed; @@ -19,13 +19,13 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop { } @Override - public ByteBuf allocate(long size) { + public BBuf allocate(long size) { var sizeClassPool = getSizeClassPool(size); - Send send = sizeClassPool.poll(); + Send send = sizeClassPool.poll(); if (send != null) { return send.receive(); } - return new ByteBuf(createMemorySegment(size), this); + return new BBuf(createMemorySegment(size), this); } protected abstract MemorySegment createMemorySegment(long size); @@ -34,7 +34,7 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop { public void close() { if (CLOSE.compareAndSet(this, false, true)) { pool.forEach((k,v) -> { - Send send; + Send send; while ((send = v.poll()) != null) { dispose(send.receive()); } @@ -43,7 +43,7 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop { } @Override - public void drop(ByteBuf buf) { + public void drop(BBuf buf) { var sizeClassPool = getSizeClassPool(buf.size()); sizeClassPool.offer(buf.send()); if (closed) { @@ -54,11 +54,11 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop { } } - private ConcurrentLinkedQueue> getSizeClassPool(long size) { + private ConcurrentLinkedQueue> getSizeClassPool(long size) { return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>()); } - private static void dispose(ByteBuf buf) { - ByteBuf.SEGMENT_CLOSE.drop(buf); + private static void dispose(BBuf buf) { + BBuf.SEGMENT_CLOSE.drop(buf); } } diff --git a/buffer/src/test/java/io/netty/buffer/b2/ByteBufTest.java b/buffer/src/test/java/io/netty/buffer/b2/BBufTest.java similarity index 61% rename from buffer/src/test/java/io/netty/buffer/b2/ByteBufTest.java rename to buffer/src/test/java/io/netty/buffer/b2/BBufTest.java index e00426b..fb2310f 100644 --- a/buffer/src/test/java/io/netty/buffer/b2/ByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/b2/BBufTest.java @@ -11,45 +11,45 @@ import java.util.concurrent.SynchronousQueue; import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; -public abstract class ByteBufTest { +public abstract class BBufTest { protected abstract Allocator createAllocator(); @Test public void allocateAndAccessingBuffer() { try (Allocator allocator = createAllocator(); - ByteBuf buf = allocator.allocate(8)) { - buf.put((byte) 1); - buf.put((byte) 2); - try (ByteBuf inner = buf.acquire()) { - inner.put((byte) 3); - inner.put((byte) 4); - inner.put((byte) 5); - inner.put((byte) 6); - inner.put((byte) 7); - inner.put((byte) 8); + BBuf buf = allocator.allocate(8)) { + buf.writeByte((byte) 1); + buf.writeByte((byte) 2); + try (BBuf inner = buf.acquire()) { + inner.writeByte((byte) 3); + inner.writeByte((byte) 4); + inner.writeByte((byte) 5); + inner.writeByte((byte) 6); + inner.writeByte((byte) 7); + inner.writeByte((byte) 8); try { - inner.put((byte) 9); + inner.writeByte((byte) 9); fail("Expected to be out of bounds."); } catch (RuntimeException re) { assertThat(re.getMessage(), containsString("bound")); } try { - buf.put((byte) 9); + buf.writeByte((byte) 9); fail("Expected to be out of bounds."); } catch (RuntimeException re) { assertThat(re.getMessage(), containsString("bound")); } } - assertEquals((byte) 1, buf.get()); - assertEquals((byte) 2, buf.get()); - assertEquals((byte) 3, buf.get()); - assertEquals((byte) 4, buf.get()); - assertEquals((byte) 5, buf.get()); - assertEquals((byte) 6, buf.get()); - assertEquals((byte) 7, buf.get()); - assertEquals((byte) 8, buf.get()); + assertEquals((byte) 1, buf.readByte()); + assertEquals((byte) 2, buf.readByte()); + assertEquals((byte) 3, buf.readByte()); + assertEquals((byte) 4, buf.readByte()); + assertEquals((byte) 5, buf.readByte()); + assertEquals((byte) 6, buf.readByte()); + assertEquals((byte) 7, buf.readByte()); + assertEquals((byte) 8, buf.readByte()); try { - assertEquals((byte) 9, buf.get()); + assertEquals((byte) 9, buf.readByte()); fail("Expected to be out of bounds."); } catch (RuntimeException re) { assertThat(re.getMessage(), containsString("bound")); @@ -61,17 +61,17 @@ public abstract class ByteBufTest { @Test public void allocateAndRendesvousWithThread() throws Exception { try (Allocator allocator = createAllocator()) { - ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(10); + ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(10); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(() -> { - try (ByteBuf byteBuf = queue.take().receive()) { - return byteBuf.get(); + try (BBuf byteBuf = queue.take().receive()) { + return byteBuf.readByte(); } }); executor.shutdown(); - try (ByteBuf buf = allocator.allocate(8)) { - buf.put((byte) 42); + try (BBuf buf = allocator.allocate(8)) { + buf.writeByte((byte) 42); buf.sendTo(queue::offer); } @@ -82,17 +82,17 @@ public abstract class ByteBufTest { @Test public void allocateAndRendesvousWithThreadViaSyncQueue() throws Exception { try (Allocator allocator = createAllocator()) { - SynchronousQueue> queue = new SynchronousQueue<>(); + SynchronousQueue> queue = new SynchronousQueue<>(); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(() -> { - try (ByteBuf byteBuf = queue.take().receive()) { - return byteBuf.get(); + try (BBuf byteBuf = queue.take().receive()) { + return byteBuf.readByte(); } }); executor.shutdown(); - try (ByteBuf buf = allocator.allocate(8)) { - buf.put((byte) 42); + try (BBuf buf = allocator.allocate(8)) { + buf.writeByte((byte) 42); buf.sendTo(e -> { try { queue.put(e); @@ -109,17 +109,17 @@ public abstract class ByteBufTest { @Test public void allocateAndSendToThread() throws Exception { try (Allocator allocator = createAllocator()) { - ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(10); + ArrayBlockingQueue> queue = new ArrayBlockingQueue<>(10); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(() -> { - try (ByteBuf byteBuf = queue.take().receive()) { - return byteBuf.get(); + try (BBuf byteBuf = queue.take().receive()) { + return byteBuf.readByte(); } }); executor.shutdown(); - try (ByteBuf buf = allocator.allocate(8)) { - buf.put((byte) 42); + try (BBuf buf = allocator.allocate(8)) { + buf.writeByte((byte) 42); assertTrue(queue.offer(buf.send())); } @@ -130,17 +130,17 @@ public abstract class ByteBufTest { @Test public void allocateAndSendToThreadViaSyncQueue() throws Exception { try (Allocator allocator = createAllocator()) { - SynchronousQueue> queue = new SynchronousQueue<>(); + SynchronousQueue> queue = new SynchronousQueue<>(); ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(() -> { - try (ByteBuf byteBuf = queue.take().receive()) { - return byteBuf.get(); + try (BBuf byteBuf = queue.take().receive()) { + return byteBuf.readByte(); } }); executor.shutdown(); - try (ByteBuf buf = allocator.allocate(8)) { - buf.put((byte) 42); + try (BBuf buf = allocator.allocate(8)) { + buf.writeByte((byte) 42); queue.put(buf.send()); } diff --git a/buffer/src/test/java/io/netty/buffer/b2/DirectByteBufTest.java b/buffer/src/test/java/io/netty/buffer/b2/DirectBBufTest.java similarity index 71% rename from buffer/src/test/java/io/netty/buffer/b2/DirectByteBufTest.java rename to buffer/src/test/java/io/netty/buffer/b2/DirectBBufTest.java index 5a2ff6c..8567ccb 100644 --- a/buffer/src/test/java/io/netty/buffer/b2/DirectByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/b2/DirectBBufTest.java @@ -1,6 +1,6 @@ package io.netty.buffer.b2; -public class DirectByteBufTest extends ByteBufTest { +public class DirectBBufTest extends BBufTest { @Override protected Allocator createAllocator() { return Allocator.direct(); diff --git a/buffer/src/test/java/io/netty/buffer/b2/HeapByteBufTest.java b/buffer/src/test/java/io/netty/buffer/b2/HeapBBufTest.java similarity index 71% rename from buffer/src/test/java/io/netty/buffer/b2/HeapByteBufTest.java rename to buffer/src/test/java/io/netty/buffer/b2/HeapBBufTest.java index 134adbc..92ae927 100644 --- a/buffer/src/test/java/io/netty/buffer/b2/HeapByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/b2/HeapBBufTest.java @@ -1,6 +1,6 @@ package io.netty.buffer.b2; -public class HeapByteBufTest extends ByteBufTest { +public class HeapBBufTest extends BBufTest { @Override protected Allocator createAllocator() { return Allocator.heap(); diff --git a/buffer/src/test/java/io/netty/buffer/b2/PooledDirectByteBufTest.java b/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufTest.java similarity index 69% rename from buffer/src/test/java/io/netty/buffer/b2/PooledDirectByteBufTest.java rename to buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufTest.java index 814ddf4..6c642bd 100644 --- a/buffer/src/test/java/io/netty/buffer/b2/PooledDirectByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/b2/PooledDirectBBufTest.java @@ -1,6 +1,6 @@ package io.netty.buffer.b2; -public class PooledDirectByteBufTest extends ByteBufTest { +public class PooledDirectBBufTest extends BBufTest { @Override protected Allocator createAllocator() { return Allocator.pooledDirect(); diff --git a/buffer/src/test/java/io/netty/buffer/b2/PooledHeapByteBufTest.java b/buffer/src/test/java/io/netty/buffer/b2/PooledHeapBBufTest.java similarity index 70% rename from buffer/src/test/java/io/netty/buffer/b2/PooledHeapByteBufTest.java rename to buffer/src/test/java/io/netty/buffer/b2/PooledHeapBBufTest.java index 5e45aaf..672421c 100644 --- a/buffer/src/test/java/io/netty/buffer/b2/PooledHeapByteBufTest.java +++ b/buffer/src/test/java/io/netty/buffer/b2/PooledHeapBBufTest.java @@ -1,6 +1,6 @@ package io.netty.buffer.b2; -public class PooledHeapByteBufTest extends ByteBufTest { +public class PooledHeapBBufTest extends BBufTest { @Override protected Allocator createAllocator() { return Allocator.pooledHeap();