Add copyInto methods to the Buf interface

Motivation:
Copy methods are useful for bulk moving data into more convenient locations for what comes next in a given context.

Modification:
Add bulk copyInto methods for copying regions of buffer contents into arrays, byte buffers, and other Buf instances.
Some of these implementations are not optimised at this point, however, since we're primarily concerned with getting the API right at this point, and implementation maturity comes later.

Result:
We can now bulk copy data from a Buf into other convenient forms.
This commit is contained in:
Chris Vest 2020-10-30 14:39:50 +01:00
parent fdeed0c94e
commit 91be83444d
6 changed files with 578 additions and 1 deletions

View File

@ -15,6 +15,7 @@
*/
package io.netty.buffer.b2;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
/**
@ -190,4 +191,73 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* that is a view of the given region of this buffer.
*/
Buf slice(int offset, int length);
/**
* Copies the given length of data from this buffer into the given destination array, beginning at the given source
* position in this buffer, and the given destination position in the destination array.
* <p>
* This method does not read or modify the {@linkplain #writerIndex() write offset} or the
* {@linkplain #readerIndex() read offset}.
*
* @param srcPos The byte offset into this buffer wherefrom the copying should start; the byte at this offset in
* this buffer will be copied to the {@code destPos} index in the {@code dest} array.
* @param dest The destination byte array.
* @param destPos The index into the {@code dest} array wherefrom the copying should start.
* @param length The number of bytes to copy.
* @throws NullPointerException if the destination array is null.
* @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative,
* or if the resulting end positions reaches beyond the end of either this buffer or the destination array.
*/
void copyInto(int srcPos, byte[] dest, int destPos, int length);
/**
* Copies the given length of data from this buffer into the given destination byte buffer, beginning at the given
* source position in this buffer, and the given destination position in the destination byte buffer.
* <p>
* This method does not read or modify the {@linkplain #writerIndex() write offset} or the
* {@linkplain #readerIndex() read offset}, nor is the position of the destination buffer changed.
* <p>
* The position and limit of the destination byte buffer are also ignored, and do not influence {@code destPos}
* or {@code length}.
*
* @param srcPos The byte offset into this buffer wherefrom the copying should start; the byte at this offset in
* this buffer will be copied to the {@code destPos} index in the {@code dest} array.
* @param dest The destination byte buffer.
* @param destPos The index into the {@code dest} array wherefrom the copying should start.
* @param length The number of bytes to copy.
* @throws NullPointerException if the destination array is null.
* @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative,
* or if the resulting end positions reaches beyond the end of either this buffer or the destination array.
*/
void copyInto(int srcPos, ByteBuffer dest, int destPos, int length);
/**
* Copies the given length of data from this buffer into the given destination buffer, beginning at the given
* source position in this buffer, and the given destination position in the destination buffer.
* <p>
* This method does not read or modify the {@linkplain #writerIndex() write offset} or the
* {@linkplain #readerIndex() read offset} on this buffer, nor on the destination buffer.
* <p>
* The read and write offsets of the destination buffer are also ignored, and do not influence {@code destPos}
* or {@code length}.
*
* @param srcPos The byte offset into this buffer wherefrom the copying should start; the byte at this offset in
* this buffer will be copied to the {@code destPos} index in the {@code dest} array.
* @param dest The destination buffer.
* @param destPos The index into the {@code dest} array wherefrom the copying should start.
* @param length The number of bytes to copy.
* @throws NullPointerException if the destination array is null.
* @throws IndexOutOfBoundsException if the source or destination positions, or the length, are negative,
* or if the resulting end positions reaches beyond the end of either this buffer or the destination array.
*/
void copyInto(int srcPos, Buf dest, int destPos, int length);
/**
* Resets the {@linkplain #readerIndex() read offset} and the {@linkplain #writerIndex() write offset} on this
* buffer to their initial values.
*/
default void reset() {
readerIndex(0);
writerIndex(0);
}
}

View File

