Add support for iterating underlying buffer components

Motivation:
It's desirable to be able to access the contents of a Buf via an array or a ByteBuffer.
However, we would also like to have a unified API that works for both composite and non-composite buffers.
Even for nested composite buffers.

Modification:
Add a forEachReadable method, which uses internal iteration to process all buffer components.
The internal iteration allows us to hide any nesting of composite buffers.
The consumer in the internal iteration is presented with a Component object, which exposes the contents in various ways.
The data is exposed from the Component via methods, such that anything that is expensive to create, will not have to be paid for unless it is used.
This mechanism also let us avoid any allocation unnecessary allocation; the ByteBuffers and arrays will necessarily have to be allocated, but the consumer may or may not need allocation depending on how it's implemented, and the component objects do not need to be allocated, because the non-composite buffers can directly implement the Component interface.

Result:
It's now possible to access the contents of Buf instances as arrays or ByteBuffers, without having to copy the data.
This commit is contained in:
Chris Vest 2021-01-11 16:10:00 +01:00
parent 8cdcfd53c9
commit d382017dc6
6 changed files with 924 additions and 108 deletions

View File

@ -17,6 +17,7 @@ package io.netty.buffer.api;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.function.Consumer;
/**
* A reference counted buffer of memory, with separate reader and writer offsets.
@ -187,7 +188,7 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
* @return The native memory address, if any, otherwise 0.
*/
long getNativeAddress();
long nativeAddress();
/**
* Set the read-only state of this buffer.
@ -482,4 +483,26 @@ public interface Buf extends Rc<Buf>, BufAccessors {
* or is {@linkplain #readOnly() read-only}.
*/
void compact();
/**
* Get the number of "components" in this buffer. For composite buffers, this is the number of transitive
* constituent buffers, while non-composite buffers only have one component.
*
* @return The number of components in this buffer.
*/
int componentCount();
/**
* Process all readable components of this buffer, and return the number of components consumed.
* <p>
* The number of components consumed may be less than the {@linkplain #componentCount() component count} if not all
* of them have readable data.
*
* <strong>Note</strong> that the {@link Component} instance passed to the consumer could be reused for multiple
* calls, so the data must be extracted from the component in the context of the iteration.
*
* @param consumer The consumer that will be used to process the buffer components.
* @return The number of readable components processed, which may be less than {@link #componentCount()}.
*/
int forEachReadable(Consumer<Component> consumer);
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
/**
* A view onto the buffer component being processed in a given iteration of {@link Buf#forEachReadable(Consumer)}.
* <p>
* Instances of this interface are allowed to be mutable behind the scenes, and the data is only guaranteed to be
* consistent within the given iteration.
*/
public interface Component {
/**
* Check if this component is backed by a cached byte array than can be accessed cheaply.
*
* @return {@code true} if {@link #array()} is a cheap operation, otherwise {@code false}.
*/
boolean hasCachedArray();
/**
* Get a byte array of the contents of this component.
* <p>
* <strong>Note</strong> that the array is meant to be read-only. It may either be a direct reference to the
* concrete array instance that is backing this component, or it is a fresh copy.
*
* @return A byte array of the contents of this component.
*/
byte[] array();
/**
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
* @return The native memory address, if any, otherwise 0.
*/
long nativeAddress();
/**
* Build a {@link ByteBuffer} instance for this memory component.
* @return A new {@link ByteBuffer} for this memory component.
*/
ByteBuffer byteBuffer();
}

View File

