Add a ByteBuffer based implementation of Buffer

Motivation:
We need a new implementation of our new API that supports Java 11, since that is what Netty 5 will most likely baseline on.
We also need an implementation that does not rely on Unsafe.
This leaves us with ByteBuffer as the underlying currency of memory.

Modification:
- Add a NioBuffer implementation and associated supporting classes.
- The entry-point for this is a new MemoryManagers API, which is used to pick the implementation and provide the on-/off-heap MemoryManager implementations.
- Add a mechanism to configure/override which MemoryManagers implementation to use.
- The MemoryManagers implementations are service-loadable, so new ones can be discovered at runtime.
- The existing MemorySegment based implementation also get a MemoryManagers implementation.
- Expand the BufferTest to include all combinations of all implementations. We now run 360.000 tests in BufferTest.
- Some common infrastructure, like ArcDrop, is moved to its own package.
- Add a module-info.java to control the service loading, and the visibility in the various packages.
- Some pom.xml file updates to support our now module based project.

Result:
We have an implementation that should work on Java 11, but we currently don't build or test on 11.
More work needs to happen before that is a reality.
This commit is contained in:
Chris Vest 2021-03-18 15:18:22 +01:00
parent 0272b1cf84
commit 95709828bf
26 changed files with 1683 additions and 102 deletions

View File

@ -69,7 +69,7 @@
<properties>
<javaModuleName>io.netty.incubator.buffer</javaModuleName>
<netty.version>5.0.0.Final-SNAPSHOT</netty.version>
<netty.build.version>28</netty.build.version>
<netty.build.version>29-SNAPSHOT</netty.build.version>
<java.version>16</java.version>
<junit.version>5.7.0</junit.version>
<surefire.version>3.0.0-M5</surefire.version>
@ -122,7 +122,7 @@
</plugin>
<plugin>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>3.1.0</version>
<version>3.1.2</version>
<executions>
<execution>
<id>check-style</id>
@ -148,7 +148,7 @@
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>8.29</version>
<version>8.41</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
@ -235,7 +235,7 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<version>2.5.4</version>
<version>5.1.1</version>
<executions>
<execution>
<id>generate-manifest</id>
@ -399,6 +399,7 @@
<artifactId>netty-buffer</artifactId>
<version>${netty.version}</version>
<type>test-jar</type>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>

View File