@ -15,6 +15,7 @@
*/
package io.netty.buffer.b2;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
@ -237,6 +238,58 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
}
@Override
public void copyInto(int srcPos, byte[] dest, int destPos, int length) {
copyInto(srcPos, (b, s, d, l) -> b.copyInto(s, dest, d, l), destPos, length);
}
@Override
public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) {
copyInto(srcPos, (b, s, d, l) -> b.copyInto(s, dest, d, l), destPos, length);
}
@Override
public void copyInto(int srcPos, Buf dest, int destPos, int length) {
if (length < 0) {
throw new IndexOutOfBoundsException("Length cannot be negative: " + length + '.');
}
if (srcPos < 0) {
throw indexOutOfBounds(srcPos);
}
if (srcPos + length > capacity) {
throw indexOutOfBounds(srcPos + length);
}
// todo optimise by doing bulk copy via consituent buffers
for (int i = length - 1; i >= 0; i--) { // Iterate in reverse to account for src and dest buffer overlap.
dest.writeByte(destPos + i, readByte(srcPos + i));
}
}
private void copyInto(int srcPos, CopyInto dest, int destPos, int length) {
if (length < 0) {
throw new IndexOutOfBoundsException("Length cannot be negative: " + length + '.');
}
if (srcPos < 0) {
throw indexOutOfBounds(srcPos);
}
if (srcPos + length > capacity) {
throw indexOutOfBounds(srcPos + length);
}
while (length > 0) {
var buf = (Buf) chooseBuffer(srcPos, 0);
int toCopy = buf.capacity() - subOffset;
dest.copyInto(buf, subOffset, destPos, toCopy);
srcPos += toCopy;
destPos += toCopy;
length -= toCopy;
}
}
@FunctionalInterface
private interface CopyInto {
void copyInto(Buf src, int srcPos, int destPos, int length);
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
@Override
public byte readByte() {

View File

@ -17,6 +17,7 @@ package io.netty.buffer.b2;
import jdk.incubator.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import static jdk.incubator.foreign.MemoryAccess.getByteAtOffset_BE;
@ -51,8 +52,8 @@ import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset_LE;
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
static final Drop<MemSegBuf> SEGMENT_CLOSE = buf -> buf.seg.close();
final MemorySegment seg;
private boolean isBigEndian;
private final boolean isSendable;
private boolean isBigEndian;
private int roff;
private int woff;
@ -136,6 +137,32 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
return new MemSegBuf(slice, drop, sendable).writerIndex(length).order(order());
}
@Override
public void copyInto(int srcPos, byte[] dest, int destPos, int length) {
try (var target = MemorySegment.ofArray(dest)) {
copyInto(srcPos, target, destPos, length);
}
}
@Override
public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) {
try (var target = MemorySegment.ofByteBuffer(dest.duplicate().clear())) {
copyInto(srcPos, target, destPos, length);
}
}
@Override
public void copyInto(int srcPos, Buf dest, int destPos, int length) {
// todo optimise: specialise for MemSegBuf; use ByteIterator.
for (int i = length - 1; i >= 0; i--) { // Iterate in reverse to account for src and dest buffer overlap.
dest.writeByte(destPos + i, readByte(srcPos + i));
}
}
private void copyInto(int srcPos, MemorySegment dest, int destPos, int length) {
dest.asSlice(destPos, length).copyFrom(seg.asSlice(srcPos, length));
}
// ### CODEGEN START primitive accessors implementation
// <editor-fold defaultstate="collapsed" desc="Generated primitive accessors implementation.">

View File

@ -0,0 +1,60 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.b2;
import java.util.ArrayDeque;
/**
* A scope is a convenient mechanism for capturing the life cycles of multiple reference counted objects. Once the scope
* is closed, all of the added objects will also be closed in reverse insert order. That is, the most recently added
* object will be closed first.
* <p>
* Scopes can be reused. After a scope has been closed, new objects can be added to it, and they will be closed when the
* scope is closed again.
* <p>
* Objects will not be closed multiple times if the scope is closed multiple times, unless said objects are also added
* multiple times.
* <p>
* Note that scopes are not thread-safe. They are intended to be used from a single thread.
*/
public class Scope implements AutoCloseable {
private final ArrayDeque<Rc<?>> deque = new ArrayDeque<>();
/**
* Add the given reference counted object to this scope, so that it will be {@linkplain Rc#close() closed} when this
* scope is {@linkplain #close() closed}.
*
* @param obj The reference counted object to add to this scope.
* @param <T> The type of the reference counted object.
* @return The same exact object that was added; further operations can be chained on the object after this method
* call.
*/
public <T extends Rc<T>> T add(T obj) {
deque.addLast(obj);
return obj;
}
/**
* Close this scope and all the reference counted object it contains.
*/
@Override
public void close() {
Rc<?> obj;
while ((obj = deque.pollLast()) != null) {
obj.close();
}
}
}