@ -19,6 +19,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Objects;
import java.util.function.Consumer;
final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
/**
@ -203,7 +204,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
}
@Override
public long getNativeAddress() {
public long nativeAddress() {
return 0;
}
@ -700,6 +701,27 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
writerOffset(woff - distance);
}
@Override
public int componentCount() {
int sum = 0;
for (Buf buf : bufs) {
sum += buf.componentCount();
}
return sum;
}
@Override
public int forEachReadable(Consumer<Component> consumer) {
checkReadBounds(readerOffset(), Math.max(1, readableBytes()));
int visited = 0;
for (Buf buf : bufs) {
if (buf.readableBytes() > 0) {
visited += buf.forEachReadable(consumer);
}
}
return visited;
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
@Override
public byte readByte() {

View File

@ -19,6 +19,7 @@ import io.netty.buffer.api.Allocator;
import io.netty.buffer.api.AllocatorControl;
import io.netty.buffer.api.Buf;
import io.netty.buffer.api.ByteCursor;
import io.netty.buffer.api.Component;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.Owned;
import io.netty.buffer.api.RcSupport;
@ -26,6 +27,7 @@ import jdk.incubator.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.function.Consumer;
import static jdk.incubator.foreign.MemoryAccess.getByteAtOffset;
import static jdk.incubator.foreign.MemoryAccess.getCharAtOffset;
@ -42,7 +44,7 @@ import static jdk.incubator.foreign.MemoryAccess.setIntAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset;
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf, Component {
private static final MemorySegment CLOSED_SEGMENT;
static final Drop<MemSegBuf> SEGMENT_CLOSE;
@ -58,6 +60,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
private final AllocatorControl alloc;
private final boolean isSendable;
private final int baseOffset; // TODO remove this when JDK bug is fixed (slices of heap buffers)
private MemorySegment seg;
private MemorySegment wseg;
private ByteOrder order;
@ -65,15 +68,17 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
private int woff;
MemSegBuf(MemorySegment segmet, Drop<MemSegBuf> drop, AllocatorControl alloc) {
this(segmet, drop, alloc, true);
this(segmet, drop, alloc, true, 0);
}
private MemSegBuf(MemorySegment segment, Drop<MemSegBuf> drop, AllocatorControl alloc, boolean isSendable) {
private MemSegBuf(MemorySegment segment, Drop<MemSegBuf> drop, AllocatorControl alloc, boolean isSendable,
int baseOffset) {
super(drop);
this.alloc = alloc;
seg = segment;
wseg = segment;
this.isSendable = isSendable;
this.baseOffset = baseOffset;
order = ByteOrder.nativeOrder();
}
@ -130,7 +135,17 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
@Override
public long getNativeAddress() {
public boolean hasCachedArray() {
return false;
}
@Override
public byte[] array() {
return seg.toByteArray();
}
@Override
public long nativeAddress() {
try {
return seg.address().toRawLongValue();
} catch (UnsupportedOperationException e) {
@ -138,6 +153,24 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
}
}
@Override
public ByteBuffer byteBuffer() {
var buffer = seg.asByteBuffer();
int base = baseOffset;
if (buffer.isDirect()) {
// TODO Remove this when JDK bug is fixed.
ByteBuffer tmp = ByteBuffer.allocateDirect(buffer.capacity());
tmp.put(buffer);
buffer = tmp.position(0);
base = 0; // TODO native memory segments do not have the buffer-of-slice bug.
}
if (readOnly()) {
buffer = buffer.asReadOnlyBuffer();
}
// TODO avoid slicing and just set position+limit when JDK bug is fixed.
return buffer.slice(base + readerOffset(), readableBytes()).order(order);
}
@Override
public Buf readOnly(boolean readOnly) {
wseg = readOnly? CLOSED_SEGMENT : seg;
@ -161,7 +194,7 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
b.makeInaccessible();
};
var sendable = false; // Sending implies ownership change, which we can't do for slices.
return new MemSegBuf(slice, drop, alloc, sendable)
return new MemSegBuf(slice, drop, alloc, sendable, baseOffset + offset)
.writerOffset(length)
.order(order())
.readOnly(readOnly());
@ -458,6 +491,18 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
woff -= distance;
}
@Override
public int componentCount() {
return 1;
}
@Override
public int forEachReadable(Consumer<Component> consumer) {
checkRead(readerOffset(), Math.max(1, readableBytes()));
consumer.accept(this);
return 1;
}
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
@Override
public byte readByte() {
@ -969,13 +1014,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
private void checkRead(int index, int size) {
if (index < 0 || woff < index + size) {
throw accessCheckException(index);
throw readAccessCheckException(index);
}
}
private void checkWrite(int index, int size) {
if (index < 0 || wseg.byteSize() < index + size) {
throw accessCheckException(index);
throw writeAccessCheckException(index);
}
}
@ -989,16 +1034,21 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
return ioobe;
}
private RuntimeException accessCheckException(int index) {
private RuntimeException readAccessCheckException(int index) {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
return outOfBounds(index);
}
private RuntimeException writeAccessCheckException(int index) {
if (seg == CLOSED_SEGMENT) {
throw bufferIsClosed();
}
if (wseg != seg) {
return bufferIsReadOnly();
}
return new IndexOutOfBoundsException(
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
(seg.byteSize() - 1) + "].");
return outOfBounds(index);
}
private static IllegalStateException bufferIsClosed() {
@ -1009,6 +1059,12 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
return new IllegalStateException("This buffer is read-only.");
}
private IndexOutOfBoundsException outOfBounds(int index) {
return new IndexOutOfBoundsException(
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
(seg.byteSize() - 1) + "].");
}
Object recoverableMemory() {
return new RecoverableMemory(seg, alloc);
}

File diff suppressed because it is too large Load Diff

View File

@ -56,6 +56,10 @@ public final class Fixture implements Supplier<Allocator> {
return properties.contains(Properties.DIRECT);
}
public boolean isComposite() {
return properties.contains(Properties.COMPOSITE);
}
public boolean isPooled() {
return properties.contains(Properties.POOLED);
}