Merge pull request #24 from netty/buffer-iterate
Add support for iterating underlying buffer components
This commit is contained in:
commit
202dd54ff2
@ -15,6 +15,11 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.buffer.api;
|
package io.netty.buffer.api;
|
||||||
|
|
||||||
|
import io.netty.buffer.api.ComponentProcessor.ReadableComponentProcessor;
|
||||||
|
import io.netty.buffer.api.ComponentProcessor.WritableComponentProcessor;
|
||||||
|
import io.netty.buffer.api.ComponentProcessor.ReadableComponent;
|
||||||
|
import io.netty.buffer.api.ComponentProcessor.WritableComponent;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.ByteOrder;
|
import java.nio.ByteOrder;
|
||||||
|
|
||||||
@ -187,7 +192,7 @@ public interface Buf extends Rc<Buf>, BufAccessors {
|
|||||||
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
|
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
|
||||||
* @return The native memory address, if any, otherwise 0.
|
* @return The native memory address, if any, otherwise 0.
|
||||||
*/
|
*/
|
||||||
long getNativeAddress();
|
long nativeAddress();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the read-only state of this buffer.
|
* Set the read-only state of this buffer.
|
||||||
@ -482,4 +487,119 @@ public interface Buf extends Rc<Buf>, BufAccessors {
|
|||||||
* or is {@linkplain #readOnly() read-only}.
|
* or is {@linkplain #readOnly() read-only}.
|
||||||
*/
|
*/
|
||||||
void compact();
|
void compact();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of "components" in this buffer. For composite buffers, this is the number of transitive
|
||||||
|
* constituent buffers, while non-composite buffers only have one component.
|
||||||
|
*
|
||||||
|
* @return The number of components in this buffer.
|
||||||
|
*/
|
||||||
|
int countComponents();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of "components" in this buffer, that are readable. These are the components that would be
|
||||||
|
* processed by {@link #forEachReadable(int, ReadableComponentProcessor)}. For composite buffers, this is the
|
||||||
|
* number of transitive constituent buffers that are readable, while non-composite buffers only have at most one
|
||||||
|
* readable component.
|
||||||
|
* <p>
|
||||||
|
* The number of readable components may be less than the {@link #countComponents() component count}, if not all of
|
||||||
|
* them have readable data.
|
||||||
|
*
|
||||||
|
* @return The number of readable components in this buffer.
|
||||||
|
*/
|
||||||
|
int countReadableComponents();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of "components" in this buffer, that are writable. These are the components that would be
|
||||||
|
* processed by {@link #forEachWritable(int, WritableComponentProcessor)}. For composite buffers, this is the
|
||||||
|
* number of transitive constituent buffers that are writable, while non-composite buffers only have at most one
|
||||||
|
* writable component.
|
||||||
|
* <p>
|
||||||
|
* The number of writable components may be less than the {@link #countComponents() component count}, if not all of
|
||||||
|
* them have space for writing.
|
||||||
|
*
|
||||||
|
* @return The number of writable components in this buffer.
|
||||||
|
*/
|
||||||
|
int countWritableComponents();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process all readable components of this buffer, and return the number of components processed.
|
||||||
|
* <p>
|
||||||
|
* The given {@linkplain ReadableComponentProcessor processor} is called for each readable component in this buffer,
|
||||||
|
* and passed a component index, for the given component in the iteration, and a {@link ReadableComponent} object
|
||||||
|
* for accessing the data within the given component.
|
||||||
|
* <p>
|
||||||
|
* The component index is specific to the particular invokation of this method. The first call to the consumer will
|
||||||
|
* be passed the given initial index, and the next call will be passed the initial index plus one, and so on.
|
||||||
|
* <p>
|
||||||
|
* The {@linkplain ReadableComponentProcessor component processor} may stop the iteration at any time by returning
|
||||||
|
* {@code false}.
|
||||||
|
* This will cause the number of components processed to be returned as a negative number (to signal early return),
|
||||||
|
* and the number of components processed may then be less than the
|
||||||
|
* {@linkplain #countReadableComponents() readable component count}.
|
||||||
|
* <p>
|
||||||
|
* <strong>Note</strong> that the {@link ReadableComponent} instance passed to the consumer could be reused for
|
||||||
|
* multiple calls, so the data must be extracted from the component in the context of the iteration.
|
||||||
|
* <p>
|
||||||
|
* The {@link ByteBuffer} instances obtained from the component, share life time with that internal component.
|
||||||
|
* This means they can be accessed as long as the internal memory store remain unchanged. Methods that may cause
|
||||||
|
* such changes, are any method that requires the buffer to be {@linkplain #isOwned() owned}.
|
||||||
|
* <p>
|
||||||
|
* The best way to ensure this doesn't cause any trouble, is to use the buffers directly as part of the iteration,
|
||||||
|
* or immediately after the iteration while we are still in the scope of the method that triggered the iteration.
|
||||||
|
* <p>
|
||||||
|
* <strong>Note</strong> that the arrays, memory addresses, and byte buffers exposed as components by this method,
|
||||||
|
* should not be used for changing the buffer contents. Doing so may cause undefined behaviour.
|
||||||
|
* <p>
|
||||||
|
* Changes to position and limit of the byte buffers exposed via the processed components, are not reflected back to
|
||||||
|
* this buffer instance.
|
||||||
|
*
|
||||||
|
* @param initialIndex The initial index of the iteration, and the index that will be passed to the first call to
|
||||||
|
* the {@linkplain ReadableComponentProcessor#process(int, ReadableComponent) processor}.
|
||||||
|
* @param processor The processor that will be used to process the buffer components.
|
||||||
|
* @return The number of readable components processed, as a positive number of all readable components were
|
||||||
|
* processed, or as a negative number if the iteration was stopped because
|
||||||
|
* {@link ReadableComponentProcessor#process(int, ReadableComponent)} returned {@code false}.
|
||||||
|
* In any case, the number of components processed may be less than {@link #countComponents()}.
|
||||||
|
*/
|
||||||
|
int forEachReadable(int initialIndex, ReadableComponentProcessor processor);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process all writable components of this buffer, and return the number of components processed.
|
||||||
|
* <p>
|
||||||
|
* The given {@linkplain WritableComponentProcessor processor} is called for each writable component in this buffer,
|
||||||
|
* and passed a component index, for the given component in the iteration, and a {@link WritableComponent} object
|
||||||
|
* for accessing the data within the given component.
|
||||||
|
* <p>
|
||||||
|
* The component index is specific to the particular invokation of this method. The first call to the consumer will
|
||||||
|
* be passed the given initial index, and the next call will be passed the initial index plus one, and so on.
|
||||||
|
* <p>
|
||||||
|
* The {@link WritableComponentProcessor component processor} may stop the iteration at any time by returning
|
||||||
|
* {@code false}.
|
||||||
|
* This will cause the number of components processed to be returned as a negative number (to signal early return),
|
||||||
|
* and the number of components processed may then be less than the
|
||||||
|
* {@linkplain #countReadableComponents() readable component count}.
|
||||||
|
* <p>
|
||||||
|
* <strong>Note</strong> that the {@link WritableComponent} instance passed to the consumer could be reused for
|
||||||
|
* multiple calls, so the data must be extracted from the component in the context of the iteration.
|
||||||
|
* <p>
|
||||||
|
* The {@link ByteBuffer} instances obtained from the component, share life time with that internal component.
|
||||||
|
* This means they can be accessed as long as the internal memory store remain unchanged. Methods that may cause
|
||||||
|
* such changes, are any method that requires the buffer to be {@linkplain #isOwned() owned}.
|
||||||
|
* <p>
|
||||||
|
* The best way to ensure this doesn't cause any trouble, is to use the buffers directly as part of the iteration,
|
||||||
|
* or immediately after the iteration while we are still in the scope of the method that triggered the iteration.
|
||||||
|
* <p>
|
||||||
|
* Changes to position and limit of the byte buffers exposed via the processed components, are not reflected back to
|
||||||
|
* this buffer instance.
|
||||||
|
*
|
||||||
|
* @param initialIndex The initial index of the iteration, and the index that will be passed to the first call to
|
||||||
|
* the {@linkplain WritableComponentProcessor#process(int, WritableComponent) processor}.
|
||||||
|
* @param processor The processor that will be used to process the buffer components.
|
||||||
|
* @return The number of writable components processed, as a positive number of all writable components were
|
||||||
|
* processed, or as a negative number if the iteration was stopped because
|
||||||
|
* {@link WritableComponentProcessor#process(int, WritableComponent)} returned {@code false}.
|
||||||
|
* In any case, the number of components processed may be less than {@link #countComponents()}.
|
||||||
|
*/
|
||||||
|
int forEachWritable(int initialIndex, WritableComponentProcessor processor);
|
||||||
}
|
}
|
||||||
|
170
src/main/java/io/netty/buffer/api/ComponentProcessor.java
Normal file
170
src/main/java/io/netty/buffer/api/ComponentProcessor.java
Normal file
@ -0,0 +1,170 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2020 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.buffer.api;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This interface contain a collection of APIs used in the {@link Buf#forEachReadable(int, ReadableComponentProcessor)}
|
||||||
|
* and {@link Buf#forEachWritable(int, WritableComponentProcessor)} methods.
|
||||||
|
*/
|
||||||
|
public interface ComponentProcessor {
|
||||||
|
/**
|
||||||
|
* A processor of {@linkplain ReadableComponent readable components}.
|
||||||
|
*/
|
||||||
|
@FunctionalInterface
|
||||||
|
interface ReadableComponentProcessor extends ComponentProcessor {
|
||||||
|
/**
|
||||||
|
* Process the given component at the given index in the
|
||||||
|
* {@link Buf#forEachReadable(int, ReadableComponentProcessor) iteration}.
|
||||||
|
* <p>
|
||||||
|
* The component object itself is only valid during this call, but the {@link ByteBuffer byte buffers}, arrays,
|
||||||
|
* and native address pointers obtained from it, will be valid until any
|
||||||
|
* {@link Buf#isOwned() ownership} requiring operation is performed on the buffer.
|
||||||
|
*
|
||||||
|
* @param index The current index of the given buffer component, based on the initial index passed to the
|
||||||
|
* {@link Buf#forEachReadable(int, ReadableComponentProcessor)} method.
|
||||||
|
* @param component The current buffer component being processed.
|
||||||
|
* @return {@code true} if the iteration should continue and more components should be processed, otherwise
|
||||||
|
* {@code false} to stop the iteration early.
|
||||||
|
*/
|
||||||
|
boolean process(int index, ReadableComponent component);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A processor of {@linkplain WritableComponent writable components}.
|
||||||
|
*/
|
||||||
|
@FunctionalInterface
|
||||||
|
interface WritableComponentProcessor extends ComponentProcessor {
|
||||||
|
/**
|
||||||
|
* Process the given component at the given index in the
|
||||||
|
* {@link Buf#forEachWritable(int, WritableComponentProcessor)} iteration}.
|
||||||
|
* <p>
|
||||||
|
* The component object itself is only valid during this call, but the {@link ByteBuffer byte buffers}, arrays,
|
||||||
|
* and native address pointers obtained from it, will be valid until any
|
||||||
|
* {@link Buf#isOwned() ownership} requiring operation is performed on the buffer.
|
||||||
|
*
|
||||||
|
* @param index The current index of the given buffer component, based on the initial index passed to the
|
||||||
|
* {@link Buf#forEachWritable(int, WritableComponentProcessor)} method.
|
||||||
|
* @param component The current buffer component being processed.
|
||||||
|
* @return {@code true} if the iteration should continue and more components should be processed, otherwise
|
||||||
|
* {@code false} to stop the iteration early.
|
||||||
|
*/
|
||||||
|
boolean process(int index, WritableComponent component);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A view onto the buffer component being processed in a given iteration of
|
||||||
|
* {@link Buf#forEachReadable(int, ReadableComponentProcessor)}.
|
||||||
|
*/
|
||||||
|
interface ReadableComponent {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if this component is backed by a cached byte array than can be accessed cheaply.
|
||||||
|
* <p>
|
||||||
|
* <strong>Note</strong> that regardless of what this method returns, the array should not be used to modify the
|
||||||
|
* contents of this buffer component.
|
||||||
|
*
|
||||||
|
* @return {@code true} if {@link #readableArray()} is a cheap operation, otherwise {@code false}.
|
||||||
|
*/
|
||||||
|
boolean hasReadableArray();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a byte array of the contents of this component.
|
||||||
|
* <p>
|
||||||
|
* <strong>Note</strong> that the array is meant to be read-only. It may either be a direct reference to the
|
||||||
|
* concrete array instance that is backing this component, or it is a fresh copy.
|
||||||
|
* Writing to the array may produce undefined behaviour.
|
||||||
|
*
|
||||||
|
* @return A byte array of the contents of this component.
|
||||||
|
* @throws UnsupportedOperationException if {@link #hasReadableArray()} returns {@code false}.
|
||||||
|
*/
|
||||||
|
byte[] readableArray();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An offset into the {@link #readableArray()} where this component starts.
|
||||||
|
*
|
||||||
|
* @return An offset into {@link #readableArray()}.
|
||||||
|
* @throws UnsupportedOperationException if {@link #hasReadableArray()} returns {@code false}.
|
||||||
|
*/
|
||||||
|
int readableArrayOffset();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
|
||||||
|
* <p>
|
||||||
|
* <strong>Note</strong> that the address should not be used for writing to the buffer memory, and doing so may
|
||||||
|
* produce undefined behaviour.
|
||||||
|
*
|
||||||
|
* @return The native memory address, if any, otherwise 0.
|
||||||
|
*/
|
||||||
|
long readableNativeAddress();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a {@link ByteBuffer} instance for this memory component.
|
||||||
|
* <p>
|
||||||
|
* <strong>Note</strong> that the {@link ByteBuffer} is read-only, to prevent write accesses to the memory,
|
||||||
|
* when the buffer component is obtained through {@link Buf#forEachReadable(int, ReadableComponentProcessor)}.
|
||||||
|
*
|
||||||
|
* @return A new {@link ByteBuffer}, with its own position and limit, for this memory component.
|
||||||
|
*/
|
||||||
|
ByteBuffer readableBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A view onto the buffer component being processed in a given iteration of
|
||||||
|
* {@link Buf#forEachWritable(int, WritableComponentProcessor)}.
|
||||||
|
*/
|
||||||
|
interface WritableComponent {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if this component is backed by a cached byte array than can be accessed cheaply.
|
||||||
|
*
|
||||||
|
* @return {@code true} if {@link #writableArray()} is a cheap operation, otherwise {@code false}.
|
||||||
|
*/
|
||||||
|
boolean hasWritableArray();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a byte array of the contents of this component.
|
||||||
|
*
|
||||||
|
* @return A byte array of the contents of this component.
|
||||||
|
* @throws UnsupportedOperationException if {@link #hasWritableArray()} returns {@code false}.
|
||||||
|
*/
|
||||||
|
byte[] writableArray();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An offset into the {@link #writableArray()} where this component starts.
|
||||||
|
*
|
||||||
|
* @return An offset into {@link #writableArray()}.
|
||||||
|
* @throws UnsupportedOperationException if {@link #hasWritableArray()} returns {@code false}.
|
||||||
|
*/
|
||||||
|
int writableArrayOffset();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
|
||||||
|
*
|
||||||
|
* @return The native memory address, if any, otherwise 0.
|
||||||
|
*/
|
||||||
|
long writableNativeAddress();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a {@link ByteBuffer} instance for this memory component, which can be used for modifying the buffer
|
||||||
|
* contents.
|
||||||
|
*
|
||||||
|
* @return A new {@link ByteBuffer}, with its own position and limit, for this memory component.
|
||||||
|
*/
|
||||||
|
ByteBuffer writableBuffer();
|
||||||
|
}
|
||||||
|
}
|
@ -15,6 +15,9 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.buffer.api;
|
package io.netty.buffer.api;
|
||||||
|
|
||||||
|
import io.netty.buffer.api.ComponentProcessor.ReadableComponentProcessor;
|
||||||
|
import io.netty.buffer.api.ComponentProcessor.WritableComponentProcessor;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.ByteOrder;
|
import java.nio.ByteOrder;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -48,7 +51,19 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
|||||||
private boolean readOnly;
|
private boolean readOnly;
|
||||||
|
|
||||||
CompositeBuf(Allocator allocator, Buf[] bufs) {
|
CompositeBuf(Allocator allocator, Buf[] bufs) {
|
||||||
this(allocator, true, bufs.clone(), COMPOSITE_DROP); // Clone prevents external modification of array.
|
this(allocator, true, filterExternalBufs(bufs), COMPOSITE_DROP);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Buf[] filterExternalBufs(Buf[] bufs) {
|
||||||
|
// We filter out all zero-capacity buffers because they wouldn't contribute to the composite buffer anyway,
|
||||||
|
// and also, by ensuring that all constituent buffers contribute to the size of the composite buffer,
|
||||||
|
// we make sure that the number of composite buffers will never become greater than the number of bytes in
|
||||||
|
// the composite buffer.
|
||||||
|
// This restriction guarantees that methods like countComponents, forEachReadable and forEachWritable,
|
||||||
|
// will never overflow their component counts.
|
||||||
|
// Allocating a new array unconditionally also prevents external modification of the array.
|
||||||
|
// TODO if any buffer is itself a composite buffer, then we should unwrap its sub-buffers
|
||||||
|
return Arrays.stream(bufs).filter(b -> b.capacity() > 0).toArray(Buf[]::new);
|
||||||
}
|
}
|
||||||
|
|
||||||
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
|
private CompositeBuf(Allocator allocator, boolean isSendable, Buf[] bufs, Drop<CompositeBuf> drop) {
|
||||||
@ -203,7 +218,7 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getNativeAddress() {
|
public long nativeAddress() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -616,7 +631,17 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
|||||||
(extension.readOnly()? "read-only." : "writable."));
|
(extension.readOnly()? "read-only." : "writable."));
|
||||||
}
|
}
|
||||||
|
|
||||||
long newSize = capacity() + (long) extension.capacity();
|
long extensionCapacity = extension.capacity();
|
||||||
|
if (extensionCapacity == 0) {
|
||||||
|
// Extending by a zero-sized buffer makes no difference. Especially since it's not allowed to change the
|
||||||
|
// capacity of buffers that are constiuents of composite buffers.
|
||||||
|
// This also ensures that methods like countComponents, and forEachReadable, do not have to worry about
|
||||||
|
// overflow in their component counters.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// TODO if extension is itself a composite buffer, then we should extend ourselves by all of the sub-buffers
|
||||||
|
|
||||||
|
long newSize = capacity() + extensionCapacity;
|
||||||
Allocator.checkSize(newSize);
|
Allocator.checkSize(newSize);
|
||||||
|
|
||||||
Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
|
Buf[] restoreTemp = bufs; // We need this to restore our buffer array, in case offset computations fail.
|
||||||
@ -700,6 +725,69 @@ final class CompositeBuf extends RcSupport<Buf, CompositeBuf> implements Buf {
|
|||||||
writerOffset(woff - distance);
|
writerOffset(woff - distance);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int countComponents() {
|
||||||
|
int sum = 0;
|
||||||
|
for (Buf buf : bufs) {
|
||||||
|
sum += buf.countComponents();
|
||||||
|
}
|
||||||
|
return sum;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int countReadableComponents() {
|
||||||
|
int sum = 0;
|
||||||
|
for (Buf buf : bufs) {
|
||||||
|
sum += buf.countReadableComponents();
|
||||||
|
}
|
||||||
|
return sum;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int countWritableComponents() {
|
||||||
|
int sum = 0;
|
||||||
|
for (Buf buf : bufs) {
|
||||||
|
sum += buf.countWritableComponents();
|
||||||
|
}
|
||||||
|
return sum;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int forEachReadable(int initialIndex, ReadableComponentProcessor processor) {
|
||||||
|
checkReadBounds(readerOffset(), Math.max(1, readableBytes()));
|
||||||
|
int visited = 0;
|
||||||
|
for (Buf buf : bufs) {
|
||||||
|
if (buf.readableBytes() > 0) {
|
||||||
|
int count = buf.forEachReadable(visited + initialIndex, processor);
|
||||||
|
if (count > 0) {
|
||||||
|
visited += count;
|
||||||
|
} else {
|
||||||
|
visited = -visited + count;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return visited;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int forEachWritable(int initialIndex, WritableComponentProcessor processor) {
|
||||||
|
checkWriteBounds(writerOffset(), Math.max(1, writableBytes()));
|
||||||
|
int visited = 0;
|
||||||
|
for (Buf buf : bufs) {
|
||||||
|
if (buf.writableBytes() > 0) {
|
||||||
|
int count = buf.forEachWritable(visited + initialIndex, processor);
|
||||||
|
if (count > 0) {
|
||||||
|
visited += count;
|
||||||
|
} else {
|
||||||
|
visited = -visited + count;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return visited;
|
||||||
|
}
|
||||||
|
|
||||||
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
|
// <editor-fold defaultstate="collapsed" desc="Primitive accessors.">
|
||||||
@Override
|
@Override
|
||||||
public byte readByte() {
|
public byte readByte() {
|
||||||
|
@ -19,6 +19,10 @@ import io.netty.buffer.api.Allocator;
|
|||||||
import io.netty.buffer.api.AllocatorControl;
|
import io.netty.buffer.api.AllocatorControl;
|
||||||
import io.netty.buffer.api.Buf;
|
import io.netty.buffer.api.Buf;
|
||||||
import io.netty.buffer.api.ByteCursor;
|
import io.netty.buffer.api.ByteCursor;
|
||||||
|
import io.netty.buffer.api.ComponentProcessor.ReadableComponent;
|
||||||
|
import io.netty.buffer.api.ComponentProcessor.ReadableComponentProcessor;
|
||||||
|
import io.netty.buffer.api.ComponentProcessor.WritableComponent;
|
||||||
|
import io.netty.buffer.api.ComponentProcessor.WritableComponentProcessor;
|
||||||
import io.netty.buffer.api.Drop;
|
import io.netty.buffer.api.Drop;
|
||||||
import io.netty.buffer.api.Owned;
|
import io.netty.buffer.api.Owned;
|
||||||
import io.netty.buffer.api.RcSupport;
|
import io.netty.buffer.api.RcSupport;
|
||||||
@ -42,7 +46,7 @@ import static jdk.incubator.foreign.MemoryAccess.setIntAtOffset;
|
|||||||
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
|
import static jdk.incubator.foreign.MemoryAccess.setLongAtOffset;
|
||||||
import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset;
|
import static jdk.incubator.foreign.MemoryAccess.setShortAtOffset;
|
||||||
|
|
||||||
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
|
class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf, ReadableComponent, WritableComponent {
|
||||||
private static final MemorySegment CLOSED_SEGMENT;
|
private static final MemorySegment CLOSED_SEGMENT;
|
||||||
static final Drop<MemSegBuf> SEGMENT_CLOSE;
|
static final Drop<MemSegBuf> SEGMENT_CLOSE;
|
||||||
|
|
||||||
@ -129,8 +133,78 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// <editor-fold defaultstate="collapsed" desc="Readable/WritableComponent implementation.">
|
||||||
@Override
|
@Override
|
||||||
public long getNativeAddress() {
|
public boolean hasReadableArray() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] readableArray() {
|
||||||
|
throw new UnsupportedOperationException("This component has no backing array.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int readableArrayOffset() {
|
||||||
|
throw new UnsupportedOperationException("This component has no backing array.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long readableNativeAddress() {
|
||||||
|
return nativeAddress();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer readableBuffer() {
|
||||||
|
var buffer = seg.asByteBuffer();
|
||||||
|
if (buffer.isDirect()) {
|
||||||
|
// TODO Remove this when the slicing of shared, native segments JDK bug is fixed.
|
||||||
|
// See https://mail.openjdk.java.net/pipermail/panama-dev/2021-January/011810.html
|
||||||
|
ByteBuffer tmp = ByteBuffer.allocateDirect(buffer.capacity());
|
||||||
|
tmp.put(buffer);
|
||||||
|
buffer = tmp.position(0);
|
||||||
|
}
|
||||||
|
buffer = buffer.asReadOnlyBuffer();
|
||||||
|
// TODO avoid slicing and just set position+limit when JDK bug is fixed.
|
||||||
|
return buffer.slice(readerOffset(), readableBytes()).order(order);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasWritableArray() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] writableArray() {
|
||||||
|
throw new UnsupportedOperationException("This component has no backing array.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int writableArrayOffset() {
|
||||||
|
throw new UnsupportedOperationException("This component has no backing array.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long writableNativeAddress() {
|
||||||
|
return nativeAddress();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer writableBuffer() {
|
||||||
|
var buffer = wseg.asByteBuffer();
|
||||||
|
|
||||||
|
if (buffer.isDirect()) {
|
||||||
|
buffer = buffer.position(writerOffset()).limit(writerOffset() + writableBytes());
|
||||||
|
} else {
|
||||||
|
// TODO avoid slicing and just set position when JDK bug is fixed.
|
||||||
|
buffer = buffer.slice(writerOffset(), writableBytes());
|
||||||
|
}
|
||||||
|
return buffer.order(order);
|
||||||
|
}
|
||||||
|
// </editor-fold>
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long nativeAddress() {
|
||||||
try {
|
try {
|
||||||
return seg.address().toRawLongValue();
|
return seg.address().toRawLongValue();
|
||||||
} catch (UnsupportedOperationException e) {
|
} catch (UnsupportedOperationException e) {
|
||||||
@ -458,6 +532,33 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
|
|||||||
woff -= distance;
|
woff -= distance;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int countComponents() {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int countReadableComponents() {
|
||||||
|
return readableBytes() > 0? 1 : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int countWritableComponents() {
|
||||||
|
return writableBytes() > 0? 1 : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int forEachReadable(int initialIndex, ReadableComponentProcessor processor) {
|
||||||
|
checkRead(readerOffset(), Math.max(1, readableBytes()));
|
||||||
|
return processor.process(initialIndex, this)? 1 : -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int forEachWritable(int initialIndex, WritableComponentProcessor processor) {
|
||||||
|
checkWrite(writerOffset(), Math.max(1, writableBytes()));
|
||||||
|
return processor.process(initialIndex, this)? 1 : -1;
|
||||||
|
}
|
||||||
|
|
||||||
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
|
// <editor-fold defaultstate="collapsed" desc="Primitive accessors implementation.">
|
||||||
@Override
|
@Override
|
||||||
public byte readByte() {
|
public byte readByte() {
|
||||||
@ -969,13 +1070,13 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
|
|||||||
|
|
||||||
private void checkRead(int index, int size) {
|
private void checkRead(int index, int size) {
|
||||||
if (index < 0 || woff < index + size) {
|
if (index < 0 || woff < index + size) {
|
||||||
throw accessCheckException(index);
|
throw readAccessCheckException(index);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkWrite(int index, int size) {
|
private void checkWrite(int index, int size) {
|
||||||
if (index < 0 || wseg.byteSize() < index + size) {
|
if (index < 0 || wseg.byteSize() < index + size) {
|
||||||
throw accessCheckException(index);
|
throw writeAccessCheckException(index);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -989,16 +1090,21 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
|
|||||||
return ioobe;
|
return ioobe;
|
||||||
}
|
}
|
||||||
|
|
||||||
private RuntimeException accessCheckException(int index) {
|
private RuntimeException readAccessCheckException(int index) {
|
||||||
|
if (seg == CLOSED_SEGMENT) {
|
||||||
|
throw bufferIsClosed();
|
||||||
|
}
|
||||||
|
return outOfBounds(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RuntimeException writeAccessCheckException(int index) {
|
||||||
if (seg == CLOSED_SEGMENT) {
|
if (seg == CLOSED_SEGMENT) {
|
||||||
throw bufferIsClosed();
|
throw bufferIsClosed();
|
||||||
}
|
}
|
||||||
if (wseg != seg) {
|
if (wseg != seg) {
|
||||||
return bufferIsReadOnly();
|
return bufferIsReadOnly();
|
||||||
}
|
}
|
||||||
return new IndexOutOfBoundsException(
|
return outOfBounds(index);
|
||||||
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
|
|
||||||
(seg.byteSize() - 1) + "].");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static IllegalStateException bufferIsClosed() {
|
private static IllegalStateException bufferIsClosed() {
|
||||||
@ -1009,6 +1115,12 @@ class MemSegBuf extends RcSupport<Buf, MemSegBuf> implements Buf {
|
|||||||
return new IllegalStateException("This buffer is read-only.");
|
return new IllegalStateException("This buffer is read-only.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private IndexOutOfBoundsException outOfBounds(int index) {
|
||||||
|
return new IndexOutOfBoundsException(
|
||||||
|
"Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " +
|
||||||
|
(seg.byteSize() - 1) + "].");
|
||||||
|
}
|
||||||
|
|
||||||
Object recoverableMemory() {
|
Object recoverableMemory() {
|
||||||
return new RecoverableMemory(seg, alloc);
|
return new RecoverableMemory(seg, alloc);
|
||||||
}
|
}
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -56,6 +56,10 @@ public final class Fixture implements Supplier<Allocator> {
|
|||||||
return properties.contains(Properties.DIRECT);
|
return properties.contains(Properties.DIRECT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isComposite() {
|
||||||
|
return properties.contains(Properties.COMPOSITE);
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isPooled() {
|
public boolean isPooled() {
|
||||||
return properties.contains(Properties.POOLED);
|
return properties.contains(Properties.POOLED);
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user