View File

@ -21,12 +21,14 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.function.Function;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@ -326,6 +328,16 @@ public abstract class BufTest {
}
}
@Test
public void resetMustSetReaderAndWriterOffsetsToTheirInitialPositions() {
try (Buf buf = allocate(8)) {
buf.writeInt(0).readShort();
buf.reset();
assertEquals(0, buf.readerIndex());
assertEquals(0, buf.writerIndex());
}
}
@Test
public void sliceWithoutOffsetAndSizeMustReturnReadableRegion() {
try (Buf buf = allocate(8)) {
@ -518,6 +530,306 @@ public abstract class BufTest {
}
}
@Test
public void copyIntoByteArray() {
try (Buf buf = allocate(8)) {
buf.order(ByteOrder.BIG_ENDIAN).writeLong(0x0102030405060708L);
byte[] array = new byte[8];
buf.copyInto(0, array, 0, array.length);
assertArrayEquals(new byte[]{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, array);
buf.writerIndex(0).order(ByteOrder.LITTLE_ENDIAN).writeLong(0x0102030405060708L);
buf.copyInto(0, array, 0, array.length);
assertArrayEquals(new byte[]{0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01}, array);
array = new byte[6];
buf.copyInto(1, array, 1, 3);
assertArrayEquals(new byte[] {0x00, 0x07, 0x06, 0x05, 0x00, 0x00}, array);
}
}
@Test
public void copyIntoHeapByteBuffer() {
testCopyIntoByteBuffer(ByteBuffer::allocate);
}
@Test
public void copyIntoDirectByteBuffer() {
testCopyIntoByteBuffer(ByteBuffer::allocateDirect);
}
private void testCopyIntoByteBuffer(Function<Integer, ByteBuffer> bbAlloc) {
try (Buf buf = allocate(8)) {
buf.order(ByteOrder.BIG_ENDIAN).writeLong(0x0102030405060708L);
ByteBuffer buffer = bbAlloc.apply(8);
buf.copyInto(0, buffer, 0, buffer.capacity());
assertEquals((byte) 0x01, buffer.get());
assertEquals((byte) 0x02, buffer.get());
assertEquals((byte) 0x03, buffer.get());
assertEquals((byte) 0x04, buffer.get());
assertEquals((byte) 0x05, buffer.get());
assertEquals((byte) 0x06, buffer.get());
assertEquals((byte) 0x07, buffer.get());
assertEquals((byte) 0x08, buffer.get());
buffer.clear();
buf.writerIndex(0).order(ByteOrder.LITTLE_ENDIAN).writeLong(0x0102030405060708L);
buf.copyInto(0, buffer, 0, buffer.capacity());
assertEquals((byte) 0x08, buffer.get());
assertEquals((byte) 0x07, buffer.get());
assertEquals((byte) 0x06, buffer.get());
assertEquals((byte) 0x05, buffer.get());
assertEquals((byte) 0x04, buffer.get());
assertEquals((byte) 0x03, buffer.get());
assertEquals((byte) 0x02, buffer.get());
assertEquals((byte) 0x01, buffer.get());
buffer.clear();
buffer = bbAlloc.apply(6);
buf.copyInto(1, buffer, 1, 3);
assertEquals((byte) 0x00, buffer.get());
assertEquals((byte) 0x07, buffer.get());
assertEquals((byte) 0x06, buffer.get());
assertEquals((byte) 0x05, buffer.get());
assertEquals((byte) 0x00, buffer.get());
assertEquals((byte) 0x00, buffer.get());
buffer.clear();
buffer = bbAlloc.apply(6);
buffer.position(3).limit(3);
buf.copyInto(1, buffer, 1, 3);
assertEquals(3, buffer.position());
assertEquals(3, buffer.limit());
buffer.clear();
assertEquals((byte) 0x00, buffer.get());
assertEquals((byte) 0x07, buffer.get());
assertEquals((byte) 0x06, buffer.get());
assertEquals((byte) 0x05, buffer.get());
assertEquals((byte) 0x00, buffer.get());
assertEquals((byte) 0x00, buffer.get());
}
}
@Test
public void copyIntoOnHeapBuf() {
testCopyIntoBuf(Allocator.heap()::allocate);
}
@Test
public void copyIntoOffHeapBuf() {
testCopyIntoBuf(Allocator.direct()::allocate);
}
@Test
public void copyIntoOnHeapBufSlice() {
try (Scope scope = new Scope()) {
testCopyIntoBuf(size -> scope.add(Allocator.heap().allocate(size)).writerIndex(size).slice());
}
}
@Test
public void copyIntoOffHeapBufSlice() {
try (Scope scope = new Scope()) {
testCopyIntoBuf(size -> scope.add(Allocator.direct().allocate(size)).writerIndex(size).slice());
}
}
@Test
public void copyIntoCompositeOnHeapOnHeapBuf() {
try (var a = Allocator.heap();
var b = Allocator.heap()) {
testCopyIntoBuf(size -> {
int first = size / 2;
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buf.compose(bufFirst, bufSecond);
}
});
}
}
@Test
public void copyIntoCompositeOnHeapOffHeapBuf() {
try (var a = Allocator.heap();
var b = Allocator.direct()) {
testCopyIntoBuf(size -> {
int first = size / 2;
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buf.compose(bufFirst, bufSecond);
}
});
}
}
@Test
public void copyIntoCompositeOffHeapOnHeapBuf() {
try (var a = Allocator.direct();
var b = Allocator.heap()) {
testCopyIntoBuf(size -> {
int first = size / 2;
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buf.compose(bufFirst, bufSecond);
}
});
}
}
@Test
public void copyIntoCompositeOffHeapOffHeapBuf() {
try (var a = Allocator.direct();
var b = Allocator.direct()) {
testCopyIntoBuf(size -> {
int first = size / 2;
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return Buf.compose(bufFirst, bufSecond);
}
});
}
}
@Test
public void copyIntoCompositeOnHeapOnHeapBufSlice() {
try (var a = Allocator.heap();
var b = Allocator.heap();
var scope = new Scope()) {
testCopyIntoBuf(size -> {
int first = size / 2;
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buf.compose(bufFirst, bufSecond)).writerIndex(size).slice();
}
});
}
}
@Test
public void copyIntoCompositeOnHeapOffHeapBufSlice() {
try (var a = Allocator.heap();
var b = Allocator.direct();
var scope = new Scope()) {
testCopyIntoBuf(size -> {
int first = size / 2;
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buf.compose(bufFirst, bufSecond)).writerIndex(size).slice();
}
});
}
}
@Test
public void copyIntoCompositeOffHeapOnHeapBufSlice() {
try (var a = Allocator.direct();
var b = Allocator.heap();
var scope = new Scope()) {
testCopyIntoBuf(size -> {
int first = size / 2;
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buf.compose(bufFirst, bufSecond)).writerIndex(size).slice();
}
});
}
}
@Test
public void copyIntoCompositeOffHeapOffHeapBufSlice() {
try (var a = Allocator.direct();
var b = Allocator.direct();
var scope = new Scope()) {
testCopyIntoBuf(size -> {
int first = size / 2;
int second = size - first;
try (var bufFirst = a.allocate(first);
var bufSecond = b.allocate(second)) {
return scope.add(Buf.compose(bufFirst, bufSecond)).writerIndex(size).slice();
}
});
}
}
private void testCopyIntoBuf(Function<Integer, Buf> bbAlloc) {
try (Buf buf = allocate(8)) {
buf.order(ByteOrder.BIG_ENDIAN).writeLong(0x0102030405060708L);
Buf buffer = bbAlloc.apply(8);
buffer.writerIndex(8);
buf.copyInto(0, buffer, 0, buffer.capacity());
assertEquals((byte) 0x01, buffer.readByte());
assertEquals((byte) 0x02, buffer.readByte());
assertEquals((byte) 0x03, buffer.readByte());
assertEquals((byte) 0x04, buffer.readByte());
assertEquals((byte) 0x05, buffer.readByte());
assertEquals((byte) 0x06, buffer.readByte());
assertEquals((byte) 0x07, buffer.readByte());
assertEquals((byte) 0x08, buffer.readByte());
buffer.reset();
buf.writerIndex(0).order(ByteOrder.LITTLE_ENDIAN).writeLong(0x0102030405060708L);
buf.copyInto(0, buffer, 0, buffer.capacity());
buffer.writerIndex(8);
assertEquals((byte) 0x08, buffer.readByte());
assertEquals((byte) 0x07, buffer.readByte());
assertEquals((byte) 0x06, buffer.readByte());
assertEquals((byte) 0x05, buffer.readByte());
assertEquals((byte) 0x04, buffer.readByte());
assertEquals((byte) 0x03, buffer.readByte());
assertEquals((byte) 0x02, buffer.readByte());
assertEquals((byte) 0x01, buffer.readByte());
buffer.reset();
buffer.close();
buffer = bbAlloc.apply(6);
buf.copyInto(1, buffer, 1, 3);
buffer.writerIndex(6);
assertEquals((byte) 0x00, buffer.readByte());
assertEquals((byte) 0x07, buffer.readByte());
assertEquals((byte) 0x06, buffer.readByte());
assertEquals((byte) 0x05, buffer.readByte());
assertEquals((byte) 0x00, buffer.readByte());
assertEquals((byte) 0x00, buffer.readByte());
buffer.close();
buffer = bbAlloc.apply(6);
buffer.writerIndex(3).readerIndex(3);
buf.copyInto(1, buffer, 1, 3);
assertEquals(3, buffer.readerIndex());
assertEquals(3, buffer.writerIndex());
buffer.reset();
buffer.writerIndex(6);
assertEquals((byte) 0x00, buffer.readByte());
assertEquals((byte) 0x07, buffer.readByte());
assertEquals((byte) 0x06, buffer.readByte());
assertEquals((byte) 0x05, buffer.readByte());
assertEquals((byte) 0x00, buffer.readByte());
assertEquals((byte) 0x00, buffer.readByte());
buffer.close();
buf.reset();
buf.order(ByteOrder.BIG_ENDIAN).writeLong(0x0102030405060708L);
// Testing copyInto for overlapping writes:
//
// 0x0102030405060708
//
//
//
// 0x0102030102030405
buf.copyInto(0, buf, 3, 5);
assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x01, 0x02, 0x03, 0x04, 0x05}, buf.copy());
}
}
// todo resize copying must preserve contents
// todo resize sharing
// ### CODEGEN START primitive accessors tests
// <editor-fold defaultstate="collapsed" desc="Generated primitive accessors tests.">

