Flesh out the Buf interface and simplify the generics

The `Buf` type no longer needs any other qualifying type annotations to specify.
It is itself a non-parameterised type now, which makes it a lot simpler for people to use.
It's wealth of accessor methods have also been expanded, and tests have been added.
There is, however, still a fair number of methods to add before it resembles a `ByteBuf` replacement.
This commit is contained in:
Chris Vest 2020-08-28 12:17:41 +02:00
parent 35bde75d52
commit 09f9b5a158
13 changed files with 555 additions and 204 deletions

View File

@ -27,7 +27,7 @@ public interface Allocator extends AutoCloseable {
* @param size The size of {@link Buf} to allocate.
* @return The newly allocated {@link Buf}.
*/
<T extends Buf<T>> T allocate(long size);
Buf allocate(long size);
/**
* Close this allocator, freeing all of its internal resources. It is not specified if the allocator can still be

View File

@ -1,13 +1,11 @@
package io.netty.buffer.b2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import jdk.incubator.foreign.MemoryAccess;
import jdk.incubator.foreign.MemorySegment;
import static io.netty.buffer.b2.Statics.*;
public class BBuf extends RcSupport<BBuf> implements Buf<BBuf> {
public class BBuf extends RcSupport<Buf,BBuf> implements Buf {
static final Drop<BBuf> NO_DROP = buf -> {};
static final Drop<BBuf> SEGMENT_CLOSE = buf -> buf.segment.close();
static final Drop<BBuf> SEGMENT_CLOSE_NATIVE = buf -> {
@ -23,6 +21,11 @@ public class BBuf extends RcSupport<BBuf> implements Buf<BBuf> {
this.segment = segment;
}
@Override
public int capacity() {
return (int) segment.byteSize();
}
@Override
public int readerIndex() {
return read;
@ -47,28 +50,29 @@ public class BBuf extends RcSupport<BBuf> implements Buf<BBuf> {
return this;
}
public byte readByte() {
return MemoryAccess.getByteAtOffset(segment, read++);
@Override
public int readableBytes() {
return writerIndex() - readerIndex();
}
public void writeByte(byte value) {
MemoryAccess.setByteAtOffset(segment, write++, value);
@Override
public int writableBytes() {
return capacity() - writerIndex();
}
public BBuf setLong(int offset, long value) {
MemoryAccess.setLongAtOffset(segment, offset, value);
@Override
public Buf fill(byte value) {
segment.fill(value);
return this;
}
public long getLong(int offset) {
return MemoryAccess.getLongAtOffset(segment, offset);
@Override
public byte[] copy() {
return segment.toByteArray();
}
public void fill(byte value) {
segment.fill(value);
}
long getNativeAddress() {
@Override
public long getNativeAddress() {
try {
return segment.address().toRawLongValue();
} catch (UnsupportedOperationException e) {
@ -76,16 +80,104 @@ public class BBuf extends RcSupport<BBuf> implements Buf<BBuf> {
}
}
public long size() {
return segment.byteSize();
@Override
public byte readByte() {
byte value = MemoryAccess.getByteAtOffset(segment, read);
read += 1;
return value;
}
public byte[] debugAsByteArray() {
return segment.toByteArray();
@Override
public byte readByte(int index) {
return MemoryAccess.getByteAtOffset(segment, index);
}
public ByteBuf view() {
return Unpooled.wrappedBuffer(getNativeAddress(), Math.toIntExact(size()), false);
@Override
public Buf writeByte(byte value) {
MemoryAccess.setByteAtOffset(segment, write, value);
write += 1;
return this;
}
@Override
public Buf writeByte(int index, byte value) {
MemoryAccess.setByteAtOffset(segment, index, value);
return this;
}
@Override
public long readLong() {
long value = MemoryAccess.getLongAtOffset(segment, read);
read += Long.BYTES;
return value;
}
@Override
public long readLong(int offset) {
return MemoryAccess.getLongAtOffset(segment, offset);
}
@Override
public Buf writeLong(long value) {
MemoryAccess.setLongAtOffset(segment, write, value);
write += Long.BYTES;
return this;
}
@Override
public Buf writeLong(int offset, long value) {
MemoryAccess.setLongAtOffset(segment, offset, value);
return this;
}
@Override
public int readInt() {
int value = MemoryAccess.getIntAtOffset(segment, read);
read += Integer.BYTES;
return value;
}
@Override
public int readInt(int offset) {
return MemoryAccess.getIntAtOffset(segment, offset);
}
@Override
public Buf writeInt(int value) {
MemoryAccess.setIntAtOffset(segment, write, value);
write += Integer.BYTES;
return this;
}
@Override
public Buf writeInt(int offset, int value) {
MemoryAccess.setIntAtOffset(segment, offset, value);
return this;
}
@Override
public short readShort() {
short value = MemoryAccess.getShortAtOffset(segment, read);
read += Short.BYTES;
return value;
}
@Override
public short readShort(int offset) {
return MemoryAccess.getShortAtOffset(segment, offset);
}
@Override
public Buf writeShort(short value) {
MemoryAccess.setShortAtOffset(segment, write, value);
write += Short.BYTES;
return this;
}
@Override
public Buf writeShort(int offset, short value) {
MemoryAccess.setShortAtOffset(segment, offset, value);
return this;
}
@Override

View File

@ -2,9 +2,15 @@ package io.netty.buffer.b2;
/**
* A reference counted buffer API with separate reader and writer indexes.
* @param <T> The concrete runtime buffer type.
*/
public interface Buf<T extends Buf<T>> extends Rc<T> {
public interface Buf extends Rc<Buf> {
/**
* The capacity of this buffer, that is, the maximum number of bytes it can contain.
*
* @return The capacity in bytes.
*/
int capacity();
/**
* Get the current reader index. The next read will happen from this byte index into the buffer.
*
@ -17,8 +23,10 @@ public interface Buf<T extends Buf<T>> extends Rc<T> {
*
* @param index The reader index to set.
* @return This Buf.
* @throws IndexOutOfBoundsException if the specified {@code index} is less than zero or greater than the current
* {@link #writerIndex()}.
*/
T readerIndex(int index);
Buf readerIndex(int index);
/**
* Get the current writer index. The next write will happen at this byte index into the byffer.
@ -32,6 +40,191 @@ public interface Buf<T extends Buf<T>> extends Rc<T> {
*
* @param index The writer index to set.
* @return This Buf.
* @throws IndexOutOfBoundsException if the specified {@code index} is less than the current {@link #readerIndex()}
* or greater than {@link #capacity()}.
*/
T writerIndex(int index);
Buf writerIndex(int index);
/**
* Returns the number of readable bytes which is equal to {@code (writerIndex() - readerIndex())}.
*/
int readableBytes();
/**
* Returns the number of writable bytes which is equal to {@code (capacity() - writerIndex())}.
*/
int writableBytes();
/**
* Fill the buffer with the given byte value. This method does not respect the {@link #readerIndex()} or {@link
* #writerIndex()}, but copies the full capacity of the buffer. The {@link #readerIndex()} and {@link
* #writerIndex()} are not modified.
*
* @param value The byte value to write at every index in the buffer.
* @return This Buf.
*/
Buf fill(byte value);
/**
* Create a byte array, and fill it with the complete contents of this buffer. This method does not respect the
* {@link #readerIndex()} or {@link #writerIndex()}, but copies the full capacity of the buffer. The {@link
* #readerIndex()} and {@link #writerIndex()} are not modified.
*
* @return A byte array that contains a copy of the contents of this buffer.
*/
byte[] copy();
/**
* Give the native memory address backing this buffer, or return 0 if this is a heap backed buffer.
* @return The native memory address, if any, otherwise 0.
*/
long getNativeAddress();
/**
* Get the byte value at the current {@link #readerIndex()} and increases the index by 1.
*
* @return The byte value at the current reader index.
* @throws IndexOutOfBoundsException If {@link #readableBytes} is less than 1.
*/
byte readByte();
/**
* Get the byte value at the given index. The {@link #readerIndex()} is not modified.
*
* @param index The absolute index into this buffer to read from.
* @return The byte value at the given index.
* @throws IndexOutOfBoundsException if the given index is out of bounds of the buffer, that is, less than 0 or
* greater than or equal to {@link #capacity()}.
*/
byte readByte(int index);
/**
* Set the given byte value at the current {@link #writerIndex()} and increases the index by 1.
*
* @param value The byte value to write.
* @return This Buf.
*/
Buf writeByte(byte value);
/**
* Set the given byte value at the given index. The {@link #writerIndex()} is not modified.
*
* @param index The byte value to write.
* @param value The absolute index into this buffer to write to.
* @return This Buf.
* @throws IndexOutOfBoundsException if the given index is out of bounds of the buffer, that is, less than 0 or
* greater than or equal to {@link #capacity()}.
*/
Buf writeByte(int index, byte value);
/**
* Get the long value at the current {@link #readerIndex()} and increases the index by {@link Long#BYTES}.
*
* @return The long value at the current reader index.
* @throws IndexOutOfBoundsException If {@link #readableBytes} is less than {@link Long#BYTES}.
*/
long readLong();
/**
* Get the long value at the given index. The {@link #readerIndex()} is not modified.
*
* @param index The absolute index into this buffer to read from.
* @return The long value at the given index.
* @throws IndexOutOfBoundsException if the given index is out of bounds of the buffer, that is, less than 0 or
* greater than or equal to {@link #capacity()}.
*/
long readLong(int index);
/**
* Set the given long value at the current {@link #writerIndex()} and increases the index by {@link Long#BYTES}.
*
* @param value The long value to write.
* @return This Buf.
*/
Buf writeLong(long value);
/**
* Set the given long value at the given index. The {@link #writerIndex()} is not modified.
*
* @param index The long value to write.
* @param value The absolute index into this buffer to write to.
* @return This Buf.
* @throws IndexOutOfBoundsException if the given index is out of bounds of the buffer, that is, less than 0 or
* greater than or equal to {@link #capacity()}.
*/
Buf writeLong(int index, long value);
/**
* Get the int value at the current {@link #readerIndex()} and increases the index by {@link Integer#BYTES}.
*
* @return The int value at the current reader index.
* @throws IndexOutOfBoundsException If {@link #readableBytes} is less than {@link Integer#BYTES}.
*/
int readInt();
/**
* Get the int value at the given index. The {@link #readerIndex()} is not modified.
*
* @param index The absolute index into this buffer to read from.
* @return The int value at the given index.
* @throws IndexOutOfBoundsException if the given index is out of bounds of the buffer, that is, less than 0 or
* greater than or equal to {@link #capacity()}.
*/
int readInt(int index);
/**
* Set the given int value at the current {@link #writerIndex()} and increases the index by {@link Integer#BYTES}.
*
* @param value The int value to write.
* @return This Buf.
*/
Buf writeInt(int value);
/**
* Set the given int value at the given index. The {@link #writerIndex()} is not modified.
*
* @param index The int value to write.
* @param value The absolute index into this buffer to write to.
* @return This Buf.
* @throws IndexOutOfBoundsException if the given index is out of bounds of the buffer, that is, less than 0 or
* greater than or equal to {@link #capacity()}.
*/
Buf writeInt(int index, int value);
/**
* Get the short value at the current {@link #readerIndex()} and increases the index by {@link Short#BYTES}.
*
* @return The short value at the current reader index.
* @throws IndexOutOfBoundsException If {@link #readableBytes} is less than {@link Short#BYTES}.
*/
short readShort();
/**
* Get the short value at the given index. The {@link #readerIndex()} is not modified.
*
* @param index The absolute index into this buffer to read from.
* @return The short value at the given index.
* @throws IndexOutOfBoundsException if the given index is out of bounds of the buffer, that is, less than 0 or
* greater than or equal to {@link #capacity()}.
*/
short readShort(int index);
/**
* Set the given short value at the current {@link #writerIndex()} and increases the index by {@link Short#BYTES}.
*
* @param value The short value to write.
* @return This Buf.
*/
Buf writeShort(short value);
/**
* Set the given short value at the given index. The {@link #writerIndex()} is not modified.
*
* @param index The short value to write.
* @param value The absolute index into this buffer to write to.
* @return This Buf.
* @throws IndexOutOfBoundsException if the given index is out of bounds of the buffer, that is, less than 0 or
* greater than or equal to {@link #capacity()}.
*/
Buf writeShort(int index, short value);
}

View File

@ -4,12 +4,12 @@ import java.util.function.Consumer;
/**
* The Drop interface is used by {@link Rc} instances to implement their resource disposal mechanics. The {@link
* #drop(Rc)} method will be called by the Rc when their last reference is closed.
* #drop(Object)} method will be called by the Rc when their last reference is closed.
*
* @param <T>
*/
@FunctionalInterface
public interface Drop<T extends Rc<T>> extends Consumer<T> {
public interface Drop<T> extends Consumer<T> {
/**
* Dispose of the resources in the given Rc.
*

View File

@ -7,7 +7,7 @@ package io.netty.buffer.b2;
* @param <T> The concrete type of {@link Rc} that is owned.
*/
@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
public interface Owned<T extends Rc<T>> {
public interface Owned<T> {
/**
* Transfer the ownership of the owned Rc, to the given recipient thread. The owned Rc is invalidated but without
* disposing of its internal state. Then a new Rc with the given owner is produced in its stead.

View File

@ -11,9 +11,9 @@ import java.util.function.Consumer;
* ideally), then the resource is desposed of, or released, or returned to the pool it came from. The precise action is
* implemented by the {@link Drop} instance given as an argument to the Rc constructor.
*
* @param <T> The concrete subtype.
* @param <I> The concrete subtype.
*/
public interface Rc<T extends Rc<T>> extends AutoCloseable {
public interface Rc<I extends Rc<I>> extends AutoCloseable {
/**
* Increment the reference count.
* <p>
@ -21,7 +21,7 @@ public interface Rc<T extends Rc<T>> extends AutoCloseable {
*
* @return This Rc instance.
*/
T acquire();
I acquire();
/**
* Decrement the reference count, and despose of the resource if the last reference is closed.
@ -41,7 +41,7 @@ public interface Rc<T extends Rc<T>> extends AutoCloseable {
* @param consumer The consumer encodes the mechanism by which the recipient recieves the Rc instance.
* @throws InterruptedException If this thread was interrupted
*/
void sendTo(Consumer<Send<T>> consumer) throws InterruptedException;
void sendTo(Consumer<Send<I>> consumer) throws InterruptedException;
/**
* Send this Rc instance to another Thread, transferring the ownership to the recipient. This method can be used
@ -53,5 +53,5 @@ public interface Rc<T extends Rc<T>> extends AutoCloseable {
* @implNote Not possible without hacks because we need the receiving thread in order to set the new owner in the
* currently owning thread.
*/
Send<T> send();
Send<I> send();
}

View File

@ -2,7 +2,7 @@ package io.netty.buffer.b2;
import java.util.function.Consumer;
public abstract class RcSupport<T extends RcSupport<T> & Owned<T>> implements Rc<T>, Owned<T> {
public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> implements Rc<I>, Owned<T> {
private int acquires; // Closed if negative.
private final Drop<T> drop;
@ -18,7 +18,7 @@ public abstract class RcSupport<T extends RcSupport<T> & Owned<T>> implements Rc
* @return This Rc instance.
*/
@Override
public T acquire() {
public I acquire() {
if (acquires < 0) {
throw new IllegalStateException("Resource is closed.");
}
@ -39,7 +39,7 @@ public abstract class RcSupport<T extends RcSupport<T> & Owned<T>> implements Rc
throw new IllegalStateException("Double-free: Already closed and dropped.");
}
if (acquires == 0) {
drop.drop(self());
drop.drop(impl());
}
acquires--;
}
@ -53,8 +53,8 @@ public abstract class RcSupport<T extends RcSupport<T> & Owned<T>> implements Rc
* @throws InterruptedException If this thread was interrupted
*/
@Override
public void sendTo(Consumer<Send<T>> consumer) throws InterruptedException {
var send = new RendezvousSend<>(self(), drop);
public void sendTo(Consumer<Send<I>> consumer) throws InterruptedException {
var send = new RendezvousSend<I,T>(impl(), drop);
consumer.accept(send);
send.finish();
acquires = -2; // close without dropping (also ignore future double-free attempts)
@ -71,9 +71,9 @@ public abstract class RcSupport<T extends RcSupport<T> & Owned<T>> implements Rc
* currently owning thread.
*/
@Override
public Send<T> send() {
public Send<I> send() {
acquires = -2; // close without dropping (also ignore future double-free attempts)
return new TransferSend<>(prepareSend(), drop);
return new TransferSend<I,T>(prepareSend(), drop);
}
/**
@ -85,11 +85,16 @@ public abstract class RcSupport<T extends RcSupport<T> & Owned<T>> implements Rc
* @return This Rc instance in a deactivated state.
*/
protected T prepareSend() {
return self();
return impl();
}
@SuppressWarnings("unchecked")
private T self() {
private I self() {
return (I) this;
}
@SuppressWarnings("unchecked")
private T impl() {
return (T) this;
}
}

View File

@ -6,7 +6,7 @@ import java.util.concurrent.CountDownLatch;
import static io.netty.buffer.b2.Statics.*;
import static java.lang.invoke.MethodHandles.*;
class RendezvousSend<T extends Rc<T> & Owned<T>> implements Send<T> {
class RendezvousSend<I extends Rc<I>, T extends Rc<I> & Owned<T>> implements Send<I> {
private static final VarHandle RECEIVED = findVarHandle(lookup(), RendezvousSend.class, "received", boolean.class);
private final CountDownLatch recipientLatch;
private final CountDownLatch sentLatch;
@ -15,7 +15,7 @@ class RendezvousSend<T extends Rc<T> & Owned<T>> implements Send<T> {
@SuppressWarnings("unused")
private volatile boolean received; // Accessed via VarHandle
private volatile Thread recipient;
private volatile T incoming;
private volatile I incoming;
RendezvousSend(T outgoing, Drop<T> drop) {
this.outgoing = outgoing;
@ -25,7 +25,7 @@ class RendezvousSend<T extends Rc<T> & Owned<T>> implements Send<T> {
}
@Override
public T receive() {
public I receive() {
if (!RECEIVED.compareAndSet(this, false, true)) {
throw new IllegalStateException("This object has already been received.");
}
@ -44,8 +44,9 @@ class RendezvousSend<T extends Rc<T> & Owned<T>> implements Send<T> {
throw new IllegalStateException("Already sent.");
}
recipientLatch.await();
incoming = outgoing.transferOwnership(recipient, drop);
drop.accept(incoming);
var transferred = outgoing.transferOwnership(recipient, drop);
incoming = (I) transferred;
drop.accept(transferred);
sentLatch.countDown();
}
}

View File

@ -11,7 +11,7 @@ import static java.lang.invoke.MethodHandles.*;
abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
private static final VarHandle CLOSE = Statics.findVarHandle(lookup(), SizeClassedMemoryPool.class, "closed", boolean.class);
private final ConcurrentHashMap<Long, ConcurrentLinkedQueue<Send<BBuf>>> pool;
private final ConcurrentHashMap<Long, ConcurrentLinkedQueue<Send<Buf>>> pool;
private final Drop<BBuf> disposer;
@SuppressWarnings("unused")
private volatile boolean closed;
@ -22,9 +22,9 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
}
@Override
public BBuf allocate(long size) {
public Buf allocate(long size) {
var sizeClassPool = getSizeClassPool(size);
Send<BBuf> send = sizeClassPool.poll();
Send<Buf> send = sizeClassPool.poll();
if (send != null) {
return send.receive();
}
@ -48,7 +48,7 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
if (CLOSE.compareAndSet(this, false, true)) {
var capturedExceptions = new ArrayList<Exception>(4);
pool.forEach((k,v) -> {
Send<BBuf> send;
Send<Buf> send;
while ((send = v.poll()) != null) {
try {
dispose(send.receive());
@ -67,7 +67,7 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
@Override
public void drop(BBuf buf) {
var sizeClassPool = getSizeClassPool(buf.size());
var sizeClassPool = getSizeClassPool(buf.capacity());
sizeClassPool.offer(buf.send());
if (closed) {
var send = sizeClassPool.poll();
@ -77,11 +77,11 @@ abstract class SizeClassedMemoryPool implements Allocator, Drop<BBuf> {
}
}
private ConcurrentLinkedQueue<Send<BBuf>> getSizeClassPool(long size) {
private ConcurrentLinkedQueue<Send<Buf>> getSizeClassPool(long size) {
return pool.computeIfAbsent(size, k -> new ConcurrentLinkedQueue<>());
}
private void dispose(BBuf buf) {
disposer.drop(buf);
private void dispose(Buf buf) {
disposer.drop((BBuf) buf);
}
}

View File

@ -5,7 +5,7 @@ import java.lang.invoke.VarHandle;
import static io.netty.buffer.b2.Statics.*;
import static java.lang.invoke.MethodHandles.*;
class TransferSend<T extends Rc<T> & Owned<T>> implements Send<T> {
class TransferSend<I extends Rc<I>, T extends Rc<I> & Owned<T>> implements Send<I> {
private static final VarHandle RECEIVED = findVarHandle(lookup(), TransferSend.class, "received", boolean.class);
private final T outgoing;
private final Drop<T> drop;
@ -18,12 +18,12 @@ class TransferSend<T extends Rc<T> & Owned<T>> implements Send<T> {
}
@Override
public T receive() {
public I receive() {
if (!RECEIVED.compareAndSet(this, false, true)) {
throw new IllegalStateException("This object has already been received.");
}
var copy = outgoing.transferOwnership(Thread.currentThread(), drop);
drop.accept(copy);
return copy;
return (I) copy;
}
}

View File

@ -1,10 +1,11 @@
package io.netty.buffer.b2;
import org.junit.After;
import org.junit.AssumptionViolatedException;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.nio.BufferUnderflowException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -17,218 +18,277 @@ import static org.junit.Assert.*;
public abstract class BBufTest {
protected abstract Allocator createAllocator();
private Allocator allocator;
private Buf buf;
@Before
public void setUp() {
allocator = createAllocator();
buf = allocator.allocate(8);
}
@After
public void tearDown() {
buf.close();
allocator.close();
}
@Test
public void allocateAndAccessingBuffer() {
try (Allocator allocator = createAllocator();
BBuf buf = allocator.allocate(8)) {
buf.writeByte((byte) 1);
buf.writeByte((byte) 2);
try (BBuf inner = buf.acquire()) {
inner.writeByte((byte) 3);
inner.writeByte((byte) 4);
inner.writeByte((byte) 5);
inner.writeByte((byte) 6);
inner.writeByte((byte) 7);
inner.writeByte((byte) 8);
try {
inner.writeByte((byte) 9);
fail("Expected to be out of bounds.");
} catch (RuntimeException re) {
assertThat(re.getMessage(), containsString("bound"));
}
try {
buf.writeByte((byte) 9);
fail("Expected to be out of bounds.");
} catch (RuntimeException re) {
assertThat(re.getMessage(), containsString("bound"));
}
}
assertEquals((byte) 1, buf.readByte());
assertEquals((byte) 2, buf.readByte());
assertEquals((byte) 3, buf.readByte());
assertEquals((byte) 4, buf.readByte());
assertEquals((byte) 5, buf.readByte());
assertEquals((byte) 6, buf.readByte());
assertEquals((byte) 7, buf.readByte());
assertEquals((byte) 8, buf.readByte());
buf.writeByte((byte) 1);
buf.writeByte((byte) 2);
try (Buf inner = buf.acquire()) {
inner.writeByte((byte) 3);
inner.writeByte((byte) 4);
inner.writeByte((byte) 5);
inner.writeByte((byte) 6);
inner.writeByte((byte) 7);
inner.writeByte((byte) 8);
try {
assertEquals((byte) 9, buf.readByte());
inner.writeByte((byte) 9);
fail("Expected to be out of bounds.");
} catch (RuntimeException re) {
assertThat(re.getMessage(), containsString("bound"));
}
try {
buf.writeByte((byte) 9);
fail("Expected to be out of bounds.");
} catch (RuntimeException re) {
assertThat(re.getMessage(), containsString("bound"));
}
assertArrayEquals(new byte[] {1, 2, 3, 4, 5, 6, 7, 8}, buf.debugAsByteArray());
}
assertEquals((byte) 1, buf.readByte());
assertEquals((byte) 2, buf.readByte());
assertEquals((byte) 3, buf.readByte());
assertEquals((byte) 4, buf.readByte());
assertEquals((byte) 5, buf.readByte());
assertEquals((byte) 6, buf.readByte());
assertEquals((byte) 7, buf.readByte());
assertEquals((byte) 8, buf.readByte());
try {
assertEquals((byte) 9, buf.readByte());
fail("Expected to be out of bounds.");
} catch (RuntimeException re) {
assertThat(re.getMessage(), containsString("bound"));
}
assertArrayEquals(new byte[] {1, 2, 3, 4, 5, 6, 7, 8}, buf.copy());
}
@Test
public void allocateAndRendesvousWithThread() throws Exception {
try (Allocator allocator = createAllocator()) {
ArrayBlockingQueue<Send<BBuf>> queue = new ArrayBlockingQueue<>(10);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (BBuf byteBuf = queue.take().receive()) {
return byteBuf.readByte();
}
});
executor.shutdown();
try (BBuf buf = allocator.allocate(8)) {
buf.writeByte((byte) 42);
buf.sendTo(queue::offer);
ArrayBlockingQueue<Send<Buf>> queue = new ArrayBlockingQueue<>(10);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (Buf byteBuf = queue.take().receive()) {
return byteBuf.readByte();
}
});
executor.shutdown();
assertEquals((byte) 42, future.get().byteValue());
try (Buf buf = allocator.allocate(8)) {
buf.writeByte((byte) 42);
buf.sendTo(queue::offer);
}
assertEquals((byte) 42, future.get().byteValue());
}
@Test
public void allocateAndRendesvousWithThreadViaSyncQueue() throws Exception {
try (Allocator allocator = createAllocator()) {
SynchronousQueue<Send<BBuf>> queue = new SynchronousQueue<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (BBuf byteBuf = queue.take().receive()) {
return byteBuf.readByte();
SynchronousQueue<Send<Buf>> queue = new SynchronousQueue<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (Buf byteBuf = queue.take().receive()) {
return byteBuf.readByte();
}
});
executor.shutdown();
try (Buf buf = allocator.allocate(8)) {
buf.writeByte((byte) 42);
buf.sendTo(e -> {
try {
queue.put(e);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
});
executor.shutdown();
try (BBuf buf = allocator.allocate(8)) {
buf.writeByte((byte) 42);
buf.sendTo(e -> {
try {
queue.put(e);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
});
}
assertEquals((byte) 42, future.get().byteValue());
}
assertEquals((byte) 42, future.get().byteValue());
}
@Test
public void allocateAndSendToThread() throws Exception {
try (Allocator allocator = createAllocator()) {
ArrayBlockingQueue<Send<BBuf>> queue = new ArrayBlockingQueue<>(10);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (BBuf byteBuf = queue.take().receive()) {
return byteBuf.readByte();
}
});
executor.shutdown();
try (BBuf buf = allocator.allocate(8)) {
buf.writeByte((byte) 42);
assertTrue(queue.offer(buf.send()));
ArrayBlockingQueue<Send<Buf>> queue = new ArrayBlockingQueue<>(10);
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (Buf byteBuf = queue.take().receive()) {
return byteBuf.readByte();
}
});
executor.shutdown();
assertEquals((byte) 42, future.get().byteValue());
try (Buf buf = allocator.allocate(8)) {
buf.writeByte((byte) 42);
assertTrue(queue.offer(buf.send()));
}
assertEquals((byte) 42, future.get().byteValue());
}
@Test
public void allocateAndSendToThreadViaSyncQueue() throws Exception {
try (Allocator allocator = createAllocator()) {
SynchronousQueue<Send<BBuf>> queue = new SynchronousQueue<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (BBuf byteBuf = queue.take().receive()) {
return byteBuf.readByte();
}
});
executor.shutdown();
try (BBuf buf = allocator.allocate(8)) {
buf.writeByte((byte) 42);
queue.put(buf.send());
SynchronousQueue<Send<Buf>> queue = new SynchronousQueue<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Byte> future = executor.submit(() -> {
try (Buf byteBuf = queue.take().receive()) {
return byteBuf.readByte();
}
});
executor.shutdown();
assertEquals((byte) 42, future.get().byteValue());
try (Buf buf = allocator.allocate(8)) {
assertSame(buf, buf.writeByte((byte) 42));
queue.put(buf.send());
}
assertEquals((byte) 42, future.get().byteValue());
}
@Test
public void mustThrowWhenAllocatingZeroSizedBuffer() {
try (Allocator allocator = createAllocator()) {
try {
allocator.allocate(0);
fail("Expected to throw an IllegalArgumentException.");
} catch (IllegalArgumentException ignore) {
}
try {
allocator.allocate(0);
fail("Expected to throw an IllegalArgumentException.");
} catch (IllegalArgumentException ignore) {
}
}
@Test
public void mustThrowWhenAllocatingNegativeSizedBuffer() {
try (Allocator allocator = createAllocator()) {
try {
allocator.allocate(-1);
fail("Expected to throw an IllegalArgumentException.");
} catch (IllegalArgumentException ignore) {
}
try {
allocator.allocate(-1);
fail("Expected to throw an IllegalArgumentException.");
} catch (IllegalArgumentException ignore) {
}
}
@Test
public void mustThrowWhenAllocatingOverSizedBuffer() {
try (Allocator allocator = createAllocator()) {
try {
allocator.allocate(Integer.MAX_VALUE);
fail("Expected to throw an IllegalArgumentException.");
} catch (IllegalArgumentException ignore) {
}
try {
allocator.allocate(Integer.MAX_VALUE);
fail("Expected to throw an IllegalArgumentException.");
} catch (IllegalArgumentException ignore) {
}
}
@Ignore
@Test
public void mustAllowAllocatingMaxArraySizedBuffer() {
try (Allocator allocator = createAllocator()) {
try {
allocator.allocate(Integer.MAX_VALUE - 8).close();
} catch (OutOfMemoryError oome) {
// Mark test as ignored if this happens.
throw new AssumptionViolatedException("JVM does not have enough memory for this test.", oome);
}
try {
allocator.allocate(Integer.MAX_VALUE - 8).close();
} catch (OutOfMemoryError oome) {
// Mark test as ignored if this happens.
throw new AssumptionViolatedException("JVM does not have enough memory for this test.", oome);
}
}
@Test
public void setReaderIndexMustThrowOnNegativeIndex() {
try (Allocator allocator = createAllocator();
BBuf buf = allocator.allocate(8)) {
try {
buf.readerIndex(-1);
fail("Expected an exception to be thrown.");
} catch (IndexOutOfBoundsException e) {
// Good.
}
try {
buf.readerIndex(-1);
fail("Expected an exception to be thrown.");
} catch (IndexOutOfBoundsException e) {
// Good.
}
}
@Test
public void setReaderIndexMustThrowOnOversizedIndex() {
try (Allocator allocator = createAllocator();
BBuf buf = allocator.allocate(8)) {
try {
buf.readerIndex(8);
fail("Expected an exception to be thrown.");
} catch (IndexOutOfBoundsException e) {
// Good.
}
try {
buf.readerIndex(8);
fail("Expected an exception to be thrown.");
} catch (IndexOutOfBoundsException e) {
// Good.
}
}
@Test
public void setReaderIndexMustNotThrowWithinBounds() {
try (Allocator allocator = createAllocator();
BBuf buf = allocator.allocate(8)) {
buf.readerIndex(0);
buf.readerIndex(7);
assertSame(buf, buf.readerIndex(0));
assertSame(buf, buf.readerIndex(7));
}
@Test
public void capacityMustBeAllocatedSize() {
assertEquals(8, buf.capacity());
try (Buf b = allocator.allocate(13)) {
assertEquals(13, b.capacity());
}
}
@Test
public void readerWriterIndexUpdates() {
try (Buf buf = allocator.allocate(42)) {
assertEquals(0, buf.writerIndex());
assertSame(buf, buf.writerIndex(1));
assertEquals(1, buf.writerIndex());
assertSame(buf, buf.writeByte((byte) 7));
assertEquals(2, buf.writerIndex());
assertSame(buf, buf.writeShort((short) 3003));
assertEquals(4, buf.writerIndex());
assertSame(buf, buf.writeInt(0x5A55_BA55));
assertEquals(8, buf.writerIndex());
assertSame(buf, buf.writeLong(0x123456789ABCDEF0L));
assertEquals(16, buf.writerIndex());
assertEquals(26, buf.writableBytes());
assertEquals(16, buf.readableBytes());
assertEquals(0, buf.readerIndex());
assertSame(buf, buf.readerIndex(1));
assertEquals(1, buf.readerIndex());
assertEquals((byte) 7, buf.readByte());
assertEquals(2, buf.readerIndex());
assertEquals((short) 3003, buf.readShort());
assertEquals(4, buf.readerIndex());
assertEquals(0x5A55_BA55, buf.readInt());
assertEquals(8, buf.readerIndex());
assertEquals(0x123456789ABCDEF0L, buf.readLong());
assertEquals(16, buf.readerIndex());
assertEquals(0, buf.readableBytes());
}
}
@Test
public void fill() {
try (Buf buf = allocator.allocate(16)) {
assertSame(buf, buf.fill((byte) 0xA5));
assertEquals(0xA5A5A5A5_A5A5A5A5L, buf.readLong());
assertEquals(0xA5A5A5A5_A5A5A5A5L, buf.readLong());
}
}
@Test
public void readAndWriteAtOffset() {
try (Buf buf = allocator.allocate(16)) {
buf.fill((byte) 0xA5);
assertSame(buf, buf.writeLong(3, 0xBEEF_CA4E_1234_5678L));
assertEquals(0xBEEF_CA4E_1234_5678L, buf.readLong(3));
assertEquals(0, buf.readerIndex());
assertEquals(0, buf.writerIndex());
assertSame(buf, buf.writeInt(3, 0x1234_5678));
assertEquals(0x1234_5678, buf.readInt(3));
assertEquals(0, buf.readerIndex());
assertEquals(0, buf.writerIndex());
assertSame(buf, buf.writeShort(3, (short) 0x5678));
assertEquals((short) 0x5678, buf.readShort(3));
assertEquals(0, buf.readerIndex());
assertEquals(0, buf.writerIndex());
assertSame(buf, buf.writeByte(3, (byte) 0x78));
assertEquals((byte) 0x78, buf.readByte(3));
assertEquals(0, buf.readerIndex());
assertEquals(0, buf.writerIndex());
}
}
}

View File

@ -13,7 +13,7 @@ public class DirectBBufTest extends BBufTest {
@Test
public void directBufferMustHaveNonZeroAddress() {
try (Allocator allocator = createAllocator();
BBuf buf = allocator.allocate(8)) {
Buf buf = allocator.allocate(8)) {
assertNotEquals(0, buf.getNativeAddress());
}
}

View File

@ -13,7 +13,7 @@ public class HeapBBufTest extends BBufTest {
@Test
public void heapBufferMustHaveZeroAddress() {
try (Allocator allocator = createAllocator();
BBuf buf = allocator.allocate(8)) {
Buf buf = allocator.allocate(8)) {
assertEquals(0, buf.getNativeAddress());
}
}