Merge pull request #72 from netty/integration
Implement BufferIntegratable for Nio- and UnsafeBuffer
This commit is contained in:
commit
f0a57ed023
@ -76,7 +76,7 @@ import java.nio.ByteOrder;
|
|||||||
*
|
*
|
||||||
* <h3 name="split">Splitting buffers</h3>
|
* <h3 name="split">Splitting buffers</h3>
|
||||||
*
|
*
|
||||||
* The {@link #split()} method break a buffer into two.
|
* The {@link #split()} method breaks a buffer into two.
|
||||||
* The two buffers will share the underlying memory, but their regions will not overlap, ensuring that the memory is
|
* The two buffers will share the underlying memory, but their regions will not overlap, ensuring that the memory is
|
||||||
* safely shared between the two.
|
* safely shared between the two.
|
||||||
* <p>
|
* <p>
|
||||||
@ -86,7 +86,7 @@ import java.nio.ByteOrder;
|
|||||||
* further processing, as split buffer regions, once their data has been received in its entirety.
|
* further processing, as split buffer regions, once their data has been received in its entirety.
|
||||||
*
|
*
|
||||||
* If you instead wish to temporarily share a region of a buffer, you will have to pass offset and length along with the
|
* If you instead wish to temporarily share a region of a buffer, you will have to pass offset and length along with the
|
||||||
* buffer, or you will have to make a copy of the region in a new buffer.
|
* buffer, or you will have to make a copy of the region.
|
||||||
*
|
*
|
||||||
* <h3>Buffers as constants</h3>
|
* <h3>Buffers as constants</h3>
|
||||||
*
|
*
|
||||||
@ -377,7 +377,7 @@ public interface Buffer extends Resource<Buffer>, BufferAccessors {
|
|||||||
* {@code false}.
|
* {@code false}.
|
||||||
*
|
*
|
||||||
* @param size The requested number of bytes of space that should be available for writing.
|
* @param size The requested number of bytes of space that should be available for writing.
|
||||||
* @throws IllegalStateException if this buffer is not in a bad state, or is {@linkplain #readOnly() read-only}.
|
* @throws IllegalStateException if this buffer is in a bad state, or is {@linkplain #readOnly() read-only}.
|
||||||
*/
|
*/
|
||||||
default void ensureWritable(int size) {
|
default void ensureWritable(int size) {
|
||||||
ensureWritable(size, 1, true);
|
ensureWritable(size, 1, true);
|
||||||
@ -418,7 +418,7 @@ public interface Buffer extends Resource<Buffer>, BufferAccessors {
|
|||||||
* @param allowCompaction {@code true} if the method is allowed to modify the
|
* @param allowCompaction {@code true} if the method is allowed to modify the
|
||||||
* {@linkplain #readerOffset() reader offset} and
|
* {@linkplain #readerOffset() reader offset} and
|
||||||
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
|
* {@linkplain #writerOffset() writer offset}, otherwise {@code false}.
|
||||||
* @throws IllegalStateException if this buffer is not in a bad state, or is {@linkplain #readOnly() read-only}.
|
* @throws IllegalStateException if this buffer is in a bad state, or is {@linkplain #readOnly() read-only}.
|
||||||
*/
|
*/
|
||||||
void ensureWritable(int size, int minimumGrowth, boolean allowCompaction);
|
void ensureWritable(int size, int minimumGrowth, boolean allowCompaction);
|
||||||
|
|
||||||
@ -553,7 +553,7 @@ public interface Buffer extends Resource<Buffer>, BufferAccessors {
|
|||||||
/**
|
/**
|
||||||
* Discards the read bytes, and moves the buffer contents to the beginning of the buffer.
|
* Discards the read bytes, and moves the buffer contents to the beginning of the buffer.
|
||||||
*
|
*
|
||||||
* @throws IllegalStateException if this buffer is not in a bad state, or is {@linkplain #readOnly() read-only}.
|
* @throws IllegalStateException if this buffer is in a bad state, or is {@linkplain #readOnly() read-only}.
|
||||||
*/
|
*/
|
||||||
void compact();
|
void compact();
|
||||||
|
|
||||||
|
@ -97,8 +97,9 @@ public abstract class BufferHolder<T extends BufferHolder<T>> implements Resourc
|
|||||||
* @param send The new {@link Buffer} instance that is replacing the currently held buffer.
|
* @param send The new {@link Buffer} instance that is replacing the currently held buffer.
|
||||||
*/
|
*/
|
||||||
protected final void replaceBuffer(Send<Buffer> send) {
|
protected final void replaceBuffer(Send<Buffer> send) {
|
||||||
|
Buffer received = send.receive();
|
||||||
buf.close();
|
buf.close();
|
||||||
buf = send.receive();
|
buf = received;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -114,7 +115,8 @@ public abstract class BufferHolder<T extends BufferHolder<T>> implements Resourc
|
|||||||
* @param send The {@link Send} with the new {@link Buffer} instance that is replacing the currently held buffer.
|
* @param send The {@link Send} with the new {@link Buffer} instance that is replacing the currently held buffer.
|
||||||
*/
|
*/
|
||||||
protected final void replaceBufferVolatile(Send<Buffer> send) {
|
protected final void replaceBufferVolatile(Send<Buffer> send) {
|
||||||
var prev = (Buffer) BUF.getAndSet(this, send.receive());
|
Buffer received = send.receive();
|
||||||
|
var prev = (Buffer) BUF.getAndSet(this, received);
|
||||||
prev.close();
|
prev.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,6 +15,9 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.buffer.api;
|
package io.netty.buffer.api;
|
||||||
|
|
||||||
|
import io.netty.buffer.api.internal.MemoryManagersOverride;
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.ServiceLoader;
|
import java.util.ServiceLoader;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
@ -51,6 +54,8 @@ public interface MemoryManagers {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a lazy-loading stream of all available memory managers.
|
* Get a lazy-loading stream of all available memory managers.
|
||||||
|
* <p>
|
||||||
|
* Note: All available {@link MemoryManagers} instances are service loaded and instantiated on every call.
|
||||||
*
|
*
|
||||||
* @return A stream of providers of memory managers instances.
|
* @return A stream of providers of memory managers instances.
|
||||||
*/
|
*/
|
||||||
@ -59,6 +64,28 @@ public interface MemoryManagers {
|
|||||||
return loader.stream();
|
return loader.stream();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find a {@link MemoryManagers} implementation by its {@linkplain #getImplementationName() implementation name}.
|
||||||
|
* <p>
|
||||||
|
* Note: All available {@link MemoryManagers} instances are service loaded and instantiated every time this
|
||||||
|
* method is called.
|
||||||
|
*
|
||||||
|
* @param implementationName The named implementation to look for.
|
||||||
|
* @return A {@link MemoryManagers} implementation, if any was found.
|
||||||
|
*/
|
||||||
|
static Optional<MemoryManagers> lookupImplementation(String implementationName) {
|
||||||
|
return getAllManagers()
|
||||||
|
.flatMap(provider -> {
|
||||||
|
try {
|
||||||
|
return Stream.ofNullable(provider.get());
|
||||||
|
} catch (Exception e) {
|
||||||
|
return Stream.empty();
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.filter(impl -> implementationName.equals(impl.getImplementationName()))
|
||||||
|
.findFirst();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a {@link MemoryManager} instance that is suitable for allocating on-heap {@link Buffer} instances.
|
* Get a {@link MemoryManager} instance that is suitable for allocating on-heap {@link Buffer} instances.
|
||||||
*
|
*
|
||||||
@ -72,4 +99,12 @@ public interface MemoryManagers {
|
|||||||
* @return An off-heap {@link MemoryManager}.
|
* @return An off-heap {@link MemoryManager}.
|
||||||
*/
|
*/
|
||||||
MemoryManager getNativeMemoryManager();
|
MemoryManager getNativeMemoryManager();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the name for this implementation, which can be used for finding this particular implementation via the
|
||||||
|
* {@link #lookupImplementation(String)} method.
|
||||||
|
*
|
||||||
|
* @return The name of this memory managers implementation.
|
||||||
|
*/
|
||||||
|
String getImplementationName();
|
||||||
}
|
}
|
||||||
|
@ -1,3 +1,18 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2021 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
package io.netty.buffer.api;
|
package io.netty.buffer.api;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1419,8 +1419,9 @@ public final class ByteBufAdaptor extends ByteBuf {
|
|||||||
@Override
|
@Override
|
||||||
public ByteBuf retainedSlice(int index, int length) {
|
public ByteBuf retainedSlice(int index, int length) {
|
||||||
checkAccess();
|
checkAccess();
|
||||||
|
Slice slice = new Slice(this, index, length);
|
||||||
retain();
|
retain();
|
||||||
return new Slice(this, index, length);
|
return slice;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class Slice extends SlicedByteBuf {
|
private static final class Slice extends SlicedByteBuf {
|
||||||
@ -1677,8 +1678,16 @@ public final class ByteBufAdaptor extends ByteBuf {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean release(int decrement) {
|
public boolean release(int decrement) {
|
||||||
|
int refCount = 1 + Statics.countBorrows((ResourceSupport<?, ?>) buffer);
|
||||||
|
if (!buffer.isAccessible() || decrement > refCount) {
|
||||||
|
throw new IllegalReferenceCountException(refCount, -decrement);
|
||||||
|
}
|
||||||
for (int i = 0; i < decrement; i++) {
|
for (int i = 0; i < decrement; i++) {
|
||||||
buffer.close();
|
try {
|
||||||
|
buffer.close();
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
throw new IllegalReferenceCountException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return !buffer.isAccessible();
|
return !buffer.isAccessible();
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,11 @@ public class ByteBufferMemoryManagers implements MemoryManagers {
|
|||||||
return new ByteBufferMemoryManager(true);
|
return new ByteBufferMemoryManager(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getImplementationName() {
|
||||||
|
return "ByteBuffer";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "BB";
|
return "BB";
|
||||||
|
@ -15,12 +15,16 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.buffer.api.bytebuffer;
|
package io.netty.buffer.api.bytebuffer;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.api.AllocatorControl;
|
import io.netty.buffer.api.AllocatorControl;
|
||||||
import io.netty.buffer.api.Buffer;
|
import io.netty.buffer.api.Buffer;
|
||||||
import io.netty.buffer.api.BufferAllocator;
|
import io.netty.buffer.api.BufferAllocator;
|
||||||
import io.netty.buffer.api.ByteCursor;
|
import io.netty.buffer.api.ByteCursor;
|
||||||
import io.netty.buffer.api.Drop;
|
import io.netty.buffer.api.Drop;
|
||||||
import io.netty.buffer.api.Owned;
|
import io.netty.buffer.api.Owned;
|
||||||
|
import io.netty.buffer.api.adaptor.BufferIntegratable;
|
||||||
|
import io.netty.buffer.api.adaptor.ByteBufAdaptor;
|
||||||
|
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
|
||||||
import io.netty.buffer.api.internal.ResourceSupport;
|
import io.netty.buffer.api.internal.ResourceSupport;
|
||||||
import io.netty.buffer.api.ReadableComponent;
|
import io.netty.buffer.api.ReadableComponent;
|
||||||
import io.netty.buffer.api.ReadableComponentProcessor;
|
import io.netty.buffer.api.ReadableComponentProcessor;
|
||||||
@ -28,6 +32,8 @@ import io.netty.buffer.api.WritableComponent;
|
|||||||
import io.netty.buffer.api.WritableComponentProcessor;
|
import io.netty.buffer.api.WritableComponentProcessor;
|
||||||
import io.netty.buffer.api.internal.ArcDrop;
|
import io.netty.buffer.api.internal.ArcDrop;
|
||||||
import io.netty.buffer.api.internal.Statics;
|
import io.netty.buffer.api.internal.Statics;
|
||||||
|
import io.netty.util.IllegalReferenceCountException;
|
||||||
|
import io.netty.util.ReferenceCounted;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
@ -39,7 +45,8 @@ import static io.netty.buffer.api.internal.Statics.bbslice;
|
|||||||
import static io.netty.buffer.api.internal.Statics.bufferIsClosed;
|
import static io.netty.buffer.api.internal.Statics.bufferIsClosed;
|
||||||
import static io.netty.buffer.api.internal.Statics.bufferIsReadOnly;
|
import static io.netty.buffer.api.internal.Statics.bufferIsReadOnly;
|
||||||
|
|
||||||
class NioBuffer extends ResourceSupport<Buffer, NioBuffer> implements Buffer, ReadableComponent, WritableComponent {
|
class NioBuffer extends ResourceSupport<Buffer, NioBuffer> implements Buffer, ReadableComponent, WritableComponent,
|
||||||
|
BufferIntegratable {
|
||||||
private static final ByteBuffer CLOSED_BUFFER = ByteBuffer.allocate(0);
|
private static final ByteBuffer CLOSED_BUFFER = ByteBuffer.allocate(0);
|
||||||
|
|
||||||
private final AllocatorControl control;
|
private final AllocatorControl control;
|
||||||
@ -1190,4 +1197,67 @@ class NioBuffer extends ResourceSupport<Buffer, NioBuffer> implements Buffer, Re
|
|||||||
ByteBuffer recoverable() {
|
ByteBuffer recoverable() {
|
||||||
return base;
|
return base;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// <editor-fold name="BufferIntegratable methods">
|
||||||
|
private ByteBufAdaptor adaptor;
|
||||||
|
@Override
|
||||||
|
public ByteBuf asByteBuf() {
|
||||||
|
ByteBufAdaptor bba = adaptor;
|
||||||
|
if (bba == null) {
|
||||||
|
ByteBufAllocatorAdaptor alloc = new ByteBufAllocatorAdaptor(
|
||||||
|
BufferAllocator.heap(), BufferAllocator.direct());
|
||||||
|
return adaptor = new ByteBufAdaptor(alloc, this);
|
||||||
|
}
|
||||||
|
return bba;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int refCnt() {
|
||||||
|
return isAccessible()? 1 + countBorrows() : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReferenceCounted retain() {
|
||||||
|
return retain(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReferenceCounted retain(int increment) {
|
||||||
|
for (int i = 0; i < increment; i++) {
|
||||||
|
acquire();
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReferenceCounted touch() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReferenceCounted touch(Object hint) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean release() {
|
||||||
|
return release(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean release(int decrement) {
|
||||||
|
int refCount = 1 + countBorrows();
|
||||||
|
if (!isAccessible() || decrement > refCount) {
|
||||||
|
throw new IllegalReferenceCountException(refCount, -decrement);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < decrement; i++) {
|
||||||
|
try {
|
||||||
|
close();
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
throw new IllegalReferenceCountException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return !isAccessible();
|
||||||
|
}
|
||||||
|
// </editor-fold>
|
||||||
}
|
}
|
||||||
|
@ -13,8 +13,9 @@
|
|||||||
* License for the specific language governing permissions and limitations
|
* License for the specific language governing permissions and limitations
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
package io.netty.buffer.api;
|
package io.netty.buffer.api.internal;
|
||||||
|
|
||||||
|
import io.netty.buffer.api.MemoryManagers;
|
||||||
import io.netty.buffer.api.bytebuffer.ByteBufferMemoryManagers;
|
import io.netty.buffer.api.bytebuffer.ByteBufferMemoryManagers;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -23,7 +24,7 @@ import java.util.Map;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
final class MemoryManagersOverride {
|
public final class MemoryManagersOverride {
|
||||||
private static final MemoryManagers DEFAULT = new ByteBufferMemoryManagers();
|
private static final MemoryManagers DEFAULT = new ByteBufferMemoryManagers();
|
||||||
private static final AtomicInteger OVERRIDES_AVAILABLE = new AtomicInteger();
|
private static final AtomicInteger OVERRIDES_AVAILABLE = new AtomicInteger();
|
||||||
private static final Map<Thread, MemoryManagers> OVERRIDES = Collections.synchronizedMap(new IdentityHashMap<>());
|
private static final Map<Thread, MemoryManagers> OVERRIDES = Collections.synchronizedMap(new IdentityHashMap<>());
|
||||||
@ -31,14 +32,14 @@ final class MemoryManagersOverride {
|
|||||||
private MemoryManagersOverride() {
|
private MemoryManagersOverride() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static MemoryManagers getManagers() {
|
public static MemoryManagers getManagers() {
|
||||||
if (OVERRIDES_AVAILABLE.get() > 0) {
|
if (OVERRIDES_AVAILABLE.get() > 0) {
|
||||||
return OVERRIDES.getOrDefault(Thread.currentThread(), DEFAULT);
|
return OVERRIDES.getOrDefault(Thread.currentThread(), DEFAULT);
|
||||||
}
|
}
|
||||||
return DEFAULT;
|
return DEFAULT;
|
||||||
}
|
}
|
||||||
|
|
||||||
static <T> T using(MemoryManagers managers, Supplier<T> supplier) {
|
public static <T> T using(MemoryManagers managers, Supplier<T> supplier) {
|
||||||
Thread thread = Thread.currentThread();
|
Thread thread = Thread.currentThread();
|
||||||
OVERRIDES.put(thread, managers);
|
OVERRIDES.put(thread, managers);
|
||||||
OVERRIDES_AVAILABLE.incrementAndGet();
|
OVERRIDES_AVAILABLE.incrementAndGet();
|
@ -38,6 +38,17 @@ public abstract class ResourceSupport<I extends Resource<I>, T extends ResourceS
|
|||||||
tracer = LifecycleTracer.get();
|
tracer = LifecycleTracer.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulation bypass for calling {@link #acquire()} on the given object.
|
||||||
|
* <p>
|
||||||
|
* Note: this {@code acquire} method does not check the type of the return value from acquire at compile time.
|
||||||
|
* The type is instead checked at runtime, and will cause a {@link ClassCastException} to be thrown if done
|
||||||
|
* incorrectly.
|
||||||
|
*
|
||||||
|
* @param obj The object we wish to acquire (increment reference count) on.
|
||||||
|
* @param <T> The type of the acquired object, given by target-typing.
|
||||||
|
* @return The acquired object.
|
||||||
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
static <T> T acquire(ResourceSupport<?, ?> obj) {
|
static <T> T acquire(ResourceSupport<?, ?> obj) {
|
||||||
return (T) obj.acquire();
|
return (T) obj.acquire();
|
||||||
@ -103,6 +114,13 @@ public abstract class ResourceSupport<I extends Resource<I>, T extends ResourceS
|
|||||||
return new TransferSend<I, T>(owned, drop, getClass());
|
return new TransferSend<I, T>(owned, drop, getClass());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attach a trace of the life-cycle of this object as suppressed exceptions to the given throwable.
|
||||||
|
*
|
||||||
|
* @param throwable The throwable to attach a life-cycle trace to.
|
||||||
|
* @param <E> The concrete exception type.
|
||||||
|
* @return The given exception, which can then be thrown.
|
||||||
|
*/
|
||||||
protected <E extends Throwable> E attachTrace(E throwable) {
|
protected <E extends Throwable> E attachTrace(E throwable) {
|
||||||
return tracer.attachTrace(throwable);
|
return tracer.attachTrace(throwable);
|
||||||
}
|
}
|
||||||
@ -117,14 +135,34 @@ public abstract class ResourceSupport<I extends Resource<I>, T extends ResourceS
|
|||||||
"Cannot send() a reference counted object with " + countBorrows() + " borrows: " + this + '.');
|
"Cannot send() a reference counted object with " + countBorrows() + " borrows: " + this + '.');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulation bypass to call {@link #isOwned()} on the given object.
|
||||||
|
*
|
||||||
|
* @param obj The object to query the ownership state on.
|
||||||
|
* @return {@code true} if the given object is owned, otherwise {@code false}.
|
||||||
|
*/
|
||||||
static boolean isOwned(ResourceSupport<?, ?> obj) {
|
static boolean isOwned(ResourceSupport<?, ?> obj) {
|
||||||
return obj.isOwned();
|
return obj.isOwned();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Query if this object is in an "owned" state, which means no other references have been
|
||||||
|
* {@linkplain #acquire() acquired} to it.
|
||||||
|
*
|
||||||
|
* This would usually be the case, since there are no public methods for acquiring references to these objects.
|
||||||
|
*
|
||||||
|
* @return {@code true} if this object is in an owned state, otherwise {@code false}.
|
||||||
|
*/
|
||||||
protected boolean isOwned() {
|
protected boolean isOwned() {
|
||||||
return acquires == 0;
|
return acquires == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Encapsulation bypass to call {@link #countBorrows()} on the given object.
|
||||||
|
*
|
||||||
|
* @param obj The object to count borrows on.
|
||||||
|
* @return The number of borrows, or outstanding {@linkplain #acquire() acquires}, if any, of the given object.
|
||||||
|
*/
|
||||||
static int countBorrows(ResourceSupport<?, ?> obj) {
|
static int countBorrows(ResourceSupport<?, ?> obj) {
|
||||||
return obj.countBorrows();
|
return obj.countBorrows();
|
||||||
}
|
}
|
||||||
|
@ -54,6 +54,7 @@ public interface Statics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("JavaLangInvokeHandleSignature")
|
||||||
static MethodHandle getByteBufferPutOffsetsMethodHandle() {
|
static MethodHandle getByteBufferPutOffsetsMethodHandle() {
|
||||||
try {
|
try {
|
||||||
Lookup lookup = MethodHandles.lookup();
|
Lookup lookup = MethodHandles.lookup();
|
||||||
@ -64,7 +65,7 @@ public interface Statics {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings({"unchecked", "unused"})
|
||||||
static <T extends Buffer> Drop<T> noOpDrop() {
|
static <T extends Buffer> Drop<T> noOpDrop() {
|
||||||
return (Drop<T>) NO_OP_DROP;
|
return (Drop<T>) NO_OP_DROP;
|
||||||
}
|
}
|
||||||
|
@ -15,12 +15,16 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.buffer.api.unsafe;
|
package io.netty.buffer.api.unsafe;
|
||||||
|
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.api.BufferAllocator;
|
import io.netty.buffer.api.BufferAllocator;
|
||||||
import io.netty.buffer.api.AllocatorControl;
|
import io.netty.buffer.api.AllocatorControl;
|
||||||
import io.netty.buffer.api.Buffer;
|
import io.netty.buffer.api.Buffer;
|
||||||
import io.netty.buffer.api.ByteCursor;
|
import io.netty.buffer.api.ByteCursor;
|
||||||
import io.netty.buffer.api.Drop;
|
import io.netty.buffer.api.Drop;
|
||||||
import io.netty.buffer.api.Owned;
|
import io.netty.buffer.api.Owned;
|
||||||
|
import io.netty.buffer.api.adaptor.BufferIntegratable;
|
||||||
|
import io.netty.buffer.api.adaptor.ByteBufAdaptor;
|
||||||
|
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
|
||||||
import io.netty.buffer.api.internal.ResourceSupport;
|
import io.netty.buffer.api.internal.ResourceSupport;
|
||||||
import io.netty.buffer.api.ReadableComponent;
|
import io.netty.buffer.api.ReadableComponent;
|
||||||
import io.netty.buffer.api.ReadableComponentProcessor;
|
import io.netty.buffer.api.ReadableComponentProcessor;
|
||||||
@ -28,6 +32,8 @@ import io.netty.buffer.api.WritableComponent;
|
|||||||
import io.netty.buffer.api.WritableComponentProcessor;
|
import io.netty.buffer.api.WritableComponentProcessor;
|
||||||
import io.netty.buffer.api.internal.ArcDrop;
|
import io.netty.buffer.api.internal.ArcDrop;
|
||||||
import io.netty.buffer.api.internal.Statics;
|
import io.netty.buffer.api.internal.Statics;
|
||||||
|
import io.netty.util.IllegalReferenceCountException;
|
||||||
|
import io.netty.util.ReferenceCounted;
|
||||||
import io.netty.util.internal.PlatformDependent;
|
import io.netty.util.internal.PlatformDependent;
|
||||||
|
|
||||||
import java.lang.ref.Reference;
|
import java.lang.ref.Reference;
|
||||||
@ -40,7 +46,7 @@ import static io.netty.buffer.api.internal.Statics.bufferIsReadOnly;
|
|||||||
import static io.netty.util.internal.PlatformDependent.BIG_ENDIAN_NATIVE_ORDER;
|
import static io.netty.util.internal.PlatformDependent.BIG_ENDIAN_NATIVE_ORDER;
|
||||||
|
|
||||||
class UnsafeBuffer extends ResourceSupport<Buffer, UnsafeBuffer> implements Buffer, ReadableComponent,
|
class UnsafeBuffer extends ResourceSupport<Buffer, UnsafeBuffer> implements Buffer, ReadableComponent,
|
||||||
WritableComponent {
|
WritableComponent, BufferIntegratable {
|
||||||
private static final int CLOSED_SIZE = -1;
|
private static final int CLOSED_SIZE = -1;
|
||||||
private static final boolean ACCESS_UNALIGNED = PlatformDependent.isUnaligned();
|
private static final boolean ACCESS_UNALIGNED = PlatformDependent.isUnaligned();
|
||||||
private UnsafeMemory memory; // The memory liveness; monitored by Cleaner.
|
private UnsafeMemory memory; // The memory liveness; monitored by Cleaner.
|
||||||
@ -1606,4 +1612,67 @@ class UnsafeBuffer extends ResourceSupport<Buffer, UnsafeBuffer> implements Buff
|
|||||||
Object recover() {
|
Object recover() {
|
||||||
return memory;
|
return memory;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// <editor-fold name="BufferIntegratable methods">
|
||||||
|
private ByteBufAdaptor adaptor;
|
||||||
|
@Override
|
||||||
|
public ByteBuf asByteBuf() {
|
||||||
|
ByteBufAdaptor bba = adaptor;
|
||||||
|
if (bba == null) {
|
||||||
|
ByteBufAllocatorAdaptor alloc = new ByteBufAllocatorAdaptor(
|
||||||
|
BufferAllocator.heap(), BufferAllocator.direct());
|
||||||
|
return adaptor = new ByteBufAdaptor(alloc, this);
|
||||||
|
}
|
||||||
|
return bba;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int refCnt() {
|
||||||
|
return isAccessible()? 1 + countBorrows() : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReferenceCounted retain() {
|
||||||
|
return retain(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReferenceCounted retain(int increment) {
|
||||||
|
for (int i = 0; i < increment; i++) {
|
||||||
|
acquire();
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReferenceCounted touch() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReferenceCounted touch(Object hint) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean release() {
|
||||||
|
return release(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean release(int decrement) {
|
||||||
|
int refCount = 1 + countBorrows();
|
||||||
|
if (!isAccessible() || decrement > refCount) {
|
||||||
|
throw new IllegalReferenceCountException(refCount, -decrement);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < decrement; i++) {
|
||||||
|
try {
|
||||||
|
close();
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
throw new IllegalReferenceCountException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return !isAccessible();
|
||||||
|
}
|
||||||
|
// </editor-fold>
|
||||||
}
|
}
|
||||||
|
@ -39,6 +39,11 @@ public class UnsafeMemoryManagers implements MemoryManagers {
|
|||||||
return new UnsafeMemoryManager(true);
|
return new UnsafeMemoryManager(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getImplementationName() {
|
||||||
|
return "Unsafe";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "US";
|
return "US";
|
||||||
|
@ -32,6 +32,7 @@ import io.netty.buffer.api.WritableComponentProcessor;
|
|||||||
import io.netty.buffer.api.Drop;
|
import io.netty.buffer.api.Drop;
|
||||||
import io.netty.buffer.api.Owned;
|
import io.netty.buffer.api.Owned;
|
||||||
import io.netty.buffer.api.internal.ResourceSupport;
|
import io.netty.buffer.api.internal.ResourceSupport;
|
||||||
|
import io.netty.util.IllegalReferenceCountException;
|
||||||
import jdk.incubator.foreign.MemorySegment;
|
import jdk.incubator.foreign.MemorySegment;
|
||||||
import jdk.incubator.foreign.ResourceScope;
|
import jdk.incubator.foreign.ResourceScope;
|
||||||
|
|
||||||
@ -1266,8 +1267,16 @@ class MemSegBuffer extends ResourceSupport<Buffer, MemSegBuffer> implements Buff
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean release(int decrement) {
|
public boolean release(int decrement) {
|
||||||
|
int refCount = 1 + countBorrows();
|
||||||
|
if (!isAccessible() || decrement > refCount) {
|
||||||
|
throw new IllegalReferenceCountException(refCount, -decrement);
|
||||||
|
}
|
||||||
for (int i = 0; i < decrement; i++) {
|
for (int i = 0; i < decrement; i++) {
|
||||||
close();
|
try {
|
||||||
|
close();
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
throw new IllegalReferenceCountException(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return !isAccessible();
|
return !isAccessible();
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,11 @@ public class SegmentMemoryManagers implements MemoryManagers {
|
|||||||
return new NativeMemorySegmentManager();
|
return new NativeMemorySegmentManager();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getImplementationName() {
|
||||||
|
return "MemorySegment";
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "MS";
|
return "MS";
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -16,22 +16,32 @@
|
|||||||
package io.netty.buffer.api.tests.adaptor;
|
package io.netty.buffer.api.tests.adaptor;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.api.BufferAllocator;
|
||||||
|
import io.netty.buffer.api.MemoryManagers;
|
||||||
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
|
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
|
||||||
import org.junit.AfterClass;
|
import org.junit.jupiter.api.AfterAll;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.Ignore;
|
|
||||||
|
|
||||||
public class ByteBufAdaptorTest extends AbstractByteBufTest {
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
||||||
|
|
||||||
|
public abstract class ByteBufAdaptorTest extends AbstractByteBufTest {
|
||||||
static ByteBufAllocatorAdaptor alloc;
|
static ByteBufAllocatorAdaptor alloc;
|
||||||
|
|
||||||
@BeforeClass
|
static void setUpAllocator(String name) {
|
||||||
public static void setUpAllocator() {
|
Optional<MemoryManagers> managers = MemoryManagers.lookupImplementation(name);
|
||||||
alloc = new ByteBufAllocatorAdaptor();
|
assumeTrue(managers.isPresent(), () -> "Memory implementation '" + name + "' not found.");
|
||||||
|
BufferAllocator onheap = MemoryManagers.using(managers.get(), BufferAllocator::pooledHeap);
|
||||||
|
BufferAllocator offheap = MemoryManagers.using(managers.get(), BufferAllocator::pooledHeap);
|
||||||
|
alloc = new ByteBufAllocatorAdaptor(onheap, offheap);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterAll
|
||||||
public static void tearDownAllocator() throws Exception {
|
public static void tearDownAllocator() throws Exception {
|
||||||
alloc.close();
|
if (alloc != null) {
|
||||||
|
alloc.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -39,50 +49,50 @@ public class ByteBufAdaptorTest extends AbstractByteBufTest {
|
|||||||
return alloc.buffer(capacity, capacity);
|
return alloc.buffer(capacity, capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("This test codifies that asking to reading 0 bytes from an empty but unclosed stream should return -1, " +
|
@Disabled("This test codifies that asking to reading 0 bytes from an empty but unclosed stream should return -1, " +
|
||||||
"which is just weird.")
|
"which is just weird.")
|
||||||
@Override
|
@Override
|
||||||
public void testStreamTransfer1() throws Exception {
|
public void testStreamTransfer1() throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("Relies on capacity and max capacity being separate things.")
|
@Disabled("Relies on capacity and max capacity being separate things.")
|
||||||
@Override
|
@Override
|
||||||
public void testCapacityIncrease() {
|
public void testCapacityIncrease() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("Decreasing capacity not supported in new API.")
|
@Disabled("Decreasing capacity not supported in new API.")
|
||||||
@Override
|
@Override
|
||||||
public void testCapacityDecrease() {
|
public void testCapacityDecrease() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("Decreasing capacity not supported in new API.")
|
@Disabled("Decreasing capacity not supported in new API.")
|
||||||
@Override
|
@Override
|
||||||
public void testCapacityNegative() {
|
public void testCapacityNegative() {
|
||||||
throw new IllegalArgumentException(); // Can't ignore tests annotated with throws expectation?
|
throw new IllegalArgumentException(); // Can't ignore tests annotated with throws expectation?
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("Decreasing capacity not supported in new API.")
|
@Disabled("Decreasing capacity not supported in new API.")
|
||||||
@Override
|
@Override
|
||||||
public void testCapacityEnforceMaxCapacity() {
|
public void testCapacityEnforceMaxCapacity() {
|
||||||
throw new IllegalArgumentException(); // Can't ignore tests annotated with throws expectation?
|
throw new IllegalArgumentException(); // Can't ignore tests annotated with throws expectation?
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("Decreasing capacity not supported in new API.")
|
@Disabled("Decreasing capacity not supported in new API.")
|
||||||
@Override
|
@Override
|
||||||
public void testMaxFastWritableBytes() {
|
public void testMaxFastWritableBytes() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("Impossible to expose entire memory as a ByteBuffer using new API.")
|
@Disabled("Impossible to expose entire memory as a ByteBuffer using new API.")
|
||||||
@Override
|
@Override
|
||||||
public void testNioBufferExposeOnlyRegion() {
|
public void testNioBufferExposeOnlyRegion() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("Impossible to expose entire memory as a ByteBuffer using new API.")
|
@Disabled("Impossible to expose entire memory as a ByteBuffer using new API.")
|
||||||
@Override
|
@Override
|
||||||
public void testToByteBuffer2() {
|
public void testToByteBuffer2() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore("No longer allowed to allocate 0 sized buffers, except for composite buffers with no components.")
|
@Disabled("No longer allowed to allocate 0 sized buffers, except for composite buffers with no components.")
|
||||||
@Override
|
@Override
|
||||||
public void testLittleEndianWithExpand() {
|
public void testLittleEndianWithExpand() {
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,25 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2021 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.buffer.api.tests.adaptor;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
|
||||||
|
public class MemSegByteBufAdaptorTest extends ByteBufAdaptorTest {
|
||||||
|
@BeforeAll
|
||||||
|
public static void setUpAllocator() {
|
||||||
|
setUpAllocator("MemorySegment");
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2021 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.buffer.api.tests.adaptor;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
|
||||||
|
public class NioByteBufAdaptorTest extends ByteBufAdaptorTest {
|
||||||
|
@BeforeAll
|
||||||
|
public static void setUpAllocator() {
|
||||||
|
setUpAllocator("ByteBuffer");
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2021 The Netty Project
|
||||||
|
*
|
||||||
|
* The Netty Project licenses this file to you under the Apache License,
|
||||||
|
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.netty.buffer.api.tests.adaptor;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
|
||||||
|
public class UnsafeByteBufAdaptorTest extends ByteBufAdaptorTest {
|
||||||
|
@BeforeAll
|
||||||
|
public static void setUpAllocator() {
|
||||||
|
setUpAllocator("Unsafe");
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user