Add Buf.forEachWritable
Pass iteration indexes through.
This commit is contained in:
parent
d382017dc6
commit
46ed14577c
@ -17,7 +17,6 @@ package io.netty.buffer.api;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* A reference counted buffer of memory, with separate reader and writer offsets.
|
||||
@ -490,19 +489,74 @@ public interface Buf extends Rc<Buf>, BufAccessors {
|
||||
*
|
||||
* @return The number of components in this buffer.
|
||||
*/
|
||||
int componentCount();
|
||||
int countComponents();
|
||||
|
||||
/**
|
||||
* Process all readable components of this buffer, and return the number of components consumed.
|
||||
* Get the number of "components" in this buffer, that are readable. These are the components that would be
|
||||
* processed by {@link #forEachReadable(int, ComponentProcessor)}. For composite buffers, this is the number of
|
||||
* transitive constituent buffers that are readable, while non-composite buffers only have at most one readable
|
||||
* component.
|
||||
* <p>
|
||||
* The number of components consumed may be less than the {@linkplain #componentCount() component count} if not all
|
||||
* of them have readable data.
|
||||
* The number of readable components may be less than the {@link #countComponents() component count}, if not all of
|
||||
* them have readable data.
|
||||
*
|
||||
* @return The number of readable components in this buffer.
|
||||
*/
|
||||
int countReadableComponents();
|
||||
|
||||
/**
|
||||
* Get the number of "components" in this buffer, that are writable. These are the components that would be
|
||||
* processed by {@link #forEachWritable(int, ComponentProcessor)}. For composite buffers, this is the number of
|
||||
* transitive constituent buffers that are writable, while non-composite buffers only have at most one writable
|
||||
* component.
|
||||
* <p>
|
||||
* The number of writable components may be less than the {@link #countComponents() component count}, if not all of
|
||||
* them have space for writing.
|
||||
*
|
||||
* @return The number of writable components in this buffer.
|
||||
*/
|
||||
int countWritableComponents();
|
||||
|
||||
/**
|
||||
* Process all readable components of this buffer, and return the number of components processed.
|
||||
* <p>
|
||||
* The given {@linkplain ComponentProcessor processor} is called for each component in this buffer, and passed a
|
||||
* component index, for the given component in the iteration, and a {@link Component} object for accessing the data
|
||||
* within the given component.
|
||||
* <p>
|
||||
* The component index is specific to the particular invokation of this method, and may change. The first call to
|
||||
* the consumer will be passed the given initial index, and the next call will be passed the initial index plus one,
|
||||
* and so on.
|
||||
* <p>
|
||||
* The {@link ComponentProcessor} may stop the iteration at any time by returning {@code false}. This may cause the
|
||||
* number of components processed to be returned as a negative number (to signal early return), and the number of
|
||||
* components processed may then be less than the {@linkplain #countReadableComponents() readable component count}.
|
||||
* <p>
|
||||
* <strong>Note</strong> that the {@link Component} instance passed to the consumer could be reused for multiple
|
||||
* calls, so the data must be extracted from the component in the context of the iteration.
|
||||
* <p>
|
||||
* The {@link ByteBuffer} instances obtained from the component, share life time with that internal component.
|
||||
* This means they can be accessed as long as the internal memory store remain unchanged. Methods that may cause
|
||||
* such changes, are any method that requires the buffer to be {@linkplain #isOwned() owned}.
|
||||
* <p>
|
||||
* The best way to ensure this doesn't cause any trouble, is to use the buffers directly as part of the iteration,
|
||||
* or immediately after the iteration.
|
||||
* <p>
|
||||
* <strong>Note</strong> that the arrays, memory addresses, and byte buffers exposed as components by this method,
|
||||
* should not be used for changing the buffer contents. Doing so may cause undefined behaviour.
|
||||
* <p>
|
||||
* Changes to position and limit of the byte buffers exposed via the processed components, are not reflected back to
|
||||
* this buffer instance.
|
||||
*
|
||||
* @param consumer The consumer that will be used to process the buffer components.
|
||||
* @return The number of readable components processed, which may be less than {@link #componentCount()}.
|
||||
* @param initialIndex The initial index of the iteration, and the index that will be passed to the first call to
|
||||
* the {@linkplain ComponentProcessor#process(int, Component) processor}.
|
||||
* @param processor The processor that will be used to process the buffer components.
|
||||
* @return The number of readable components processed, as a positive number of all readable components were
|
||||
* processed, or as a negative number if the iteration was stopped because
|
||||
* {@link ComponentProcessor#process(int, Component)} returned {@code false}.
|
||||
* In any case, the number of components processed may be less than {@link #countComponents()}.
|
||||
*/
|
||||
int forEachReadable(Consumer<Component> consumer);
|
||||
int forEachReadable(int initialIndex, ComponentProcessor processor);
|
||||
|
||||
int forEachWritable(int initialIndex, ComponentProcessor processor);
|
||||
}
|
||||
|
@ -16,10 +16,10 @@
|
||||
package io.netty.buffer.api;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* A view onto the buffer component being processed in a given iteration of {@link Buf#forEachReadable(Consumer)}.
|
||||
* A view onto the buffer component being processed in a given iteration of
|
||||
* {@link Buf#forEachReadable(int, ComponentProcessor)}, or {@link Buf#forEachWritable(int, ComponentProcessor)}.
|
||||
* <p>
|
||||
* Instances of this interface are allowed to be mutable behind the scenes, and the data is only guaranteed to be
|
||||
* consistent within the given iteration.
|
||||
@ -28,30 +28,44 @@ public interface Component {
|
||||
|
||||
/**
|
||||
* Check if this component is backed by a cached byte array than can be accessed cheaply.
|
||||
* <p>
|
||||
* <strong>Note</strong> that regardless of what this method returns, the array should not be used to modify the
|
||||
* contents of this buffer component.
|
||||
*
|
||||
* @return {@code true} if {@link #array()} is a cheap operation, otherwise {@code false}.
|
||||
*/
|
||||
boolean hasCachedArray();
|
||||
boolean hasArray();
|
||||
|
||||
/**
|
||||
* Get a byte array of the contents of this component.
|
||||
* <p>
|
||||
* <strong>Note</strong> that the array is meant to be read-only. It may either be a direct reference to the
|
||||
* concrete array instance that is backing this component, or it is a fresh copy.
|
||||
* Writing to the array may produce undefined behaviour.
|
||||
*
|
||||
* @return A byte array of the contents of this component.
|
||||
*/
|
||||
byte[] array();
|
||||
|
||||
int arrayOffset();
|
||||
|
||||
/**
|
||||
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
|
||||
* <p>
|
||||
* <strong>Note</strong> that the address should not be used for writing to the buffer memory, and doing so may
|
||||
* produce undefined behaviour.
|
||||
*
|
||||
* @return The native memory address, if any, otherwise 0.
|
||||
*/
|
||||
long nativeAddress();
|
||||
|
||||
/**
|
||||
* Build a {@link ByteBuffer} instance for this memory component.
|
||||
* Get a {@link ByteBuffer} instance for this memory component.
|
||||
* <p>
|
||||
* <strong>Note</strong> that the {@link ByteBuffer} is read-only, to prevent write accesses to the memory,
|
||||
* when the buffer component is obtained through {@link Buf#forEachReadable(int, ComponentProcessor)}.
|
||||
*
|
||||
* @return A new {@link ByteBuffer} for this memory component.
|
||||
*/
|
||||
ByteBuffer byteBuffer();
|
||||
ByteBuffer buffer();
|
||||
}
|
||||
|
21
src/main/java/io/netty/buffer/api/ComponentProcessor.java
Normal file
21
src/main/java/io/netty/buffer/api/ComponentProcessor.java
Normal file
@ -0,0 +1,21 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.buffer.api;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface ComponentProcessor {
|
||||
boolean process(int index, Component component);
|
||||
}
|
@ -19,7 +19,6 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
/**
|
||||
@ -49,7 +48,12 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
private boolean readOnly;
|
||||
|
||||
CompositeBuf(Allocator allocator, Buf[] bufs) {
|
||||
this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
|
||||
this(allocator, true, filterExternalBufs(bufs), COMPOSITE_DROP);
|
||||
}
|
||||
|
||||
private static Buf[] filterExternalBufs(Buf[] bufs) {
|
||||
// Allocating a new array unconditionally also prevents external modification of the array.
|
||||
return Arrays.stream(bufs).filter(b -> b.capacity() > 0).toArray(Buf[]::new);
|
||||
}
|
||||
|
||||
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
|
||||
@ -617,7 +621,14 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
(extension.readOnly()? "read-only." : "writable."));
|
||||
}
|
||||
|
||||
long newSize = capacity() + (long) extension.capacity();
|
||||
long extensionCapacity = extension.capacity();
|
||||
if (extensionCapacity == 0) {
|
||||
// Extending by a zero-sized buffer makes no difference. Especially since it's not allowed to change the
|
||||
// capacity of buffers that are constiuents of composite buffers.
|
||||
return;
|
||||
}
|
||||
|
||||
long newSize = capacity() + extensionCapacity;
|
||||
Allocator.checkSize(newSize);
|
||||
|
||||
Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
|
||||
@ -702,21 +713,63 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int componentCount() {
|
||||
public int countComponents() {
|
||||
int sum = 0;
|
||||
for (Buf buf : bufs) {
|
||||
sum += buf.componentCount();
|
||||
sum += buf.countComponents();
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int forEachReadable(Consumer<Component> consumer) {
|
||||
public int countReadableComponents() {
|
||||
int sum = 0;
|
||||
for (Buf buf : bufs) {
|
||||
sum += buf.countReadableComponents();
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int countWritableComponents() {
|
||||
int sum = 0;
|
||||
for (Buf buf : bufs) {
|
||||
sum += buf.countWritableComponents();
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int forEachReadable(int initialIndex, ComponentProcessor processor) {
|
||||
checkReadBounds(readerOffset(), Math.max(1, readableBytes()));
|
||||
int visited = 0;
|
||||
for (Buf buf : bufs) {
|
||||
if (buf.readableBytes() > 0) {
|
||||
visited += buf.forEachReadable(consumer);
|
||||
int count = buf.forEachReadable(visited + initialIndex, processor);
|
||||
if (count > 0) {
|
||||
visited += count;
|
||||
} else {
|
||||
visited = -visited + count;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return visited;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int forEachWritable(int initialIndex, ComponentProcessor processor) {
|
||||
checkWriteBounds(writerOffset(), Math.max(1, writableBytes()));
|
||||
int visited = 0;
|
||||
for (Buf buf : bufs) {
|
||||
if (buf.writableBytes() > 0) {
|
||||
int count = buf.forEachWritable(visited + initialIndex, processor);
|
||||
if (count > 0) {
|
||||
visited += count;
|
||||
} else {
|
||||
visited = -visited + count;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return visited;
|
||||
|
@ -20,6 +20,7 @@ import io.netty.buffer.api.AllocatorControl;
|
||||
import io.netty.buffer.api.Buf;
|
||||
import io.netty.buffer.api.ByteCursor;
|
||||
import io.netty.buffer.api.Component;
|
||||
import io.netty.buffer.api.ComponentProcessor;
|
||||
import io.netty.buffer.api.Drop;
|
||||
import io.netty.buffer.api.Owned;
|
||||
import io.netty.buffer.api.RcSupport;
|
||||
@ -27,7 +28,6 @@ import jdk.incubator.foreign.MemorySegment;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static jdk.incubator.foreign.MemoryAccess.getByteAtOffset;
|
||||
import static jdk.incubator.foreign.MemoryAccess.getCharAtOffset;
|
||||
@ -135,13 +135,18 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf, Component {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasCachedArray() {
|
||||
public boolean hasArray() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] array() {
|
||||
return seg.toByteArray();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int arrayOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -154,19 +159,17 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf, Component {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer byteBuffer() {
|
||||
public ByteBuffer buffer() {
|
||||
var buffer = seg.asByteBuffer();
|
||||
int base = baseOffset;
|
||||
if (buffer.isDirect()) {
|
||||
// TODO Remove this when JDK bug is fixed.
|
||||
// TODO Remove this when the slicing of shared, native segments JDK bug is fixed.
|
||||
ByteBuffer tmp = ByteBuffer.allocateDirect(buffer.capacity());
|
||||
tmp.put(buffer);
|
||||
buffer = tmp.position(0);
|
||||
base = 0; // TODO native memory segments do not have the buffer-of-slice bug.
|
||||
}
|
||||
if (readOnly()) {
|
||||
buffer = buffer.asReadOnlyBuffer();
|
||||
}
|
||||
buffer = buffer.asReadOnlyBuffer();
|
||||
// TODO avoid slicing and just set position+limit when JDK bug is fixed.
|
||||
return buffer.slice(base + readerOffset(), readableBytes()).order(order);
|
||||
}
|
||||
@ -492,15 +495,75 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf, Component {
|
||||
}
|
||||
|
||||
@Override
|
||||
public int componentCount() {
|
||||
public int countComponents() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int forEachReadable(Consumer<Component> consumer) {
|
||||
public int countReadableComponents() {
|
||||
return readableBytes() > 0? 1 : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int countWritableComponents() {
|
||||
return writableBytes() > 0? 1 : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int forEachReadable(int initialIndex, ComponentProcessor processor) {
|
||||
checkRead(readerOffset(), Math.max(1, readableBytes()));
|
||||
consumer.accept(this);
|
||||
return 1;
|
||||
return processor.process(initialIndex, this)? 1 : -1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int forEachWritable(int initialIndex, ComponentProcessor processor) {
|
||||
checkWrite(writerOffset(), Math.max(1, writableBytes()));
|
||||
|
||||
var buffer = wseg.asByteBuffer();
|
||||
|
||||
if (buffer.isDirect()) {
|
||||
buffer = buffer.position(writerOffset()).limit(writerOffset() + writableBytes());
|
||||
} else {
|
||||
// TODO avoid slicing and just set position when JDK bug is fixed.
|
||||
buffer = buffer.slice(baseOffset + writerOffset(), writableBytes());
|
||||
}
|
||||
buffer = buffer.order(order);
|
||||
return processor.process(initialIndex, new WritableComponent(wseg, buffer))? 1 : -1;
|
||||
}
|
||||
|
||||
private static final class WritableComponent implements Component {
|
||||
private final MemorySegment segment;
|
||||
private final ByteBuffer buffer;
|
||||
|
||||
private WritableComponent(MemorySegment segment, ByteBuffer buffer) {
|
||||
this.segment = segment;
|
||||
this.buffer = buffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasArray() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] array() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int arrayOffset() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long nativeAddress() {
|
||||
return buffer.isDirect()? segment.address().toRawLongValue() : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer buffer() {
|
||||
return buffer;
|
||||
}
|
||||
}
|
||||
|
||||
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
|
||||
|
@ -17,6 +17,7 @@ package io.netty.buffer.api;
|
||||
|
||||
import io.netty.buffer.api.Fixture.Properties;
|
||||
import io.netty.buffer.api.memseg.NativeMemorySegmentManager;
|
||||
import org.assertj.core.api.AtomicIntegerArrayAssert;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
@ -40,6 +41,7 @@ import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.Stream.Builder;
|
||||
@ -2588,7 +2590,29 @@ public class BufTest {
|
||||
public void componentCountOfNonCompositeBufferMustBeOne(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8)) {
|
||||
assertThat(buf.componentCount()).isOne();
|
||||
assertThat(buf.countComponents()).isOne();
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonCompositeAllocators")
|
||||
public void readableComponentCountMustBeOneIfThereAreReadableBytes(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8)) {
|
||||
assertThat(buf.countReadableComponents()).isZero();
|
||||
buf.writeByte((byte) 1);
|
||||
assertThat(buf.countReadableComponents()).isOne();
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonCompositeAllocators")
|
||||
public void writableComponentCountMustBeOneIfThereAreWritableBytes(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8)) {
|
||||
assertThat(buf.countWritableComponents()).isOne();
|
||||
buf.writeLong(1);
|
||||
assertThat(buf.countWritableComponents()).isZero();
|
||||
}
|
||||
}
|
||||
|
||||
@ -2602,7 +2626,27 @@ public class BufTest {
|
||||
Buf x = allocator.compose(b, c)) {
|
||||
buf = allocator.compose(a, x);
|
||||
}
|
||||
assertThat(buf.componentCount()).isEqualTo(3);
|
||||
assertThat(buf.countComponents()).isEqualTo(3);
|
||||
assertThat(buf.countReadableComponents()).isZero();
|
||||
assertThat(buf.countWritableComponents()).isEqualTo(3);
|
||||
buf.writeInt(1);
|
||||
assertThat(buf.countReadableComponents()).isOne();
|
||||
assertThat(buf.countWritableComponents()).isEqualTo(3);
|
||||
buf.writeInt(1);
|
||||
assertThat(buf.countReadableComponents()).isOne();
|
||||
assertThat(buf.countWritableComponents()).isEqualTo(2);
|
||||
buf.writeInt(1);
|
||||
assertThat(buf.countReadableComponents()).isEqualTo(2);
|
||||
assertThat(buf.countWritableComponents()).isEqualTo(2);
|
||||
buf.writeInt(1);
|
||||
assertThat(buf.countReadableComponents()).isEqualTo(2);
|
||||
assertThat(buf.countWritableComponents()).isOne();
|
||||
buf.writeInt(1);
|
||||
assertThat(buf.countReadableComponents()).isEqualTo(3);
|
||||
assertThat(buf.countWritableComponents()).isOne();
|
||||
buf.writeInt(1);
|
||||
assertThat(buf.countReadableComponents()).isEqualTo(3);
|
||||
assertThat(buf.countWritableComponents()).isZero();
|
||||
}
|
||||
}
|
||||
|
||||
@ -2614,16 +2658,16 @@ public class BufTest {
|
||||
Buf bufLERW = allocator.allocate(8).order(LITTLE_ENDIAN).writeLong(0x0102030405060708L);
|
||||
Buf bufBERO = allocator.allocate(8).order(BIG_ENDIAN).writeLong(0x0102030405060708L).readOnly(true);
|
||||
Buf bufLERO = allocator.allocate(8).order(LITTLE_ENDIAN).writeLong(0x0102030405060708L).readOnly(true)) {
|
||||
verifyForEachReadableSignleComponent(fixture, bufBERW);
|
||||
verifyForEachReadableSignleComponent(fixture, bufLERW);
|
||||
verifyForEachReadableSignleComponent(fixture, bufBERO);
|
||||
verifyForEachReadableSignleComponent(fixture, bufLERO);
|
||||
verifyForEachReadableSingleComponent(fixture, bufBERW);
|
||||
verifyForEachReadableSingleComponent(fixture, bufLERW);
|
||||
verifyForEachReadableSingleComponent(fixture, bufBERO);
|
||||
verifyForEachReadableSingleComponent(fixture, bufLERO);
|
||||
}
|
||||
}
|
||||
|
||||
private static void verifyForEachReadableSignleComponent(Fixture fixture, Buf buf) {
|
||||
buf.forEachReadable(component -> {
|
||||
var buffer = component.byteBuffer();
|
||||
private static void verifyForEachReadableSingleComponent(Fixture fixture, Buf buf) {
|
||||
buf.forEachReadable(0, (index, component) -> {
|
||||
var buffer = component.buffer();
|
||||
assertThat(buffer.position()).isZero();
|
||||
assertThat(buffer.limit()).isEqualTo(8);
|
||||
assertThat(buffer.capacity()).isEqualTo(8);
|
||||
@ -2635,21 +2679,17 @@ public class BufTest {
|
||||
assertThat(component.nativeAddress()).isZero();
|
||||
}
|
||||
|
||||
byte[] array = component.array();
|
||||
if (buffer.order() == BIG_ENDIAN) {
|
||||
assertThat(array).containsExactly(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08);
|
||||
} else {
|
||||
assertThat(array).containsExactly(0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01);
|
||||
if (component.hasArray()) {
|
||||
byte[] array = component.array();
|
||||
if (buffer.order() == BIG_ENDIAN) {
|
||||
assertThat(array).containsExactly(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08);
|
||||
} else {
|
||||
assertThat(array).containsExactly(0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01);
|
||||
}
|
||||
}
|
||||
|
||||
if (buf.readOnly()) {
|
||||
assertTrue(buffer.isReadOnly());
|
||||
assertThrows(ReadOnlyBufferException.class, () -> buffer.put(0, (byte) 0xFF));
|
||||
} else {
|
||||
assertFalse(buffer.isReadOnly());
|
||||
buffer.put(0, (byte) 0xFF);
|
||||
assertEquals((byte) 0xFF, buffer.get(0));
|
||||
}
|
||||
assertThrows(ReadOnlyBufferException.class, () -> buffer.put(0, (byte) 0xFF));
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
@ -2666,9 +2706,56 @@ public class BufTest {
|
||||
composite = allocator.compose(a, b, c);
|
||||
}
|
||||
var list = new LinkedList<Integer>(List.of(1, 2, 3));
|
||||
composite.forEachReadable(component -> {
|
||||
assertEquals(list.pollFirst().intValue(), component.byteBuffer().getInt());
|
||||
int count = composite.forEachReadable(0, (index, component) -> {
|
||||
var buffer = component.buffer();
|
||||
int bufferValue = buffer.getInt();
|
||||
assertEquals(list.pollFirst().intValue(), bufferValue);
|
||||
assertEquals(bufferValue, index + 1);
|
||||
assertThrows(ReadOnlyBufferException.class, () -> buffer.put(0, (byte) 0xFF));
|
||||
return true;
|
||||
});
|
||||
assertEquals(3, count);
|
||||
assertThat(list).isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void forEachReadableMustReturnNegativeCountWhenProcessorReturnsFalse(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8)) {
|
||||
buf.writeLong(0x0102030405060708L);
|
||||
int count = buf.forEachReadable(0, (index, component) -> false);
|
||||
assertEquals(-1, count);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void forEachReadableMustStopIterationWhenProcessorReturnsFalse() {
|
||||
try (Allocator allocator = Allocator.heap()) {
|
||||
Buf composite;
|
||||
try (Buf a = allocator.allocate(4);
|
||||
Buf b = allocator.allocate(4);
|
||||
Buf c = allocator.allocate(4)) {
|
||||
a.writeInt(1);
|
||||
b.writeInt(2);
|
||||
c.writeInt(3);
|
||||
composite = allocator.compose(a, b, c);
|
||||
}
|
||||
int readPos = composite.readerOffset();
|
||||
int writePos = composite.writerOffset();
|
||||
var list = new LinkedList<Integer>(List.of(1, 2, 3));
|
||||
int count = composite.forEachReadable(0, (index, component) -> {
|
||||
var buffer = component.buffer();
|
||||
int bufferValue = buffer.getInt();
|
||||
assertEquals(list.pollFirst().intValue(), bufferValue);
|
||||
assertEquals(bufferValue, index + 1);
|
||||
return false;
|
||||
});
|
||||
assertEquals(-1, count);
|
||||
assertThat(list).containsExactly(2, 3);
|
||||
assertEquals(readPos, composite.readerOffset());
|
||||
assertEquals(writePos, composite.writerOffset());
|
||||
}
|
||||
}
|
||||
|
||||
@ -2679,7 +2766,210 @@ public class BufTest {
|
||||
var buf = allocator.allocate(8);
|
||||
buf.writeLong(0);
|
||||
buf.close();
|
||||
assertThrows(IllegalStateException.class, () -> buf.forEachReadable(component -> { }));
|
||||
assertThrows(IllegalStateException.class, () -> buf.forEachReadable(0, (component, index) -> true));
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void forEachReadableMustAllowCollectingBuffersInArray(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator()) {
|
||||
Buf buf;
|
||||
try (Buf a = allocator.allocate(4);
|
||||
Buf b = allocator.allocate(4);
|
||||
Buf c = allocator.allocate(4)) {
|
||||
buf = allocator.compose(a, b, c);
|
||||
}
|
||||
int i = 1;
|
||||
while (buf.writableBytes() > 0) {
|
||||
buf.writeByte((byte) i++);
|
||||
}
|
||||
ByteBuffer[] buffers = new ByteBuffer[buf.countReadableComponents()];
|
||||
buf.forEachReadable(0, (index, component) -> {
|
||||
buffers[index] = component.buffer();
|
||||
return true;
|
||||
});
|
||||
i = 1;
|
||||
assertThat(buffers.length).isGreaterThanOrEqualTo(1);
|
||||
for (ByteBuffer buffer : buffers) {
|
||||
while (buffer.hasRemaining()) {
|
||||
assertEquals((byte) i++, buffer.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonCompositeAllocators")
|
||||
public void forEachWritableMustVisitBuffer(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf bufBERW = allocator.allocate(8).order(BIG_ENDIAN);
|
||||
Buf bufLERW = allocator.allocate(8).order(LITTLE_ENDIAN)) {
|
||||
verifyForEachWritableSingleComponent(fixture, bufBERW);
|
||||
verifyForEachWritableSingleComponent(fixture, bufLERW);
|
||||
}
|
||||
}
|
||||
|
||||
private static void verifyForEachWritableSingleComponent(Fixture fixture, Buf buf) {
|
||||
buf.forEachWritable(0, (index, component) -> {
|
||||
var buffer = component.buffer();
|
||||
assertThat(buffer.position()).isZero();
|
||||
assertThat(buffer.limit()).isEqualTo(8);
|
||||
assertThat(buffer.capacity()).isEqualTo(8);
|
||||
buffer.putLong(0x0102030405060708L);
|
||||
buffer.flip();
|
||||
assertEquals(0x0102030405060708L, buffer.getLong());
|
||||
buf.writerOffset(8);
|
||||
assertEquals(0x0102030405060708L, buf.getLong(0));
|
||||
|
||||
if (fixture.isDirect()) {
|
||||
assertThat(component.nativeAddress()).isNotZero();
|
||||
} else {
|
||||
assertThat(component.nativeAddress()).isZero();
|
||||
}
|
||||
|
||||
if (component.hasArray()) {
|
||||
byte[] array = component.array();
|
||||
if (buffer.order() == BIG_ENDIAN) {
|
||||
assertThat(array).containsExactly(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08);
|
||||
} else {
|
||||
assertThat(array).containsExactly(0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01);
|
||||
}
|
||||
}
|
||||
|
||||
buffer.put(0, (byte) 0xFF);
|
||||
assertEquals((byte) 0xFF, buffer.get(0));
|
||||
assertEquals((byte) 0xFF, buf.getByte(0));
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void forEachWritableMustVisitAllWritableConstituentBuffersInOrder() {
|
||||
try (Allocator allocator = Allocator.heap()) {
|
||||
Buf buf;
|
||||
try (Buf a = allocator.allocate(8);
|
||||
Buf b = allocator.allocate(8);
|
||||
Buf c = allocator.allocate(8)) {
|
||||
buf = allocator.compose(a, b, c);
|
||||
}
|
||||
buf.order(BIG_ENDIAN);
|
||||
buf.forEachWritable(0, (index, component) -> {
|
||||
component.buffer().putLong(0x0102030405060708L + 0x1010101010101010L * index);
|
||||
return true;
|
||||
});
|
||||
buf.writerOffset(3 * 8);
|
||||
assertEquals(0x0102030405060708L, buf.readLong());
|
||||
assertEquals(0x1112131415161718L, buf.readLong());
|
||||
assertEquals(0x2122232425262728L, buf.readLong());
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void forEachWritableMustReturnNegativeCountWhenProcessorReturnsFalse(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8)) {
|
||||
int count = buf.forEachWritable(0, (index, component) -> false);
|
||||
assertEquals(-1, count);
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void forEachWritableMustStopIterationWhenProcessorRetursFalse(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8)) {
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
buf.forEachWritable(0, (index, component) -> {
|
||||
counter.incrementAndGet();
|
||||
return false;
|
||||
});
|
||||
assertEquals(1, counter.get());
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void forEachWritableChangesMadeToByteBufferComponentMustBeReflectedInBuffer(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(9).order(BIG_ENDIAN)) {
|
||||
buf.writeByte((byte) 0xFF);
|
||||
AtomicInteger writtenCounter = new AtomicInteger();
|
||||
buf.forEachWritable(0, (index, component) -> {
|
||||
var buffer = component.buffer();
|
||||
while (buffer.hasRemaining()) {
|
||||
buffer.put((byte) writtenCounter.incrementAndGet());
|
||||
}
|
||||
return true;
|
||||
});
|
||||
buf.writerOffset(9);
|
||||
assertEquals((byte) 0xFF, buf.readByte());
|
||||
assertEquals(0x0102030405060708L, buf.readLong());
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void changesMadeToByteBufferComponentsShouldBeReflectedInBuffer(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8)) {
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
buf.forEachWritable(0, (index, component) -> {
|
||||
var buffer = component.buffer();
|
||||
while (buffer.hasRemaining()) {
|
||||
buffer.put((byte) counter.incrementAndGet());
|
||||
}
|
||||
return true;
|
||||
});
|
||||
buf.writerOffset(buf.capacity());
|
||||
for (int i = 0; i < 8; i++) {
|
||||
assertEquals((byte) i + 1, buf.getByte(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void forEachWritableOnClosedBufferMustThrow(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator()) {
|
||||
Buf buf = allocator.allocate(8);
|
||||
buf.close();
|
||||
assertThrows(IllegalStateException.class, () -> buf.forEachWritable(0, (index, component) -> true));
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void forEachWritableOnReadOnlyBufferMustThrow(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8).readOnly(true)) {
|
||||
assertThrows(IllegalStateException.class, () -> buf.forEachWritable(0, (index, component) -> true));
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allocators")
|
||||
public void forEachWritableMustAllowCollectingBuffersInArray(Fixture fixture) {
|
||||
try (Allocator allocator = fixture.createAllocator();
|
||||
Buf buf = allocator.allocate(8)) {
|
||||
ByteBuffer[] buffers = new ByteBuffer[buf.countWritableComponents()];
|
||||
buf.forEachWritable(0, (index, component) -> {
|
||||
buffers[index] = component.buffer();
|
||||
return true;
|
||||
});
|
||||
assertThat(buffers.length).isGreaterThanOrEqualTo(1);
|
||||
int i = 1;
|
||||
for (ByteBuffer buffer : buffers) {
|
||||
while (buffer.hasRemaining()) {
|
||||
buffer.put((byte) i++);
|
||||
}
|
||||
}
|
||||
buf.writerOffset(buf.capacity());
|
||||
i = 1;
|
||||
while (buf.readableBytes() > 0) {
|
||||
assertEquals((byte) i++, buf.readByte());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user