Implement BufferIntegratable for Nio- and UnsafeBuffer
This commit is contained in:
parent
d7d0c0fa93
commit
af119de4a7
@ -15,6 +15,9 @@
|
||||
*/
|
||||
package io.netty.buffer.api;
|
||||
|
||||
import io.netty.buffer.api.internal.MemoryManagersOverride;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
@ -59,6 +62,25 @@ public interface MemoryManagers {
|
||||
return loader.stream();
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a {@link MemoryManagers} implementation by its {@linkplain #implementationName() implementation name}.
|
||||
*
|
||||
* @param implementationName The named implementation to look for.
|
||||
* @return A {@link MemoryManagers} implementation, if any was found.
|
||||
*/
|
||||
static Optional<MemoryManagers> lookupImplementation(String implementationName) {
|
||||
return getAllManagers()
|
||||
.flatMap(provider -> {
|
||||
try {
|
||||
return Stream.ofNullable(provider.get());
|
||||
} catch (Exception e) {
|
||||
return Stream.empty();
|
||||
}
|
||||
})
|
||||
.filter(impl -> implementationName.equals(impl.implementationName()))
|
||||
.findFirst();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a {@link MemoryManager} instance that is suitable for allocating on-heap {@link Buffer} instances.
|
||||
*
|
||||
@ -72,4 +94,12 @@ public interface MemoryManagers {
|
||||
* @return An off-heap {@link MemoryManager}.
|
||||
*/
|
||||
MemoryManager getNativeMemoryManager();
|
||||
|
||||
/**
|
||||
* Get the name for this implementation, which can be used for finding this particular implementation via the
|
||||
* {@link #lookupImplementation(String)} method.
|
||||
*
|
||||
* @return The name of this memory managers implementation.
|
||||
*/
|
||||
String implementationName();
|
||||
}
|
||||
|
@ -29,6 +29,11 @@ public class ByteBufferMemoryManagers implements MemoryManagers {
|
||||
return new ByteBufferMemoryManager(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String implementationName() {
|
||||
return "ByteBuffer";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BB";
|
||||
|
@ -15,12 +15,16 @@
|
||||
*/
|
||||
package io.netty.buffer.api.bytebuffer;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.api.AllocatorControl;
|
||||
import io.netty.buffer.api.Buffer;
|
||||
import io.netty.buffer.api.BufferAllocator;
|
||||
import io.netty.buffer.api.ByteCursor;
|
||||
import io.netty.buffer.api.Drop;
|
||||
import io.netty.buffer.api.Owned;
|
||||
import io.netty.buffer.api.adaptor.BufferIntegratable;
|
||||
import io.netty.buffer.api.adaptor.ByteBufAdaptor;
|
||||
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
|
||||
import io.netty.buffer.api.internal.ResourceSupport;
|
||||
import io.netty.buffer.api.ReadableComponent;
|
||||
import io.netty.buffer.api.ReadableComponentProcessor;
|
||||
@ -28,6 +32,7 @@ import io.netty.buffer.api.WritableComponent;
|
||||
import io.netty.buffer.api.WritableComponentProcessor;
|
||||
import io.netty.buffer.api.internal.ArcDrop;
|
||||
import io.netty.buffer.api.internal.Statics;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
@ -39,7 +44,8 @@ import static io.netty.buffer.api.internal.Statics.bbslice;
|
||||
import static io.netty.buffer.api.internal.Statics.bufferIsClosed;
|
||||
import static io.netty.buffer.api.internal.Statics.bufferIsReadOnly;
|
||||
|
||||
class NioBuffer extends ResourceSupport<Buffer, NioBuffer> implements Buffer, ReadableComponent, WritableComponent {
|
||||
class NioBuffer extends ResourceSupport<Buffer, NioBuffer> implements Buffer, ReadableComponent, WritableComponent,
|
||||
BufferIntegratable {
|
||||
private static final ByteBuffer CLOSED_BUFFER = ByteBuffer.allocate(0);
|
||||
|
||||
private final AllocatorControl control;
|
||||
@ -1190,4 +1196,59 @@ class NioBuffer extends ResourceSupport<Buffer, NioBuffer> implements Buffer, Re
|
||||
ByteBuffer recoverable() {
|
||||
return base;
|
||||
}
|
||||
|
||||
// <editor-fold name="BufferIntegratable methods">
|
||||
private ByteBufAdaptor adaptor;
|
||||
@Override
|
||||
public ByteBuf asByteBuf() {
|
||||
ByteBufAdaptor bba = adaptor;
|
||||
if (bba == null) {
|
||||
ByteBufAllocatorAdaptor alloc = new ByteBufAllocatorAdaptor(
|
||||
BufferAllocator.heap(), BufferAllocator.direct());
|
||||
return adaptor = new ByteBufAdaptor(alloc, this);
|
||||
}
|
||||
return bba;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int refCnt() {
|
||||
return isAccessible()? 1 + countBorrows() : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted retain() {
|
||||
return retain(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted retain(int increment) {
|
||||
for (int i = 0; i < increment; i++) {
|
||||
acquire();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted touch() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted touch(Object hint) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release() {
|
||||
return release(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release(int decrement) {
|
||||
for (int i = 0; i < decrement; i++) {
|
||||
close();
|
||||
}
|
||||
return !isAccessible();
|
||||
}
|
||||
// </editor-fold>
|
||||
}
|
||||
|
@ -13,8 +13,9 @@
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.buffer.api;
|
||||
package io.netty.buffer.api.internal;
|
||||
|
||||
import io.netty.buffer.api.MemoryManagers;
|
||||
import io.netty.buffer.api.bytebuffer.ByteBufferMemoryManagers;
|
||||
|
||||
import java.util.Collections;
|
||||
@ -23,7 +24,7 @@ import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
final class MemoryManagersOverride {
|
||||
public final class MemoryManagersOverride {
|
||||
private static final MemoryManagers DEFAULT = new ByteBufferMemoryManagers();
|
||||
private static final AtomicInteger OVERRIDES_AVAILABLE = new AtomicInteger();
|
||||
private static final Map<Thread, MemoryManagers> OVERRIDES = Collections.synchronizedMap(new IdentityHashMap<>());
|
||||
@ -31,14 +32,14 @@ final class MemoryManagersOverride {
|
||||
private MemoryManagersOverride() {
|
||||
}
|
||||
|
||||
static MemoryManagers getManagers() {
|
||||
public static MemoryManagers getManagers() {
|
||||
if (OVERRIDES_AVAILABLE.get() > 0) {
|
||||
return OVERRIDES.getOrDefault(Thread.currentThread(), DEFAULT);
|
||||
}
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
static <T> T using(MemoryManagers managers, Supplier<T> supplier) {
|
||||
public static <T> T using(MemoryManagers managers, Supplier<T> supplier) {
|
||||
Thread thread = Thread.currentThread();
|
||||
OVERRIDES.put(thread, managers);
|
||||
OVERRIDES_AVAILABLE.incrementAndGet();
|
@ -15,12 +15,16 @@
|
||||
*/
|
||||
package io.netty.buffer.api.unsafe;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.api.BufferAllocator;
|
||||
import io.netty.buffer.api.AllocatorControl;
|
||||
import io.netty.buffer.api.Buffer;
|
||||
import io.netty.buffer.api.ByteCursor;
|
||||
import io.netty.buffer.api.Drop;
|
||||
import io.netty.buffer.api.Owned;
|
||||
import io.netty.buffer.api.adaptor.BufferIntegratable;
|
||||
import io.netty.buffer.api.adaptor.ByteBufAdaptor;
|
||||
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
|
||||
import io.netty.buffer.api.internal.ResourceSupport;
|
||||
import io.netty.buffer.api.ReadableComponent;
|
||||
import io.netty.buffer.api.ReadableComponentProcessor;
|
||||
@ -28,6 +32,7 @@ import io.netty.buffer.api.WritableComponent;
|
||||
import io.netty.buffer.api.WritableComponentProcessor;
|
||||
import io.netty.buffer.api.internal.ArcDrop;
|
||||
import io.netty.buffer.api.internal.Statics;
|
||||
import io.netty.util.ReferenceCounted;
|
||||
import io.netty.util.internal.PlatformDependent;
|
||||
|
||||
import java.lang.ref.Reference;
|
||||
@ -40,7 +45,7 @@ import static io.netty.buffer.api.internal.Statics.bufferIsReadOnly;
|
||||
import static io.netty.util.internal.PlatformDependent.BIG_ENDIAN_NATIVE_ORDER;
|
||||
|
||||
class UnsafeBuffer extends ResourceSupport<Buffer, UnsafeBuffer> implements Buffer, ReadableComponent,
|
||||
WritableComponent {
|
||||
WritableComponent, BufferIntegratable {
|
||||
private static final int CLOSED_SIZE = -1;
|
||||
private static final boolean ACCESS_UNALIGNED = PlatformDependent.isUnaligned();
|
||||
private UnsafeMemory memory; // The memory liveness; monitored by Cleaner.
|
||||
@ -1606,4 +1611,59 @@ class UnsafeBuffer extends ResourceSupport<Buffer, UnsafeBuffer> implements Buff
|
||||
Object recover() {
|
||||
return memory;
|
||||
}
|
||||
|
||||
// <editor-fold name="BufferIntegratable methods">
|
||||
private ByteBufAdaptor adaptor;
|
||||
@Override
|
||||
public ByteBuf asByteBuf() {
|
||||
ByteBufAdaptor bba = adaptor;
|
||||
if (bba == null) {
|
||||
ByteBufAllocatorAdaptor alloc = new ByteBufAllocatorAdaptor(
|
||||
BufferAllocator.heap(), BufferAllocator.direct());
|
||||
return adaptor = new ByteBufAdaptor(alloc, this);
|
||||
}
|
||||
return bba;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int refCnt() {
|
||||
return isAccessible()? 1 + countBorrows() : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted retain() {
|
||||
return retain(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted retain(int increment) {
|
||||
for (int i = 0; i < increment; i++) {
|
||||
acquire();
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted touch() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReferenceCounted touch(Object hint) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release() {
|
||||
return release(1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release(int decrement) {
|
||||
for (int i = 0; i < decrement; i++) {
|
||||
close();
|
||||
}
|
||||
return !isAccessible();
|
||||
}
|
||||
// </editor-fold>
|
||||
}
|
||||
|
@ -39,6 +39,11 @@ public class UnsafeMemoryManagers implements MemoryManagers {
|
||||
return new UnsafeMemoryManager(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String implementationName() {
|
||||
return "Unsafe";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "US";
|
||||
|
@ -29,6 +29,11 @@ public class SegmentMemoryManagers implements MemoryManagers {
|
||||
return new NativeMemorySegmentManager();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String implementationName() {
|
||||
return "MemorySegment";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MS";
|
||||
|
@ -16,17 +16,25 @@
|
||||
package io.netty.buffer.api.tests.adaptor;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.api.BufferAllocator;
|
||||
import io.netty.buffer.api.MemoryManagers;
|
||||
import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
|
||||
public class ByteBufAdaptorTest extends AbstractByteBufTest {
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assumptions.assumeTrue;
|
||||
|
||||
public abstract class ByteBufAdaptorTest extends AbstractByteBufTest {
|
||||
static ByteBufAllocatorAdaptor alloc;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpAllocator() {
|
||||
alloc = new ByteBufAllocatorAdaptor();
|
||||
static void setUpAllocator(String name) {
|
||||
Optional<MemoryManagers> managers = MemoryManagers.lookupImplementation(name);
|
||||
assumeTrue(managers.isPresent());
|
||||
BufferAllocator onheap = MemoryManagers.using(managers.get(), BufferAllocator::pooledHeap);
|
||||
BufferAllocator offheap = MemoryManagers.using(managers.get(), BufferAllocator::pooledHeap);
|
||||
alloc = new ByteBufAllocatorAdaptor(onheap, offheap);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright 2021 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.buffer.api.tests.adaptor;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
public class MemSegByteBufAdaptorTest extends ByteBufAdaptorTest {
|
||||
@BeforeClass
|
||||
public static void setUpAllocator() {
|
||||
setUpAllocator("MemorySegment");
|
||||
}
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright 2021 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.buffer.api.tests.adaptor;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
public class NioByteBufAdaptorTest extends ByteBufAdaptorTest {
|
||||
@BeforeClass
|
||||
public static void setUpAllocator() {
|
||||
setUpAllocator("ByteBuffer");
|
||||
}
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright 2021 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.buffer.api.tests.adaptor;
|
||||
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
public class UnsafeByteBufAdaptorTest extends ByteBufAdaptorTest {
|
||||
@BeforeClass
|
||||
public static void setUpAllocator() {
|
||||
setUpAllocator("Unsafe");
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user