View File

@ -0,0 +1,55 @@
package io.netty.buffer.b2;
import org.junit.Test;
import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class ScopeTest {
@Test
public void scopeMustCloseContainedRcsInReverseInsertOrder() {
ArrayList<Integer> closeOrder = new ArrayList<>();
try (Scope scope = new Scope()) {
scope.add(new SomeRc(new OrderingDrop(1, closeOrder)));
scope.add(new SomeRc(new OrderingDrop(2, closeOrder)));
scope.add(new SomeRc(new OrderingDrop(3, closeOrder)));
}
var itr = closeOrder.iterator();
assertTrue(itr.hasNext());
assertEquals(3, (int) itr.next());
assertTrue(itr.hasNext());
assertEquals(2, (int) itr.next());
assertTrue(itr.hasNext());
assertEquals(1, (int) itr.next());
assertFalse(itr.hasNext());
}
private static final class SomeRc extends RcSupport<SomeRc, SomeRc> {
SomeRc(Drop<SomeRc> drop) {
super(drop);
}
@Override
protected Owned<SomeRc> prepareSend() {
return null;
}
}
private static final class OrderingDrop implements Drop<SomeRc> {
private final int order;
private final ArrayList<Integer> list;
private OrderingDrop(int order, ArrayList<Integer> list) {
this.order = order;
this.list = list;
}
@Override
public void drop(SomeRc obj) {
list.add(order);
}
}
}