Merge pull request #17 from netty/buf-holder

Add BufHolder and BufRef helper classes
This commit is contained in:
Chris Vest 2020-12-17 11:45:11 +01:00 committed by GitHub
commit cc685c0516
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 753 additions and 81 deletions

View File

@ -14,6 +14,7 @@ dbg:
clean:
docker rm -fv build-container-dbg
docker rm -fv build-container
build: image
docker create --name build-container netty-incubator-buffer:build

View File

@ -123,7 +123,7 @@ public interface Allocator extends AutoCloseable {
* @param extension The buffer to extend the composite buffer with.
*/
static void extend(Buf composite, Buf extension) {
if (composite.getClass() != CompositeBuf.class) {
if (!isComposite(composite)) {
throw new IllegalArgumentException(
"Expected the first buffer to be a composite buffer, " +
"but it is a " + composite.getClass() + " buffer: " + composite + '.');
@ -132,6 +132,15 @@ public interface Allocator extends AutoCloseable {
buf.extendWith(extension);
}
/**
* Check if the given buffer is a {@linkplain #compose(Buf...) composite} buffer or not.
* @param composite The buffer to check.
* @return {@code true} if the given buffer was created with {@link #compose(Buf...)}, {@code false} otherwise.
*/
static boolean isComposite(Buf composite) {
return composite.getClass() == CompositeBuf.class;
}
/**
* Close this allocator, freeing all of its internal resources. It is not specified if the allocator can still be
* used after this method has been called on it.

View File

@ -0,0 +1,193 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import java.lang.invoke.VarHandle;
import java.util.Objects;
import static io.netty.buffer.api.Statics.findVarHandle;
import static java.lang.invoke.MethodHandles.lookup;
/**
* The {@link BufHolder} is an abstract class that simplifies the implementation of objects that themselves contain
* a {@link Buf} instance.
* <p>
* The {@link BufHolder} can only hold on to a single buffer, so objects and classes that need to hold on to multiple
* buffers will have to do their implementation from scratch, though they can use the code of the {@link BufHolder} as
* inspiration.
* <p>
* If you just want an object that is a reference to a buffer, then the {@link BufRef} can be used for that purpose.
* If you have an advanced use case where you wish to implement {@link Rc}, and tightly control lifetimes, then
* {@link RcSupport} can be of help.
*
* @param <T> The concrete {@link BufHolder} type.
*/
public abstract class BufHolder<T extends BufHolder<T>> implements Rc<T> {
private static final VarHandle BUF = findVarHandle(lookup(), BufHolder.class, "buf", Buf.class);
private Buf buf;
/**
* Create a new {@link BufHolder} to hold the given {@linkplain Buf buffer}.
* <p>
* <strong>Note:</strong> this increases the reference count of the given buffer.
*
* @param buf The {@linkplain Buf buffer} to be held by this holder.
*/
protected BufHolder(Buf buf) {
this.buf = Objects.requireNonNull(buf, "The buffer cannot be null.").acquire();
}
/**
* Create a new {@link BufHolder} to hold the {@linkplain Buf buffer} received from the given {@link Send}.
* <p>
* The {@link BufHolder} will then be holding exclusive ownership of the buffer.
*
* @param send The {@linkplain Buf buffer} to be held by this holder.
*/
protected BufHolder(Send<Buf> send) {
buf = Objects.requireNonNull(send, "The send cannot be null.").receive();
}
@SuppressWarnings("unchecked")
@Override
public T acquire() {
buf.acquire();
return (T) this;
}
@Override
public void close() {
buf.close();
}
@Override
public boolean isOwned() {
return buf.isOwned();
}
@Override
public int countBorrows() {
return buf.countBorrows();
}
@Override
public Send<T> send() {
var send = buf.send();
return () -> receive(send.receive());
}
/**
* Called when a {@linkplain #send() sent} {@link BufHolder} is received by the recipient.
* The {@link BufHolder} should return a new concrete instance, that wraps the given {@link Buf} object.
*
* @param buf The {@link Buf} that is {@linkplain Send#receive() received} by the recipient,
* and needs to be wrapped in a new {@link BufHolder} instance.
* @return A new {@linkplain T buffer holder} instance, containing the given {@linkplain Buf buffer}.
*/
protected abstract T receive(Buf buf);
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and increases the reference count of the new buffer.
* <p>
* The buffer assignment is performed using a plain store.
*
* @param newBuf The new {@link Buf} instance that is replacing the currently held buffer.
*/
protected final void replaceBuf(Buf newBuf) {
try (var ignore = buf) {
buf = newBuf.acquire();
}
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and takes exclusive ownership of the sent buffer.
* <p>
* The buffer assignment is performed using a plain store.
*
* @param send The new {@link Buf} instance that is replacing the currently held buffer.
*/
protected final void replaceBuf(Send<Buf> send) {
try (var ignore = buf) {
buf = send.receive();
}
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and increases the reference count of the new buffer.
* <p>
* The buffer assignment is performed using a volatile store.
*
* @param newBuf The new {@link Buf} instance that is replacing the currently held buffer.
*/
protected final void replaceBufVolatile(Buf newBuf) {
var prev = (Buf) BUF.getAndSet(this, newBuf.acquire());
prev.close();
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and takes exclusive ownership of the sent buffer.
* <p>
* The buffer assignment is performed using a volatile store.
*
* @param send The {@link Send} with the new {@link Buf} instance that is replacing the currently held buffer.
*/
protected final void replaceBufVolatile(Send<Buf> send) {
var prev = (Buf) BUF.getAndSet(this, send.receive());
prev.close();
}
/**
* Access the held {@link Buf} instance.
* <p>
* The access is performed using a plain load.
*
* @return The {@link Buf} instance being held by this {@linkplain T buffer holder}.
*/
protected final Buf getBuf() {
return buf;
}
/**
* Access the held {@link Buf} instance.
* <p>
* The access is performed using a volatile load.
*
* @return The {@link Buf} instance being held by this {@linkplain T buffer holder}.
*/
protected final Buf getBufVolatile() {
return (Buf) BUF.getVolatile(this);
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import java.lang.invoke.VarHandle;
/**
* A mutable reference to a buffer.
*/
public final class BufRef extends BufHolder<BufRef> {
/**
* Create a reference to the given {@linkplain Buf buffer}.
* This increments the reference count of the buffer.
*
* @param buf The buffer to reference.
*/
public BufRef(Buf buf) {
super(buf);
// BufRef is meant to be atomic, so we need to add a fence to get the semantics of a volatile store.
VarHandle.fullFence();
}
/**
* Create a reference that holds the exclusive ownership of the sent buffer.
*
* @param send The {@linkplain Send sent} buffer to take ownership of.
*/
public BufRef(Send<Buf> send) {
super(send);
// BufRef is meant to be atomic, so we need to add a fence to get the semantics of a volatile store.
VarHandle.fullFence();
}
@Override
protected BufRef receive(Buf buf) {
return new BufRef(buf);
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and increases the reference count of the new buffer.
* <p>
* The buffer assignment is performed using a volatile store.
*
* @param newBuf The new {@link Buf} instance that is replacing the currently held buffer.
*/
public void replace(Buf newBuf) {
replaceBufVolatile(newBuf);
}
/**
* Replace the underlying referenced buffer with the given buffer.
* <p>
* <strong>Note:</strong> this method decreases the reference count of the current buffer,
* and takes exclusive ownership of the sent buffer.
* <p>
* The buffer assignment is performed using a volatile store.
*
* @param send The {@link Send} with the new {@link Buf} instance that is replacing the currently held buffer.
*/
public void replace(Send<Buf> send) {
replaceBufVolatile(send);
}
/**
* Access the buffer in this reference.
*
* @return The buffer held by the reference.
*/
public Buf contents() {
return getBufVolatile();
}
}

View File

@ -31,6 +31,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
for (Buf b : buf.bufs) {
b.close();
}
buf.makeInaccessible();
};
private final Allocator allocator;
@ -43,6 +44,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private int woff;
private int subOffset; // The next offset *within* a consituent buffer to read from or write to.
private ByteOrder order;
private boolean closed;
CompositeBuf(Allocator allocator, Buf[] bufs) {
this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
@ -885,6 +887,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
throw throwable;
}
makeInaccessible();
return new Owned<CompositeBuf>() {
@Override
public CompositeBuf transferOwnership(Drop<CompositeBuf> drop) {
@ -899,6 +902,13 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
};
}
void makeInaccessible() {
capacity = 0;
roff = 0;
woff = 0;
closed = true;
}
@Override
protected IllegalStateException notSendableException() {
if (!isSendable) {
@ -979,7 +989,10 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
}
private IndexOutOfBoundsException indexOutOfBounds(int index) {
private RuntimeException indexOutOfBounds(int index) {
if (closed) {
return new IllegalStateException("This buffer is closed.");
}
return new IndexOutOfBoundsException(
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
(capacity - 1) + "].");

View File

@ -16,7 +16,6 @@
package io.netty.buffer.api;
import java.util.Objects;
import java.util.function.Function;
public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> implements Rc<I> {
private int acquires; // Closed if negative.
@ -83,6 +82,11 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
return new TransferSend<I, T>(owned, drop);
}
/**
* Create an {@link IllegalStateException} with a custom message, tailored to this particular {@link Rc} instance,
* for when the object cannot be sent for some reason.
* @return An {@link IllegalStateException} to be thrown when this object cannot be sent.
*/
protected IllegalStateException notSendableException() {
return new IllegalStateException(
"Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this + '.');
@ -111,15 +115,21 @@ public abstract class RcSupport<I extends Rc<I>, T extends RcSupport<I, T>> impl
/**
* Get access to the underlying {@link Drop} object.
* This method is unsafe because it open the possibility of bypassing and overriding resource lifetimes.
*
* @return The {@link Drop} object used by this reference counted object.
*/
protected Drop<T> unsafeGetDrop() {
return drop;
}
protected Drop<T> unsafeExchangeDrop(Drop<T> replacement) {
/**
* Replace the current underlying {@link Drop} object with the given one.
* This method is unsafe because it open the possibility of bypassing and overring resource lifetimes.
*
* @param replacement The new {@link Drop} object to use instead of the current one.
*/
protected void unsafeSetDrop(Drop<T> replacement) {
drop = Objects.requireNonNull(replacement, "Replacement drop cannot be null.");
return replacement;
}
@SuppressWarnings("unchecked")

View File

@ -30,7 +30,7 @@ import java.util.ArrayDeque;
* <p>
* Note that scopes are not thread-safe. They are intended to be used from a single thread.
*/
public class Scope implements AutoCloseable {
public final class Scope implements AutoCloseable {
private final ArrayDeque<Rc<?>> deque = new ArrayDeque<>();
/**

View File

@ -28,6 +28,7 @@ package io.netty.buffer.api;
*
* @param <T>
*/
@FunctionalInterface
public interface Send<T extends Rc<T>> {
/**
* Receive the {@link Rc} instance being sent, and bind its ownership to the calling thread. The invalidation of the

View File

@ -20,7 +20,7 @@ import io.netty.buffer.api.Drop;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
class BifurcatedDrop<T> implements Drop<T> {
class BifurcatedDrop implements Drop<MemSegBuf> {
private static final VarHandle COUNT;
static {
try {
@ -30,12 +30,12 @@ class BifurcatedDrop<T> implements Drop<T> {
}
}
private final T originalBuf;
private final Drop<T> delegate;
private final MemSegBuf originalBuf;
private final Drop<MemSegBuf> delegate;
@SuppressWarnings("FieldMayBeFinal")
private volatile int count;
BifurcatedDrop(T originalBuf, Drop<T> delegate) {
BifurcatedDrop(MemSegBuf originalBuf, Drop<MemSegBuf> delegate) {
this.originalBuf = originalBuf;
this.delegate = delegate;
count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop.
@ -50,7 +50,7 @@ class BifurcatedDrop<T> implements Drop<T> {
}
@Override
public synchronized void drop(T obj) {
public void drop(MemSegBuf buf) {
int c;
int n;
do {
@ -62,14 +62,15 @@ class BifurcatedDrop<T> implements Drop<T> {
delegate.attach(originalBuf);
delegate.drop(originalBuf);
}
buf.makeInaccessible();
}
@Override
public void attach(T obj) {
public void attach(MemSegBuf obj) {
delegate.attach(obj);
}
Drop<T> unwrap() {
Drop<MemSegBuf> unwrap() {
return delegate;
}

View File

@ -43,7 +43,19 @@ import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset;
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
static final Drop<MemSegBuf> SEGMENT_CLOSE = buf -> buf.seg.close();
private static final MemorySegment CLOSED_SEGMENT;
static final Drop<MemSegBuf> SEGMENT_CLOSE;
static {
CLOSED_SEGMENT = MemorySegment.ofArray(new byte[0]);
CLOSED_SEGMENT.close();
SEGMENT_CLOSE = buf -> {
try (var ignore = buf.seg) {
buf.makeInaccessible();
}
};
}
private final AllocatorControl alloc;
private final boolean isSendable;
private MemorySegment seg;
@ -130,7 +142,10 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
var slice = seg.asSlice(offset, length);
acquire();
Drop<MemSegBuf> drop = b -> close();
Drop<MemSegBuf> drop = b -> {
close();
b.makeInaccessible();
};
var sendable = false; // Sending implies ownership change, which we can't do for slices.
return new MemSegBuf(slice, drop, alloc, sendable).writerOffset(length).order(order());
}
@ -177,6 +192,9 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public ByteCursor openCursor(int fromOffset, int length) {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
if (fromOffset < 0) {
throw new IllegalArgumentException("The fromOffset cannot be negative: " + fromOffset + '.');
}
@ -238,6 +256,9 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public ByteCursor openReverseCursor(int fromOffset, int length) {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
if (fromOffset < 0) {
throw new IllegalArgumentException("The fromOffset cannot be negative: " + fromOffset + '.');
}
@ -320,9 +341,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
var drop = unsafeGetDrop();
if (drop instanceof BifurcatedDrop) {
// Disconnect from the bifurcated drop, since we'll get our own fresh memory segment.
int roff = this.roff;
int woff = this.woff;
drop.drop(this);
drop = ((BifurcatedDrop<MemSegBuf>) drop).unwrap();
unsafeExchangeDrop(drop);
drop = ((BifurcatedDrop) drop).unwrap();
unsafeSetDrop(drop);
this.roff = roff;
this.woff = woff;
} else {
alloc.recoverMemory(recoverableMemory());
}
@ -343,9 +368,10 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
drop.attach(this);
}
if (drop instanceof BifurcatedDrop) {
((BifurcatedDrop<?>) drop).increment();
((BifurcatedDrop) drop).increment();
} else {
drop = unsafeExchangeDrop(new BifurcatedDrop<MemSegBuf>(new MemSegBuf(seg, drop, alloc), drop));
drop = new BifurcatedDrop(new MemSegBuf(seg, drop, alloc), drop);
unsafeSetDrop(drop);
}
var bifurcatedSeg = seg.asSlice(0, woff);
var bifurcatedBuf = new MemSegBuf(bifurcatedSeg, drop, alloc);
@ -389,28 +415,44 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeByte(byte value) {
setByteAtOffset(seg, woff, value);
woff += Byte.BYTES;
return this;
try {
setByteAtOffset(seg, woff, value);
woff += Byte.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf setByte(int woff, byte value) {
setByteAtOffset(seg, woff, value);
return this;
try {
setByteAtOffset(seg, woff, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf writeUnsignedByte(int value) {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
woff += Byte.BYTES;
return this;
try {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
woff += Byte.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf setUnsignedByte(int woff, int value) {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
return this;
try {
setByteAtOffset(seg, woff, (byte) (value & 0xFF));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
@ -429,15 +471,23 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeChar(char value) {
setCharAtOffset(seg, woff, order, value);
woff += 2;
return this;
try {
setCharAtOffset(seg, woff, order, value);
woff += 2;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf setChar(int woff, char value) {
setCharAtOffset(seg, woff, order, value);
return this;
try {
setCharAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
@ -470,28 +520,44 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeShort(short value) {
setShortAtOffset(seg, woff, order, value);
woff += Short.BYTES;
return this;
try {
setShortAtOffset(seg, woff, order, value);
woff += Short.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf setShort(int woff, short value) {
setShortAtOffset(seg, woff, order, value);
return this;
try {
setShortAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf writeUnsignedShort(int value) {
setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF));
woff += Short.BYTES;
return this;
try {
setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF));
woff += Short.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf setUnsignedShort(int woff, int value) {
setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF));
return this;
try {
setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
@ -638,28 +704,44 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeInt(int value) {
setIntAtOffset(seg, woff, order, value);
woff += Integer.BYTES;
return this;
try {
setIntAtOffset(seg, woff, order, value);
woff += Integer.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf setInt(int woff, int value) {
setIntAtOffset(seg, woff, order, value);
return this;
try {
setIntAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf writeUnsignedInt(long value) {
setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL));
woff += Integer.BYTES;
return this;
try {
setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL));
woff += Integer.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf setUnsignedInt(int woff, long value) {
setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL));
return this;
try {
setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL));
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
@ -678,15 +760,23 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeFloat(float value) {
setFloatAtOffset(seg, woff, order, value);
woff += Float.BYTES;
return this;
try {
setFloatAtOffset(seg, woff, order, value);
woff += Float.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf setFloat(int woff, float value) {
setFloatAtOffset(seg, woff, order, value);
return this;
try {
setFloatAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
@ -705,15 +795,23 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeLong(long value) {
setLongAtOffset(seg, woff, order, value);
woff += Long.BYTES;
return this;
try {
setLongAtOffset(seg, woff, order, value);
woff += Long.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf setLong(int woff, long value) {
setLongAtOffset(seg, woff, order, value);
return this;
try {
setLongAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
@ -732,35 +830,52 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
@Override
public Buf writeDouble(double value) {
setDoubleAtOffset(seg, woff, order, value);
woff += Double.BYTES;
return this;
try {
setDoubleAtOffset(seg, woff, order, value);
woff += Double.BYTES;
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
@Override
public Buf setDouble(int woff, double value) {
setDoubleAtOffset(seg, woff, order, value);
return this;
try {
setDoubleAtOffset(seg, woff, order, value);
return this;
} catch (IndexOutOfBoundsException e) {
throw checkWriteState(e);
}
}
// </editor-fold>
@Override
protected Owned<MemSegBuf> prepareSend() {
MemSegBuf outer = this;
var order = this.order;
var roff = this.roff;
var woff = this.woff;
boolean isConfined = seg.ownerThread() == null;
MemorySegment transferSegment = isConfined? seg : seg.share();
makeInaccessible();
return new Owned<MemSegBuf>() {
@Override
public MemSegBuf transferOwnership(Drop<MemSegBuf> drop) {
MemSegBuf copy = new MemSegBuf(transferSegment, drop, alloc);
copy.order = outer.order;
copy.roff = outer.roff;
copy.woff = outer.woff;
copy.order = order;
copy.roff = roff;
copy.woff = woff;
return copy;
}
};
}
void makeInaccessible() {
seg = CLOSED_SEGMENT;
roff = 0;
woff = 0;
}
@Override
protected IllegalStateException notSendableException() {
if (!isSendable) {
@ -777,22 +892,36 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
private void checkRead(int index, int size) {
if (index < 0 || woff < index + size) {
throw indexOutOfBounds(index);
throw accessCheckException(index);
}
}
private void checkWrite(int index, int size) {
if (index < 0 || seg.byteSize() < index + size) {
throw indexOutOfBounds(index);
throw accessCheckException(index);
}
}
private IndexOutOfBoundsException indexOutOfBounds(int index) {
private RuntimeException checkWriteState(IndexOutOfBoundsException ioobe) {
if (seg == CLOSED_SEGMENT) {
return bufferIsClosed();
}
return ioobe;
}
private RuntimeException accessCheckException(int index) {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
return new IndexOutOfBoundsException(
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
(seg.byteSize() - 1) + "].");
}
private IllegalStateException bufferIsClosed() {
return new IllegalStateException("This buffer is closed.");
}
Object recoverableMemory() {
return new RecoverableMemory(seg, alloc);
}

View File

@ -0,0 +1,110 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
class BufRefTest {
@Test
public void closingBufRefMustCloseOwnedBuf() {
try (Allocator allocator = Allocator.heap()) {
BufRef ref;
try (Buf b = allocator.allocate(8)) {
ref = new BufRef(b);
}
ref.contents().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
ref.close();
assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32));
}
}
@Test
public void closingBufRefMustCloseOwnedBufFromSend() {
try (Allocator allocator = Allocator.heap();
Buf buf = allocator.allocate(8)) {
BufRef ref = new BufRef(buf.send());
ref.contents().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
ref.close();
assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32));
}
}
@Test
public void mustCloseOwnedBufferWhenReplaced() {
try (Allocator allocator = Allocator.heap()) {
Buf orig;
BufRef ref;
try (Buf buf = allocator.allocate(8)) {
ref = new BufRef(orig = buf);
}
orig.writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
try (Buf buf = allocator.allocate(8)) {
ref.replace(buf); // Pass replacement directly.
}
assertThrows(IllegalStateException.class, () -> orig.writeInt(32));
ref.contents().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
ref.close();
assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32));
}
}
@Test
public void mustCloseOwnedBufferWhenReplacedFromSend() {
try (Allocator allocator = Allocator.heap()) {
Buf orig;
BufRef ref;
try (Buf buf = allocator.allocate(8)) {
ref = new BufRef(orig = buf);
}
orig.writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
try (Buf buf = allocator.allocate(8)) {
ref.replace(buf.send()); // Pass replacement via send().
}
assertThrows(IllegalStateException.class, () -> orig.writeInt(32));
ref.contents().writeInt(42);
assertThat(ref.contents().readInt()).isEqualTo(42);
ref.close();
assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32));
}
}
@Test
public void sendingRefMustSendBuffer() {
try (Allocator allocator = Allocator.heap();
BufRef refA = new BufRef(allocator.allocate(8).send())) {
refA.contents().writeInt(42);
var send = refA.send();
assertThrows(IllegalStateException.class, () -> refA.contents().readInt());
try (BufRef refB = send.receive()) {
assertThat(refB.contents().readInt()).isEqualTo(42);
}
}
}
}

View File

@ -369,6 +369,112 @@ public class BufTest {
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void originalBufferMustNotBeAccessibleAfterSend(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf orig = allocator.allocate(24)) {
orig.writeLong(42);
var send = orig.send();
verifyInaccessible(orig);
try (Buf receive = send.receive()) {
assertEquals(42, receive.readInt());
}
}
}
private void verifyInaccessible(Buf buf) {
assertThrows(IllegalStateException.class, () -> buf.readByte());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedByte());
assertThrows(IllegalStateException.class, () -> buf.readChar());
assertThrows(IllegalStateException.class, () -> buf.readShort());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedShort());
assertThrows(IllegalStateException.class, () -> buf.readMedium());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedMedium());
assertThrows(IllegalStateException.class, () -> buf.readInt());
assertThrows(IllegalStateException.class, () -> buf.readUnsignedInt());
assertThrows(IllegalStateException.class, () -> buf.readFloat());
assertThrows(IllegalStateException.class, () -> buf.readLong());
assertThrows(IllegalStateException.class, () -> buf.readDouble());
assertThrows(IllegalStateException.class, () -> buf.getByte(0));
assertThrows(IllegalStateException.class, () -> buf.getUnsignedByte(0));
assertThrows(IllegalStateException.class, () -> buf.getChar(0));
assertThrows(IllegalStateException.class, () -> buf.getShort(0));
assertThrows(IllegalStateException.class, () -> buf.getUnsignedShort(0));
assertThrows(IllegalStateException.class, () -> buf.getMedium(0));
assertThrows(IllegalStateException.class, () -> buf.getUnsignedMedium(0));
assertThrows(IllegalStateException.class, () -> buf.getInt(0));
assertThrows(IllegalStateException.class, () -> buf.getUnsignedInt(0));
assertThrows(IllegalStateException.class, () -> buf.getFloat(0));
assertThrows(IllegalStateException.class, () -> buf.getLong(0));
assertThrows(IllegalStateException.class, () -> buf.getDouble(0));
assertThrows(IllegalStateException.class, () -> buf.writeByte((byte) 32));
assertThrows(IllegalStateException.class, () -> buf.writeUnsignedByte(32));
assertThrows(IllegalStateException.class, () -> buf.writeChar('3'));
assertThrows(IllegalStateException.class, () -> buf.writeShort((short) 32));
assertThrows(IllegalStateException.class, () -> buf.writeUnsignedShort(32));
assertThrows(IllegalStateException.class, () -> buf.writeMedium(32));
assertThrows(IllegalStateException.class, () -> buf.writeUnsignedMedium(32));
assertThrows(IllegalStateException.class, () -> buf.writeInt(32));
assertThrows(IllegalStateException.class, () -> buf.writeUnsignedInt(32));
assertThrows(IllegalStateException.class, () -> buf.writeFloat(3.2f));
assertThrows(IllegalStateException.class, () -> buf.writeLong(32));
assertThrows(IllegalStateException.class, () -> buf.writeDouble(32));
assertThrows(IllegalStateException.class, () -> buf.setByte(0, (byte) 32));
assertThrows(IllegalStateException.class, () -> buf.setUnsignedByte(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setChar(0, '3'));
assertThrows(IllegalStateException.class, () -> buf.setShort(0, (short) 32));
assertThrows(IllegalStateException.class, () -> buf.setUnsignedShort(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setMedium(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setUnsignedMedium(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setInt(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setUnsignedInt(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setFloat(0, 3.2f));
assertThrows(IllegalStateException.class, () -> buf.setLong(0, 32));
assertThrows(IllegalStateException.class, () -> buf.setDouble(0, 32));
assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1));
try (Allocator allocator = Allocator.heap();
Buf target = allocator.allocate(24)) {
assertThrows(IllegalStateException.class, () -> buf.copyInto(0, target, 0, 1));
if (Allocator.isComposite(buf)) {
assertThrows(IllegalStateException.class, () -> Allocator.extend(buf, target));
}
}
assertThrows(IllegalStateException.class, () -> buf.bifurcate());
assertThrows(IllegalStateException.class, () -> buf.send());
assertThrows(IllegalStateException.class, () -> buf.acquire());
assertThrows(IllegalStateException.class, () -> buf.slice());
assertThrows(IllegalStateException.class, () -> buf.fill((byte) 0));
assertThrows(IllegalStateException.class, () -> buf.openCursor());
assertThrows(IllegalStateException.class, () -> buf.openCursor(0, 0));
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor());
assertThrows(IllegalStateException.class, () -> buf.openReverseCursor(0, 0));
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void cannotSendMoreThanOnce(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator();
Buf buf = allocator.allocate(8)) {
var send = buf.send();
var exc = assertThrows(IllegalStateException.class, () -> buf.send());
send.receive().close();
assertThat(exc).hasMessageContaining("Cannot send()");
}
}
@ParameterizedTest
@MethodSource("allocators")
public void bufferShouldNotBeAccessibleAfterClose(Fixture fixture) {
try (Allocator allocator = fixture.createAllocator()) {
Buf buf = allocator.allocate(24);
buf.writeLong(42);
buf.close();
verifyInaccessible(buf);
}
}
@ParameterizedTest
@MethodSource("initialAllocators")
void mustThrowWhenAllocatingZeroSizedBuffer(Fixture fixture) {
@ -1509,6 +1615,7 @@ public class BufTest {
assertThat(buf.writableBytes()).isEqualTo(0);
buf.ensureWritable(8);
assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8);
assertThat(buf.capacity()).isGreaterThanOrEqualTo(16);
buf.writeLong(0xA1A2A3A4A5A6A7A8L);
assertThat(buf.readableBytes()).isEqualTo(16);
assertThat(buf.readLong()).isEqualTo(0x0102030405060708L);
@ -1540,6 +1647,7 @@ public class BufTest {
Buf buf = allocator.allocate(4)) {
buf.ensureWritable(8);
assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8);
assertThat(buf.capacity()).isGreaterThanOrEqualTo(8);
buf.writeLong(0x0102030405060708L);
try (Buf slice = buf.slice()) {
assertEquals(0x0102030405060708L, slice.readLong());
@ -1842,6 +1950,15 @@ public class BufTest {
}
}
@Test
public void composeMustThrowWhenBuffersHaveMismatchedByteOrder() {
try (Allocator allocator = Allocator.heap();
Buf a = allocator.allocate(4, ByteOrder.BIG_ENDIAN);
Buf b = allocator.allocate(4, ByteOrder.LITTLE_ENDIAN)) {
assertThrows(IllegalArgumentException.class, () -> allocator.compose(a, b));
}
}
@ParameterizedTest
@MethodSource("nonSliceAllocators")
public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) {

View File

@ -15,17 +15,17 @@
*/
package io.netty.buffer.api;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ScopeTest {
@Test
public void scopeMustCloseContainedRcsInReverseInsertOrder() {
void scopeMustCloseContainedRcsInReverseInsertOrder() {
ArrayList<Integer> closeOrder = new ArrayList<>();
try (Scope scope = new Scope()) {
scope.add(new SomeRc(new OrderingDrop(1, closeOrder)));