From 95709828bf42d6e61b913a78dda104d8b4be9f39 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 18 Mar 2021 15:18:22 +0100 Subject: [PATCH] Add a ByteBuffer based implementation of Buffer Motivation: We need a new implementation of our new API that supports Java 11, since that is what Netty 5 will most likely baseline on. We also need an implementation that does not rely on Unsafe. This leaves us with ByteBuffer as the underlying currency of memory. Modification: - Add a NioBuffer implementation and associated supporting classes. - The entry-point for this is a new MemoryManagers API, which is used to pick the implementation and provide the on-/off-heap MemoryManager implementations. - Add a mechanism to configure/override which MemoryManagers implementation to use. - The MemoryManagers implementations are service-loadable, so new ones can be discovered at runtime. - The existing MemorySegment based implementation also get a MemoryManagers implementation. - Expand the BufferTest to include all combinations of all implementations. We now run 360.000 tests in BufferTest. - Some common infrastructure, like ArcDrop, is moved to its own package. - Add a module-info.java to control the service loading, and the visibility in the various packages. - Some pom.xml file updates to support our now module based project. Result: We have an implementation that should work on Java 11, but we currently don't build or test on 11. More work needs to happen before that is a reality. --- pom.xml | 9 +- src/main/java/io/netty/buffer/api/Buffer.java | 4 +- .../io/netty/buffer/api/BufferAllocator.java | 10 +- .../io/netty/buffer/api/BufferHolder.java | 2 +- .../netty/buffer/api/CleanerPooledDrop.java | 4 +- .../io/netty/buffer/api/CompositeBuffer.java | 7 + .../buffer/api/ManagedBufferAllocator.java | 4 +- .../io/netty/buffer/api/MemoryManager.java | 14 +- .../io/netty/buffer/api/MemoryManagers.java | 75 ++ .../buffer/api/MemoryManagersOverride.java | 52 + .../netty/buffer/api/ReadableComponent.java | 11 + .../buffer/api/SizeClassedMemoryPool.java | 6 +- .../io/netty/buffer/api/TransferSend.java | 2 +- .../netty/buffer/api/WritableComponent.java | 11 + .../bytebuffer/ByteBufferMemoryManager.java | 76 ++ .../bytebuffer/ByteBufferMemoryManagers.java | 36 + .../buffer/api/bytebuffer/NioBuffer.java | 1155 +++++++++++++++++ .../buffer/api/bytebuffer/package-info.java | 20 + .../api/{memseg => internal}/ArcDrop.java | 38 +- .../buffer/api/{ => internal}/Statics.java | 33 +- .../buffer/api/internal/package-info.java | 20 + .../memseg/AbstractMemorySegmentManager.java | 19 +- .../netty/buffer/api/memseg/MemSegBuffer.java | 64 +- .../api/memseg/SegmentMemoryManagers.java | 36 + src/main/java/module-info.java | 36 + .../java/io/netty/buffer/api/BufferTest.java | 41 +- 26 files changed, 1683 insertions(+), 102 deletions(-) create mode 100644 src/main/java/io/netty/buffer/api/MemoryManagers.java create mode 100644 src/main/java/io/netty/buffer/api/MemoryManagersOverride.java create mode 100644 src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java create mode 100644 src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManagers.java create mode 100644 src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java create mode 100644 src/main/java/io/netty/buffer/api/bytebuffer/package-info.java rename src/main/java/io/netty/buffer/api/{memseg => internal}/ArcDrop.java (73%) rename src/main/java/io/netty/buffer/api/{ => internal}/Statics.java (54%) create mode 100644 src/main/java/io/netty/buffer/api/internal/package-info.java create mode 100644 src/main/java/io/netty/buffer/api/memseg/SegmentMemoryManagers.java create mode 100644 src/main/java/module-info.java diff --git a/pom.xml b/pom.xml index 1b45336..f026cad 100644 --- a/pom.xml +++ b/pom.xml @@ -69,7 +69,7 @@ io.netty.incubator.buffer 5.0.0.Final-SNAPSHOT - 28 + 29-SNAPSHOT 16 5.7.0 3.0.0-M5 @@ -122,7 +122,7 @@ maven-checkstyle-plugin - 3.1.0 + 3.1.2 check-style @@ -148,7 +148,7 @@ com.puppycrawl.tools checkstyle - 8.29 + 8.41 io.netty @@ -235,7 +235,7 @@ org.apache.felix maven-bundle-plugin - 2.5.4 + 5.1.1 generate-manifest @@ -399,6 +399,7 @@ netty-buffer ${netty.version} test-jar + tests test diff --git a/src/main/java/io/netty/buffer/api/Buffer.java b/src/main/java/io/netty/buffer/api/Buffer.java index 133e364..d44869f 100644 --- a/src/main/java/io/netty/buffer/api/Buffer.java +++ b/src/main/java/io/netty/buffer/api/Buffer.java @@ -408,9 +408,7 @@ public interface Buffer extends Rc, BufferAccessors { * * @return A {@link ByteCursor} for iterating the readable bytes of this buffer. */ - default ByteCursor openCursor() { - return openCursor(readerOffset(), readableBytes()); - } + ByteCursor openCursor(); /** * Open a cursor to iterate the given number bytes of this buffer, starting at the given offset. diff --git a/src/main/java/io/netty/buffer/api/BufferAllocator.java b/src/main/java/io/netty/buffer/api/BufferAllocator.java index a60bb06..1cd0fe8 100644 --- a/src/main/java/io/netty/buffer/api/BufferAllocator.java +++ b/src/main/java/io/netty/buffer/api/BufferAllocator.java @@ -15,6 +15,8 @@ */ package io.netty.buffer.api; +import io.netty.buffer.api.internal.Statics; + import java.nio.ByteOrder; /** @@ -75,18 +77,18 @@ public interface BufferAllocator extends AutoCloseable { } static BufferAllocator heap() { - return new ManagedBufferAllocator(MemoryManager.getHeapMemoryManager(), Statics.CLEANER); + return new ManagedBufferAllocator(MemoryManagers.getManagers().getHeapMemoryManager(), Statics.CLEANER); } static BufferAllocator direct() { - return new ManagedBufferAllocator(MemoryManager.getNativeMemoryManager(), Statics.CLEANER); + return new ManagedBufferAllocator(MemoryManagers.getManagers().getNativeMemoryManager(), Statics.CLEANER); } static BufferAllocator pooledHeap() { - return new SizeClassedMemoryPool(MemoryManager.getHeapMemoryManager()); + return new SizeClassedMemoryPool(MemoryManagers.getManagers().getHeapMemoryManager()); } static BufferAllocator pooledDirect() { - return new SizeClassedMemoryPool(MemoryManager.getNativeMemoryManager()); + return new SizeClassedMemoryPool(MemoryManagers.getManagers().getNativeMemoryManager()); } } diff --git a/src/main/java/io/netty/buffer/api/BufferHolder.java b/src/main/java/io/netty/buffer/api/BufferHolder.java index fd01d31..6a1f6db 100644 --- a/src/main/java/io/netty/buffer/api/BufferHolder.java +++ b/src/main/java/io/netty/buffer/api/BufferHolder.java @@ -18,7 +18,7 @@ package io.netty.buffer.api; import java.lang.invoke.VarHandle; import java.util.Objects; -import static io.netty.buffer.api.Statics.findVarHandle; +import static io.netty.buffer.api.internal.Statics.findVarHandle; import static java.lang.invoke.MethodHandles.lookup; /** diff --git a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java b/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java index 5bed302..35a0609 100644 --- a/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java +++ b/src/main/java/io/netty/buffer/api/CleanerPooledDrop.java @@ -20,8 +20,8 @@ import java.lang.ref.Cleaner.Cleanable; import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicBoolean; -import static io.netty.buffer.api.Statics.CLEANER; -import static io.netty.buffer.api.Statics.findVarHandle; +import static io.netty.buffer.api.internal.Statics.CLEANER; +import static io.netty.buffer.api.internal.Statics.findVarHandle; import static java.lang.invoke.MethodHandles.lookup; class CleanerPooledDrop implements Drop { diff --git a/src/main/java/io/netty/buffer/api/CompositeBuffer.java b/src/main/java/io/netty/buffer/api/CompositeBuffer.java index 8906dab..c986d65 100644 --- a/src/main/java/io/netty/buffer/api/CompositeBuffer.java +++ b/src/main/java/io/netty/buffer/api/CompositeBuffer.java @@ -396,6 +396,11 @@ final class CompositeBuffer extends RcSupport implement } } + @Override + public ByteCursor openCursor() { + return openCursor(readerOffset(), readableBytes()); + } + @Override public ByteCursor openCursor(int fromOffset, int length) { if (fromOffset < 0) { @@ -1147,6 +1152,7 @@ final class CompositeBuffer extends RcSupport implement } throw throwable; } + boolean readOnly = this.readOnly; makeInaccessible(); return new Owned() { @Override @@ -1167,6 +1173,7 @@ final class CompositeBuffer extends RcSupport implement capacity = 0; roff = 0; woff = 0; + readOnly = false; closed = true; } diff --git a/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java b/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java index 85baa45..8921014 100644 --- a/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java +++ b/src/main/java/io/netty/buffer/api/ManagedBufferAllocator.java @@ -17,7 +17,7 @@ package io.netty.buffer.api; import java.lang.ref.Cleaner; -import static io.netty.buffer.api.Statics.NO_OP_DROP; +import static io.netty.buffer.api.internal.Statics.NO_OP_DROP; class ManagedBufferAllocator implements BufferAllocator, AllocatorControl { private final MemoryManager manager; @@ -44,6 +44,6 @@ class ManagedBufferAllocator implements BufferAllocator, AllocatorControl { @Override public void recoverMemory(Object memory) { // Free the recovered memory. - manager.recoverMemory(memory, manager.drop()).close(); + manager.recoverMemory(this, memory, manager.drop()).close(); } } diff --git a/src/main/java/io/netty/buffer/api/MemoryManager.java b/src/main/java/io/netty/buffer/api/MemoryManager.java index 024d2df..7f00cad 100644 --- a/src/main/java/io/netty/buffer/api/MemoryManager.java +++ b/src/main/java/io/netty/buffer/api/MemoryManager.java @@ -21,19 +21,11 @@ import io.netty.buffer.api.memseg.NativeMemorySegmentManager; import java.lang.ref.Cleaner; public interface MemoryManager { - static MemoryManager getHeapMemoryManager() { - return new HeapMemorySegmentManager(); - } - - static MemoryManager getNativeMemoryManager() { - return new NativeMemorySegmentManager(); - } - boolean isNative(); - Buffer allocateConfined(AllocatorControl alloc, long size, Drop drop, Cleaner cleaner); - Buffer allocateShared(AllocatorControl allo, long size, Drop drop, Cleaner cleaner); + Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner); + Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner); Drop drop(); Object unwrapRecoverableMemory(Buffer buf); int capacityOfRecoverableMemory(Object memory); - Buffer recoverMemory(Object recoverableMemory, Drop drop); + Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop drop); } diff --git a/src/main/java/io/netty/buffer/api/MemoryManagers.java b/src/main/java/io/netty/buffer/api/MemoryManagers.java new file mode 100644 index 0000000..f0f9d12 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/MemoryManagers.java @@ -0,0 +1,75 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api; + +import java.util.ServiceLoader; +import java.util.function.Supplier; +import java.util.stream.Stream; + +/** + * The MemoryManagers interface is the handle through which {@link BufferAllocator buffer allocators} access the low + * level memory management APIs. + *

