From 02eb4286fac523639d57f083ddcc77a485d243ef Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 11 Dec 2020 12:09:32 +0100 Subject: [PATCH 1/5] Better method names and javadocs in RcSupport --- src/main/java/io/netty/buffer/api/RcSupport.java | 16 +++++++++++++--- .../io/netty/buffer/api/memseg/MemSegBuf.java | 5 +++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/RcSupport.java b/src/main/java/io/netty/buffer/api/RcSupport.java index 4e59120..eaf8ca5 100644 --- a/src/main/java/io/netty/buffer/api/RcSupport.java +++ b/src/main/java/io/netty/buffer/api/RcSupport.java @@ -16,7 +16,6 @@ package io.netty.buffer.api; import java.util.Objects; -import java.util.function.Function; public abstract class RcSupport, T extends RcSupport> implements Rc { private int acquires; // Closed if negative. @@ -83,6 +82,11 @@ public abstract class RcSupport, T extends RcSupport> impl return new TransferSend(owned, drop); } + /** + * Create an {@link IllegalStateException} with a custom message, tailored to this particular {@link Rc} instance, + * for when the object cannot be sent for some reason. + * @return An {@link IllegalStateException} to be thrown when this object cannot be sent. + */ protected IllegalStateException notSendableException() { return new IllegalStateException( "Cannot send() a reference counted object with " + acquires + " outstanding acquires: " + this + '.'); @@ -111,15 +115,21 @@ public abstract class RcSupport, T extends RcSupport> impl /** * Get access to the underlying {@link Drop} object. * This method is unsafe because it open the possibility of bypassing and overriding resource lifetimes. + * * @return The {@link Drop} object used by this reference counted object. */ protected Drop unsafeGetDrop() { return drop; } - protected Drop unsafeExchangeDrop(Drop replacement) { + /** + * Replace the current underlying {@link Drop} object with the given one. + * This method is unsafe because it open the possibility of bypassing and overring resource lifetimes. + * + * @param replacement The new {@link Drop} object to use instead of the current one. + */ + protected void unsafeSetDrop(Drop replacement) { drop = Objects.requireNonNull(replacement, "Replacement drop cannot be null."); - return replacement; } @SuppressWarnings("unchecked") diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java index 7a11636..7c429a1 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java @@ -322,7 +322,7 @@ class MemSegBuf extends RcSupport implements Buf { // Disconnect from the bifurcated drop, since we'll get our own fresh memory segment. drop.drop(this); drop = ((BifurcatedDrop) drop).unwrap(); - unsafeExchangeDrop(drop); + unsafeSetDrop(drop); } else { alloc.recoverMemory(recoverableMemory()); } @@ -345,7 +345,8 @@ class MemSegBuf extends RcSupport implements Buf { if (drop instanceof BifurcatedDrop) { ((BifurcatedDrop) drop).increment(); } else { - drop = unsafeExchangeDrop(new BifurcatedDrop(new MemSegBuf(seg, drop, alloc), drop)); + drop = new BifurcatedDrop(new MemSegBuf(seg, drop, alloc), drop); + unsafeSetDrop(drop); } var bifurcatedSeg = seg.asSlice(0, woff); var bifurcatedBuf = new MemSegBuf(bifurcatedSeg, drop, alloc); From ab95abac25abcaa9b5396e914dba628398b75a94 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 11 Dec 2020 12:10:04 +0100 Subject: [PATCH 2/5] The `make clean` command now also cleans up after failed `build` commands --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index bcc0f01..a2e1695 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,7 @@ dbg: clean: docker rm -fv build-container-dbg + docker rm -fv build-container build: image docker create --name build-container netty-incubator-buffer:build From f83e7fa618b48cce3c0c25222542489cefdaffbe Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 14 Dec 2020 14:07:11 +0100 Subject: [PATCH 3/5] Add BufHolder and BufRef helper classes Motivation: There are many use cases where other objects will have fields that are buffers. Since buffers are reference counted, their life cycle needs to be managed carefully. Modification: Add the abstract BufHolder, and the concrete sub-class BufRef, as neat building blocks for building other classes that contain field references to buffers. The behaviours of closed/sent buffers have also been specified in tests, and tightened up in the code. Result: It is now easier to create classes/objects that wrap buffers. --- .../java/io/netty/buffer/api/Allocator.java | 11 +- .../java/io/netty/buffer/api/BufHolder.java | 198 ++++++++++++++ src/main/java/io/netty/buffer/api/BufRef.java | 68 +++++ .../io/netty/buffer/api/CompositeBuf.java | 15 +- src/main/java/io/netty/buffer/api/Scope.java | 2 +- .../buffer/api/memseg/BifurcatedDrop.java | 15 +- .../io/netty/buffer/api/memseg/MemSegBuf.java | 252 +++++++++++++----- .../java/io/netty/buffer/api/BufRefTest.java | 110 ++++++++ .../java/io/netty/buffer/api/BufTest.java | 117 ++++++++ .../java/io/netty/buffer/api/ScopeTest.java | 10 +- 10 files changed, 721 insertions(+), 77 deletions(-) create mode 100644 src/main/java/io/netty/buffer/api/BufHolder.java create mode 100644 src/main/java/io/netty/buffer/api/BufRef.java create mode 100644 src/test/java/io/netty/buffer/api/BufRefTest.java diff --git a/src/main/java/io/netty/buffer/api/Allocator.java b/src/main/java/io/netty/buffer/api/Allocator.java index ef70175..962891c 100644 --- a/src/main/java/io/netty/buffer/api/Allocator.java +++ b/src/main/java/io/netty/buffer/api/Allocator.java @@ -123,7 +123,7 @@ public interface Allocator extends AutoCloseable { * @param extension The buffer to extend the composite buffer with. */ static void extend(Buf composite, Buf extension) { - if (composite.getClass() != CompositeBuf.class) { + if (!isComposite(composite)) { throw new IllegalArgumentException( "Expected the first buffer to be a composite buffer, " + "but it is a " + composite.getClass() + " buffer: " + composite + '.'); @@ -132,6 +132,15 @@ public interface Allocator extends AutoCloseable { buf.extendWith(extension); } + /** + * Check if the given buffer is a {@linkplain #compose(Buf...) composite} buffer or not. + * @param composite The buffer to check. + * @return {@code true} if the given buffer was created with {@link #compose(Buf...)}, {@code false} otherwise. + */ + static boolean isComposite(Buf composite) { + return composite.getClass() == CompositeBuf.class; + } + /** * Close this allocator, freeing all of its internal resources. It is not specified if the allocator can still be * used after this method has been called on it. diff --git a/src/main/java/io/netty/buffer/api/BufHolder.java b/src/main/java/io/netty/buffer/api/BufHolder.java new file mode 100644 index 0000000..3d691d7 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/BufHolder.java @@ -0,0 +1,198 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api; + +import java.lang.invoke.VarHandle; +import java.util.Objects; + +import static io.netty.buffer.api.Statics.findVarHandle; +import static java.lang.invoke.MethodHandles.lookup; + +/** + * The {@link BufHolder} is an abstract class that simplifies the implementation of objects that themselves contain + * a {@link Buf} instance. + *

+ * The {@link BufHolder} can only hold on to a single buffer, so objects and classes that need to hold on to multiple + * buffers will have to do their implementation from scratch, though they can use the code of the {@link BufHolder} as + * inspiration. + *

+ * If you just want an object that is a reference to a buffer, then the {@link BufRef} can be used for that purpose. + * If you have an advanced use case where you wish to implement {@link Rc}, and tightly control lifetimes, then + * {@link RcSupport} can be of help. + * + * @param The concrete {@link BufHolder} type. + */ +public abstract class BufHolder> implements Rc { + private static final VarHandle BUF = findVarHandle(lookup(), BufHolder.class, "buf", Buf.class); + private Buf buf; + + /** + * Create a new {@link BufHolder} to hold the given {@linkplain Buf buffer}. + *

+ * Note: this increases the reference count of the given buffer. + * + * @param buf The {@linkplain Buf buffer} to be held by this holder. + */ + protected BufHolder(Buf buf) { + this.buf = Objects.requireNonNull(buf, "The buffer cannot be null.").acquire(); + } + + /** + * Create a new {@link BufHolder} to hold the {@linkplain Buf buffer} received from the given {@link Send}. + *

+ * The {@link BufHolder} will then be holding exclusive ownership of the buffer. + * + * @param send The {@linkplain Buf buffer} to be held by this holder. + */ + protected BufHolder(Send send) { + buf = Objects.requireNonNull(send, "The send cannot be null.").receive(); + } + + @SuppressWarnings("unchecked") + @Override + public T acquire() { + buf.acquire(); + return (T) this; + } + + @Override + public void close() { + buf.close(); + } + + @Override + public boolean isOwned() { + return buf.isOwned(); + } + + @Override + public int countBorrows() { + return buf.countBorrows(); + } + + @Override + public Send send() { + var send = buf.send(); + return new Send() { + @Override + public T receive() { + return BufHolder.this.receive(send.receive()); + } + }; + } + + /** + * Called when a {@linkplain #send() sent} {@link BufHolder} is received by the recipient. + * The {@link BufHolder} should return a new concrete instance, that wraps the given {@link Buf} object. + * + * @param buf The {@link Buf} that is {@linkplain Send#receive() received} by the recipient, + * and needs to be wrapped in a new {@link BufHolder} instance. + * @return A new {@linkplain T buffer holder} instance, containing the given {@linkplain Buf buffer}. + */ + protected abstract T receive(Buf buf); + + /** + * Replace the underlying referenced buffer with the given buffer. + *

+ * This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations. + *

+ * Note: this method decreases the reference count of the current buffer, + * and increases the reference count of the new buffer. + *

+ * The buffer assignment is performed using a plain store. + * + * @param newBuf The new {@link Buf} instance that is replacing the currently held buffer. + */ + protected void replace(Buf newBuf) { + try (var ignore = buf) { + buf = newBuf.acquire(); + } + } + + /** + * Replace the underlying referenced buffer with the given buffer. + *

+ * This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations. + *

+ * Note: this method decreases the reference count of the current buffer, + * and takes exclusive ownership of the sent buffer. + *

+ * The buffer assignment is performed using a plain store. + * + * @param send The new {@link Buf} instance that is replacing the currently held buffer. + */ + protected void replace(Send send) { + try (var ignore = buf) { + buf = send.receive(); + } + } + + /** + * Replace the underlying referenced buffer with the given buffer. + *

+ * This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations. + *

+ * Note: this method decreases the reference count of the current buffer, + * and increases the reference count of the new buffer. + *

+ * The buffer assignment is performed using a volatile store. + * + * @param newBuf The new {@link Buf} instance that is replacing the currently held buffer. + */ + protected void replaceVolatile(Buf newBuf) { + var prev = (Buf) BUF.getAndSet(this, newBuf.acquire()); + prev.close(); + } + + /** + * Replace the underlying referenced buffer with the given buffer. + *

+ * This method is protected to permit advanced use cases of {@link BufHolder} sub-class implementations. + *

+ * Note: this method decreases the reference count of the current buffer, + * and takes exclusive ownership of the sent buffer. + *

+ * The buffer assignment is performed using a volatile store. + * + * @param send The new {@link Buf} instance that is replacing the currently held buffer. + */ + protected void replaceVolatile(Send send) { + var prev = (Buf) BUF.getAndSet(this, send.receive()); + prev.close(); + } + + /** + * Access the held {@link Buf} instance. + *

+ * The access is performed using a plain load. + * + * @return The {@link Buf} instance being held by this {@linkplain T buffer holder}. + */ + protected Buf getBuf() { + return buf; + } + + /** + * Access the held {@link Buf} instance. + *

+ * The access is performed using a volatile load. + * + * @return The {@link Buf} instance being held by this {@linkplain T buffer holder}. + */ + protected Buf getBufVolatile() { + return (Buf) BUF.getVolatile(this); + } +} diff --git a/src/main/java/io/netty/buffer/api/BufRef.java b/src/main/java/io/netty/buffer/api/BufRef.java new file mode 100644 index 0000000..6e6d56a --- /dev/null +++ b/src/main/java/io/netty/buffer/api/BufRef.java @@ -0,0 +1,68 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api; + +import java.lang.invoke.VarHandle; + +/** + * A mutable reference to a buffer. + */ +public final class BufRef extends BufHolder { + /** + * Create a reference to the given {@linkplain Buf buffer}. + * This increments the reference count of the buffer. + * + * @param buf The buffer to reference. + */ + public BufRef(Buf buf) { + super(buf); + VarHandle.fullFence(); + } + + /** + * Create a reference that holds the exclusive ownership of the sent buffer. + * + * @param send The {@linkplain Send sent} buffer to take ownership of. + */ + public BufRef(Send send) { + super(send); + VarHandle.fullFence(); + } + + @Override + protected BufRef receive(Buf buf) { + return new BufRef(buf); + } + + @Override + public void replaceVolatile(Buf newBuf) { + super.replaceVolatile(newBuf); + } + + @Override + public void replaceVolatile(Send send) { + super.replaceVolatile(send); + } + + /** + * Access the buffer in this reference. + * + * @return The buffer held by the reference. + */ + public Buf contents() { + return getBufVolatile(); + } +} diff --git a/src/main/java/io/netty/buffer/api/CompositeBuf.java b/src/main/java/io/netty/buffer/api/CompositeBuf.java index 7d2577e..d3c4f07 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuf.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuf.java @@ -31,6 +31,7 @@ final class CompositeBuf extends RcSupport implements Buf { for (Buf b : buf.bufs) { b.close(); } + buf.makeInaccessible(); }; private final Allocator allocator; @@ -43,6 +44,7 @@ final class CompositeBuf extends RcSupport implements Buf { private int woff; private int subOffset; // The next offset *within* a consituent buffer to read from or write to. private ByteOrder order; + private boolean closed; CompositeBuf(Allocator allocator, Buf[] bufs) { this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array. @@ -885,6 +887,7 @@ final class CompositeBuf extends RcSupport implements Buf { } throw throwable; } + makeInaccessible(); return new Owned() { @Override public CompositeBuf transferOwnership(Drop drop) { @@ -899,6 +902,13 @@ final class CompositeBuf extends RcSupport implements Buf { }; } + void makeInaccessible() { + capacity = 0; + roff = 0; + woff = 0; + closed = true; + } + @Override protected IllegalStateException notSendableException() { if (!isSendable) { @@ -979,7 +989,10 @@ final class CompositeBuf extends RcSupport implements Buf { } } - private IndexOutOfBoundsException indexOutOfBounds(int index) { + private RuntimeException indexOutOfBounds(int index) { + if (closed) { + return new IllegalStateException("This buffer is closed."); + } return new IndexOutOfBoundsException( "Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " + (capacity - 1) + "]."); diff --git a/src/main/java/io/netty/buffer/api/Scope.java b/src/main/java/io/netty/buffer/api/Scope.java index 755b263..fa24987 100644 --- a/src/main/java/io/netty/buffer/api/Scope.java +++ b/src/main/java/io/netty/buffer/api/Scope.java @@ -30,7 +30,7 @@ import java.util.ArrayDeque; *

* Note that scopes are not thread-safe. They are intended to be used from a single thread. */ -public class Scope implements AutoCloseable { +public final class Scope implements AutoCloseable { private final ArrayDeque> deque = new ArrayDeque<>(); /** diff --git a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java index 2e17421..0322ad5 100644 --- a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java +++ b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java @@ -20,7 +20,7 @@ import io.netty.buffer.api.Drop; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; -class BifurcatedDrop implements Drop { +class BifurcatedDrop implements Drop { private static final VarHandle COUNT; static { try { @@ -30,12 +30,12 @@ class BifurcatedDrop implements Drop { } } - private final T originalBuf; - private final Drop delegate; + private final MemSegBuf originalBuf; + private final Drop delegate; @SuppressWarnings("FieldMayBeFinal") private volatile int count; - BifurcatedDrop(T originalBuf, Drop delegate) { + BifurcatedDrop(MemSegBuf originalBuf, Drop delegate) { this.originalBuf = originalBuf; this.delegate = delegate; count = 2; // These are created by buffer bifurcation, so we initially have 2 references to this drop. @@ -50,7 +50,7 @@ class BifurcatedDrop implements Drop { } @Override - public synchronized void drop(T obj) { + public synchronized void drop(MemSegBuf buf) { int c; int n; do { @@ -62,14 +62,15 @@ class BifurcatedDrop implements Drop { delegate.attach(originalBuf); delegate.drop(originalBuf); } + buf.makeInaccessible(); } @Override - public void attach(T obj) { + public void attach(MemSegBuf obj) { delegate.attach(obj); } - Drop unwrap() { + Drop unwrap() { return delegate; } diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java index 7c429a1..6631824 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuf.java @@ -43,7 +43,19 @@ import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset; import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset; class MemSegBuf extends RcSupport implements Buf { - static final Drop SEGMENT_CLOSE = buf -> buf.seg.close(); + private static final MemorySegment CLOSED_SEGMENT; + static final Drop SEGMENT_CLOSE; + + static { + CLOSED_SEGMENT = MemorySegment.ofArray(new byte[0]); + CLOSED_SEGMENT.close(); + SEGMENT_CLOSE = buf -> { + try (var ignore = buf.seg) { + buf.makeInaccessible(); + } + }; + } + private final AllocatorControl alloc; private final boolean isSendable; private MemorySegment seg; @@ -130,7 +142,10 @@ class MemSegBuf extends RcSupport implements Buf { } var slice = seg.asSlice(offset, length); acquire(); - Drop drop = b -> close(); + Drop drop = b -> { + close(); + b.makeInaccessible(); + }; var sendable = false; // Sending implies ownership change, which we can't do for slices. return new MemSegBuf(slice, drop, alloc, sendable).writerOffset(length).order(order()); } @@ -177,6 +192,9 @@ class MemSegBuf extends RcSupport implements Buf { @Override public ByteCursor openCursor(int fromOffset, int length) { + if (seg == CLOSED_SEGMENT) { + throw bufferIsClosed(); + } if (fromOffset < 0) { throw new IllegalArgumentException("The fromOffset cannot be negative: " + fromOffset + '.'); } @@ -238,6 +256,9 @@ class MemSegBuf extends RcSupport implements Buf { @Override public ByteCursor openReverseCursor(int fromOffset, int length) { + if (seg == CLOSED_SEGMENT) { + throw bufferIsClosed(); + } if (fromOffset < 0) { throw new IllegalArgumentException("The fromOffset cannot be negative: " + fromOffset + '.'); } @@ -320,9 +341,13 @@ class MemSegBuf extends RcSupport implements Buf { var drop = unsafeGetDrop(); if (drop instanceof BifurcatedDrop) { // Disconnect from the bifurcated drop, since we'll get our own fresh memory segment. + int roff = this.roff; + int woff = this.woff; drop.drop(this); - drop = ((BifurcatedDrop) drop).unwrap(); + drop = ((BifurcatedDrop) drop).unwrap(); unsafeSetDrop(drop); + this.roff = roff; + this.woff = woff; } else { alloc.recoverMemory(recoverableMemory()); } @@ -343,9 +368,9 @@ class MemSegBuf extends RcSupport implements Buf { drop.attach(this); } if (drop instanceof BifurcatedDrop) { - ((BifurcatedDrop) drop).increment(); + ((BifurcatedDrop) drop).increment(); } else { - drop = new BifurcatedDrop(new MemSegBuf(seg, drop, alloc), drop); + drop = new BifurcatedDrop(new MemSegBuf(seg, drop, alloc), drop); unsafeSetDrop(drop); } var bifurcatedSeg = seg.asSlice(0, woff); @@ -390,28 +415,44 @@ class MemSegBuf extends RcSupport implements Buf { @Override public Buf writeByte(byte value) { - setByteAtOffset(seg, woff, value); - woff += Byte.BYTES; - return this; + try { + setByteAtOffset(seg, woff, value); + woff += Byte.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf setByte(int woff, byte value) { - setByteAtOffset(seg, woff, value); - return this; + try { + setByteAtOffset(seg, woff, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf writeUnsignedByte(int value) { - setByteAtOffset(seg, woff, (byte) (value & 0xFF)); - woff += Byte.BYTES; - return this; + try { + setByteAtOffset(seg, woff, (byte) (value & 0xFF)); + woff += Byte.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf setUnsignedByte(int woff, int value) { - setByteAtOffset(seg, woff, (byte) (value & 0xFF)); - return this; + try { + setByteAtOffset(seg, woff, (byte) (value & 0xFF)); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override @@ -430,15 +471,23 @@ class MemSegBuf extends RcSupport implements Buf { @Override public Buf writeChar(char value) { - setCharAtOffset(seg, woff, order, value); - woff += 2; - return this; + try { + setCharAtOffset(seg, woff, order, value); + woff += 2; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf setChar(int woff, char value) { - setCharAtOffset(seg, woff, order, value); - return this; + try { + setCharAtOffset(seg, woff, order, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override @@ -471,28 +520,44 @@ class MemSegBuf extends RcSupport implements Buf { @Override public Buf writeShort(short value) { - setShortAtOffset(seg, woff, order, value); - woff += Short.BYTES; - return this; + try { + setShortAtOffset(seg, woff, order, value); + woff += Short.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf setShort(int woff, short value) { - setShortAtOffset(seg, woff, order, value); - return this; + try { + setShortAtOffset(seg, woff, order, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf writeUnsignedShort(int value) { - setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF)); - woff += Short.BYTES; - return this; + try { + setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF)); + woff += Short.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf setUnsignedShort(int woff, int value) { - setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF)); - return this; + try { + setShortAtOffset(seg, woff, order, (short) (value & 0xFFFF)); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override @@ -639,28 +704,44 @@ class MemSegBuf extends RcSupport implements Buf { @Override public Buf writeInt(int value) { - setIntAtOffset(seg, woff, order, value); - woff += Integer.BYTES; - return this; + try { + setIntAtOffset(seg, woff, order, value); + woff += Integer.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf setInt(int woff, int value) { - setIntAtOffset(seg, woff, order, value); - return this; + try { + setIntAtOffset(seg, woff, order, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf writeUnsignedInt(long value) { - setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL)); - woff += Integer.BYTES; - return this; + try { + setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL)); + woff += Integer.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf setUnsignedInt(int woff, long value) { - setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL)); - return this; + try { + setIntAtOffset(seg, woff, order, (int) (value & 0xFFFFFFFFL)); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override @@ -679,15 +760,23 @@ class MemSegBuf extends RcSupport implements Buf { @Override public Buf writeFloat(float value) { - setFloatAtOffset(seg, woff, order, value); - woff += Float.BYTES; - return this; + try { + setFloatAtOffset(seg, woff, order, value); + woff += Float.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf setFloat(int woff, float value) { - setFloatAtOffset(seg, woff, order, value); - return this; + try { + setFloatAtOffset(seg, woff, order, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override @@ -706,15 +795,23 @@ class MemSegBuf extends RcSupport implements Buf { @Override public Buf writeLong(long value) { - setLongAtOffset(seg, woff, order, value); - woff += Long.BYTES; - return this; + try { + setLongAtOffset(seg, woff, order, value); + woff += Long.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf setLong(int woff, long value) { - setLongAtOffset(seg, woff, order, value); - return this; + try { + setLongAtOffset(seg, woff, order, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override @@ -733,35 +830,52 @@ class MemSegBuf extends RcSupport implements Buf { @Override public Buf writeDouble(double value) { - setDoubleAtOffset(seg, woff, order, value); - woff += Double.BYTES; - return this; + try { + setDoubleAtOffset(seg, woff, order, value); + woff += Double.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } @Override public Buf setDouble(int woff, double value) { - setDoubleAtOffset(seg, woff, order, value); - return this; + try { + setDoubleAtOffset(seg, woff, order, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e); + } } // @Override protected Owned prepareSend() { - MemSegBuf outer = this; + var order = this.order; + var roff = this.roff; + var woff = this.woff; boolean isConfined = seg.ownerThread() == null; MemorySegment transferSegment = isConfined? seg : seg.share(); + makeInaccessible(); return new Owned() { @Override public MemSegBuf transferOwnership(Drop drop) { MemSegBuf copy = new MemSegBuf(transferSegment, drop, alloc); - copy.order = outer.order; - copy.roff = outer.roff; - copy.woff = outer.woff; + copy.order = order; + copy.roff = roff; + copy.woff = woff; return copy; } }; } + void makeInaccessible() { + seg = CLOSED_SEGMENT; + roff = 0; + woff = 0; + } + @Override protected IllegalStateException notSendableException() { if (!isSendable) { @@ -778,22 +892,36 @@ class MemSegBuf extends RcSupport implements Buf { private void checkRead(int index, int size) { if (index < 0 || woff < index + size) { - throw indexOutOfBounds(index); + throw accessCheckException(index); } } private void checkWrite(int index, int size) { if (index < 0 || seg.byteSize() < index + size) { - throw indexOutOfBounds(index); + throw accessCheckException(index); } } - private IndexOutOfBoundsException indexOutOfBounds(int index) { + private RuntimeException checkWriteState(IndexOutOfBoundsException ioobe) { + if (seg == CLOSED_SEGMENT) { + return bufferIsClosed(); + } + return ioobe; + } + + private RuntimeException accessCheckException(int index) { + if (seg == CLOSED_SEGMENT) { + throw bufferIsClosed(); + } return new IndexOutOfBoundsException( "Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " + (seg.byteSize() - 1) + "]."); } + private IllegalStateException bufferIsClosed() { + return new IllegalStateException("This buffer is closed."); + } + Object recoverableMemory() { return new RecoverableMemory(seg, alloc); } diff --git a/src/test/java/io/netty/buffer/api/BufRefTest.java b/src/test/java/io/netty/buffer/api/BufRefTest.java new file mode 100644 index 0000000..ce0783a --- /dev/null +++ b/src/test/java/io/netty/buffer/api/BufRefTest.java @@ -0,0 +1,110 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +class BufRefTest { + @Test + public void closingBufRefMustCloseOwnedBuf() { + try (Allocator allocator = Allocator.heap()) { + BufRef ref; + try (Buf b = allocator.allocate(8)) { + ref = new BufRef(b); + } + ref.contents().writeInt(42); + assertThat(ref.contents().readInt()).isEqualTo(42); + ref.close(); + assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32)); + } + } + + @Test + public void closingBufRefMustCloseOwnedBufFromSend() { + try (Allocator allocator = Allocator.heap(); + Buf buf = allocator.allocate(8)) { + BufRef ref = new BufRef(buf.send()); + ref.contents().writeInt(42); + assertThat(ref.contents().readInt()).isEqualTo(42); + ref.close(); + assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32)); + } + } + + @Test + public void mustCloseOwnedBufferWhenReplaced() { + try (Allocator allocator = Allocator.heap()) { + Buf orig; + BufRef ref; + try (Buf buf = allocator.allocate(8)) { + ref = new BufRef(orig = buf); + } + + orig.writeInt(42); + assertThat(ref.contents().readInt()).isEqualTo(42); + + try (Buf buf = allocator.allocate(8)) { + ref.replaceVolatile(buf); // Pass replacement directly. + } + + assertThrows(IllegalStateException.class, () -> orig.writeInt(32)); + ref.contents().writeInt(42); + assertThat(ref.contents().readInt()).isEqualTo(42); + ref.close(); + assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32)); + } + } + + @Test + public void mustCloseOwnedBufferWhenReplacedFromSend() { + try (Allocator allocator = Allocator.heap()) { + Buf orig; + BufRef ref; + try (Buf buf = allocator.allocate(8)) { + ref = new BufRef(orig = buf); + } + + orig.writeInt(42); + assertThat(ref.contents().readInt()).isEqualTo(42); + + try (Buf buf = allocator.allocate(8)) { + ref.replaceVolatile(buf.send()); // Pass replacement via send(). + } + + assertThrows(IllegalStateException.class, () -> orig.writeInt(32)); + ref.contents().writeInt(42); + assertThat(ref.contents().readInt()).isEqualTo(42); + ref.close(); + assertThrows(IllegalStateException.class, () -> ref.contents().writeInt(32)); + } + } + + @Test + public void sendingRefMustSendBuffer() { + try (Allocator allocator = Allocator.heap(); + BufRef refA = new BufRef(allocator.allocate(8).send())) { + refA.contents().writeInt(42); + var send = refA.send(); + assertThrows(IllegalStateException.class, () -> refA.contents().readInt()); + try (BufRef refB = send.receive()) { + assertThat(refB.contents().readInt()).isEqualTo(42); + } + } + } +} diff --git a/src/test/java/io/netty/buffer/api/BufTest.java b/src/test/java/io/netty/buffer/api/BufTest.java index e043801..bb5da96 100644 --- a/src/test/java/io/netty/buffer/api/BufTest.java +++ b/src/test/java/io/netty/buffer/api/BufTest.java @@ -369,6 +369,112 @@ public class BufTest { } } + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void originalBufferMustNotBeAccessibleAfterSend(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf orig = allocator.allocate(24)) { + orig.writeLong(42); + var send = orig.send(); + verifyInaccessible(orig); + try (Buf receive = send.receive()) { + assertEquals(42, receive.readInt()); + } + } + } + + private void verifyInaccessible(Buf buf) { + assertThrows(IllegalStateException.class, () -> buf.readByte()); + assertThrows(IllegalStateException.class, () -> buf.readUnsignedByte()); + assertThrows(IllegalStateException.class, () -> buf.readChar()); + assertThrows(IllegalStateException.class, () -> buf.readShort()); + assertThrows(IllegalStateException.class, () -> buf.readUnsignedShort()); + assertThrows(IllegalStateException.class, () -> buf.readMedium()); + assertThrows(IllegalStateException.class, () -> buf.readUnsignedMedium()); + assertThrows(IllegalStateException.class, () -> buf.readInt()); + assertThrows(IllegalStateException.class, () -> buf.readUnsignedInt()); + assertThrows(IllegalStateException.class, () -> buf.readFloat()); + assertThrows(IllegalStateException.class, () -> buf.readLong()); + assertThrows(IllegalStateException.class, () -> buf.readDouble()); + assertThrows(IllegalStateException.class, () -> buf.getByte(0)); + assertThrows(IllegalStateException.class, () -> buf.getUnsignedByte(0)); + assertThrows(IllegalStateException.class, () -> buf.getChar(0)); + assertThrows(IllegalStateException.class, () -> buf.getShort(0)); + assertThrows(IllegalStateException.class, () -> buf.getUnsignedShort(0)); + assertThrows(IllegalStateException.class, () -> buf.getMedium(0)); + assertThrows(IllegalStateException.class, () -> buf.getUnsignedMedium(0)); + assertThrows(IllegalStateException.class, () -> buf.getInt(0)); + assertThrows(IllegalStateException.class, () -> buf.getUnsignedInt(0)); + assertThrows(IllegalStateException.class, () -> buf.getFloat(0)); + assertThrows(IllegalStateException.class, () -> buf.getLong(0)); + assertThrows(IllegalStateException.class, () -> buf.getDouble(0)); + assertThrows(IllegalStateException.class, () -> buf.writeByte((byte) 32)); + assertThrows(IllegalStateException.class, () -> buf.writeUnsignedByte(32)); + assertThrows(IllegalStateException.class, () -> buf.writeChar('3')); + assertThrows(IllegalStateException.class, () -> buf.writeShort((short) 32)); + assertThrows(IllegalStateException.class, () -> buf.writeUnsignedShort(32)); + assertThrows(IllegalStateException.class, () -> buf.writeMedium(32)); + assertThrows(IllegalStateException.class, () -> buf.writeUnsignedMedium(32)); + assertThrows(IllegalStateException.class, () -> buf.writeInt(32)); + assertThrows(IllegalStateException.class, () -> buf.writeUnsignedInt(32)); + assertThrows(IllegalStateException.class, () -> buf.writeFloat(3.2f)); + assertThrows(IllegalStateException.class, () -> buf.writeLong(32)); + assertThrows(IllegalStateException.class, () -> buf.writeDouble(32)); + assertThrows(IllegalStateException.class, () -> buf.setByte(0, (byte) 32)); + assertThrows(IllegalStateException.class, () -> buf.setUnsignedByte(0, 32)); + assertThrows(IllegalStateException.class, () -> buf.setChar(0, '3')); + assertThrows(IllegalStateException.class, () -> buf.setShort(0, (short) 32)); + assertThrows(IllegalStateException.class, () -> buf.setUnsignedShort(0, 32)); + assertThrows(IllegalStateException.class, () -> buf.setMedium(0, 32)); + assertThrows(IllegalStateException.class, () -> buf.setUnsignedMedium(0, 32)); + assertThrows(IllegalStateException.class, () -> buf.setInt(0, 32)); + assertThrows(IllegalStateException.class, () -> buf.setUnsignedInt(0, 32)); + assertThrows(IllegalStateException.class, () -> buf.setFloat(0, 3.2f)); + assertThrows(IllegalStateException.class, () -> buf.setLong(0, 32)); + assertThrows(IllegalStateException.class, () -> buf.setDouble(0, 32)); + + assertThrows(IllegalStateException.class, () -> buf.ensureWritable(1)); + try (Allocator allocator = Allocator.heap(); + Buf target = allocator.allocate(24)) { + assertThrows(IllegalStateException.class, () -> buf.copyInto(0, target, 0, 1)); + if (Allocator.isComposite(buf)) { + assertThrows(IllegalStateException.class, () -> Allocator.extend(buf, target)); + } + } + assertThrows(IllegalStateException.class, () -> buf.bifurcate()); + assertThrows(IllegalStateException.class, () -> buf.send()); + assertThrows(IllegalStateException.class, () -> buf.acquire()); + assertThrows(IllegalStateException.class, () -> buf.slice()); + assertThrows(IllegalStateException.class, () -> buf.fill((byte) 0)); + assertThrows(IllegalStateException.class, () -> buf.openCursor()); + assertThrows(IllegalStateException.class, () -> buf.openCursor(0, 0)); + assertThrows(IllegalStateException.class, () -> buf.openReverseCursor()); + assertThrows(IllegalStateException.class, () -> buf.openReverseCursor(0, 0)); + } + + @ParameterizedTest + @MethodSource("nonSliceAllocators") + public void cannotSendMoreThanOnce(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator(); + Buf buf = allocator.allocate(8)) { + var send = buf.send(); + var exc = assertThrows(IllegalStateException.class, () -> buf.send()); + send.receive().close(); + assertThat(exc).hasMessageContaining("Cannot send()"); + } + } + + @ParameterizedTest + @MethodSource("allocators") + public void bufferShouldNotBeAccessibleAfterClose(Fixture fixture) { + try (Allocator allocator = fixture.createAllocator()) { + Buf buf = allocator.allocate(24); + buf.writeLong(42); + buf.close(); + verifyInaccessible(buf); + } + } + @ParameterizedTest @MethodSource("initialAllocators") void mustThrowWhenAllocatingZeroSizedBuffer(Fixture fixture) { @@ -1509,6 +1615,7 @@ public class BufTest { assertThat(buf.writableBytes()).isEqualTo(0); buf.ensureWritable(8); assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8); + assertThat(buf.capacity()).isGreaterThanOrEqualTo(16); buf.writeLong(0xA1A2A3A4A5A6A7A8L); assertThat(buf.readableBytes()).isEqualTo(16); assertThat(buf.readLong()).isEqualTo(0x0102030405060708L); @@ -1540,6 +1647,7 @@ public class BufTest { Buf buf = allocator.allocate(4)) { buf.ensureWritable(8); assertThat(buf.writableBytes()).isGreaterThanOrEqualTo(8); + assertThat(buf.capacity()).isGreaterThanOrEqualTo(8); buf.writeLong(0x0102030405060708L); try (Buf slice = buf.slice()) { assertEquals(0x0102030405060708L, slice.readLong()); @@ -1842,6 +1950,15 @@ public class BufTest { } } + @Test + public void composeMustThrowWhenBuffersHaveMismatchedByteOrder() { + try (Allocator allocator = Allocator.heap(); + Buf a = allocator.allocate(4, ByteOrder.BIG_ENDIAN); + Buf b = allocator.allocate(4, ByteOrder.LITTLE_ENDIAN)) { + assertThrows(IllegalArgumentException.class, () -> allocator.compose(a, b)); + } + } + @ParameterizedTest @MethodSource("nonSliceAllocators") public void bifurcateOfNonOwnedBufferMustThrow(Fixture fixture) { diff --git a/src/test/java/io/netty/buffer/api/ScopeTest.java b/src/test/java/io/netty/buffer/api/ScopeTest.java index 7c5e3b3..a48fa3e 100644 --- a/src/test/java/io/netty/buffer/api/ScopeTest.java +++ b/src/test/java/io/netty/buffer/api/ScopeTest.java @@ -15,17 +15,17 @@ */ package io.netty.buffer.api; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.ArrayList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ScopeTest { @Test - public void scopeMustCloseContainedRcsInReverseInsertOrder() { + void scopeMustCloseContainedRcsInReverseInsertOrder() { ArrayList closeOrder = new ArrayList<>(); try (Scope scope = new Scope()) { scope.add(new SomeRc(new OrderingDrop(1, closeOrder))); From 6b91751bea19bbf3e87610f7fda97aa8bacf9daf Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 16 Dec 2020 12:31:49 +0100 Subject: [PATCH 4/5] Small polishing that addresses PR comments --- src/main/java/io/netty/buffer/api/BufRef.java | 2 ++ src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/netty/buffer/api/BufRef.java b/src/main/java/io/netty/buffer/api/BufRef.java index 6e6d56a..7380d8f 100644 --- a/src/main/java/io/netty/buffer/api/BufRef.java +++ b/src/main/java/io/netty/buffer/api/BufRef.java @@ -29,6 +29,7 @@ public final class BufRef extends BufHolder { */ public BufRef(Buf buf) { super(buf); + // BufRef is meant to be atomic, so we need to add a fence to get the semantics of a volatile store. VarHandle.fullFence(); } @@ -39,6 +40,7 @@ public final class BufRef extends BufHolder { */ public BufRef(Send send) { super(send); + // BufRef is meant to be atomic, so we need to add a fence to get the semantics of a volatile store. VarHandle.fullFence(); } diff --git a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java index 0322ad5..91629d2 100644 --- a/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java +++ b/src/main/java/io/netty/buffer/api/memseg/BifurcatedDrop.java @@ -50,7 +50,7 @@ class BifurcatedDrop implements Drop { } @Override - public synchronized void drop(MemSegBuf buf) { + public void drop(MemSegBuf buf) { int c; int n; do { From 008c5ed6ec96cca7262cd6e21971235e3bb92d13 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 16 Dec 2020 17:06:22 +0100 Subject: [PATCH 5/5] Make BufHolder protected methods final if they're not meant to be overwritten --- .../java/io/netty/buffer/api/BufHolder.java | 21 +++++-------- src/main/java/io/netty/buffer/api/BufRef.java | 30 +++++++++++++++---- src/main/java/io/netty/buffer/api/Send.java | 1 + .../java/io/netty/buffer/api/BufRefTest.java | 4 +-- 4 files changed, 35 insertions(+), 21 deletions(-) diff --git a/src/main/java/io/netty/buffer/api/BufHolder.java b/src/main/java/io/netty/buffer/api/BufHolder.java index 3d691d7..aa88892 100644 --- a/src/main/java/io/netty/buffer/api/BufHolder.java +++ b/src/main/java/io/netty/buffer/api/BufHolder.java @@ -86,12 +86,7 @@ public abstract class BufHolder> implements Rc { @Override public Send send() { var send = buf.send(); - return new Send() { - @Override - public T receive() { - return BufHolder.this.receive(send.receive()); - } - }; + return () -> receive(send.receive()); } /** @@ -116,7 +111,7 @@ public abstract class BufHolder> implements Rc { * * @param newBuf The new {@link Buf} instance that is replacing the currently held buffer. */ - protected void replace(Buf newBuf) { + protected final void replaceBuf(Buf newBuf) { try (var ignore = buf) { buf = newBuf.acquire(); } @@ -134,7 +129,7 @@ public abstract class BufHolder> implements Rc { * * @param send The new {@link Buf} instance that is replacing the currently held buffer. */ - protected void replace(Send send) { + protected final void replaceBuf(Send send) { try (var ignore = buf) { buf = send.receive(); } @@ -152,7 +147,7 @@ public abstract class BufHolder> implements Rc { * * @param newBuf The new {@link Buf} instance that is replacing the currently held buffer. */ - protected void replaceVolatile(Buf newBuf) { + protected final void replaceBufVolatile(Buf newBuf) { var prev = (Buf) BUF.getAndSet(this, newBuf.acquire()); prev.close(); } @@ -167,9 +162,9 @@ public abstract class BufHolder> implements Rc { *

* The buffer assignment is performed using a volatile store. * - * @param send The new {@link Buf} instance that is replacing the currently held buffer. + * @param send The {@link Send} with the new {@link Buf} instance that is replacing the currently held buffer. */ - protected void replaceVolatile(Send send) { + protected final void replaceBufVolatile(Send send) { var prev = (Buf) BUF.getAndSet(this, send.receive()); prev.close(); } @@ -181,7 +176,7 @@ public abstract class BufHolder> implements Rc { * * @return The {@link Buf} instance being held by this {@linkplain T buffer holder}. */ - protected Buf getBuf() { + protected final Buf getBuf() { return buf; } @@ -192,7 +187,7 @@ public abstract class BufHolder> implements Rc { * * @return The {@link Buf} instance being held by this {@linkplain T buffer holder}. */ - protected Buf getBufVolatile() { + protected final Buf getBufVolatile() { return (Buf) BUF.getVolatile(this); } } diff --git a/src/main/java/io/netty/buffer/api/BufRef.java b/src/main/java/io/netty/buffer/api/BufRef.java index 7380d8f..2884a23 100644 --- a/src/main/java/io/netty/buffer/api/BufRef.java +++ b/src/main/java/io/netty/buffer/api/BufRef.java @@ -49,14 +49,32 @@ public final class BufRef extends BufHolder { return new BufRef(buf); } - @Override - public void replaceVolatile(Buf newBuf) { - super.replaceVolatile(newBuf); + /** + * Replace the underlying referenced buffer with the given buffer. + *

+ * Note: this method decreases the reference count of the current buffer, + * and increases the reference count of the new buffer. + *

+ * The buffer assignment is performed using a volatile store. + * + * @param newBuf The new {@link Buf} instance that is replacing the currently held buffer. + */ + public void replace(Buf newBuf) { + replaceBufVolatile(newBuf); } - @Override - public void replaceVolatile(Send send) { - super.replaceVolatile(send); + /** + * Replace the underlying referenced buffer with the given buffer. + *

+ * Note: this method decreases the reference count of the current buffer, + * and takes exclusive ownership of the sent buffer. + *

+ * The buffer assignment is performed using a volatile store. + * + * @param send The {@link Send} with the new {@link Buf} instance that is replacing the currently held buffer. + */ + public void replace(Send send) { + replaceBufVolatile(send); } /** diff --git a/src/main/java/io/netty/buffer/api/Send.java b/src/main/java/io/netty/buffer/api/Send.java index c752f90..96d4a7e 100644 --- a/src/main/java/io/netty/buffer/api/Send.java +++ b/src/main/java/io/netty/buffer/api/Send.java @@ -28,6 +28,7 @@ package io.netty.buffer.api; * * @param */ +@FunctionalInterface public interface Send> { /** * Receive the {@link Rc} instance being sent, and bind its ownership to the calling thread. The invalidation of the diff --git a/src/test/java/io/netty/buffer/api/BufRefTest.java b/src/test/java/io/netty/buffer/api/BufRefTest.java index ce0783a..85e5fd4 100644 --- a/src/test/java/io/netty/buffer/api/BufRefTest.java +++ b/src/test/java/io/netty/buffer/api/BufRefTest.java @@ -60,7 +60,7 @@ class BufRefTest { assertThat(ref.contents().readInt()).isEqualTo(42); try (Buf buf = allocator.allocate(8)) { - ref.replaceVolatile(buf); // Pass replacement directly. + ref.replace(buf); // Pass replacement directly. } assertThrows(IllegalStateException.class, () -> orig.writeInt(32)); @@ -84,7 +84,7 @@ class BufRefTest { assertThat(ref.contents().readInt()).isEqualTo(42); try (Buf buf = allocator.allocate(8)) { - ref.replaceVolatile(buf.send()); // Pass replacement via send(). + ref.replace(buf.send()); // Pass replacement via send(). } assertThrows(IllegalStateException.class, () -> orig.writeInt(32));