diff --git a/buffer/src/main/java/io/netty/buffer/b2/Buf.java b/buffer/src/main/java/io/netty/buffer/b2/Buf.java index 44182f3..54cff40 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/Buf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/Buf.java @@ -15,6 +15,7 @@ */ package io.netty.buffer.b2; +import java.nio.ByteBuffer; import java.nio.ByteOrder; /** @@ -190,4 +191,73 @@ public interface Buf extends Rc, BufAccessors { * that is a view of the given region of this buffer. */ Buf slice(int offset, int length); + + /** + * Copies the given length of data from this buffer into the given destination array, beginning at the given source + * position in this buffer, and the given destination position in the destination array. + *

+ * This method does not read or modify the {@linkplain #writerIndex() write offset} or the + * {@linkplain #readerIndex() read offset}. + * + * @param srcPos The byte offset into this buffer wherefrom the copying should start; the byte at this offset in + * this buffer will be copied to the {@code destPos} index in the {@code dest} array. + * @param dest The destination byte array. + * @param destPos The index into the {@code dest} array wherefrom the copying should start. + * @param length The number of bytes to copy. + * @throws NullPointerException if the destination array is null. + * @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative, + * or if the resulting end positions reaches beyond the end of either this buffer or the destination array. + */ + void copyInto(int srcPos, byte[] dest, int destPos, int length); + + /** + * Copies the given length of data from this buffer into the given destination byte buffer, beginning at the given + * source position in this buffer, and the given destination position in the destination byte buffer. + *

+ * This method does not read or modify the {@linkplain #writerIndex() write offset} or the + * {@linkplain #readerIndex() read offset}, nor is the position of the destination buffer changed. + *

+ * The position and limit of the destination byte buffer are also ignored, and do not influence {@code destPos} + * or {@code length}. + * + * @param srcPos The byte offset into this buffer wherefrom the copying should start; the byte at this offset in + * this buffer will be copied to the {@code destPos} index in the {@code dest} array. + * @param dest The destination byte buffer. + * @param destPos The index into the {@code dest} array wherefrom the copying should start. + * @param length The number of bytes to copy. + * @throws NullPointerException if the destination array is null. + * @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative, + * or if the resulting end positions reaches beyond the end of either this buffer or the destination array. + */ + void copyInto(int srcPos, ByteBuffer dest, int destPos, int length); + + /** + * Copies the given length of data from this buffer into the given destination buffer, beginning at the given + * source position in this buffer, and the given destination position in the destination buffer. + *

+ * This method does not read or modify the {@linkplain #writerIndex() write offset} or the + * {@linkplain #readerIndex() read offset} on this buffer, nor on the destination buffer. + *

+ * The read and write offsets of the destination buffer are also ignored, and do not influence {@code destPos} + * or {@code length}. + * + * @param srcPos The byte offset into this buffer wherefrom the copying should start; the byte at this offset in + * this buffer will be copied to the {@code destPos} index in the {@code dest} array. + * @param dest The destination buffer. + * @param destPos The index into the {@code dest} array wherefrom the copying should start. + * @param length The number of bytes to copy. + * @throws NullPointerException if the destination array is null. + * @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative, + * or if the resulting end positions reaches beyond the end of either this buffer or the destination array. + */ + void copyInto(int srcPos, Buf dest, int destPos, int length); + + /** + * Resets the {@linkplain #readerIndex() read offset} and the {@linkplain #writerIndex() write offset} on this + * buffer to their initial values. + */ + default void reset() { + readerIndex(0); + writerIndex(0); + } } \ No newline at end of file diff --git a/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java b/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java index 5544479..c2d684a 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/CompositeBuf.java @@ -15,6 +15,7 @@ */ package io.netty.buffer.b2; +import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; @@ -237,6 +238,58 @@ final class CompositeBuf extends RcSupport implements Buf { } } + @Override + public void copyInto(int srcPos, byte[] dest, int destPos, int length) { + copyInto(srcPos, (b, s, d, l) -> b.copyInto(s, dest, d, l), destPos, length); + } + + @Override + public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) { + copyInto(srcPos, (b, s, d, l) -> b.copyInto(s, dest, d, l), destPos, length); + } + + @Override + public void copyInto(int srcPos, Buf dest, int destPos, int length) { + if (length < 0) { + throw new IndexOutOfBoundsException("Length cannot be negative: " + length + '.'); + } + if (srcPos < 0) { + throw indexOutOfBounds(srcPos); + } + if (srcPos + length > capacity) { + throw indexOutOfBounds(srcPos + length); + } + // todo optimise by doing bulk copy via consituent buffers + for (int i = length - 1; i >= 0; i--) { // Iterate in reverse to account for src and dest buffer overlap. + dest.writeByte(destPos + i, readByte(srcPos + i)); + } + } + + private void copyInto(int srcPos, CopyInto dest, int destPos, int length) { + if (length < 0) { + throw new IndexOutOfBoundsException("Length cannot be negative: " + length + '.'); + } + if (srcPos < 0) { + throw indexOutOfBounds(srcPos); + } + if (srcPos + length > capacity) { + throw indexOutOfBounds(srcPos + length); + } + while (length > 0) { + var buf = (Buf) chooseBuffer(srcPos, 0); + int toCopy = buf.capacity() - subOffset; + dest.copyInto(buf, subOffset, destPos, toCopy); + srcPos += toCopy; + destPos += toCopy; + length -= toCopy; + } + } + + @FunctionalInterface + private interface CopyInto { + void copyInto(Buf src, int srcPos, int destPos, int length); + } + // @Override public byte readByte() { diff --git a/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java b/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java index 8ddb52d..84bfdb0 100644 --- a/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java +++ b/buffer/src/main/java/io/netty/buffer/b2/MemSegBuf.java @@ -17,6 +17,7 @@ package io.netty.buffer.b2; import jdk.incubator.foreign.MemorySegment; +import java.nio.ByteBuffer; import java.nio.ByteOrder; import static jdk.incubator.foreign.MemoryAccess.getByteAtOffset_BE; @@ -51,8 +52,8 @@ import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset_LE; class MemSegBuf extends RcSupport implements Buf { static final Drop SEGMENT_CLOSE = buf -> buf.seg.close(); final MemorySegment seg; - private boolean isBigEndian; private final boolean isSendable; + private boolean isBigEndian; private int roff; private int woff; @@ -136,6 +137,32 @@ class MemSegBuf extends RcSupport implements Buf { return new MemSegBuf(slice, drop, sendable).writerIndex(length).order(order()); } + @Override + public void copyInto(int srcPos, byte[] dest, int destPos, int length) { + try (var target = MemorySegment.ofArray(dest)) { + copyInto(srcPos, target, destPos, length); + } + } + + @Override + public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) { + try (var target = MemorySegment.ofByteBuffer(dest.duplicate().clear())) { + copyInto(srcPos, target, destPos, length); + } + } + + @Override + public void copyInto(int srcPos, Buf dest, int destPos, int length) { + // todo optimise: specialise for MemSegBuf; use ByteIterator. + for (int i = length - 1; i >= 0; i--) { // Iterate in reverse to account for src and dest buffer overlap. + dest.writeByte(destPos + i, readByte(srcPos + i)); + } + } + + private void copyInto(int srcPos, MemorySegment dest, int destPos, int length) { + dest.asSlice(destPos, length).copyFrom(seg.asSlice(srcPos, length)); + } + // ### CODEGEN START primitive accessors implementation // diff --git a/buffer/src/main/java/io/netty/buffer/b2/Scope.java b/buffer/src/main/java/io/netty/buffer/b2/Scope.java new file mode 100644 index 0000000..0b993af --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/b2/Scope.java @@ -0,0 +1,60 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.b2; + +import java.util.ArrayDeque; + +/** + * A scope is a convenient mechanism for capturing the life cycles of multiple reference counted objects. Once the scope + * is closed, all of the added objects will also be closed in reverse insert order. That is, the most recently added + * object will be closed first. + *

+ * Scopes can be reused. After a scope has been closed, new objects can be added to it, and they will be closed when the + * scope is closed again. + *

+ * Objects will not be closed multiple times if the scope is closed multiple times, unless said objects are also added + * multiple times. + *

+ * Note that scopes are not thread-safe. They are intended to be used from a single thread. + */ +public class Scope implements AutoCloseable { + private final ArrayDeque> deque = new ArrayDeque<>(); + + /** + * Add the given reference counted object to this scope, so that it will be {@linkplain Rc#close() closed} when this + * scope is {@linkplain #close() closed}. + * + * @param obj The reference counted object to add to this scope. + * @param The type of the reference counted object. + * @return The same exact object that was added; further operations can be chained on the object after this method + * call. + */ + public > T add(T obj) { + deque.addLast(obj); + return obj; + } + + /** + * Close this scope and all the reference counted object it contains. + */ + @Override + public void close() { + Rc obj; + while ((obj = deque.pollLast()) != null) { + obj.close(); + } + } +} diff --git a/buffer/src/test/java/io/netty/buffer/b2/BufTest.java b/buffer/src/test/java/io/netty/buffer/b2/BufTest.java index d7f6213..cae8abf 100644 --- a/buffer/src/test/java/io/netty/buffer/b2/BufTest.java +++ b/buffer/src/test/java/io/netty/buffer/b2/BufTest.java @@ -21,12 +21,14 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.SynchronousQueue; +import java.util.function.Function; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; @@ -326,6 +328,16 @@ public abstract class BufTest { } } + @Test + public void resetMustSetReaderAndWriterOffsetsToTheirInitialPositions() { + try (Buf buf = allocate(8)) { + buf.writeInt(0).readShort(); + buf.reset(); + assertEquals(0, buf.readerIndex()); + assertEquals(0, buf.writerIndex()); + } + } + @Test public void sliceWithoutOffsetAndSizeMustReturnReadableRegion() { try (Buf buf = allocate(8)) { @@ -518,6 +530,306 @@ public abstract class BufTest { } } + @Test + public void copyIntoByteArray() { + try (Buf buf = allocate(8)) { + buf.order(ByteOrder.BIG_ENDIAN).writeLong(0x0102030405060708L); + byte[] array = new byte[8]; + buf.copyInto(0, array, 0, array.length); + assertArrayEquals(new byte[]{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, array); + + buf.writerIndex(0).order(ByteOrder.LITTLE_ENDIAN).writeLong(0x0102030405060708L); + buf.copyInto(0, array, 0, array.length); + assertArrayEquals(new byte[]{0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01}, array); + + array = new byte[6]; + buf.copyInto(1, array, 1, 3); + assertArrayEquals(new byte[] {0x00, 0x07, 0x06, 0x05, 0x00, 0x00}, array); + } + } + + @Test + public void copyIntoHeapByteBuffer() { + testCopyIntoByteBuffer(ByteBuffer::allocate); + } + + @Test + public void copyIntoDirectByteBuffer() { + testCopyIntoByteBuffer(ByteBuffer::allocateDirect); + } + + private void testCopyIntoByteBuffer(Function bbAlloc) { + try (Buf buf = allocate(8)) { + buf.order(ByteOrder.BIG_ENDIAN).writeLong(0x0102030405060708L); + ByteBuffer buffer = bbAlloc.apply(8); + buf.copyInto(0, buffer, 0, buffer.capacity()); + assertEquals((byte) 0x01, buffer.get()); + assertEquals((byte) 0x02, buffer.get()); + assertEquals((byte) 0x03, buffer.get()); + assertEquals((byte) 0x04, buffer.get()); + assertEquals((byte) 0x05, buffer.get()); + assertEquals((byte) 0x06, buffer.get()); + assertEquals((byte) 0x07, buffer.get()); + assertEquals((byte) 0x08, buffer.get()); + buffer.clear(); + + buf.writerIndex(0).order(ByteOrder.LITTLE_ENDIAN).writeLong(0x0102030405060708L); + buf.copyInto(0, buffer, 0, buffer.capacity()); + assertEquals((byte) 0x08, buffer.get()); + assertEquals((byte) 0x07, buffer.get()); + assertEquals((byte) 0x06, buffer.get()); + assertEquals((byte) 0x05, buffer.get()); + assertEquals((byte) 0x04, buffer.get()); + assertEquals((byte) 0x03, buffer.get()); + assertEquals((byte) 0x02, buffer.get()); + assertEquals((byte) 0x01, buffer.get()); + buffer.clear(); + + buffer = bbAlloc.apply(6); + buf.copyInto(1, buffer, 1, 3); + assertEquals((byte) 0x00, buffer.get()); + assertEquals((byte) 0x07, buffer.get()); + assertEquals((byte) 0x06, buffer.get()); + assertEquals((byte) 0x05, buffer.get()); + assertEquals((byte) 0x00, buffer.get()); + assertEquals((byte) 0x00, buffer.get()); + buffer.clear(); + + buffer = bbAlloc.apply(6); + buffer.position(3).limit(3); + buf.copyInto(1, buffer, 1, 3); + assertEquals(3, buffer.position()); + assertEquals(3, buffer.limit()); + buffer.clear(); + assertEquals((byte) 0x00, buffer.get()); + assertEquals((byte) 0x07, buffer.get()); + assertEquals((byte) 0x06, buffer.get()); + assertEquals((byte) 0x05, buffer.get()); + assertEquals((byte) 0x00, buffer.get()); + assertEquals((byte) 0x00, buffer.get()); + } + } + + @Test + public void copyIntoOnHeapBuf() { + testCopyIntoBuf(Allocator.heap()::allocate); + } + + @Test + public void copyIntoOffHeapBuf() { + testCopyIntoBuf(Allocator.direct()::allocate); + } + + @Test + public void copyIntoOnHeapBufSlice() { + try (Scope scope = new Scope()) { + testCopyIntoBuf(size -> scope.add(Allocator.heap().allocate(size)).writerIndex(size).slice()); + } + } + + @Test + public void copyIntoOffHeapBufSlice() { + try (Scope scope = new Scope()) { + testCopyIntoBuf(size -> scope.add(Allocator.direct().allocate(size)).writerIndex(size).slice()); + } + } + + @Test + public void copyIntoCompositeOnHeapOnHeapBuf() { + try (var a = Allocator.heap(); + var b = Allocator.heap()) { + testCopyIntoBuf(size -> { + int first = size / 2; + int second = size - first; + try (var bufFirst = a.allocate(first); + var bufSecond = b.allocate(second)) { + return Buf.compose(bufFirst, bufSecond); + } + }); + } + } + + @Test + public void copyIntoCompositeOnHeapOffHeapBuf() { + try (var a = Allocator.heap(); + var b = Allocator.direct()) { + testCopyIntoBuf(size -> { + int first = size / 2; + int second = size - first; + try (var bufFirst = a.allocate(first); + var bufSecond = b.allocate(second)) { + return Buf.compose(bufFirst, bufSecond); + } + }); + } + } + + @Test + public void copyIntoCompositeOffHeapOnHeapBuf() { + try (var a = Allocator.direct(); + var b = Allocator.heap()) { + testCopyIntoBuf(size -> { + int first = size / 2; + int second = size - first; + try (var bufFirst = a.allocate(first); + var bufSecond = b.allocate(second)) { + return Buf.compose(bufFirst, bufSecond); + } + }); + } + } + + @Test + public void copyIntoCompositeOffHeapOffHeapBuf() { + try (var a = Allocator.direct(); + var b = Allocator.direct()) { + testCopyIntoBuf(size -> { + int first = size / 2; + int second = size - first; + try (var bufFirst = a.allocate(first); + var bufSecond = b.allocate(second)) { + return Buf.compose(bufFirst, bufSecond); + } + }); + } + } + + @Test + public void copyIntoCompositeOnHeapOnHeapBufSlice() { + try (var a = Allocator.heap(); + var b = Allocator.heap(); + var scope = new Scope()) { + testCopyIntoBuf(size -> { + int first = size / 2; + int second = size - first; + try (var bufFirst = a.allocate(first); + var bufSecond = b.allocate(second)) { + return scope.add(Buf.compose(bufFirst, bufSecond)).writerIndex(size).slice(); + } + }); + } + } + + @Test + public void copyIntoCompositeOnHeapOffHeapBufSlice() { + try (var a = Allocator.heap(); + var b = Allocator.direct(); + var scope = new Scope()) { + testCopyIntoBuf(size -> { + int first = size / 2; + int second = size - first; + try (var bufFirst = a.allocate(first); + var bufSecond = b.allocate(second)) { + return scope.add(Buf.compose(bufFirst, bufSecond)).writerIndex(size).slice(); + } + }); + } + } + + @Test + public void copyIntoCompositeOffHeapOnHeapBufSlice() { + try (var a = Allocator.direct(); + var b = Allocator.heap(); + var scope = new Scope()) { + testCopyIntoBuf(size -> { + int first = size / 2; + int second = size - first; + try (var bufFirst = a.allocate(first); + var bufSecond = b.allocate(second)) { + return scope.add(Buf.compose(bufFirst, bufSecond)).writerIndex(size).slice(); + } + }); + } + } + + @Test + public void copyIntoCompositeOffHeapOffHeapBufSlice() { + try (var a = Allocator.direct(); + var b = Allocator.direct(); + var scope = new Scope()) { + testCopyIntoBuf(size -> { + int first = size / 2; + int second = size - first; + try (var bufFirst = a.allocate(first); + var bufSecond = b.allocate(second)) { + return scope.add(Buf.compose(bufFirst, bufSecond)).writerIndex(size).slice(); + } + }); + } + } + + private void testCopyIntoBuf(Function bbAlloc) { + try (Buf buf = allocate(8)) { + buf.order(ByteOrder.BIG_ENDIAN).writeLong(0x0102030405060708L); + Buf buffer = bbAlloc.apply(8); + buffer.writerIndex(8); + buf.copyInto(0, buffer, 0, buffer.capacity()); + assertEquals((byte) 0x01, buffer.readByte()); + assertEquals((byte) 0x02, buffer.readByte()); + assertEquals((byte) 0x03, buffer.readByte()); + assertEquals((byte) 0x04, buffer.readByte()); + assertEquals((byte) 0x05, buffer.readByte()); + assertEquals((byte) 0x06, buffer.readByte()); + assertEquals((byte) 0x07, buffer.readByte()); + assertEquals((byte) 0x08, buffer.readByte()); + buffer.reset(); + + buf.writerIndex(0).order(ByteOrder.LITTLE_ENDIAN).writeLong(0x0102030405060708L); + buf.copyInto(0, buffer, 0, buffer.capacity()); + buffer.writerIndex(8); + assertEquals((byte) 0x08, buffer.readByte()); + assertEquals((byte) 0x07, buffer.readByte()); + assertEquals((byte) 0x06, buffer.readByte()); + assertEquals((byte) 0x05, buffer.readByte()); + assertEquals((byte) 0x04, buffer.readByte()); + assertEquals((byte) 0x03, buffer.readByte()); + assertEquals((byte) 0x02, buffer.readByte()); + assertEquals((byte) 0x01, buffer.readByte()); + buffer.reset(); + + buffer.close(); + buffer = bbAlloc.apply(6); + buf.copyInto(1, buffer, 1, 3); + buffer.writerIndex(6); + assertEquals((byte) 0x00, buffer.readByte()); + assertEquals((byte) 0x07, buffer.readByte()); + assertEquals((byte) 0x06, buffer.readByte()); + assertEquals((byte) 0x05, buffer.readByte()); + assertEquals((byte) 0x00, buffer.readByte()); + assertEquals((byte) 0x00, buffer.readByte()); + + buffer.close(); + buffer = bbAlloc.apply(6); + buffer.writerIndex(3).readerIndex(3); + buf.copyInto(1, buffer, 1, 3); + assertEquals(3, buffer.readerIndex()); + assertEquals(3, buffer.writerIndex()); + buffer.reset(); + buffer.writerIndex(6); + assertEquals((byte) 0x00, buffer.readByte()); + assertEquals((byte) 0x07, buffer.readByte()); + assertEquals((byte) 0x06, buffer.readByte()); + assertEquals((byte) 0x05, buffer.readByte()); + assertEquals((byte) 0x00, buffer.readByte()); + assertEquals((byte) 0x00, buffer.readByte()); + buffer.close(); + + buf.reset(); + buf.order(ByteOrder.BIG_ENDIAN).writeLong(0x0102030405060708L); + // Testing copyInto for overlapping writes: + // + // 0x0102030405060708 + // └──┬──┬──┘ │ + // └─▶└┬───────┘ + // ▼ + // 0x0102030102030405 + buf.copyInto(0, buf, 3, 5); + assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x01, 0x02, 0x03, 0x04, 0x05}, buf.copy()); + } + } + // todo resize copying must preserve contents + // todo resize sharing + // ### CODEGEN START primitive accessors tests // diff --git a/buffer/src/test/java/io/netty/buffer/b2/ScopeTest.java b/buffer/src/test/java/io/netty/buffer/b2/ScopeTest.java new file mode 100644 index 0000000..6e5a829 --- /dev/null +++ b/buffer/src/test/java/io/netty/buffer/b2/ScopeTest.java @@ -0,0 +1,55 @@ +package io.netty.buffer.b2; + +import org.junit.Test; + +import java.util.ArrayList; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ScopeTest { + @Test + public void scopeMustCloseContainedRcsInReverseInsertOrder() { + ArrayList closeOrder = new ArrayList<>(); + try (Scope scope = new Scope()) { + scope.add(new SomeRc(new OrderingDrop(1, closeOrder))); + scope.add(new SomeRc(new OrderingDrop(2, closeOrder))); + scope.add(new SomeRc(new OrderingDrop(3, closeOrder))); + } + var itr = closeOrder.iterator(); + assertTrue(itr.hasNext()); + assertEquals(3, (int) itr.next()); + assertTrue(itr.hasNext()); + assertEquals(2, (int) itr.next()); + assertTrue(itr.hasNext()); + assertEquals(1, (int) itr.next()); + assertFalse(itr.hasNext()); + } + + private static final class SomeRc extends RcSupport { + SomeRc(Drop drop) { + super(drop); + } + + @Override + protected Owned prepareSend() { + return null; + } + } + + private static final class OrderingDrop implements Drop { + private final int order; + private final ArrayList list; + + private OrderingDrop(int order, ArrayList list) { + this.order = order; + this.list = list; + } + + @Override + public void drop(SomeRc obj) { + list.add(order); + } + } +} \ No newline at end of file