Add BufHolder and BufRef helper classes

Motivation:
There are many use cases where other objects will have fields that are buffers.
Since buffers are reference counted, their life cycle needs to be managed carefully.

Modification:
Add the abstract BufHolder, and the concrete sub-class BufRef, as neat building blocks for building other classes that contain field references to buffers.

The behaviours of closed/sent buffers have also been specified in tests, and tightened up in the code.

Result:
It is now easier to create classes/objects that wrap buffers.
This commit is contained in:
Chris Vest 2020-12-14 14:07:11 +01:00
parent ab95abac25
commit f83e7fa618
10 changed files with 721 additions and 77 deletions

View File

@ -123,7 +123,7 @@ public interface Allocator extends AutoCloseable {
* @param extension The buffer to extend the composite buffer with. * @param extension The buffer to extend the composite buffer with.
*/ */
static void extend(Buf composite, Buf extension) { static void extend(Buf composite, Buf extension) {
if (composite.getClass() != CompositeBuf.class) { if (!isComposite(composite)) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"Expected the first buffer to be a composite buffer, " + "Expected the first buffer to be a composite buffer, " +
"but it is a " + composite.getClass() + " buffer: " + composite + '.'); "but it is a " + composite.getClass() + " buffer: " + composite + '.');
@ -132,6 +132,15 @@ public interface Allocator extends AutoCloseable {
buf.extendWith(extension); buf.extendWith(extension);
} }
/**
* Check if the given buffer is a {@linkplain #compose(Buf...) composite} buffer or not.
* @param composite The buffer to check.
* @return {@code true} if the given buffer was created with {@link #compose(Buf...)}, {@code false} otherwise.
*/
static boolean isComposite(Buf composite) {
return composite.getClass() == CompositeBuf.class;
}
/** /**
* Close this allocator, freeing all of its internal resources. It is not specified if the allocator can still be * Close this allocator, freeing all of its internal resources. It is not specified if the allocator can still be
* used after this method has been called on it. * used after this method has been called on it.

View File

@ -0,0 +1,198 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import java.lang.invoke.VarHandle;
import java.util.Objects;
import static io.netty.buffer.api.Statics.findVarHandle;
import static java.lang.invoke.MethodHandles.lookup;
/**
* The {@link BufHolder} is an abstract class that simplifies the implementation of objects that themselves contain
* a {@link Buf} instance.
* <p>
* The {@link BufHolder} can only hold on to a single buffer, so objects and classes that need to hold on to multiple
* buffers will have to do their implementation from scratch, though they can use the code of the {@link BufHolder} as
* inspiration.
* <p>
* If you just want an object that is a reference to a buffer, then the {@link BufRef} can be used for that purpose.
* If you have an advanced use case where you wish to implement {@link Rc}, and tightly control lifetimes, then
* {@link RcSupport} can be of help.
*
* @param <T> The concrete {@link BufHolder} type.
*/
public abstract class BufHolder<T extends BufHolder<T>> implements Rc<T> {
private static final VarHandle BUF = findVarHandle(lookup(), BufHolder.class, "buf", Buf.class);
private Buf buf;
/**
* Create a new {@link BufHolder} to hold the given {@linkplain Buf buffer}.
* <p>
* <strong>Note:</strong> this increases the reference count of the given buffer.
*
* @param buf The {@linkplain Buf buffer} to be held by this holder.
*/
protected BufHolder(Buf buf) {
this.buf = Objects.requireNonNull(buf, "The buffer cannot be null.").acquire();
}
/**
* Create a new {@link BufHolder} to hold the {@linkplain Buf buffer} received from the given {@link Send}.
* <p>
* The {@link BufHolder} will then be holding exclusive ownership of the buffer.
*
* @param send The {@linkplain Buf buffer} to be held by this holder.
*/
protected BufHolder(Send<Buf> send) {
buf = Objects.requireNonNull(send, "The send cannot be null.").receive();
}
@SuppressWarnings("unchecked")
@Override
public T acquire() {
buf.acquire();
return (T) this;
}
@Override
public void close() {
buf.close();
}
@Override
public boolean isOwned() {
return buf.isOwned();
}
@Override
public int countBorrows() {
return buf.countBorrows();
}
@Override
public Send<T> send() {
var send = buf.send();
return new Send<T>() {
@Override
public T receive() {
return BufHolder.this.receive(send.receive());
}
};
}
/**
* Called when a {@linkplain #send() sent} {@link BufHolder} is received by the recipient.
* The {@link BufHolder} should return a new concrete instance, that wraps the given {@link Buf} object.
*
* @param buf The {@link Buf} that is {@linkplain Send#receive() received} by the recipient,
* and needs to be wrapped in a new {@link BufHolder} instance.
* @return A new {@linkplain T buffer holder} instance, containing the given {@linkplain Buf buffer}.
*/
protected abstract T receive(Buf buf);
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and increases the reference count of the new buffer.
* <p>
* The buffer assignment is performed using a plain store.
*
* @param newBuf The new {@link Buf} instance that is replacing the currently held buffer.
*/
protected void replace(Buf newBuf) {
try (var ignore = buf) {
buf = newBuf.acquire();
}
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and takes exclusive ownership of the sent buffer.
* <p>
* The buffer assignment is performed using a plain store.
*
* @param send The new {@link Buf} instance that is replacing the currently held buffer.
*/
protected void replace(Send<Buf> send) {
try (var ignore = buf) {
buf = send.receive();
}
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and increases the reference count of the new buffer.
* <p>
* The buffer assignment is performed using a volatile store.
*
* @param newBuf The new {@link Buf} instance that is replacing the currently held buffer.
*/
protected void replaceVolatile(Buf newBuf) {
var prev = (Buf) BUF.getAndSet(this, newBuf.acquire());
prev.close();
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and takes exclusive ownership of the sent buffer.
* <p>
* The buffer assignment is performed using a volatile store.
*
* @param send The new {@link Buf} instance that is replacing the currently held buffer.
*/
protected void replaceVolatile(Send<Buf> send) {
var prev = (Buf) BUF.getAndSet(this, send.receive());
prev.close();
}
/**
* Access the held {@link Buf} instance.
* <p>
* The access is performed using a plain load.
*
* @return The {@link Buf} instance being held by this {@linkplain T buffer holder}.
*/
protected Buf getBuf() {
return buf;
}
/**
* Access the held {@link Buf} instance.
* <p>
* The access is performed using a volatile load.
*
* @return The {@link Buf} instance being held by this {@linkplain T buffer holder}.
*/
protected Buf getBufVolatile() {
return (Buf) BUF.getVolatile(this);
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import java.lang.invoke.VarHandle;
/**
* A mutable reference to a buffer.
*/
public final class BufRef extends BufHolder<BufRef> {
/**
* Create a reference to the given {@linkplain Buf buffer}.
* This increments the reference count of the buffer.
*
* @param buf The buffer to reference.
*/
public BufRef(Buf buf) {
super(buf);
VarHandle.fullFence();
}
/**
* Create a reference that holds the exclusive ownership of the sent buffer.
*
* @param send The {@linkplain Send sent} buffer to take ownership of.
*/
public BufRef(Send<Buf> send) {
super(send);
VarHandle.fullFence();
}
@Override
protected BufRef receive(Buf buf) {
return new BufRef(buf);
}
@Override
public void replaceVolatile(Buf newBuf) {
super.replaceVolatile(newBuf);
}
@Override
public void replaceVolatile(Send<Buf> send) {
super.replaceVolatile(send);
}
/**
* Access the buffer in this reference.
*
* @return The buffer held by the reference.
*/
public Buf contents() {
return getBufVolatile();
}
}

View File

@ -31,6 +31,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
for (Buf b : buf.bufs) { for (Buf b : buf.bufs) {
b.close(); b.close();
} }
buf.makeInaccessible();
}; };
private final Allocator allocator; private final Allocator allocator;
@ -43,6 +44,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private int woff; private int woff;
private int subOffset; // The next offset *within* a consituent buffer to read from or write to. private int subOffset; // The next offset *within* a consituent buffer to read from or write to.
private ByteOrder order; private ByteOrder order;
private boolean closed;
CompositeBuf(Allocator allocator, Buf[] bufs) { CompositeBuf(Allocator allocator, Buf[] bufs) {
this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array. this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
@ -885,6 +887,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
} }
throw throwable; throw throwable;
} }
makeInaccessible();
return new Owned<CompositeBuf>() { return new Owned<CompositeBuf>() {
@Override @Override
public CompositeBuf transferOwnership(Drop<CompositeBuf> drop) { public CompositeBuf transferOwnership(Drop<CompositeBuf> drop) {
@ -899,6 +902,13 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}; };
} }
void makeInaccessible() {
capacity = 0;
roff = 0;
woff = 0;
closed = true;
}
@Override @Override
protected IllegalStateException notSendableException() { protected IllegalStateException notSendableException() {
if (!isSendable) { if (!isSendable) {
@ -979,7 +989,10 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
} }
} }
private IndexOutOfBoundsException indexOutOfBounds(int index) { private RuntimeException indexOutOfBounds(int index) {
if (closed) {
return new IllegalStateException("This buffer is closed.");
}
return new IndexOutOfBoundsException( return new IndexOutOfBoundsException(
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " + "Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
(capacity - 1) + "]."); (capacity - 1) + "].");

View File

@ -30,7 +30,7 @@ import java.util.ArrayDeque;
* <p> * <p>
* Note that scopes are not thread-safe. They are intended to be used from a single thread. * Note that scopes are not thread-safe. They are intended to be used from a single thread.
*/ */
public class Scope implements AutoCloseable { public final class Scope implements AutoCloseable {
private final ArrayDeque<Rc<?>> deque = new ArrayDeque<>(); private final ArrayDeque<Rc<?>> deque = new ArrayDeque<>();
/** /**

View File

@ -20,7 +20,7 @@ import io.netty.buffer.api.Drop;
import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle; import java.lang.invoke.VarHandle;
class BifurcatedDrop<T> implements Drop<T> { class BifurcatedDrop implements Drop<MemSegBuf> {
private static final VarHandle COUNT; private static final VarHandle COUNT;
static { static {
try { try {
@ -30,12 +30,12 @@ class BifurcatedDrop<T> implements Drop<T> {
} }
} }
private final T originalBuf; private final MemSegBuf originalBuf;
private final Drop<T> delegate; private final Drop<MemSegBuf> delegate;
@SuppressWarnings("FieldMayBeFinal") @SuppressWarnings("FieldMayBeFinal")
private volatile int count; private volatile int count;
BifurcatedDrop(T originalBuf, Drop<T> delegate) { BifurcatedDrop(MemSegBuf originalBuf, Drop<MemSegBuf> delegate) {
this.originalBuf = originalBuf; this.originalBuf = originalBuf;
this.delegate = delegate; this.delegate = delegate;
count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop. count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop.
@ -50,7 +50,7 @@ class BifurcatedDrop<T> implements Drop<T> {
} }
@Override @Override
public synchronized void drop(T obj) { public synchronized void drop(MemSegBuf buf) {
int c; int c;
int n; int n;
do { do {
@ -62,14 +62,15 @@ class BifurcatedDrop<T> implements Drop<T> {
delegate.attach(originalBuf); delegate.attach(originalBuf);
delegate.drop(originalBuf); delegate.drop(originalBuf);
} }
buf.makeInaccessible();
} }
@Override @Override
public void attach(T obj) { public void attach(MemSegBuf obj) {
delegate.attach(obj); delegate.attach(obj);
} }
Drop<T> unwrap() { Drop<MemSegBuf> unwrap() {
return delegate; return delegate;
} }

View File

@ -43,7 +43,19 @@ import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset; import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset;
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf { class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
static final Drop<MemSegBuf> SEGMENT_CLOSE = buf -> buf.seg.close(); private static final MemorySegment CLOSED_SEGMENT;
static final Drop<MemSegBuf> SEGMENT_CLOSE;
static {
CLOSED_SEGMENT = MemorySegment.ofArray(new byte[0]);
CLOSED_SEGMENT.close();
SEGMENT_CLOSE = buf -> {
try (var ignore = buf.seg) {
buf.makeInaccessible();
}
};
}
private final AllocatorControl alloc; private final AllocatorControl alloc;
private final boolean isSendable; private final boolean isSendable;
private MemorySegment seg; private MemorySegment seg;
@ -130,7 +142,10 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
} }
var slice = seg.asSlice(offset, length); var slice = seg.asSlice(offset, length);
acquire(); acquire();
Drop<MemSegBuf> drop = b -> close(); Drop<MemSegBuf> drop = b -> {
close();
b.makeInaccessible();
};
var sendable = false; // Sending implies ownership change, which we can't do for slices. var sendable = false; // Sending implies ownership change, which we can't do for slices.
return new MemSegBuf(slice, drop, alloc, sendable).writerOffset(length).order(order()); return new MemSegBuf(slice, drop, alloc, sendable).writerOffset(length).order(order());
} }
@ -177,6 +192,9 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override @Override
public ByteCursor openCursor(int fromOffset, int length) { public ByteCursor openCursor(int fromOffset, int length) {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
if (fromOffset < 0) { if (fromOffset < 0) {
throw new IllegalArgumentException("The fromOffset cannot be negative: " + fromOffset + '.'); throw new IllegalArgumentException("The fromOffset cannot be negative: " + fromOffset + '.');
} }
@ -238,6 +256,9 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override @Override
public ByteCursor openReverseCursor(int fromOffset, int length) { public ByteCursor openReverseCursor(int fromOffset, int length) {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
if (fromOffset < 0) { if (fromOffset < 0) {
throw new IllegalArgumentException("The fromOffset cannot be negative: " + fromOffset + '.'); throw new IllegalArgumentException("The fromOffset cannot be negative: " + fromOffset + '.');
} }
@ -320,9 +341,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
var drop = unsafeGetDrop(); var drop = unsafeGetDrop();
if (drop instanceof BifurcatedDrop) { if (drop instanceof BifurcatedDrop) {
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment. // Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this); drop.drop(this);
drop = ((BifurcatedDrop<MemSegBuf>) drop).unwrap(); drop = ((BifurcatedDrop) drop).unwrap();
unsafeSetDrop(drop); unsafeSetDrop(drop);
this.roff = roff;
this.woff = woff;
} else { } else {
alloc.recoverMemory(recoverableMemory()); alloc.recoverMemory(recoverableMemory());
} }
@ -343,9 +368,9 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
drop.attach(this); drop.attach(this);
} }
if (drop instanceof BifurcatedDrop) { if (drop instanceof BifurcatedDrop) {
((BifurcatedDrop<?>) drop).increment(); ((BifurcatedDrop) drop).increment();
} else { } else {
drop = new BifurcatedDrop<MemSegBuf>(new MemSegBuf(seg, drop, alloc), drop); drop = new BifurcatedDrop(new MemSegBuf(seg, drop, alloc), drop);
unsafeSetDrop(drop); unsafeSetDrop(drop);
} }
var bifurcatedSeg = seg.asSlice(0, woff); var bifurcatedSeg = seg.asSlice(0, woff);
@ -390,28 +415,44 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override @Override
public Buf writeByte(byte value) { public Buf writeByte(byte value) {
setByteAtOffset(seg, woff, value); try {
woff += Byte.BYTES; setByteAtOffset(seg, woff, value);
return this; woff += Byte.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf setByte(int woff, byte value) { public Buf setByte(int woff, byte value) {
setByteAtOffset(seg, woff, value); try {
return this; setByteAtOffset(seg, woff, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf writeUnsignedByte(int value) { public Buf writeUnsignedByte(int value) {
setByteAtOffset(seg, woff, (byte) (value & 0xFF)); try {
woff += Byte.BYTES; setByteAtOffset(seg, woff, (byte) (value & 0xFF));
return this; woff += Byte.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf setUnsignedByte(int woff, int value) { public Buf setUnsignedByte(int woff, int value) {
setByteAtOffset(seg, woff, (byte) (value & 0xFF)); try {
return this; setByteAtOffset(seg, woff, (byte) (value & 0xFF));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
@ -430,15 +471,23 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override @Override
public Buf writeChar(char value) { public Buf writeChar(char value) {
setCharAtOffset(seg, woff, order, value); try {
woff += 2; setCharAtOffset(seg, woff, order, value);
return this; woff += 2;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf setChar(int woff, char value) { public Buf setChar(int woff, char value) {
setCharAtOffset(seg, woff, order, value); try {
return this; setCharAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
@ -471,28 +520,44 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override @Override
public Buf writeShort(short value) { public Buf writeShort(short value) {
setShortAtOffset(seg, woff, order, value); try {
woff += Short.BYTES; setShortAtOffset(seg, woff, order, value);
return this; woff += Short.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf setShort(int woff, short value) { public Buf setShort(int woff, short value) {
setShortAtOffset(seg, woff, order, value); try {
return this; setShortAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf writeUnsignedShort(int value) { public Buf writeUnsignedShort(int value) {
setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF)); try {
woff += Short.BYTES; setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF));
return this; woff += Short.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf setUnsignedShort(int woff, int value) { public Buf setUnsignedShort(int woff, int value) {
setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF)); try {
return this; setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
@ -639,28 +704,44 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override @Override
public Buf writeInt(int value) { public Buf writeInt(int value) {
setIntAtOffset(seg, woff, order, value); try {
woff += Integer.BYTES; setIntAtOffset(seg, woff, order, value);
return this; woff += Integer.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf setInt(int woff, int value) { public Buf setInt(int woff, int value) {
setIntAtOffset(seg, woff, order, value); try {
return this; setIntAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf writeUnsignedInt(long value) { public Buf writeUnsignedInt(long value) {
setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL)); try {
woff += Integer.BYTES; setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL));
return this; woff += Integer.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf setUnsignedInt(int woff, long value) { public Buf setUnsignedInt(int woff, long value) {
setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL)); try {
return this; setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
@ -679,15 +760,23 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override @Override
public Buf writeFloat(float value) { public Buf writeFloat(float value) {
setFloatAtOffset(seg, woff, order, value); try {
woff += Float.BYTES; setFloatAtOffset(seg, woff, order, value);
return this; woff += Float.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf setFloat(int woff, float value) { public Buf setFloat(int woff, float value) {
setFloatAtOffset(seg, woff, order, value); try {
return this; setFloatAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
@ -706,15 +795,23 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override @Override
public Buf writeLong(long value) { public Buf writeLong(long value) {
setLongAtOffset(seg, woff, order, value); try {
woff += Long.BYTES; setLongAtOffset(seg, woff, order, value);
return this; woff += Long.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf setLong(int woff, long value) { public Buf setLong(int woff, long value) {
setLongAtOffset(seg, woff, order, value); try {
return this; setLongAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
@ -733,35 +830,52 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override @Override
public Buf writeDouble(double value) { public Buf writeDouble(double value) {
setDoubleAtOffset(seg, woff, order, value); try {
woff += Double.BYTES; setDoubleAtOffset(seg, woff, order, value);
return this; woff += Double.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
@Override @Override
public Buf setDouble(int woff, double value) { public Buf setDouble(int woff, double value) {
setDoubleAtOffset(seg, woff, order, value); try {
return this; setDoubleAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
} }
// </editor-fold> // </editor-fold>
@Override @Override
protected Owned<MemSegBuf> prepareSend() { protected Owned<MemSegBuf> prepareSend() {
MemSegBuf outer = this; var order = this.order;
var roff = this.roff;
var woff = this.woff;
boolean isConfined = seg.ownerThread() == null; boolean isConfined = seg.ownerThread() == null;
MemorySegment transferSegment = isConfined? seg : seg.share(); MemorySegment transferSegment = isConfined? seg : seg.share();
makeInaccessible();
return new Owned<MemSegBuf>() { return new Owned<MemSegBuf>() {
@Override @Override
public MemSegBuf transferOwnership(Drop<MemSegBuf> drop) { public MemSegBuf transferOwnership(Drop<MemSegBuf> drop) {
MemSegBuf copy = new MemSegBuf(transferSegment, drop, alloc); MemSegBuf copy = new MemSegBuf(transferSegment, drop, alloc);
copy.order = outer.order; copy.order = order;
copy.roff = outer.roff; copy.roff = roff;
copy.woff = outer.woff; copy.woff = woff;
return copy; return copy;
} }
}; };
} }
void makeInaccessible() {
seg = CLOSED_SEGMENT;
roff = 0;
woff = 0;
}
@Override @Override
protected IllegalStateException notSendableException() { protected IllegalStateException notSendableException() {
if (!isSendable) { if (!isSendable) {
@ -778,22 +892,36 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
private void checkRead(int index, int size) { private void checkRead(int index, int size) {
if (index < 0 || woff < index + size) { if (index < 0 || woff < index + size) {
throw indexOutOfBounds(index); throw accessCheckException(index);
} }
} }
private void checkWrite(int index, int size) { private void checkWrite(int index, int size) {
if (index < 0 || seg.byteSize() < index + size) { if (index < 0 || seg.byteSize() < index + size) {
throw indexOutOfBounds(index); throw accessCheckException(index);
} }
} }
private IndexOutOfBoundsException indexOutOfBounds(int index) { private RuntimeException checkWriteState(IndexOutOfBoundsException ioobe) {
if (seg == CLOSED_SEGMENT) {
return bufferIsClosed();
}
return ioobe;
}
private RuntimeException accessCheckException(int index) {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
return new IndexOutOfBoundsException( return new IndexOutOfBoundsException(
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " + "Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
(seg.byteSize() - 1) + "]."); (seg.byteSize() - 1) + "].");
} }
private IllegalStateException bufferIsClosed() {
return new IllegalStateException("This buffer is closed.");
}
Object recoverableMemory() { Object recoverableMemory() {
return new RecoverableMemory(seg, alloc); return new RecoverableMemory(seg, alloc);
} }

View File

@ -0,0 +1,110 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
class BufRefTest {
@Test
public void closingBufRefMustCloseOwnedBuf() {
try (Allocator allocator = Allocator.heap()) {
BufRef ref;
try (Buf b = allocator.allocate(8)) {
ref = new BufRef(b);
}
ref.contents().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
ref.close();
assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32));
}
}
@Test
public void closingBufRefMustCloseOwnedBufFromSend() {
try (Allocator allocator = Allocator.heap();
Buf buf = allocator.allocate(8)) {
BufRef ref = new BufRef(buf.send());
ref.contents().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
ref.close();
assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32));
}
}
@Test
public void mustCloseOwnedBufferWhenReplaced() {
try (Allocator allocator = Allocator.heap()) {
Buf orig;
BufRef ref;
try (Buf buf = allocator.allocate(8)) {
ref = new BufRef(orig = buf);
}
orig.writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
try (Buf buf = allocator.allocate(8)) {
ref.replaceVolatile(buf); // Pass replacement directly.
}
assertThrows(IllegalStateException.class, () -> orig.writeInt(32));
ref.contents().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
ref.close();
assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32));
}
}
@Test
public void mustCloseOwnedBufferWhenReplacedFromSend() {
try (Allocator allocator = Allocator.heap()) {
Buf orig;
BufRef ref;
try (Buf buf = allocator.allocate(8)) {
ref = new BufRef(orig = buf);
}
orig.writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
try (Buf buf = allocator.allocate(8)) {
ref.replaceVolatile(buf.send()); // Pass replacement via send().
}
assertThrows(IllegalStateException.class, () -> orig.writeInt(32));
ref.contents().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
ref.close();
assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32));
}
}
@Test
public void sendingRefMustSendBuffer() {
try (Allocator allocator = Allocator.heap();
BufRef refA = new BufRef(allocator.allocate(8).send())) {
refA.contents().writeInt(42);
var send = refA.send();
assertThrows(IllegalStateException.class, () -> refA.contents().readInt());
try (BufRef refB = send.receive()) {
assertThat(refB.contents().readInt()).isEqualTo(42);
}
}
}
}

View File

@ -369,6 +369,112 @@ public class BufTest {
} }
} }
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void originalBufferMustNotBeAccessibleAfterSend(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf orig = allocator.allocate(24)) {
orig.writeLong(42);
var send = orig.send();
verifyInaccessible(orig);
try (Buf receive = send.receive()) {
assertEquals(42, receive.readInt());
}
}
}
private void verifyInaccessible(Buf buf) {
assertThrows(IllegalStateException.class, () -> buf.readByte());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedByte());
assertThrows(IllegalStateException.class, () -> buf.readChar());
assertThrows(IllegalStateException.class, () -> buf.readShort());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedShort());
assertThrows(IllegalStateException.class, () -> buf.readMedium());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedMedium());
assertThrows(IllegalStateException.class, () -> buf.readInt());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedInt());
assertThrows(IllegalStateException.class, () -> buf.readFloat());
assertThrows(IllegalStateException.class, () -> buf.readLong());
assertThrows(IllegalStateException.class, () -> buf.readDouble());
assertThrows(IllegalStateException.class, () -> buf.getByte(0));
assertThrows(IllegalStateException.class, () -> buf.getUnsignedByte(0));
assertThrows(IllegalStateException.class, () -> buf.getChar(0));
assertThrows(IllegalStateException.class, () -> buf.getShort(0));
assertThrows(IllegalStateException.class, () -> buf.getUnsignedShort(0));
assertThrows(IllegalStateException.class, () -> buf.getMedium(0));
assertThrows(IllegalStateException.class, () -> buf.getUnsignedMedium(0));
assertThrows(IllegalStateException.class, () -> buf.getInt(0));
assertThrows(IllegalStateException.class, () -> buf.getUnsignedInt(0));
assertThrows(IllegalStateException.class, () -> buf.getFloat(0));
assertThrows(IllegalStateException.class, () -> buf.getLong(0));
assertThrows(IllegalStateException.class, () -> buf.getDouble(0));
assertThrows(IllegalStateException.class, () -> buf.writeByte((byte) 32));
assertThrows(IllegalStateException.class, () -> buf.writeUnsignedByte(32));
assertThrows(IllegalStateException.class, () -> buf.writeChar('3'));
assertThrows(IllegalStateException.class, () -> buf.writeShort((short) 32));
assertThrows(IllegalStateException.class, () -> buf.writeUnsignedShort(32));
assertThrows(IllegalStateException.class, () -> buf.writeMedium(32));
assertThrows(IllegalStateException.class, () -> buf.writeUnsignedMedium(32));
assertThrows(IllegalStateException.class, () -> buf.writeInt(32));
assertThrows(IllegalStateException.class, () -> buf.writeUnsignedInt(32));
assertThrows(IllegalStateException.class, () -> buf.writeFloat(3.2f));
assertThrows(IllegalStateException.class, () -> buf.writeLong(32));
assertThrows(IllegalStateException.class, () -> buf.writeDouble(32));
assertThrows(IllegalStateException.class, () -> buf.setByte(0, (byte) 32));
assertThrows(IllegalStateException.class, () -> buf.setUnsignedByte(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setChar(0, '3'));
assertThrows(IllegalStateException.class, () -> buf.setShort(0, (short) 32));
assertThrows(IllegalStateException.class, () -> buf.setUnsignedShort(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setMedium(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setUnsignedMedium(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setInt(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setUnsignedInt(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setFloat(0, 3.2f));
assertThrows(IllegalStateException.class, () -> buf.setLong(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setDouble(0, 32));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
try (Allocator allocator = Allocator.heap();
Buf target = allocator.allocate(24)) {
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, target, 0, 1));
if (Allocator.isComposite(buf)) {
assertThrows(IllegalStateException.class, () -> Allocator.extend(buf, target));
}
}
assertThrows(IllegalStateException.class, () -> buf.bifurcate());
assertThrows(IllegalStateException.class, () -> buf.send());
assertThrows(IllegalStateException.class, () -> buf.acquire());
assertThrows(IllegalStateException.class, () -> buf.slice());
assertThrows(IllegalStateException.class, () -> buf.fill((byte) 0));
assertThrows(IllegalStateException.class, () -> buf.openCursor());
assertThrows(IllegalStateException.class, () -> buf.openCursor(0, 0));
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor());
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor(0, 0));
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void cannotSendMoreThanOnce(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
var send = buf.send();
var exc = assertThrows(IllegalStateException.class, () -> buf.send());
send.receive().close();
assertThat(exc).hasMessageContaining("Cannot send()");
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bufferShouldNotBeAccessibleAfterClose(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator()) {
Buf buf = allocator.allocate(24);
buf.writeLong(42);
buf.close();
verifyInaccessible(buf);
}
}
@ParameterizedTest @ParameterizedTest
@MethodSource("initialAllocators") @MethodSource("initialAllocators")
void mustThrowWhenAllocatingZeroSizedBuffer(Fixture fixture) { void mustThrowWhenAllocatingZeroSizedBuffer(Fixture fixture) {
@ -1509,6 +1615,7 @@ public class BufTest {
assertThat(buf.writableBytes()).isEqualTo(0); assertThat(buf.writableBytes()).isEqualTo(0);
buf.ensureWritable(8); buf.ensureWritable(8);
assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8); assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8);
assertThat(buf.capacity()).isGreaterThanOrEqualTo(16);
buf.writeLong(0xA1A2A3A4A5A6A7A8L); buf.writeLong(0xA1A2A3A4A5A6A7A8L);
assertThat(buf.readableBytes()).isEqualTo(16); assertThat(buf.readableBytes()).isEqualTo(16);
assertThat(buf.readLong()).isEqualTo(0x0102030405060708L); assertThat(buf.readLong()).isEqualTo(0x0102030405060708L);
@ -1540,6 +1647,7 @@ public class BufTest {
Buf buf = allocator.allocate(4)) { Buf buf = allocator.allocate(4)) {
buf.ensureWritable(8); buf.ensureWritable(8);
assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8); assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8);
assertThat(buf.capacity()).isGreaterThanOrEqualTo(8);
buf.writeLong(0x0102030405060708L); buf.writeLong(0x0102030405060708L);
try (Buf slice = buf.slice()) { try (Buf slice = buf.slice()) {
assertEquals(0x0102030405060708L, slice.readLong()); assertEquals(0x0102030405060708L, slice.readLong());
@ -1842,6 +1950,15 @@ public class BufTest {
} }
} }
@Test
public void composeMustThrowWhenBuffersHaveMismatchedByteOrder() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(4, ByteOrder.BIG_ENDIAN);
Buf b = allocator.allocate(4, ByteOrder.LITTLE_ENDIAN)) {
assertThrows(IllegalArgumentException.class, () -> allocator.compose(a, b));
}
}
@ParameterizedTest @ParameterizedTest
@MethodSource("nonSliceAllocators") @MethodSource("nonSliceAllocators")
public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) { public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) {

View File

@ -15,17 +15,17 @@
*/ */
package io.netty.buffer.api; package io.netty.buffer.api;
import org.junit.Test; import org.junit.jupiter.api.Test;
import java.util.ArrayList; import java.util.ArrayList;
import static org.junit.Assert.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
public class ScopeTest { public class ScopeTest {
@Test @Test
public void scopeMustCloseContainedRcsInReverseInsertOrder() { void scopeMustCloseContainedRcsInReverseInsertOrder() {
ArrayList<Integer> closeOrder = new ArrayList<>(); ArrayList<Integer> closeOrder = new ArrayList<>();
try (Scope scope = new Scope()) { try (Scope scope = new Scope()) {
scope.add(new SomeRc(new OrderingDrop(1, closeOrder))); scope.add(new SomeRc(new OrderingDrop(1, closeOrder)));