Introduce ByteIterator, and Buf.iterate

Motivation:
We need a simple API to efficiently iterate a buffer.
We've used the ByteProcessor so far, and while its internal iteration API is simple, it looses some efficiency by forcing code to only consider one byte at a time.

Modification:
The ByteIterator fills the same niche as the ByteProcessor, but uses external iteration instead of internal iteration.
This allows integrators to control the pace of iteration, and it makes it possible to expose methods for consuming bytes in bulk; one long of 8 bytes at a time.
This makes it possible to use the iterator in SIMD-Within-A-Register, or SWAR, data processing algorithms.

Result:
We have a ByteIterator for efficiently processing data within a buffer.
This commit is contained in:
Chris Vest 2020-11-05 15:15:34 +01:00
parent 91be83444d
commit 68795fb1a5
7 changed files with 478 additions and 5 deletions

View File

@ -15,6 +15,8 @@
*/
package io.netty.buffer.b2;
import io.netty.util.ByteIterator;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@ -260,4 +262,20 @@ public interface Buf extends Rc<Buf>, BufAccessors {
readerIndex(0);
writerIndex(0);
}
/**
* Iterate the readable bytes of this buffer. The {@linkplain #readerIndex() reader offset} and
* {@linkplain #writerIndex() witer offset} are not modified by the iterator.
*
* Care should be taken to ensure that the buffers lifetime extends beyond the iteration, and the
* {@linkplain #readerIndex() reader offset} and {@linkplain #writerIndex() writer offset} are not modified while
* the iteration takes place. Otherwise unpredictable behaviour might result.
*
* @return A {@link ByteIterator} for the readable bytes of this buffer.
*/
default ByteIterator iterate() {
return iterate(readerIndex(), readableBytes());
}
ByteIterator iterate(int fromOffset, int length);
}

View File

@ -15,9 +15,12 @@
*/
package io.netty.buffer.b2;
import io.netty.util.ByteIterator;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.NoSuchElementException;
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
/**
@ -39,7 +42,6 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
private final boolean isSendable;
private final Buf[] bufs;
private final int[] offsets; // The offset, for the composite buffer, where each constituent buffer starts.
private final int[] offsetSums; // The cumulative composite buffer offset.
private final int capacity;
private int roff;
private int woff;
@ -92,11 +94,9 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
"The given buffers place the read offset ahead of the write offset: " + Arrays.toString(bufs) + '.';
}
offsets = new int[bufs.length];
offsetSums = new int[bufs.length];
long cap = 0;
for (int i = 0; i < bufs.length; i++) {
offsets[i] = (int) cap;
offsetSums[i] = (int) (cap + (i == 0? 0 : offsets[i - 1]));
cap += bufs[i].capacity();
}
if (cap > MAX_CAPACITY) {
@ -265,6 +265,80 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
}
@Override
public ByteIterator iterate(int fromOffset, int length) {
int startBufferIndex = searchOffsets(fromOffset);
int off = fromOffset - offsets[startBufferIndex];
Buf startBuf = bufs[startBufferIndex];
ByteIterator startIterator = startBuf.iterate(off, Math.min(startBuf.capacity() - off, length));
return new ByteIterator() {
int index = fromOffset;
final int end = fromOffset + length;
int bufferIndex = startBufferIndex;
ByteIterator itr = startIterator;
@Override
public boolean hasNextLong() {
return bytesLeft() >= Long.BYTES;
}
@Override
public long nextLong() {
if (itr.hasNextLong()) {
long val = itr.nextLong();
index += Long.BYTES;
return val;
}
if (!hasNextLong()) {
throw new NoSuchElementException();
}
return nextLongFromBytes(); // Leave index increments to 'nextByte'
}
private long nextLongFromBytes() {
long val = 0;
for (int i = 0; i < 8; i++) {
val <<= 8;
val |= nextByte();
}
return val;
}
@Override
public boolean hasNextByte() {
return index < end;
}
@Override
public byte nextByte() {
if (itr.hasNextByte()) {
byte val = itr.nextByte();
index++;
return val;
}
if (!hasNextByte()) {
throw new NoSuchElementException();
}
bufferIndex++;
Buf nextBuf = bufs[bufferIndex];
itr = nextBuf.iterate(0, Math.min(nextBuf.capacity(), bytesLeft()));
byte val = itr.nextByte();
index++;
return val;
}
@Override
public int currentOffset() {
return index;
}
@Override
public int bytesLeft() {
return end - index;
}
};
}
private void copyInto(int srcPos, CopyInto dest, int destPos, int length) {
if (length < 0) {
throw new IndexOutOfBoundsException("Length cannot be negative: " + length + '.');
@ -676,7 +750,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
// In that case it should not matter what buffer is returned, because it shouldn't be used anyway.
return null;
}
int off = index - offsetSums[i];
int off = index - offsets[i];
Buf candidate = bufs[i];
if (off + size <= candidate.capacity()) {
subOffset = off;
@ -696,6 +770,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
return i < 0? -(i+2) : i;
}
// <editor-fold defaultstate="collapsed" desc="Torn buffer access.">
private static final class TornBufAccessors implements BufAccessors {
private final CompositeBuf buf;
@ -1191,4 +1266,5 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
buf.writePassThrough(woff, value);
}
}
// </editor-fold>
}

View File

@ -15,10 +15,14 @@
*/
package io.netty.buffer.b2;
import io.netty.util.ByteIterator;
import io.netty.util.ByteProcessor;
import io.netty.util.internal.PlatformDependent;
import jdk.incubator.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.NoSuchElementException;
import static jdk.incubator.foreign.MemoryAccess.getByteAtOffset_BE;
import static jdk.incubator.foreign.MemoryAccess.getByteAtOffset_LE;
@ -159,6 +163,55 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
}
@Override
public ByteIterator iterate(int fromOffset, int length) {
return new ByteIterator() {
final MemorySegment segment = seg;
int index = fromOffset;
final int end = index + length;
@Override
public boolean hasNextLong() {
return index + Long.BYTES <= end;
}
@Override
public long nextLong() {
if (!hasNextLong()) {
throw new NoSuchElementException();
}
long val = getLongAtOffset_BE(segment, index);
index += Long.BYTES;
return val;
}
@Override
public boolean hasNextByte() {
return index < end;
}
@Override
public byte nextByte() {
if (!hasNextByte()) {
throw new NoSuchElementException();
}
byte val = getByteAtOffset_BE(segment, index);
index++;
return val;
}
@Override
public int currentOffset() {
return index;
}
@Override
public int bytesLeft() {
return end - index;
}
};
}
private void copyInto(int srcPos, MemorySegment dest, int destPos, int length) {
dest.asSlice(destPos, length).copyFrom(seg.asSlice(srcPos, length));
}