+ * This is hidden behind this interface in order to make allocation and pool agnostic and reusable across buffer and + * memory implementations. + */ +public interface MemoryManagers { + /** + * Get the default, or currently configured, memory managers instance. + * @return A MemoryManagers instance. + */ + static MemoryManagers getManagers() { + return MemoryManagersOverride.getManagers(); + } + + /** + * Temporarily override the default configured memory managers instance. + *

+ * Calls to {@link #getManagers()} from within the given supplier will get the given managers instance. + * + * @param managers Override the default configured managers instance with this instance. + * @param supplier The supplier function to be called while the override is in place. + * @param The result type from the supplier. + * @return The result from the supplier. + */ + static T using(MemoryManagers managers, Supplier supplier) { + return MemoryManagersOverride.using(managers, supplier); + } + + /** + * Get a lazy-loading stream of all available memory managers. + * + * @return A stream of providers of memory managers instances. + */ + static Stream> getAllManagers() { + var loader = ServiceLoader.load(MemoryManagers.class); + return loader.stream(); + } + + /** + * Get a {@link MemoryManager} instance that is suitable for allocating on-heap {@link Buffer} instances. + * + * @return An on-heap {@link MemoryManager}. + */ + MemoryManager getHeapMemoryManager(); + + /** + * Get a {@link MemoryManager} instance that is suitable for allocating off-heap {@link Buffer} instances. + * + * @return An off-heap {@link MemoryManager}. + */ + MemoryManager getNativeMemoryManager(); +} diff --git a/src/main/java/io/netty/buffer/api/MemoryManagersOverride.java b/src/main/java/io/netty/buffer/api/MemoryManagersOverride.java new file mode 100644 index 0000000..4b7cf8a --- /dev/null +++ b/src/main/java/io/netty/buffer/api/MemoryManagersOverride.java @@ -0,0 +1,52 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api; + +import io.netty.buffer.api.memseg.SegmentMemoryManagers; + +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +final class MemoryManagersOverride { + private static final MemoryManagers DEFAULT = new SegmentMemoryManagers(); + private static final AtomicInteger OVERRIDES_AVAILABLE = new AtomicInteger(); + private static final Map OVERRIDES = Collections.synchronizedMap(new IdentityHashMap<>()); + + private MemoryManagersOverride() { + } + + static MemoryManagers getManagers() { + if (OVERRIDES_AVAILABLE.get() > 0) { + return OVERRIDES.getOrDefault(Thread.currentThread(), DEFAULT); + } + return DEFAULT; + } + + static T using(MemoryManagers managers, Supplier supplier) { + Thread thread = Thread.currentThread(); + OVERRIDES.put(thread, managers); + OVERRIDES_AVAILABLE.incrementAndGet(); + try { + return supplier.get(); + } finally { + OVERRIDES_AVAILABLE.decrementAndGet(); + OVERRIDES.remove(thread); + } + } +} diff --git a/src/main/java/io/netty/buffer/api/ReadableComponent.java b/src/main/java/io/netty/buffer/api/ReadableComponent.java index 72e4b29..bdcc045 100644 --- a/src/main/java/io/netty/buffer/api/ReadableComponent.java +++ b/src/main/java/io/netty/buffer/api/ReadableComponent.java @@ -42,6 +42,8 @@ public interface ReadableComponent { * * @return A byte array of the contents of this component. * @throws UnsupportedOperationException if {@link #hasReadableArray()} returns {@code false}. + * @see #readableArrayOffset() + * @see #readableArrayLength() */ byte[] readableArray(); @@ -53,6 +55,15 @@ public interface ReadableComponent { */ int readableArrayOffset(); + /** + * The number of bytes in the {@link #readableArray()} that belong to this component. + * + * @return The number of bytes, from the {@link #readableArrayOffset()} into the {@link #readableArray()}, + * that belong to this component. + * @throws UnsupportedOperationException if {@link #hasReadableArray()} returns {@code false}. + */ + int readableArrayLength(); + /** * Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address. *

diff --git a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java index 265bd82..4ec5f73 100644 --- a/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java +++ b/src/main/java/io/netty/buffer/api/SizeClassedMemoryPool.java @@ -15,13 +15,15 @@ */ package io.netty.buffer.api; +import io.netty.buffer.api.internal.Statics; + import java.lang.invoke.VarHandle; import java.nio.ByteOrder; import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import static io.netty.buffer.api.Statics.NO_OP_DROP; +import static io.netty.buffer.api.internal.Statics.NO_OP_DROP; import static java.lang.invoke.MethodHandles.lookup; class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop { @@ -129,7 +131,7 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop, T extends Rc> implements Send { diff --git a/src/main/java/io/netty/buffer/api/WritableComponent.java b/src/main/java/io/netty/buffer/api/WritableComponent.java index 5d7485a..4ee6c92 100644 --- a/src/main/java/io/netty/buffer/api/WritableComponent.java +++ b/src/main/java/io/netty/buffer/api/WritableComponent.java @@ -35,6 +35,8 @@ public interface WritableComponent { * * @return A byte array of the contents of this component. * @throws UnsupportedOperationException if {@link #hasWritableArray()} returns {@code false}. + * @see #writableArrayOffset() + * @see #writableArrayLength() */ byte[] writableArray(); @@ -46,6 +48,15 @@ public interface WritableComponent { */ int writableArrayOffset(); + /** + * The number of bytes in the {@link #writableArray()} that belong to this component. + * + * @return The number of bytes, from the {@link #writableArrayOffset()} into the {@link #writableArray()}, + * that belong to this component. + * @throws UnsupportedOperationException if {@link #hasWritableArray()} returns {@code false}. + */ + int writableArrayLength(); + /** * Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address. * diff --git a/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java b/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java new file mode 100644 index 0000000..23be2a0 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManager.java @@ -0,0 +1,76 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.bytebuffer; + +import io.netty.buffer.api.AllocatorControl; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.Drop; +import io.netty.buffer.api.MemoryManager; +import io.netty.buffer.api.internal.Statics; + +import java.lang.ref.Cleaner; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import static io.netty.buffer.api.internal.Statics.convert; + +public class ByteBufferMemoryManager implements MemoryManager { + private final boolean direct; + + public ByteBufferMemoryManager(boolean direct) { + this.direct = direct; + } + + @Override + public boolean isNative() { + return direct; + } + + @Override + public Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner) { + return allocateShared(allocatorControl, size, drop, cleaner); + } + + @Override + public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner) { + int capacity = Math.toIntExact(size); + ByteBuffer buffer = direct? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity); + buffer.order(ByteOrder.nativeOrder()); + return new NioBuffer(buffer, buffer, allocatorControl, convert(drop)); + } + + @Override + public Drop drop() { + return Statics.NO_OP_DROP; + } + + @Override + public Object unwrapRecoverableMemory(Buffer buf) { + return ((NioBuffer) buf).recoverable(); + } + + @Override + public int capacityOfRecoverableMemory(Object memory) { + //noinspection OverlyStrongTypeCast + return ((ByteBuffer) memory).capacity(); + } + + @Override + public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop drop) { + ByteBuffer memory = (ByteBuffer) recoverableMemory; + return new NioBuffer(memory, memory, allocatorControl, convert(drop)); + } +} diff --git a/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManagers.java b/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManagers.java new file mode 100644 index 0000000..8eca2fc --- /dev/null +++ b/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManagers.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.bytebuffer; + +import io.netty.buffer.api.MemoryManager; +import io.netty.buffer.api.MemoryManagers; + +public class ByteBufferMemoryManagers implements MemoryManagers { + @Override + public MemoryManager getHeapMemoryManager() { + return new ByteBufferMemoryManager(false); + } + + @Override + public MemoryManager getNativeMemoryManager() { + return new ByteBufferMemoryManager(true); + } + + @Override + public String toString() { + return "BB"; + } +} diff --git a/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java b/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java new file mode 100644 index 0000000..84d6b61 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java @@ -0,0 +1,1155 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.bytebuffer; + +import io.netty.buffer.api.AllocatorControl; +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.BufferAllocator; +import io.netty.buffer.api.ByteCursor; +import io.netty.buffer.api.Drop; +import io.netty.buffer.api.Owned; +import io.netty.buffer.api.RcSupport; +import io.netty.buffer.api.ReadableComponent; +import io.netty.buffer.api.ReadableComponentProcessor; +import io.netty.buffer.api.WritableComponent; +import io.netty.buffer.api.WritableComponentProcessor; +import io.netty.buffer.api.internal.ArcDrop; +import io.netty.buffer.api.internal.Statics; +import io.netty.util.internal.PlatformDependent; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.ReadOnlyBufferException; + +class NioBuffer extends RcSupport implements Buffer, ReadableComponent, WritableComponent { + private static final ByteBuffer CLOSED_BUFFER = ByteBuffer.allocate(0); + + private final AllocatorControl control; + private ByteBuffer base; + private ByteBuffer rmem; // For reading. + private ByteBuffer wmem; // For writing. + + private int roff; + private int woff; + + NioBuffer(ByteBuffer base, ByteBuffer memory, AllocatorControl control, Drop drop) { + super(new MakeInaccisbleOnDrop(ArcDrop.wrap(drop))); + this.base = base; + rmem = memory; + wmem = memory; + this.control = control; + } + + private static final class MakeInaccisbleOnDrop implements Drop { + final Drop delegate; + + private MakeInaccisbleOnDrop(Drop delegate) { + this.delegate = delegate; + } + + @Override + public void drop(NioBuffer buf) { + try { + delegate.drop(buf); + } finally { + buf.makeInaccessible(); + } + } + + @Override + public void attach(NioBuffer buf) { + delegate.attach(buf); + } + + @Override + public String toString() { + return "MemSegDrop(" + delegate + ')'; + } + } + + @Override + protected Drop unsafeGetDrop() { + MakeInaccisbleOnDrop drop = (MakeInaccisbleOnDrop) super.unsafeGetDrop(); + return drop.delegate; + } + + @Override + protected void unsafeSetDrop(Drop replacement) { + super.unsafeSetDrop(new MakeInaccisbleOnDrop(replacement)); + } + + @Override + public String toString() { + return "Buffer[roff:" + roff + ", woff:" + woff + ", cap:" + rmem.capacity() + ']'; + } + + @Override + public Buffer order(ByteOrder order) { + rmem.order(order); + return this; + } + + @Override + public ByteOrder order() { + return rmem.order(); + } + + @Override + public int capacity() { + return rmem.capacity(); + } + + @Override + public int readerOffset() { + return roff; + } + + @Override + public Buffer readerOffset(int offset) { + checkRead(offset, 0); + roff = offset; + return this; + } + + @Override + public int writerOffset() { + return woff; + } + + @Override + public Buffer writerOffset(int offset) { + checkWrite(offset, 0); + woff = offset; + return this; + } + + @Override + public Buffer fill(byte value) { + int capacity = capacity(); + checkSet(0, capacity); + if (rmem == CLOSED_BUFFER) { + throw bufferIsClosed(); + } + for (int i = 0; i < capacity; i++) { + wmem.put(i, value); + } + return this; + } + + @Override + public long nativeAddress() { + return rmem.isDirect() && PlatformDependent.hasUnsafe()? PlatformDependent.directBufferAddress(rmem) : 0; + } + + @Override + public Buffer readOnly(boolean readOnly) { + if (readOnly && wmem == rmem) { + wmem = CLOSED_BUFFER; + } else if (!readOnly && wmem != rmem) { + wmem = rmem; + } + return this; + } + + @Override + public boolean readOnly() { + return wmem == CLOSED_BUFFER && rmem != CLOSED_BUFFER; + } + + @Override + public Buffer slice(int offset, int length) { + if (length < 0) { + throw new IllegalArgumentException("Length cannot be negative: " + length + '.'); + } + if (!isAccessible()) { + throw new IllegalStateException("This buffer is closed: " + this + '.'); + } + ByteBuffer slice = rmem.slice(offset, length); + ArcDrop drop = (ArcDrop) unsafeGetDrop(); + drop.increment(); + return new NioBuffer(base, slice, control, drop) + .writerOffset(length) + .order(order()) + .readOnly(readOnly()); + } + + @Override + public void copyInto(int srcPos, byte[] dest, int destPos, int length) { + copyInto(srcPos, ByteBuffer.wrap(dest), destPos, length); + } + + @Override + public void copyInto(int srcPos, ByteBuffer dest, int destPos, int length) { + if (rmem == CLOSED_BUFFER) { + throw bufferIsClosed(); + } + if (srcPos < 0) { + throw new IllegalArgumentException("The srcPos cannot be negative: " + srcPos + '.'); + } + if (length < 0) { + throw new IllegalArgumentException("The length cannot be negative: " + length + '.'); + } + if (capacity() < srcPos + length) { + throw new IllegalArgumentException("The srcPos + length is beyond the end of the buffer: " + + "srcPos = " + srcPos + ", length = " + length + '.'); + } + dest = dest.duplicate().clear(); + dest.put(destPos, rmem, srcPos, length); + } + + @Override + public void copyInto(int srcPos, Buffer dest, int destPos, int length) { + if (dest instanceof NioBuffer) { + var nb = (NioBuffer) dest; + nb.checkSet(destPos, length); + copyInto(srcPos, nb.wmem, destPos, length); + return; + } + + Statics.copyToViaReverseCursor(this, srcPos, dest, destPos, length); + } + + @Override + public ByteCursor openCursor() { + return openCursor(readerOffset(), readableBytes()); + } + + @Override + public ByteCursor openCursor(int fromOffset, int length) { + if (rmem == CLOSED_BUFFER) { + throw bufferIsClosed(); + } + if (fromOffset < 0) { + throw new IllegalArgumentException("The fromOffset cannot be negative: " + fromOffset + '.'); + } + if (length < 0) { + throw new IllegalArgumentException("The length cannot be negative: " + length + '.'); + } + if (capacity() < fromOffset + length) { + throw new IllegalArgumentException("The fromOffset + length is beyond the end of the buffer: " + + "fromOffset = " + fromOffset + ", length = " + length + '.'); + } + return new ByteCursor() { + // Duplicate source buffer to keep our own byte order state. + final ByteBuffer buffer = rmem.duplicate().order(ByteOrder.BIG_ENDIAN); + int index = fromOffset; + final int end = index + length; + long longValue = -1; + byte byteValue = -1; + + @Override + public boolean readLong() { + if (index + Long.BYTES <= end) { + longValue = buffer.getLong(index); + index += Long.BYTES; + return true; + } + return false; + } + + @Override + public long getLong() { + return longValue; + } + + @Override + public boolean readByte() { + if (index < end) { + byteValue = buffer.get(index); + index++; + return true; + } + return false; + } + + @Override + public byte getByte() { + return byteValue; + } + + @Override + public int currentOffset() { + return index; + } + + @Override + public int bytesLeft() { + return end - index; + } + }; + } + + @Override + public ByteCursor openReverseCursor(int fromOffset, int length) { + if (rmem == CLOSED_BUFFER) { + throw bufferIsClosed(); + } + if (fromOffset < 0) { + throw new IllegalArgumentException("The fromOffset cannot be negative: " + fromOffset + '.'); + } + if (length < 0) { + throw new IllegalArgumentException("The length cannot be negative: " + length + '.'); + } + if (capacity() <= fromOffset) { + throw new IllegalArgumentException("The fromOffset is beyond the end of the buffer: " + fromOffset + '.'); + } + if (fromOffset - length < -1) { + throw new IllegalArgumentException("The fromOffset - length would underflow the buffer: " + + "fromOffset = " + fromOffset + ", length = " + length + '.'); + } + return new ByteCursor() { + final ByteBuffer buffer = rmem.duplicate().order(ByteOrder.LITTLE_ENDIAN); + int index = fromOffset; + final int end = index - length; + long longValue = -1; + byte byteValue = -1; + + @Override + public boolean readLong() { + if (index - Long.BYTES >= end) { + index -= 7; + longValue = buffer.getLong(index); + index--; + return true; + } + return false; + } + + @Override + public long getLong() { + return longValue; + } + + @Override + public boolean readByte() { + if (index > end) { + byteValue = buffer.get(index); + index--; + return true; + } + return false; + } + + @Override + public byte getByte() { + return byteValue; + } + + @Override + public int currentOffset() { + return index; + } + + @Override + public int bytesLeft() { + return index - end; + } + }; + } + + @Override + public void ensureWritable(int size, boolean allowCompaction) { + if (!isOwned()) { + throw attachTrace(new IllegalStateException( + "Buffer is not owned. Only owned buffers can call ensureWritable.")); + } + if (size < 0) { + throw new IllegalArgumentException("Cannot ensure writable for a negative size: " + size + '.'); + } + if (rmem != wmem) { + throw bufferIsReadOnly(); + } + if (writableBytes() >= size) { + // We already have enough space. + return; + } + + if (allowCompaction && writableBytes() + readerOffset() >= size) { + // We can solve this with compaction. + compact(); + return; + } + + // Allocate a bigger buffer. + long newSize = capacity() + size - (long) writableBytes(); + BufferAllocator.checkSize(newSize); + ByteBuffer buffer = (ByteBuffer) control.allocateUntethered(this, (int) newSize); + buffer.order(order()); + + // Copy contents. + copyInto(0, buffer, 0, capacity()); + + // Release old memory: + var drop = (Drop) unsafeGetDrop(); + int roff = this.roff; + int woff = this.woff; + drop.drop(this); + while (drop instanceof ArcDrop) { + drop = ((ArcDrop) drop).unwrap(); + } + unsafeSetDrop(new ArcDrop<>(drop)); + this.roff = roff; + this.woff = woff; + + base = buffer; + rmem = buffer; + wmem = buffer; + drop.attach(this); + } + + @Override + public Buffer bifurcate() { + if (!isOwned()) { + throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); + } + var drop = (ArcDrop) unsafeGetDrop(); + unsafeSetDrop(new ArcDrop<>(drop)); + var bifurcatedSeg = rmem.slice(0, woff); + // TODO maybe incrementing the existing ArcDrop is enough; maybe we don't need to wrap it in another ArcDrop. + var bifurcatedBuf = new NioBuffer(base, bifurcatedSeg, control, new ArcDrop<>(drop.increment())); + bifurcatedBuf.woff = woff; + bifurcatedBuf.roff = roff; + bifurcatedBuf.order(order()); + boolean readOnly = readOnly(); + bifurcatedBuf.readOnly(readOnly); + rmem = rmem.slice(woff, rmem.capacity() - woff); + if (!readOnly) { + wmem = rmem; + } + woff = 0; + roff = 0; + return bifurcatedBuf; + } + + @Override + public void compact() { + if (!isOwned()) { + throw attachTrace(new IllegalStateException("Buffer must be owned in order to compact.")); + } + if (readOnly()) { + throw new IllegalStateException("Buffer must be writable in order to compact, but was read-only."); + } + if (roff == 0) { + return; + } + rmem.limit(woff).position(roff).compact().clear(); + woff -= roff; + roff = 0; + } + + @Override + public int countComponents() { + return 1; + } + + @Override + public int countReadableComponents() { + return readableBytes() > 0? 1 : 0; + } + + @Override + public int countWritableComponents() { + return writableBytes() > 0? 1 : 0; + } + + // + @Override + public boolean hasReadableArray() { + return rmem.hasArray(); + } + + @Override + public byte[] readableArray() { + return rmem.array(); + } + + @Override + public int readableArrayOffset() { + return rmem.arrayOffset() + roff; + } + + @Override + public int readableArrayLength() { + return woff - roff; + } + + @Override + public long readableNativeAddress() { + return nativeAddress(); + } + + @Override + public ByteBuffer readableBuffer() { + return rmem.asReadOnlyBuffer().slice(readerOffset(), readableBytes()).order(order()); + } + + @Override + public boolean hasWritableArray() { + return wmem.hasArray(); + } + + @Override + public byte[] writableArray() { + return wmem.array(); + } + + @Override + public int writableArrayOffset() { + return wmem.arrayOffset() + woff; + } + + @Override + public int writableArrayLength() { + return capacity() - woff; + } + + @Override + public long writableNativeAddress() { + return nativeAddress(); + } + + @Override + public ByteBuffer writableBuffer() { + return wmem.slice(writerOffset(), writableBytes()).order(order()); + } + // + + @Override + public int forEachReadable(int initialIndex, ReadableComponentProcessor processor) + throws E { + checkRead(readerOffset(), Math.max(1, readableBytes())); + return processor.process(initialIndex, this)? 1 : -1; + } + + @Override + public int forEachWritable(int initialIndex, WritableComponentProcessor processor) + throws E { + checkWrite(writerOffset(), Math.max(1, writableBytes())); + return processor.process(initialIndex, this)? 1 : -1; + } + + @Override + public byte readByte() { + checkRead(roff, Byte.BYTES); + var value = rmem.get(roff); + roff += Byte.BYTES; + return value; + } + + @Override + public byte getByte(int roff) { + checkGet(roff, Byte.BYTES); + return rmem.get(roff); + } + + @Override + public int readUnsignedByte() { + return readByte() & 0xFF; + } + + @Override + public int getUnsignedByte(int roff) { + return getByte(roff) & 0xFF; + } + + @Override + public Buffer writeByte(byte value) { + try { + wmem.put(woff, value); + woff += Byte.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer setByte(int woff, byte value) { + try { + wmem.put(woff, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer writeUnsignedByte(int value) { + try { + wmem.put(woff, (byte) (value & 0xFF)); + woff += Byte.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer setUnsignedByte(int woff, int value) { + try { + wmem.put(woff, (byte) (value & 0xFF)); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public char readChar() { + checkRead(roff, 2); + var value = rmem.getChar(roff); + roff += 2; + return value; + } + + @Override + public char getChar(int roff) { + checkGet(roff, 2); + return rmem.getChar(roff); + } + + @Override + public Buffer writeChar(char value) { + try { + wmem.putChar(woff, value); + woff += 2; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer setChar(int woff, char value) { + try { + wmem.putChar(woff, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public short readShort() { + checkRead(roff, Short.BYTES); + var value = rmem.getShort(roff); + roff += 2; + return value; + } + + @Override + public short getShort(int roff) { + checkGet(roff, Short.BYTES); + return rmem.getShort(roff); + } + + @Override + public int readUnsignedShort() { + checkRead(roff, Short.BYTES); + var value = rmem.getShort(roff) & 0xFFFF; + roff += 2; + return value; + } + + @Override + public int getUnsignedShort(int roff) { + checkGet(roff, Short.BYTES); + return rmem.getShort(roff) & 0xFFFF; + } + + @Override + public Buffer writeShort(short value) { + try { + wmem.putShort(woff, value); + woff += Short.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer setShort(int woff, short value) { + try { + wmem.putShort(woff, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer writeUnsignedShort(int value) { + try { + wmem.putShort(woff, (short) (value & 0xFFFF)); + woff += Short.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer setUnsignedShort(int woff, int value) { + try { + wmem.putShort(woff, (short) (value & 0xFFFF)); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public int readMedium() { + checkRead(roff, 3); + int value = order() == ByteOrder.BIG_ENDIAN? + rmem.get(roff) << 16 | + (rmem.get(roff + 1) & 0xFF) << 8 | + rmem.get(roff + 2) & 0xFF : + rmem.get(roff) & 0xFF | + (rmem.get(roff + 1) & 0xFF) << 8 | + rmem.get(roff + 2) << 16; + roff += 3; + return value; + } + + @Override + public int getMedium(int roff) { + checkGet(roff, 3); + return order() == ByteOrder.BIG_ENDIAN? + rmem.get(roff) << 16 | + (rmem.get(roff + 1) & 0xFF) << 8 | + rmem.get(roff + 2) & 0xFF : + rmem.get(roff) & 0xFF | + (rmem.get(roff + 1) & 0xFF) << 8 | + rmem.get(roff + 2) << 16; + } + + @Override + public int readUnsignedMedium() { + checkRead(roff, 3); + int value = order() == ByteOrder.BIG_ENDIAN? + (rmem.get(roff) << 16 | + (rmem.get(roff + 1) & 0xFF) << 8 | + rmem.get(roff + 2) & 0xFF) & 0xFFFFFF : + (rmem.get(roff) & 0xFF | + (rmem.get(roff + 1) & 0xFF) << 8 | + rmem.get(roff + 2) << 16) & 0xFFFFFF; + roff += 3; + return value; + } + + @Override + public int getUnsignedMedium(int roff) { + checkGet(roff, 3); + return order() == ByteOrder.BIG_ENDIAN? + (rmem.get(roff) << 16 | + (rmem.get(roff + 1) & 0xFF) << 8 | + rmem.get(roff + 2) & 0xFF) & 0xFFFFFF : + (rmem.get(roff) & 0xFF | + (rmem.get(roff + 1) & 0xFF) << 8 | + rmem.get(roff + 2) << 16) & 0xFFFFFF; + } + + @Override + public Buffer writeMedium(int value) { + checkWrite(woff, 3); + if (order() == ByteOrder.BIG_ENDIAN) { + wmem.put(woff, (byte) (value >> 16)); + wmem.put(woff + 1, (byte) (value >> 8 & 0xFF)); + wmem.put(woff + 2, (byte) (value & 0xFF)); + } else { + wmem.put(woff, (byte) (value & 0xFF)); + wmem.put(woff + 1, (byte) (value >> 8 & 0xFF)); + wmem.put(woff + 2, (byte) (value >> 16 & 0xFF)); + } + woff += 3; + return this; + } + + @Override + public Buffer setMedium(int woff, int value) { + checkSet(woff, 3); + if (order() == ByteOrder.BIG_ENDIAN) { + wmem.put(woff, (byte) (value >> 16)); + wmem.put(woff + 1, (byte) (value >> 8 & 0xFF)); + wmem.put(woff + 2, (byte) (value & 0xFF)); + } else { + wmem.put(woff, (byte) (value & 0xFF)); + wmem.put(woff + 1, (byte) (value >> 8 & 0xFF)); + wmem.put(woff + 2, (byte) (value >> 16 & 0xFF)); + } + return this; + } + + @Override + public Buffer writeUnsignedMedium(int value) { + checkWrite(woff, 3); + if (order() == ByteOrder.BIG_ENDIAN) { + wmem.put(woff, (byte) (value >> 16)); + wmem.put(woff + 1, (byte) (value >> 8 & 0xFF)); + wmem.put(woff + 2, (byte) (value & 0xFF)); + } else { + wmem.put(woff, (byte) (value & 0xFF)); + wmem.put(woff + 1, (byte) (value >> 8 & 0xFF)); + wmem.put(woff + 2, (byte) (value >> 16 & 0xFF)); + } + woff += 3; + return this; + } + + @Override + public Buffer setUnsignedMedium(int woff, int value) { + checkSet(woff, 3); + if (order() == ByteOrder.BIG_ENDIAN) { + wmem.put(woff, (byte) (value >> 16)); + wmem.put(woff + 1, (byte) (value >> 8 & 0xFF)); + wmem.put(woff + 2, (byte) (value & 0xFF)); + } else { + wmem.put(woff, (byte) (value & 0xFF)); + wmem.put(woff + 1, (byte) (value >> 8 & 0xFF)); + wmem.put(woff + 2, (byte) (value >> 16 & 0xFF)); + } + return this; + } + + @Override + public int readInt() { + checkRead(roff, Integer.BYTES); + var value = rmem.getInt(roff); + roff += Integer.BYTES; + return value; + } + + @Override + public int getInt(int roff) { + checkGet(roff, Integer.BYTES); + return rmem.getInt(roff); + } + + @Override + public long readUnsignedInt() { + checkRead(roff, Integer.BYTES); + var value = rmem.getInt(roff) & 0xFFFFFFFFL; + roff += Integer.BYTES; + return value; + } + + @Override + public long getUnsignedInt(int roff) { + checkGet(roff, Integer.BYTES); + return rmem.getInt(roff) & 0xFFFFFFFFL; + } + + @Override + public Buffer writeInt(int value) { + try { + wmem.putInt(woff, value); + woff += Integer.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer setInt(int woff, int value) { + try { + wmem.putInt(woff, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, NioBuffer.this.woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer writeUnsignedInt(long value) { + try { + wmem.putInt(woff, (int) (value & 0xFFFFFFFFL)); + woff += Integer.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer setUnsignedInt(int woff, long value) { + try { + wmem.putInt(woff, (int) (value & 0xFFFFFFFFL)); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, NioBuffer.this.woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public float readFloat() { + checkRead(roff, Float.BYTES); + var value = rmem.getFloat(roff); + roff += Float.BYTES; + return value; + } + + @Override + public float getFloat(int roff) { + checkGet(roff, Float.BYTES); + return rmem.getFloat(roff); + } + + @Override + public Buffer writeFloat(float value) { + try { + wmem.putFloat(woff, value); + woff += Float.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer setFloat(int woff, float value) { + try { + wmem.putFloat(woff, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public long readLong() { + checkRead(roff, Long.BYTES); + var value = rmem.getLong(roff); + roff += Long.BYTES; + return value; + } + + @Override + public long getLong(int roff) { + checkGet(roff, Long.BYTES); + return rmem.getLong(roff); + } + + @Override + public Buffer writeLong(long value) { + try { + wmem.putLong(woff, value); + woff += Long.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer setLong(int woff, long value) { + try { + wmem.putLong(woff, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public double readDouble() { + checkRead(roff, Double.BYTES); + var value = rmem.getDouble(roff); + roff += Double.BYTES; + return value; + } + + @Override + public double getDouble(int roff) { + checkGet(roff, Double.BYTES); + return rmem.getDouble(roff); + } + + @Override + public Buffer writeDouble(double value) { + try { + wmem.putDouble(woff, value); + woff += Double.BYTES; + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + public Buffer setDouble(int woff, double value) { + try { + wmem.putDouble(woff, value); + return this; + } catch (IndexOutOfBoundsException e) { + throw checkWriteState(e, woff); + } catch (ReadOnlyBufferException e) { + throw bufferIsReadOnly(); + } + } + + @Override + protected Owned prepareSend() { + var order = order(); + var roff = this.roff; + var woff = this.woff; + var readOnly = readOnly(); + ByteBuffer base = this.base; + ByteBuffer rmem = this.rmem; + makeInaccessible(); + return new Owned() { + @Override + public NioBuffer transferOwnership(Drop drop) { + NioBuffer copy = new NioBuffer(base, rmem, control, drop); + copy.order(order); + copy.roff = roff; + copy.woff = woff; + copy.readOnly(readOnly); + return copy; + } + }; + } + + void makeInaccessible() { + base = CLOSED_BUFFER; + rmem = CLOSED_BUFFER; + wmem = CLOSED_BUFFER; + roff = 0; + woff = 0; + } + + @Override + public boolean isOwned() { + return super.isOwned() && ((ArcDrop) unsafeGetDrop()).isOwned(); + } + + @Override + public int countBorrows() { + return super.countBorrows() + ((ArcDrop) unsafeGetDrop()).countBorrows(); + } + + private void checkRead(int index, int size) { + if (index < 0 || woff < index + size) { + throw readAccessCheckException(index); + } + } + + private void checkGet(int index, int size) { + if (index < 0 || capacity() < index + size) { + throw readAccessCheckException(index); + } + } + + private void checkWrite(int index, int size) { + if (index < roff || wmem.capacity() < index + size) { + throw writeAccessCheckException(index); + } + } + + private void checkSet(int index, int size) { + if (index < 0 || wmem.capacity() < index + size) { + throw writeAccessCheckException(index); + } + } + + private RuntimeException checkWriteState(IndexOutOfBoundsException ioobe, int offset) { + if (rmem == CLOSED_BUFFER) { + return bufferIsClosed(); + } + if (wmem != rmem) { + return bufferIsReadOnly(); + } + + IndexOutOfBoundsException exception = outOfBounds(offset); + exception.addSuppressed(ioobe); + return exception; + } + + private RuntimeException readAccessCheckException(int index) { + if (rmem == CLOSED_BUFFER) { + throw bufferIsClosed(); + } + return outOfBounds(index); + } + + private RuntimeException writeAccessCheckException(int index) { + if (rmem == CLOSED_BUFFER) { + throw bufferIsClosed(); + } + if (wmem != rmem) { + return bufferIsReadOnly(); + } + return outOfBounds(index); + } + + private static IllegalStateException bufferIsClosed() { + return new IllegalStateException("This buffer is closed."); + } + + private static IllegalStateException bufferIsReadOnly() { + return new IllegalStateException("This buffer is read-only."); + } + + private IndexOutOfBoundsException outOfBounds(int index) { + return new IndexOutOfBoundsException( + "Index " + index + " is out of bounds: [read 0 to " + woff + ", write 0 to " + + rmem.capacity() + "]."); + } + + ByteBuffer recoverable() { + return base; + } +} diff --git a/src/main/java/io/netty/buffer/api/bytebuffer/package-info.java b/src/main/java/io/netty/buffer/api/bytebuffer/package-info.java new file mode 100644 index 0000000..b49f818 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/bytebuffer/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Safe ByteBuffer based implementation. + */ +package io.netty.buffer.api.bytebuffer; diff --git a/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java b/src/main/java/io/netty/buffer/api/internal/ArcDrop.java similarity index 73% rename from src/main/java/io/netty/buffer/api/memseg/ArcDrop.java rename to src/main/java/io/netty/buffer/api/internal/ArcDrop.java index 293a88f..87fe945 100644 --- a/src/main/java/io/netty/buffer/api/memseg/ArcDrop.java +++ b/src/main/java/io/netty/buffer/api/internal/ArcDrop.java @@ -13,14 +13,14 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.buffer.api.memseg; +package io.netty.buffer.api.internal; import io.netty.buffer.api.Drop; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; -final class ArcDrop implements Drop { +public final class ArcDrop implements Drop { private static final VarHandle COUNT; static { try { @@ -30,31 +30,31 @@ final class ArcDrop implements Drop { } } - private final Drop delegate; + private final Drop delegate; @SuppressWarnings("FieldMayBeFinal") private volatile int count; - ArcDrop(Drop delegate) { + public ArcDrop(Drop delegate) { this.delegate = delegate; count = 1; } - static Drop wrap(Drop drop) { + public static Drop wrap(Drop drop) { if (drop.getClass() == ArcDrop.class) { return drop; } - return new ArcDrop(drop); + return new ArcDrop(drop); } - static Drop acquire(Drop drop) { + public static Drop acquire(Drop drop) { if (drop.getClass() == ArcDrop.class) { - ((ArcDrop) drop).increment(); + ((ArcDrop) drop).increment(); return drop; } - return new ArcDrop(drop); + return new ArcDrop(drop); } - ArcDrop increment() { + public ArcDrop increment() { int c; do { c = count; @@ -64,7 +64,7 @@ final class ArcDrop implements Drop { } @Override - public void drop(MemSegBuffer buf) { + public void drop(T obj) { int c; int n; do { @@ -73,33 +73,33 @@ final class ArcDrop implements Drop { checkValidState(c); } while (!COUNT.compareAndSet(this, c, n)); if (n == 0) { - delegate.drop(buf); + delegate.drop(obj); } } @Override - public void attach(MemSegBuffer obj) { + public void attach(T obj) { delegate.attach(obj); } - boolean isOwned() { + public boolean isOwned() { return count <= 1; } - int countBorrows() { + public int countBorrows() { return count - 1; } - Drop unwrap() { + public Drop unwrap() { return delegate; } @Override public String toString() { StringBuilder builder = new StringBuilder().append("ArcDrop(").append(count).append(", "); - Drop drop = this; - while ((drop = ((ArcDrop) drop).unwrap()) instanceof ArcDrop) { - builder.append(((ArcDrop) drop).count).append(", "); + Drop drop = this; + while ((drop = ((ArcDrop) drop).unwrap()) instanceof ArcDrop) { + builder.append(((ArcDrop) drop).count).append(", "); } return builder.append(drop).append(')').toString(); } diff --git a/src/main/java/io/netty/buffer/api/Statics.java b/src/main/java/io/netty/buffer/api/internal/Statics.java similarity index 54% rename from src/main/java/io/netty/buffer/api/Statics.java rename to src/main/java/io/netty/buffer/api/internal/Statics.java index 1df8e9e..61d9b16 100644 --- a/src/main/java/io/netty/buffer/api/Statics.java +++ b/src/main/java/io/netty/buffer/api/internal/Statics.java @@ -13,13 +13,17 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.buffer.api; +package io.netty.buffer.api.internal; + +import io.netty.buffer.api.Buffer; +import io.netty.buffer.api.Drop; import java.lang.invoke.MethodHandles.Lookup; import java.lang.invoke.VarHandle; import java.lang.ref.Cleaner; +import java.nio.ByteOrder; -interface Statics { +public interface Statics { Cleaner CLEANER = Cleaner.create(); Drop NO_OP_DROP = new Drop() { @Override @@ -39,4 +43,29 @@ interface Statics { throw new ExceptionInInitializerError(e); } } + + @SuppressWarnings("unchecked") + static Drop convert(Drop drop) { + return (Drop) drop; + } + + static void copyToViaReverseCursor(Buffer src, int srcPos, Buffer dest, int destPos, int length) { + // Iterate in reverse to account for src and dest buffer overlap. + var itr = src.openReverseCursor(srcPos + length - 1, length); + ByteOrder prevOrder = dest.order(); + // We read longs in BE, in reverse, so they need to be flipped for writing. + dest.order(ByteOrder.LITTLE_ENDIAN); + try { + while (itr.readLong()) { + long val = itr.getLong(); + length -= Long.BYTES; + dest.setLong(destPos + length, val); + } + while (itr.readByte()) { + dest.setByte(destPos + --length, itr.getByte()); + } + } finally { + dest.order(prevOrder); + } + } } diff --git a/src/main/java/io/netty/buffer/api/internal/package-info.java b/src/main/java/io/netty/buffer/api/internal/package-info.java new file mode 100644 index 0000000..dcf90af --- /dev/null +++ b/src/main/java/io/netty/buffer/api/internal/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Internal implementation details that can be shared among Buffer implementations. + */ +package io.netty.buffer.api.internal; diff --git a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java index db7b359..00ceea5 100644 --- a/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java +++ b/src/main/java/io/netty/buffer/api/memseg/AbstractMemorySegmentManager.java @@ -24,26 +24,28 @@ import jdk.incubator.foreign.MemorySegment; import java.lang.ref.Cleaner; +import static io.netty.buffer.api.internal.Statics.convert; + public abstract class AbstractMemorySegmentManager implements MemoryManager { @Override public abstract boolean isNative(); @Override - public Buffer allocateConfined(AllocatorControl alloc, long size, Drop drop, Cleaner cleaner) { + public Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner) { var segment = createSegment(size); if (cleaner != null) { segment = segment.registerCleaner(cleaner); } - return new MemSegBuffer(segment, segment, convert(drop), alloc); + return new MemSegBuffer(segment, segment, convert(drop), allocatorControl); } @Override - public Buffer allocateShared(AllocatorControl alloc, long size, Drop drop, Cleaner cleaner) { + public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop drop, Cleaner cleaner) { var segment = createSegment(size).share(); if (cleaner != null) { segment = segment.registerCleaner(cleaner); } - return new MemSegBuffer(segment, segment, convert(drop), alloc); + return new MemSegBuffer(segment, segment, convert(drop), allocatorControl); } protected abstract MemorySegment createSegment(long size); @@ -65,13 +67,8 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager { } @Override - public Buffer recoverMemory(Object recoverableMemory, Drop drop) { - var recovery = (RecoverableMemory) recoverableMemory; + public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop drop) { + var recovery = (RecoverableMemory) recoverableMemory; // TODO get rid of RecoverableMemory return recovery.recover(convert(drop)); } - - @SuppressWarnings("unchecked") - private static Drop convert(Drop drop) { - return (Drop) drop; - } } diff --git a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java index 01d3e08..b49e8fa 100644 --- a/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java +++ b/src/main/java/io/netty/buffer/api/memseg/MemSegBuffer.java @@ -30,6 +30,8 @@ import io.netty.buffer.api.RcSupport; import io.netty.buffer.api.adaptor.BufferIntegratable; import io.netty.buffer.api.adaptor.ByteBufAdaptor; import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor; +import io.netty.buffer.api.internal.ArcDrop; +import io.netty.buffer.api.internal.Statics; import jdk.incubator.foreign.MemorySegment; import java.nio.ByteBuffer; @@ -153,9 +155,9 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } @Override - public MemSegBuffer readerOffset(int index) { - checkRead(index, 0); - roff = index; + public MemSegBuffer readerOffset(int offset) { + checkRead(offset, 0); + roff = offset; return this; } @@ -165,9 +167,9 @@ class MemSegBuffer extends RcSupport implements Buffer, Re } @Override - public MemSegBuffer writerOffset(int index) { - checkWrite(index, 0); - woff = index; + public MemSegBuffer writerOffset(int offset) { + checkWrite(offset, 0); + woff = offset; return this; } @@ -194,6 +196,11 @@ class MemSegBuffer extends RcSupport implements Buffer, Re throw new UnsupportedOperationException("This component has no backing array."); } + @Override + public int readableArrayLength() { + throw new UnsupportedOperationException("This component has no backing array."); + } + @Override public long readableNativeAddress() { return nativeAddress(); @@ -229,6 +236,11 @@ class MemSegBuffer extends RcSupport implements Buffer, Re throw new UnsupportedOperationException("This component has no backing array."); } + @Override + public int writableArrayLength() { + throw new UnsupportedOperationException("This component has no backing array."); + } + @Override public long writableNativeAddress() { return nativeAddress(); @@ -324,23 +336,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re return; } - // Iterate in reverse to account for src and dest buffer overlap. - var itr = openReverseCursor(srcPos + length - 1, length); - ByteOrder prevOrder = dest.order(); - // We read longs in BE, in reverse, so they need to be flipped for writing. - dest.order(ByteOrder.LITTLE_ENDIAN); - try { - while (itr.readLong()) { - long val = itr.getLong(); - length -= Long.BYTES; - dest.setLong(destPos + length, val); - } - while (itr.readByte()) { - dest.setByte(destPos + --length, itr.getByte()); - } - } finally { - dest.order(prevOrder); - } + Statics.copyToViaReverseCursor(this, srcPos, dest, destPos, length); } @Override @@ -526,12 +522,13 @@ class MemSegBuffer extends RcSupport implements Buffer, Re int woff = this.woff; drop.drop(this); while (drop instanceof ArcDrop) { - drop = ((ArcDrop) drop).unwrap(); + drop = ((ArcDrop) drop).unwrap(); } - unsafeSetDrop(new ArcDrop(drop)); + unsafeSetDrop(new ArcDrop<>(drop)); this.roff = roff; this.woff = woff; } else { + // TODO would we ever get here? alloc.recoverMemory(recoverableMemory()); } @@ -546,10 +543,10 @@ class MemSegBuffer extends RcSupport implements Buffer, Re if (!isOwned()) { throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned.")); } - var drop = (ArcDrop) unsafeGetDrop(); - unsafeSetDrop(new ArcDrop(drop)); + var drop = (ArcDrop) unsafeGetDrop(); + unsafeSetDrop(new ArcDrop<>(drop)); var bifurcatedSeg = seg.asSlice(0, woff); - var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop(drop.increment()), alloc); + var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop<>(drop.increment()), alloc); bifurcatedBuf.woff = woff; bifurcatedBuf.roff = roff; bifurcatedBuf.order(order); @@ -1083,7 +1080,7 @@ class MemSegBuffer extends RcSupport implements Buffer, Re var woff = this.woff; var readOnly = readOnly(); boolean isConfined = seg.ownerThread() == null; - MemorySegment transferSegment = isConfined? seg : seg.share(); + MemorySegment transferSegment = isConfined? seg : seg.share(); // TODO remove confimenent checks MemorySegment base = this.base; makeInaccessible(); return new Owned() { @@ -1109,12 +1106,12 @@ class MemSegBuffer extends RcSupport implements Buffer, Re @Override public boolean isOwned() { - return super.isOwned() && ((ArcDrop) unsafeGetDrop()).isOwned(); + return super.isOwned() && ((ArcDrop) unsafeGetDrop()).isOwned(); } @Override public int countBorrows() { - return super.countBorrows() + ((ArcDrop) unsafeGetDrop()).countBorrows(); + return super.countBorrows() + ((ArcDrop) unsafeGetDrop()).countBorrows(); } private void checkRead(int index, int size) { @@ -1199,11 +1196,6 @@ class MemSegBuffer extends RcSupport implements Buffer, Re return bba; } - @Override - public int readableBytes() { - return writerOffset() - readerOffset(); - } - @Override public MemSegBuffer retain(int increment) { for (int i = 0; i < increment; i++) { diff --git a/src/main/java/io/netty/buffer/api/memseg/SegmentMemoryManagers.java b/src/main/java/io/netty/buffer/api/memseg/SegmentMemoryManagers.java new file mode 100644 index 0000000..c004270 --- /dev/null +++ b/src/main/java/io/netty/buffer/api/memseg/SegmentMemoryManagers.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.memseg; + +import io.netty.buffer.api.MemoryManager; +import io.netty.buffer.api.MemoryManagers; + +public class SegmentMemoryManagers implements MemoryManagers { + @Override + public MemoryManager getHeapMemoryManager() { + return new HeapMemorySegmentManager(); + } + + @Override + public MemoryManager getNativeMemoryManager() { + return new NativeMemorySegmentManager(); + } + + @Override + public String toString() { + return "MS"; + } +} diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java new file mode 100644 index 0000000..4063afe --- /dev/null +++ b/src/main/java/module-info.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +module netty.incubator.buffer { + requires jdk.incubator.foreign; + requires io.netty.common; + requires io.netty.buffer; + + // Optional dependencies, needed for some of the examples. + requires static java.logging; + + exports io.netty.buffer.api; + exports io.netty.buffer.api.adaptor; + + uses io.netty.buffer.api.MemoryManagers; + + // Permit reflective access to non-public members. + // Also means we don't have to make all test methods etc. public for JUnit to access them. + opens io.netty.buffer.api; + + provides io.netty.buffer.api.MemoryManagers with + io.netty.buffer.api.memseg.SegmentMemoryManagers, + io.netty.buffer.api.bytebuffer.ByteBufferMemoryManagers; +} \ No newline at end of file diff --git a/src/test/java/io/netty/buffer/api/BufferTest.java b/src/test/java/io/netty/buffer/api/BufferTest.java index d542abf..f2a9c43 100644 --- a/src/test/java/io/netty/buffer/api/BufferTest.java +++ b/src/test/java/io/netty/buffer/api/BufferTest.java @@ -31,6 +31,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.ReadOnlyBufferException; import java.text.ParseException; +import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedList; import java.util.List; @@ -50,6 +51,7 @@ import static io.netty.buffer.api.Fixture.Properties.COMPOSITE; import static io.netty.buffer.api.Fixture.Properties.DIRECT; import static io.netty.buffer.api.Fixture.Properties.HEAP; import static io.netty.buffer.api.Fixture.Properties.POOLED; +import static io.netty.buffer.api.MemoryManagers.using; import static java.nio.ByteOrder.BIG_ENDIAN; import static java.nio.ByteOrder.LITTLE_ENDIAN; import static org.assertj.core.api.Assertions.assertThat; @@ -102,6 +104,20 @@ public class BufferTest { private static Stream fixtureCombinations() { List initFixtures = initialAllocators(); + + // Multiply by all MemoryManagers. + List loadableManagers = new ArrayList<>(); + MemoryManagers.getAllManagers().forEach(provider -> { + loadableManagers.add(provider.get()); + }); + initFixtures = initFixtures.stream().flatMap(f -> { + Stream.Builder builder = Stream.builder(); + for (MemoryManagers managers : loadableManagers) { + builder.add(new Fixture(f + "/" + managers, () -> using(managers, f), f.getProperties())); + } + return builder.build(); + }).toList(); + Builder builder = Stream.builder(); initFixtures.forEach(builder); @@ -2497,6 +2513,17 @@ public class BufferTest { } } + @ParameterizedTest + @MethodSource("allocators") + public void closedBuffersAreNotReadOnly(Fixture fixture) { + try (BufferAllocator allocator = fixture.createAllocator()) { + Buffer buf = allocator.allocate(8); + buf.readOnly(true); + buf.close(); + assertFalse(buf.readOnly()); + } + } + @ParameterizedTest @MethodSource("allocators") public void readOnlyBufferMustBecomeWritableAgainAfterTogglingReadOnlyOff(Fixture fixture) { @@ -2833,10 +2860,12 @@ public class BufferTest { if (component.hasReadableArray()) { byte[] array = component.readableArray(); + byte[] arrayCopy = new byte[component.readableArrayLength()]; + System.arraycopy(array, component.readableArrayOffset(), arrayCopy, 0, arrayCopy.length); if (buffer.order() == BIG_ENDIAN) { - assertThat(array).containsExactly(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08); + assertThat(arrayCopy).containsExactly(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08); } else { - assertThat(array).containsExactly(0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01); + assertThat(arrayCopy).containsExactly(0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01); } } @@ -3013,12 +3042,16 @@ public class BufferTest { assertThat(component.writableNativeAddress()).isZero(); } + buf.writerOffset(0); if (component.hasWritableArray()) { byte[] array = component.writableArray(); + int offset = component.writableArrayOffset(); + byte[] arrayCopy = new byte[component.writableArrayLength()]; + System.arraycopy(array, offset, arrayCopy, 0, arrayCopy.length); if (buffer.order() == BIG_ENDIAN) { - assertThat(array).containsExactly(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08); + assertThat(arrayCopy).containsExactly(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08); } else { - assertThat(array).containsExactly(0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01); + assertThat(arrayCopy).containsExactly(0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01); } }