@ -408,9 +408,7 @@ public interface Buffer extends Rc<Buffer>, BufferAccessors {
*
* @return A {@link ByteCursor} for iterating the readable bytes of this buffer.
*/
default ByteCursor openCursor() {
return openCursor(readerOffset(), readableBytes());
}
ByteCursor openCursor();
/**
* Open a cursor to iterate the given number bytes of this buffer, starting at the given offset.

View File

@ -15,6 +15,8 @@
*/
package io.netty.buffer.api;
import io.netty.buffer.api.internal.Statics;
import java.nio.ByteOrder;
/**
@ -75,18 +77,18 @@ public interface BufferAllocator extends AutoCloseable {
}
static BufferAllocator heap() {
return new ManagedBufferAllocator(MemoryManager.getHeapMemoryManager(), Statics.CLEANER);
return new ManagedBufferAllocator(MemoryManagers.getManagers().getHeapMemoryManager(), Statics.CLEANER);
}
static BufferAllocator direct() {
return new ManagedBufferAllocator(MemoryManager.getNativeMemoryManager(), Statics.CLEANER);
return new ManagedBufferAllocator(MemoryManagers.getManagers().getNativeMemoryManager(), Statics.CLEANER);
}
static BufferAllocator pooledHeap() {
return new SizeClassedMemoryPool(MemoryManager.getHeapMemoryManager());
return new SizeClassedMemoryPool(MemoryManagers.getManagers().getHeapMemoryManager());
}
static BufferAllocator pooledDirect() {
return new SizeClassedMemoryPool(MemoryManager.getNativeMemoryManager());
return new SizeClassedMemoryPool(MemoryManagers.getManagers().getNativeMemoryManager());
}
}

View File

@ -18,7 +18,7 @@ package io.netty.buffer.api;
import java.lang.invoke.VarHandle;
import java.util.Objects;
import static io.netty.buffer.api.Statics.findVarHandle;
import static io.netty.buffer.api.internal.Statics.findVarHandle;
import static java.lang.invoke.MethodHandles.lookup;
/**

View File

@ -20,8 +20,8 @@ import java.lang.ref.Cleaner.Cleanable;
import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.buffer.api.Statics.CLEANER;
import static io.netty.buffer.api.Statics.findVarHandle;
import static io.netty.buffer.api.internal.Statics.CLEANER;
import static io.netty.buffer.api.internal.Statics.findVarHandle;
import static java.lang.invoke.MethodHandles.lookup;
class CleanerPooledDrop implements Drop<Buffer> {

View File

@ -396,6 +396,11 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
}
@Override
public ByteCursor openCursor() {
return openCursor(readerOffset(), readableBytes());
}
@Override
public ByteCursor openCursor(int fromOffset, int length) {
if (fromOffset < 0) {
@ -1147,6 +1152,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
}
throw throwable;
}
boolean readOnly = this.readOnly;
makeInaccessible();
return new Owned<CompositeBuffer>() {
@Override
@ -1167,6 +1173,7 @@ final class CompositeBuffer extends RcSupport<Buffer, CompositeBuffer> implement
capacity = 0;
roff = 0;
woff = 0;
readOnly = false;
closed = true;
}

View File

@ -17,7 +17,7 @@ package io.netty.buffer.api;
import java.lang.ref.Cleaner;
import static io.netty.buffer.api.Statics.NO_OP_DROP;
import static io.netty.buffer.api.internal.Statics.NO_OP_DROP;
class ManagedBufferAllocator implements BufferAllocator, AllocatorControl {
private final MemoryManager manager;
@ -44,6 +44,6 @@ class ManagedBufferAllocator implements BufferAllocator, AllocatorControl {
@Override
public void recoverMemory(Object memory) {
// Free the recovered memory.
manager.recoverMemory(memory, manager.drop()).close();
manager.recoverMemory(this, memory, manager.drop()).close();
}
}

View File

@ -21,19 +21,11 @@ import io.netty.buffer.api.memseg.NativeMemorySegmentManager;
import java.lang.ref.Cleaner;
public interface MemoryManager {
static MemoryManager getHeapMemoryManager() {
return new HeapMemorySegmentManager();
}
static MemoryManager getNativeMemoryManager() {
return new NativeMemorySegmentManager();
}
boolean isNative();
Buffer allocateConfined(AllocatorControl alloc, long size, Drop<Buffer> drop, Cleaner cleaner);
Buffer allocateShared(AllocatorControl allo, long size, Drop<Buffer> drop, Cleaner cleaner);
Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner);
Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner);
Drop<Buffer> drop();
Object unwrapRecoverableMemory(Buffer buf);
int capacityOfRecoverableMemory(Object memory);
Buffer recoverMemory(Object recoverableMemory, Drop<Buffer> drop);
Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop<Buffer> drop);
}

View File

@ -0,0 +1,75 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import java.util.ServiceLoader;
import java.util.function.Supplier;
import java.util.stream.Stream;
/**
* The MemoryManagers interface is the handle through which {@link BufferAllocator buffer allocators} access the low
* level memory management APIs.
* <p>
* This is hidden behind this interface in order to make allocation and pool agnostic and reusable across buffer and
* memory implementations.
*/
public interface MemoryManagers {
/**
* Get the default, or currently configured, memory managers instance.
* @return A MemoryManagers instance.
*/
static MemoryManagers getManagers() {
return MemoryManagersOverride.getManagers();
}
/**
* Temporarily override the default configured memory managers instance.
* <p>
* Calls to {@link #getManagers()} from within the given supplier will get the given managers instance.
*
* @param managers Override the default configured managers instance with this instance.
* @param supplier The supplier function to be called while the override is in place.
* @param <T> The result type from the supplier.
* @return The result from the supplier.
*/
static <T> T using(MemoryManagers managers, Supplier<T> supplier) {
return MemoryManagersOverride.using(managers, supplier);
}
/**
* Get a lazy-loading stream of all available memory managers.
*
* @return A stream of providers of memory managers instances.
*/
static Stream<ServiceLoader.Provider<MemoryManagers>> getAllManagers() {
var loader = ServiceLoader.load(MemoryManagers.class);
return loader.stream();
}
/**
* Get a {@link MemoryManager} instance that is suitable for allocating on-heap {@link Buffer} instances.
*
* @return An on-heap {@link MemoryManager}.
*/
MemoryManager getHeapMemoryManager();
/**
* Get a {@link MemoryManager} instance that is suitable for allocating off-heap {@link Buffer} instances.
*
* @return An off-heap {@link MemoryManager}.
*/
MemoryManager getNativeMemoryManager();
}

View File

@ -0,0 +1,52 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
import io.netty.buffer.api.memseg.SegmentMemoryManagers;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
final class MemoryManagersOverride {
private static final MemoryManagers DEFAULT = new SegmentMemoryManagers();
private static final AtomicInteger OVERRIDES_AVAILABLE = new AtomicInteger();
private static final Map<Thread, MemoryManagers> OVERRIDES = Collections.synchronizedMap(new IdentityHashMap<>());
private MemoryManagersOverride() {
}
static MemoryManagers getManagers() {
if (OVERRIDES_AVAILABLE.get() > 0) {
return OVERRIDES.getOrDefault(Thread.currentThread(), DEFAULT);
}
return DEFAULT;
}
static <T> T using(MemoryManagers managers, Supplier<T> supplier) {
Thread thread = Thread.currentThread();
OVERRIDES.put(thread, managers);
OVERRIDES_AVAILABLE.incrementAndGet();
try {
return supplier.get();
} finally {
OVERRIDES_AVAILABLE.decrementAndGet();
OVERRIDES.remove(thread);
}
}
}

View File

@ -42,6 +42,8 @@ public interface ReadableComponent {
*
* @return A byte array of the contents of this component.
* @throws UnsupportedOperationException if {@link #hasReadableArray()} returns {@code false}.
* @see #readableArrayOffset()
* @see #readableArrayLength()
*/
byte[] readableArray();
@ -53,6 +55,15 @@ public interface ReadableComponent {
*/
int readableArrayOffset();
/**
* The number of bytes in the {@link #readableArray()} that belong to this component.
*
* @return The number of bytes, from the {@link #readableArrayOffset()} into the {@link #readableArray()},
* that belong to this component.
* @throws UnsupportedOperationException if {@link #hasReadableArray()} returns {@code false}.
*/
int readableArrayLength();
/**
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
* <p>

View File

@ -15,13 +15,15 @@
*/
package io.netty.buffer.api;
import io.netty.buffer.api.internal.Statics;
import java.lang.invoke.VarHandle;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import static io.netty.buffer.api.Statics.NO_OP_DROP;
import static io.netty.buffer.api.internal.Statics.NO_OP_DROP;
import static java.lang.invoke.MethodHandles.lookup;
class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<Buffer> {
@ -129,7 +131,7 @@ class SizeClassedMemoryPool implements BufferAllocator, AllocatorControl, Drop<B
private Buffer recoverMemoryIntoBuffer(Object memory) {
var drop = getDrop();
var buf = manager.recoverMemory(memory, drop);
var buf = manager.recoverMemory(this, memory, drop);
drop.attach(buf);
return buf;
}

View File

@ -17,7 +17,7 @@ package io.netty.buffer.api;
import java.lang.invoke.VarHandle;
import static io.netty.buffer.api.Statics.findVarHandle;
import static io.netty.buffer.api.internal.Statics.findVarHandle;
import static java.lang.invoke.MethodHandles.lookup;
class TransferSend<I extends Rc<I>, T extends Rc<I>> implements Send<I> {

View File

@ -35,6 +35,8 @@ public interface WritableComponent {
*
* @return A byte array of the contents of this component.
* @throws UnsupportedOperationException if {@link #hasWritableArray()} returns {@code false}.
* @see #writableArrayOffset()
* @see #writableArrayLength()
*/
byte[] writableArray();
@ -46,6 +48,15 @@ public interface WritableComponent {
*/
int writableArrayOffset();
/**
* The number of bytes in the {@link #writableArray()} that belong to this component.
*
* @return The number of bytes, from the {@link #writableArrayOffset()} into the {@link #writableArray()},
* that belong to this component.
* @throws UnsupportedOperationException if {@link #hasWritableArray()} returns {@code false}.
*/
int writableArrayLength();
/**
* Give the native memory address backing this buffer, or return 0 if this buffer has no native memory address.
*

View File

@ -0,0 +1,76 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.bytebuffer;
import io.netty.buffer.api.AllocatorControl;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Drop;
import io.netty.buffer.api.MemoryManager;
import io.netty.buffer.api.internal.Statics;
import java.lang.ref.Cleaner;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import static io.netty.buffer.api.internal.Statics.convert;
public class ByteBufferMemoryManager implements MemoryManager {
private final boolean direct;
public ByteBufferMemoryManager(boolean direct) {
this.direct = direct;
}
@Override
public boolean isNative() {
return direct;
}
@Override
public Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
return allocateShared(allocatorControl, size, drop, cleaner);
}
@Override
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
int capacity = Math.toIntExact(size);
ByteBuffer buffer = direct? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
buffer.order(ByteOrder.nativeOrder());
return new NioBuffer(buffer, buffer, allocatorControl, convert(drop));
}
@Override
public Drop<Buffer> drop() {
return Statics.NO_OP_DROP;
}
@Override
public Object unwrapRecoverableMemory(Buffer buf) {
return ((NioBuffer) buf).recoverable();
}
@Override
public int capacityOfRecoverableMemory(Object memory) {
//noinspection OverlyStrongTypeCast
return ((ByteBuffer) memory).capacity();
}
@Override
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop<Buffer> drop) {
ByteBuffer memory = (ByteBuffer) recoverableMemory;
return new NioBuffer(memory, memory, allocatorControl, convert(drop));
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.bytebuffer;
import io.netty.buffer.api.MemoryManager;
import io.netty.buffer.api.MemoryManagers;
public class ByteBufferMemoryManagers implements MemoryManagers {
@Override
public MemoryManager getHeapMemoryManager() {
return new ByteBufferMemoryManager(false);
}
@Override
public MemoryManager getNativeMemoryManager() {
return new ByteBufferMemoryManager(true);
}
@Override
public String toString() {
return "BB";
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,20 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Safe ByteBuffer based implementation.
*/
package io.netty.buffer.api.bytebuffer;

View File

@ -13,14 +13,14 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.memseg;
package io.netty.buffer.api.internal;
import io.netty.buffer.api.Drop;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
final class ArcDrop implements Drop<MemSegBuffer> {
public final class ArcDrop<T> implements Drop<T> {
private static final VarHandle COUNT;
static {
try {
@ -30,31 +30,31 @@ final class ArcDrop implements Drop<MemSegBuffer> {
}
}
private final Drop<MemSegBuffer> delegate;
private final Drop<T> delegate;
@SuppressWarnings("FieldMayBeFinal")
private volatile int count;
ArcDrop(Drop<MemSegBuffer> delegate) {
public ArcDrop(Drop<T> delegate) {
this.delegate = delegate;
count = 1;
}
static Drop<MemSegBuffer> wrap(Drop<MemSegBuffer> drop) {
public static <X> Drop<X> wrap(Drop<X> drop) {
if (drop.getClass() == ArcDrop.class) {
return drop;
}
return new ArcDrop(drop);
return new ArcDrop<X>(drop);
}
static Drop<MemSegBuffer> acquire(Drop<MemSegBuffer> drop) {
public static <X> Drop<X> acquire(Drop<X> drop) {
if (drop.getClass() == ArcDrop.class) {
((ArcDrop) drop).increment();
((ArcDrop<X>) drop).increment();
return drop;
}
return new ArcDrop(drop);
return new ArcDrop<X>(drop);
}
ArcDrop increment() {
public ArcDrop<T> increment() {
int c;
do {
c = count;
@ -64,7 +64,7 @@ final class ArcDrop implements Drop<MemSegBuffer> {
}
@Override
public void drop(MemSegBuffer buf) {
public void drop(T obj) {
int c;
int n;
do {
@ -73,33 +73,33 @@ final class ArcDrop implements Drop<MemSegBuffer> {
checkValidState(c);
} while (!COUNT.compareAndSet(this, c, n));
if (n == 0) {
delegate.drop(buf);
delegate.drop(obj);
}
}
@Override
public void attach(MemSegBuffer obj) {
public void attach(T obj) {
delegate.attach(obj);
}
boolean isOwned() {
public boolean isOwned() {
return count <= 1;
}
int countBorrows() {
public int countBorrows() {
return count - 1;
}
Drop<MemSegBuffer> unwrap() {
public Drop<T> unwrap() {
return delegate;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder().append("ArcDrop(").append(count).append(", ");
Drop<MemSegBuffer> drop = this;
while ((drop = ((ArcDrop) drop).unwrap()) instanceof ArcDrop) {
builder.append(((ArcDrop) drop).count).append(", ");
Drop<T> drop = this;
while ((drop = ((ArcDrop<T>) drop).unwrap()) instanceof ArcDrop) {
builder.append(((ArcDrop<T>) drop).count).append(", ");
}
return builder.append(drop).append(')').toString();
}

View File

@ -13,13 +13,17 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api;
package io.netty.buffer.api.internal;
import io.netty.buffer.api.Buffer;
import io.netty.buffer.api.Drop;
import java.lang.invoke.MethodHandles.Lookup;
import java.lang.invoke.VarHandle;
import java.lang.ref.Cleaner;
import java.nio.ByteOrder;
interface Statics {
public interface Statics {
Cleaner CLEANER = Cleaner.create();
Drop<Buffer> NO_OP_DROP = new Drop<Buffer>() {
@Override
@ -39,4 +43,29 @@ interface Statics {
throw new ExceptionInInitializerError(e);
}
}
@SuppressWarnings("unchecked")
static <T, R> Drop<R> convert(Drop<T> drop) {
return (Drop<R>) drop;
}
static void copyToViaReverseCursor(Buffer src, int srcPos, Buffer dest, int destPos, int length) {
// Iterate in reverse to account for src and dest buffer overlap.
var itr = src.openReverseCursor(srcPos + length - 1, length);
ByteOrder prevOrder = dest.order();
// We read longs in BE, in reverse, so they need to be flipped for writing.
dest.order(ByteOrder.LITTLE_ENDIAN);
try {
while (itr.readLong()) {
long val = itr.getLong();
length -= Long.BYTES;
dest.setLong(destPos + length, val);
}
while (itr.readByte()) {
dest.setByte(destPos + --length, itr.getByte());
}
} finally {
dest.order(prevOrder);
}
}
}

View File

@ -0,0 +1,20 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Internal implementation details that can be shared among Buffer implementations.
*/
package io.netty.buffer.api.internal;

View File

@ -24,26 +24,28 @@ import jdk.incubator.foreign.MemorySegment;
import java.lang.ref.Cleaner;
import static io.netty.buffer.api.internal.Statics.convert;
public abstract class AbstractMemorySegmentManager implements MemoryManager {
@Override
public abstract boolean isNative();
@Override
public Buffer allocateConfined(AllocatorControl alloc, long size, Drop<Buffer> drop, Cleaner cleaner) {
public Buffer allocateConfined(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
var segment = createSegment(size);
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuffer(segment, segment, convert(drop), alloc);
return new MemSegBuffer(segment, segment, convert(drop), allocatorControl);
}
@Override
public Buffer allocateShared(AllocatorControl alloc, long size, Drop<Buffer> drop, Cleaner cleaner) {
public Buffer allocateShared(AllocatorControl allocatorControl, long size, Drop<Buffer> drop, Cleaner cleaner) {
var segment = createSegment(size).share();
if (cleaner != null) {
segment = segment.registerCleaner(cleaner);
}
return new MemSegBuffer(segment, segment, convert(drop), alloc);
return new MemSegBuffer(segment, segment, convert(drop), allocatorControl);
}
protected abstract MemorySegment createSegment(long size);
@ -65,13 +67,8 @@ public abstract class AbstractMemorySegmentManager implements MemoryManager {
}
@Override
public Buffer recoverMemory(Object recoverableMemory, Drop<Buffer> drop) {
var recovery = (RecoverableMemory) recoverableMemory;
public Buffer recoverMemory(AllocatorControl allocatorControl, Object recoverableMemory, Drop<Buffer> drop) {
var recovery = (RecoverableMemory) recoverableMemory; // TODO get rid of RecoverableMemory
return recovery.recover(convert(drop));
}
@SuppressWarnings("unchecked")
private static <T, R> Drop<R> convert(Drop<T> drop) {
return (Drop<R>) drop;
}
}

View File

@ -30,6 +30,8 @@ import io.netty.buffer.api.RcSupport;
import io.netty.buffer.api.adaptor.BufferIntegratable;
import io.netty.buffer.api.adaptor.ByteBufAdaptor;
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
import io.netty.buffer.api.internal.ArcDrop;
import io.netty.buffer.api.internal.Statics;
import jdk.incubator.foreign.MemorySegment;
import java.nio.ByteBuffer;
@ -153,9 +155,9 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
}
@Override
public MemSegBuffer readerOffset(int index) {
checkRead(index, 0);
roff = index;
public MemSegBuffer readerOffset(int offset) {
checkRead(offset, 0);
roff = offset;
return this;
}
@ -165,9 +167,9 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
}
@Override
public MemSegBuffer writerOffset(int index) {
checkWrite(index, 0);
woff = index;
public MemSegBuffer writerOffset(int offset) {
checkWrite(offset, 0);
woff = offset;
return this;
}
@ -194,6 +196,11 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
throw new UnsupportedOperationException("This component has no backing array.");
}
@Override
public int readableArrayLength() {
throw new UnsupportedOperationException("This component has no backing array.");
}
@Override
public long readableNativeAddress() {
return nativeAddress();
@ -229,6 +236,11 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
throw new UnsupportedOperationException("This component has no backing array.");
}
@Override
public int writableArrayLength() {
throw new UnsupportedOperationException("This component has no backing array.");
}
@Override
public long writableNativeAddress() {
return nativeAddress();
@ -324,23 +336,7 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
return;
}
// Iterate in reverse to account for src and dest buffer overlap.
var itr = openReverseCursor(srcPos + length - 1, length);
ByteOrder prevOrder = dest.order();
// We read longs in BE, in reverse, so they need to be flipped for writing.
dest.order(ByteOrder.LITTLE_ENDIAN);
try {
while (itr.readLong()) {
long val = itr.getLong();
length -= Long.BYTES;
dest.setLong(destPos + length, val);
}
while (itr.readByte()) {
dest.setByte(destPos + --length, itr.getByte());
}
} finally {
dest.order(prevOrder);
}
Statics.copyToViaReverseCursor(this, srcPos, dest, destPos, length);
}
@Override
@ -526,12 +522,13 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
int woff = this.woff;
drop.drop(this);
while (drop instanceof ArcDrop) {
drop = ((ArcDrop) drop).unwrap();
drop = ((ArcDrop<MemSegBuffer>) drop).unwrap();
}
unsafeSetDrop(new ArcDrop(drop));
unsafeSetDrop(new ArcDrop<>(drop));
this.roff = roff;
this.woff = woff;
} else {
// TODO would we ever get here?
alloc.recoverMemory(recoverableMemory());
}
@ -546,10 +543,10 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
if (!isOwned()) {
throw attachTrace(new IllegalStateException("Cannot bifurcate a buffer that is not owned."));
}
var drop = (ArcDrop) unsafeGetDrop();
unsafeSetDrop(new ArcDrop(drop));
var drop = (ArcDrop<MemSegBuffer>) unsafeGetDrop();
unsafeSetDrop(new ArcDrop<>(drop));
var bifurcatedSeg = seg.asSlice(0, woff);
var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop(drop.increment()), alloc);
var bifurcatedBuf = new MemSegBuffer(base, bifurcatedSeg, new ArcDrop<>(drop.increment()), alloc);
bifurcatedBuf.woff = woff;
bifurcatedBuf.roff = roff;
bifurcatedBuf.order(order);
@ -1083,7 +1080,7 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
var woff = this.woff;
var readOnly = readOnly();
boolean isConfined = seg.ownerThread() == null;
MemorySegment transferSegment = isConfined? seg : seg.share();
MemorySegment transferSegment = isConfined? seg : seg.share(); // TODO remove confimenent checks
MemorySegment base = this.base;
makeInaccessible();
return new Owned<MemSegBuffer>() {
@ -1109,12 +1106,12 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
@Override
public boolean isOwned() {
return super.isOwned() && ((ArcDrop) unsafeGetDrop()).isOwned();
return super.isOwned() && ((ArcDrop<MemSegBuffer>) unsafeGetDrop()).isOwned();
}
@Override
public int countBorrows() {
return super.countBorrows() + ((ArcDrop) unsafeGetDrop()).countBorrows();
return super.countBorrows() + ((ArcDrop<MemSegBuffer>) unsafeGetDrop()).countBorrows();
}
private void checkRead(int index, int size) {
@ -1199,11 +1196,6 @@ class MemSegBuffer extends RcSupport<Buffer, MemSegBuffer> implements Buffer, Re
return bba;
}
@Override
public int readableBytes() {
return writerOffset() - readerOffset();
}
@Override
public MemSegBuffer retain(int increment) {
for (int i = 0; i < increment; i++) {

View File

@ -0,0 +1,36 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer.api.memseg;
import io.netty.buffer.api.MemoryManager;
import io.netty.buffer.api.MemoryManagers;
public class SegmentMemoryManagers implements MemoryManagers {
@Override
public MemoryManager getHeapMemoryManager() {
return new HeapMemorySegmentManager();
}
@Override
public MemoryManager getNativeMemoryManager() {
return new NativeMemorySegmentManager();
}
@Override
public String toString() {
return "MS";
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2021 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
module netty.incubator.buffer {
requires jdk.incubator.foreign;
requires io.netty.common;
requires io.netty.buffer;
// Optional dependencies, needed for some of the examples.
requires static java.logging;
exports io.netty.buffer.api;
exports io.netty.buffer.api.adaptor;
uses io.netty.buffer.api.MemoryManagers;
// Permit reflective access to non-public members.
// Also means we don't have to make all test methods etc. public for JUnit to access them.
opens io.netty.buffer.api;
provides io.netty.buffer.api.MemoryManagers with
io.netty.buffer.api.memseg.SegmentMemoryManagers,
io.netty.buffer.api.bytebuffer.ByteBufferMemoryManagers;
}

View File

@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.ReadOnlyBufferException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@ -50,6 +51,7 @@ import static io.netty.buffer.api.Fixture.Properties.COMPOSITE;
import static io.netty.buffer.api.Fixture.Properties.DIRECT;
import static io.netty.buffer.api.Fixture.Properties.HEAP;
import static io.netty.buffer.api.Fixture.Properties.POOLED;
import static io.netty.buffer.api.MemoryManagers.using;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static java.nio.ByteOrder.LITTLE_ENDIAN;
import static org.assertj.core.api.Assertions.assertThat;
@ -102,6 +104,20 @@ public class BufferTest {
private static Stream<Fixture> fixtureCombinations() {
List<Fixture> initFixtures = initialAllocators();
// Multiply by all MemoryManagers.
List<MemoryManagers> loadableManagers = new ArrayList<>();
MemoryManagers.getAllManagers().forEach(provider -> {
loadableManagers.add(provider.get());
});
initFixtures = initFixtures.stream().flatMap(f -> {
Stream.Builder<Fixture> builder = Stream.builder();
for (MemoryManagers managers : loadableManagers) {
builder.add(new Fixture(f + "/" + managers, () -> using(managers, f), f.getProperties()));
}
return builder.build();
}).toList();
Builder<Fixture> builder = Stream.builder();
initFixtures.forEach(builder);
@ -2497,6 +2513,17 @@ public class BufferTest {
}
}
@ParameterizedTest
@MethodSource("allocators")
public void closedBuffersAreNotReadOnly(Fixture fixture) {
try (BufferAllocator allocator = fixture.createAllocator()) {
Buffer buf = allocator.allocate(8);
buf.readOnly(true);
buf.close();
assertFalse(buf.readOnly());
}
}
@ParameterizedTest
@MethodSource("allocators")
public void readOnlyBufferMustBecomeWritableAgainAfterTogglingReadOnlyOff(Fixture fixture) {
@ -2833,10 +2860,12 @@ public class BufferTest {
if (component.hasReadableArray()) {
byte[] array = component.readableArray();
byte[] arrayCopy = new byte[component.readableArrayLength()];
System.arraycopy(array, component.readableArrayOffset(), arrayCopy, 0, arrayCopy.length);
if (buffer.order() == BIG_ENDIAN) {
assertThat(array).containsExactly(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08);
assertThat(arrayCopy).containsExactly(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08);
} else {
assertThat(array).containsExactly(0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01);
assertThat(arrayCopy).containsExactly(0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01);
}
}
@ -3013,12 +3042,16 @@ public class BufferTest {
assertThat(component.writableNativeAddress()).isZero();
}
buf.writerOffset(0);
if (component.hasWritableArray()) {
byte[] array = component.writableArray();
int offset = component.writableArrayOffset();
byte[] arrayCopy = new byte[component.writableArrayLength()];
System.arraycopy(array, offset, arrayCopy, 0, arrayCopy.length);
if (buffer.order() == BIG_ENDIAN) {
assertThat(array).containsExactly(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08);
assertThat(arrayCopy).containsExactly(0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08);
} else {
assertThat(array).containsExactly(0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01);
assertThat(arrayCopy).containsExactly(0x08, 0x07, 0x06, 0x05, 0x04, 0x03, 0x02, 0x01);
}
}