View File

@ -23,6 +23,7 @@ import org.junit.Test;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -827,6 +828,212 @@ public abstract class BufTest {
assertArrayEquals(new byte[] {0x01, 0x02, 0x03, 0x01, 0x02, 0x03, 0x04, 0x05}, buf.copy());
}
}
@Test
public void readableBytesMustMatchWhatWasWritten() {
try (Buf buf = allocate(16)) {
buf.writeLong(0);
assertEquals(Long.BYTES, buf.readableBytes());
buf.readShort();
assertEquals(Long.BYTES - Short.BYTES, buf.readableBytes());
}
}
@Test
public void byteIterationOfBigEndianBuffers() {
try (Buf buf = allocate(0x28)) {
buf.order(ByteOrder.BIG_ENDIAN); // The byte order should have no impact.
checkByteIteration(buf);
buf.reset();
checkByteIterationOfRegion(buf);
}
}
@Test
public void byteIterationOfLittleEndianBuffers() {
try (Buf buf = allocate(0x28)) {
buf.order(ByteOrder.LITTLE_ENDIAN); // The byte order should have no impact.
checkByteIteration(buf);
buf.reset();
checkByteIterationOfRegion(buf);
}
}
private static void checkByteIteration(Buf buf) {
var itr = buf.iterate();
assertFalse(itr.hasNextByte());
assertFalse(itr.hasNextLong());
assertEquals(0, itr.bytesLeft());
try {
itr.nextLong();
fail("Expected a no such element exception.");
} catch (NoSuchElementException ignore) {
// Good.
}
try {
itr.nextByte();
fail("Expected a no such element exception.");
} catch (NoSuchElementException ignore) {
// Good.
}
for (int i = 0; i < 0x27; i++) {
buf.writeByte((byte) (i + 1));
}
int roff = buf.readerIndex();
int woff = buf.writerIndex();
itr = buf.iterate();
assertEquals(0x27, itr.bytesLeft());
assertTrue(itr.hasNextByte());
assertTrue(itr.hasNextLong());
assertEquals(0x0102030405060708L, itr.nextLong());
assertEquals(0x1F, itr.bytesLeft());
assertTrue(itr.hasNextLong());
assertEquals(0x090A0B0C0D0E0F10L, itr.nextLong());
assertTrue(itr.hasNextLong());
assertEquals(0x17, itr.bytesLeft());
assertEquals(0x1112131415161718L, itr.nextLong());
assertTrue(itr.hasNextLong());
assertEquals(0x0F, itr.bytesLeft());
assertEquals(0x191A1B1C1D1E1F20L, itr.nextLong());
assertFalse(itr.hasNextLong());
assertEquals(7, itr.bytesLeft());
try {
itr.nextLong();
fail("Expected a no such element exception.");
} catch (NoSuchElementException ignore) {
// Good.
}
assertTrue(itr.hasNextByte());
assertEquals((byte) 0x21, itr.nextByte());
assertTrue(itr.hasNextByte());
assertEquals(6, itr.bytesLeft());
assertEquals((byte) 0x22, itr.nextByte());
assertTrue(itr.hasNextByte());
assertEquals(5, itr.bytesLeft());
assertEquals((byte) 0x23, itr.nextByte());
assertTrue(itr.hasNextByte());
assertEquals(4, itr.bytesLeft());
assertEquals((byte) 0x24, itr.nextByte());
assertTrue(itr.hasNextByte());
assertEquals(3, itr.bytesLeft());
assertEquals((byte) 0x25, itr.nextByte());
assertTrue(itr.hasNextByte());
assertEquals(2, itr.bytesLeft());
assertEquals((byte) 0x26, itr.nextByte());
assertTrue(itr.hasNextByte());
assertEquals(1, itr.bytesLeft());
assertEquals((byte) 0x27, itr.nextByte());
assertFalse(itr.hasNextByte());
assertEquals(0, itr.bytesLeft());
try {
itr.nextLong();
fail("Expected a no such element exception.");
} catch (NoSuchElementException ignore) {
// Good.
}
try {
itr.nextByte();
fail("Expected a no such element exception.");
} catch (NoSuchElementException ignore) {
// Good.
}
assertEquals(roff, buf.readerIndex());
assertEquals(woff, buf.writerIndex());
}
private static void checkByteIterationOfRegion(Buf buf) {
var itr = buf.iterate(1, 0);
assertFalse(itr.hasNextByte());
assertFalse(itr.hasNextLong());
assertEquals(0, itr.bytesLeft());
try {
itr.nextLong();
fail("Expected a no such element exception.");
} catch (NoSuchElementException ignore) {
// Good.
}
try {
itr.nextByte();
fail("Expected a no such element exception.");
} catch (NoSuchElementException ignore) {
// Good.
}
for (int i = 0; i < 0x27; i++) {
buf.writeByte((byte) (i + 1));
}
int roff = buf.readerIndex();
int woff = buf.writerIndex();
itr = buf.iterate(buf.readerIndex() + 1, buf.readableBytes() - 2);
assertEquals(0x25, itr.bytesLeft());
assertTrue(itr.hasNextByte());
assertTrue(itr.hasNextLong());
assertEquals(0x0203040506070809L, itr.nextLong());
assertEquals(0x1D, itr.bytesLeft());
assertTrue(itr.hasNextLong());
assertEquals(0x0A0B0C0D0E0F1011L, itr.nextLong());
assertTrue(itr.hasNextLong());
assertEquals(0x15, itr.bytesLeft());
assertEquals(0x1213141516171819L, itr.nextLong());
assertTrue(itr.hasNextLong());
assertEquals(0x0D, itr.bytesLeft());
assertEquals(0x1A1B1C1D1E1F2021L, itr.nextLong());
assertFalse(itr.hasNextLong());
assertEquals(5, itr.bytesLeft());
try {
itr.nextLong();
fail("Expected a no such element exception.");
} catch (NoSuchElementException ignore) {
// Good.
}
assertTrue(itr.hasNextByte());
assertEquals((byte) 0x22, itr.nextByte());
assertTrue(itr.hasNextByte());
assertEquals(4, itr.bytesLeft());
assertEquals((byte) 0x23, itr.nextByte());
assertTrue(itr.hasNextByte());
assertEquals(3, itr.bytesLeft());
assertEquals((byte) 0x24, itr.nextByte());
assertTrue(itr.hasNextByte());
assertEquals(2, itr.bytesLeft());
assertEquals((byte) 0x25, itr.nextByte());
assertTrue(itr.hasNextByte());
assertEquals(1, itr.bytesLeft());
assertEquals((byte) 0x26, itr.nextByte());
assertFalse(itr.hasNextByte());
assertEquals(0, itr.bytesLeft());
try {
itr.nextLong();
fail("Expected a no such element exception.");
} catch (NoSuchElementException ignore) {
// Good.
}
try {
itr.nextByte();
fail("Expected a no such element exception.");
} catch (NoSuchElementException ignore) {
// Good.
}
itr = buf.iterate(buf.readerIndex() + 1, 2);
assertEquals(2, itr.bytesLeft());
assertTrue(itr.hasNextByte());
assertFalse(itr.hasNextLong());
assertEquals((byte) 0x02, itr.nextByte());
assertEquals(1, itr.bytesLeft());
assertTrue(itr.hasNextByte());
assertFalse(itr.hasNextLong());
assertEquals((byte) 0x03, itr.nextByte());
assertEquals(0, itr.bytesLeft());
assertFalse(itr.hasNextByte());
assertFalse(itr.hasNextLong());
assertEquals(roff, buf.readerIndex());
assertEquals(woff, buf.writerIndex());
}
// todo reverse iteration
// todo reverse iteration of region
// todo resize copying must preserve contents
// todo resize sharing

View File

@ -65,6 +65,29 @@ public class CompositeBufTest extends BufTest {
combinations.add(new Object[]{name, allocator});
}
}
// Also add a 3-way composite buffer.
String name = "compose(heap,heap,heap)";
Supplier<Allocator> allocator = () -> {
return new Allocator() {
final Allocator alloc = Allocator.heap();
@Override
public Buf allocate(int size) {
int part = size / 3;
try (Buf a = alloc.allocate(part);
Buf b = alloc.allocate(part);
Buf c = alloc.allocate(size - part * 2)) {
return Buf.compose(a, b, c);
}
}
@Override
public void close() {
alloc.close();
}
};
};
combinations.add(new Object[]{name, allocator});
return combinations;
}

View File

@ -0,0 +1,96 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.netty.util;
/**
* The ByteIterator scans through a sequence of bytes.
* This is similar to {@link ByteProcessor}, but for external iteration rather than internal iteration.
* The external iteration allows the callers to control the pace of the iteration.
* The API includes methods for reading {@code long}s as a batch of 8 bytes.
* The long values are always in big-endian format, so that the highest-order byte in the long value, contain the byte
* that would otherwise have been returned by the next call to {@link #nextByte()}.
*/
public interface ByteIterator {
/**
* Check if the iterator has at least 8 bytes left.
* Note that when this method returns {@code false}, the {@link #hasNextByte()} can still return {@code true}.
* It is recommended to have any long-processing loop be followed by a byte-processing loop for the 7 or fewer
* bytes that might form a tail in the iterator.
*
* @return {@code true} if a call to {@link #nextLong()} would succeed, otherwise {@code false}.
*/
boolean hasNextLong();
/**
* Read and return the next 8 bytes, and move the iterator position forward by 8 bytes.
* The bytes are packed and return as a {@code long} value in big-endian format, such that the highest-order byte
* in the long, is the byte that would otherwise have been returned by the next call to {@link #nextByte()}.
*
* @return The next 8 bytes in big-endian format.
* @throws java.util.NoSuchElementException If the iterator has fewer than 8 bytes left.
*/
long nextLong();
/**
* Check if the iterator has at least one byte left.
*
* @return {@code true} if the next call to {@link #nextByte()} would succeed, otherwise {@code false}.
*/
boolean hasNextByte();
/**
* Read and return the next byte, and move the iterator position] forward by one byte.
*
* @return The next byte.
* @throws java.util.NoSuchElementException If the iterator has no more bytes left.
*/
byte nextByte();
/**
* The current position of this iterator into the underlying sequence of bytes.
* For instance, if we are iterating a buffer, this would be the iterators current offset into the buffer.
*
* @return The current iterator offset into the underlying sequence of bytes.
*/
int currentOffset();
/**
* Get the current number of bytes left in the iterator.
*
* @return The number of bytes left in the iterator.
*/
int bytesLeft();
/**
* Process the remaining bytes in this iterator with the given {@link ByteProcessor}.
* This method consumes the iterator.
*
* @param processor The processor to use for processing the bytes in the iterator.
* @return The number of bytes processed, if the {@link ByteProcessor#process(byte) process} method returned
* {@code false}, or {@code -1} if the whole iterator was processed.
*/
default int process(ByteProcessor processor) {
boolean requestMore = true;
int index = currentOffset();
if (hasNextByte()) {
byte val = nextByte();
while ((requestMore = processor.process(val)) && hasNextByte()) {
val = nextByte();
index++;
}
}
return requestMore? -1 : index;
}
}

View File

@ -5,7 +5,7 @@
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT