From af119de4a73dcd7a84363877165810679a94ac67 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 28 May 2021 11:51:13 +0200 Subject: [PATCH] Implement BufferIntegratable for Nio- and UnsafeBuffer --- .../io/netty/buffer/api/MemoryManagers.java | 30 +++++++++ .../bytebuffer/ByteBufferMemoryManagers.java | 5 ++ .../buffer/api/bytebuffer/NioBuffer.java | 63 ++++++++++++++++++- .../MemoryManagersOverride.java | 9 +-- .../netty/buffer/api/unsafe/UnsafeBuffer.java | 62 +++++++++++++++++- .../api/unsafe/UnsafeMemoryManagers.java | 5 ++ .../api/memseg/SegmentMemoryManagers.java | 5 ++ .../api/tests/adaptor/ByteBufAdaptorTest.java | 18 ++++-- .../adaptor/MemSegByteBufAdaptorTest.java | 25 ++++++++ .../tests/adaptor/NioByteBufAdaptorTest.java | 25 ++++++++ .../adaptor/UnsafeByteBufAdaptorTest.java | 25 ++++++++ 11 files changed, 261 insertions(+), 11 deletions(-) rename buffer-api/src/main/java/io/netty/buffer/api/{ => internal}/MemoryManagersOverride.java (86%) create mode 100644 buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/MemSegByteBufAdaptorTest.java create mode 100644 buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/NioByteBufAdaptorTest.java create mode 100644 buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/UnsafeByteBufAdaptorTest.java diff --git a/buffer-api/src/main/java/io/netty/buffer/api/MemoryManagers.java b/buffer-api/src/main/java/io/netty/buffer/api/MemoryManagers.java index f0f9d12..262af5b 100644 --- a/buffer-api/src/main/java/io/netty/buffer/api/MemoryManagers.java +++ b/buffer-api/src/main/java/io/netty/buffer/api/MemoryManagers.java @@ -15,6 +15,9 @@ */ package io.netty.buffer.api; +import io.netty.buffer.api.internal.MemoryManagersOverride; + +import java.util.Optional; import java.util.ServiceLoader; import java.util.function.Supplier; import java.util.stream.Stream; @@ -59,6 +62,25 @@ public interface MemoryManagers { return loader.stream(); } + /** + * Find a {@link MemoryManagers} implementation by its {@linkplain #implementationName() implementation name}. + * + * @param implementationName The named implementation to look for. + * @return A {@link MemoryManagers} implementation, if any was found. + */ + static Optional lookupImplementation(String implementationName) { + return getAllManagers() + .flatMap(provider -> { + try { + return Stream.ofNullable(provider.get()); + } catch (Exception e) { + return Stream.empty(); + } + }) + .filter(impl -> implementationName.equals(impl.implementationName())) + .findFirst(); + } + /** * Get a {@link MemoryManager} instance that is suitable for allocating on-heap {@link Buffer} instances. * @@ -72,4 +94,12 @@ public interface MemoryManagers { * @return An off-heap {@link MemoryManager}. */ MemoryManager getNativeMemoryManager(); + + /** + * Get the name for this implementation, which can be used for finding this particular implementation via the + * {@link #lookupImplementation(String)} method. + * + * @return The name of this memory managers implementation. + */ + String implementationName(); } diff --git a/buffer-api/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManagers.java b/buffer-api/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManagers.java index 8eca2fc..cb33511 100644 --- a/buffer-api/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManagers.java +++ b/buffer-api/src/main/java/io/netty/buffer/api/bytebuffer/ByteBufferMemoryManagers.java @@ -29,6 +29,11 @@ public class ByteBufferMemoryManagers implements MemoryManagers { return new ByteBufferMemoryManager(true); } + @Override + public String implementationName() { + return "ByteBuffer"; + } + @Override public String toString() { return "BB"; diff --git a/buffer-api/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java b/buffer-api/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java index 8fa5a7e..329b347 100644 --- a/buffer-api/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java +++ b/buffer-api/src/main/java/io/netty/buffer/api/bytebuffer/NioBuffer.java @@ -15,12 +15,16 @@ */ package io.netty.buffer.api.bytebuffer; +import io.netty.buffer.ByteBuf; import io.netty.buffer.api.AllocatorControl; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.ByteCursor; import io.netty.buffer.api.Drop; import io.netty.buffer.api.Owned; +import io.netty.buffer.api.adaptor.BufferIntegratable; +import io.netty.buffer.api.adaptor.ByteBufAdaptor; +import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor; import io.netty.buffer.api.internal.ResourceSupport; import io.netty.buffer.api.ReadableComponent; import io.netty.buffer.api.ReadableComponentProcessor; @@ -28,6 +32,7 @@ import io.netty.buffer.api.WritableComponent; import io.netty.buffer.api.WritableComponentProcessor; import io.netty.buffer.api.internal.ArcDrop; import io.netty.buffer.api.internal.Statics; +import io.netty.util.ReferenceCounted; import io.netty.util.internal.PlatformDependent; import java.nio.ByteBuffer; @@ -39,7 +44,8 @@ import static io.netty.buffer.api.internal.Statics.bbslice; import static io.netty.buffer.api.internal.Statics.bufferIsClosed; import static io.netty.buffer.api.internal.Statics.bufferIsReadOnly; -class NioBuffer extends ResourceSupport implements Buffer, ReadableComponent, WritableComponent { +class NioBuffer extends ResourceSupport implements Buffer, ReadableComponent, WritableComponent, + BufferIntegratable { private static final ByteBuffer CLOSED_BUFFER = ByteBuffer.allocate(0); private final AllocatorControl control; @@ -1190,4 +1196,59 @@ class NioBuffer extends ResourceSupport implements Buffer, Re ByteBuffer recoverable() { return base; } + + // + private ByteBufAdaptor adaptor; + @Override + public ByteBuf asByteBuf() { + ByteBufAdaptor bba = adaptor; + if (bba == null) { + ByteBufAllocatorAdaptor alloc = new ByteBufAllocatorAdaptor( + BufferAllocator.heap(), BufferAllocator.direct()); + return adaptor = new ByteBufAdaptor(alloc, this); + } + return bba; + } + + @Override + public int refCnt() { + return isAccessible()? 1 + countBorrows() : 0; + } + + @Override + public ReferenceCounted retain() { + return retain(1); + } + + @Override + public ReferenceCounted retain(int increment) { + for (int i = 0; i < increment; i++) { + acquire(); + } + return this; + } + + @Override + public ReferenceCounted touch() { + return this; + } + + @Override + public ReferenceCounted touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + for (int i = 0; i < decrement; i++) { + close(); + } + return !isAccessible(); + } + // } diff --git a/buffer-api/src/main/java/io/netty/buffer/api/MemoryManagersOverride.java b/buffer-api/src/main/java/io/netty/buffer/api/internal/MemoryManagersOverride.java similarity index 86% rename from buffer-api/src/main/java/io/netty/buffer/api/MemoryManagersOverride.java rename to buffer-api/src/main/java/io/netty/buffer/api/internal/MemoryManagersOverride.java index c59125b..e4b0255 100644 --- a/buffer-api/src/main/java/io/netty/buffer/api/MemoryManagersOverride.java +++ b/buffer-api/src/main/java/io/netty/buffer/api/internal/MemoryManagersOverride.java @@ -13,8 +13,9 @@ * License for the specific language governing permissions and limitations * under the License. */ -package io.netty.buffer.api; +package io.netty.buffer.api.internal; +import io.netty.buffer.api.MemoryManagers; import io.netty.buffer.api.bytebuffer.ByteBufferMemoryManagers; import java.util.Collections; @@ -23,7 +24,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; -final class MemoryManagersOverride { +public final class MemoryManagersOverride { private static final MemoryManagers DEFAULT = new ByteBufferMemoryManagers(); private static final AtomicInteger OVERRIDES_AVAILABLE = new AtomicInteger(); private static final Map OVERRIDES = Collections.synchronizedMap(new IdentityHashMap<>()); @@ -31,14 +32,14 @@ final class MemoryManagersOverride { private MemoryManagersOverride() { } - static MemoryManagers getManagers() { + public static MemoryManagers getManagers() { if (OVERRIDES_AVAILABLE.get() > 0) { return OVERRIDES.getOrDefault(Thread.currentThread(), DEFAULT); } return DEFAULT; } - static T using(MemoryManagers managers, Supplier supplier) { + public static T using(MemoryManagers managers, Supplier supplier) { Thread thread = Thread.currentThread(); OVERRIDES.put(thread, managers); OVERRIDES_AVAILABLE.incrementAndGet(); diff --git a/buffer-api/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java b/buffer-api/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java index 8f875f0..793688b 100644 --- a/buffer-api/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java +++ b/buffer-api/src/main/java/io/netty/buffer/api/unsafe/UnsafeBuffer.java @@ -15,12 +15,16 @@ */ package io.netty.buffer.api.unsafe; +import io.netty.buffer.ByteBuf; import io.netty.buffer.api.BufferAllocator; import io.netty.buffer.api.AllocatorControl; import io.netty.buffer.api.Buffer; import io.netty.buffer.api.ByteCursor; import io.netty.buffer.api.Drop; import io.netty.buffer.api.Owned; +import io.netty.buffer.api.adaptor.BufferIntegratable; +import io.netty.buffer.api.adaptor.ByteBufAdaptor; +import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor; import io.netty.buffer.api.internal.ResourceSupport; import io.netty.buffer.api.ReadableComponent; import io.netty.buffer.api.ReadableComponentProcessor; @@ -28,6 +32,7 @@ import io.netty.buffer.api.WritableComponent; import io.netty.buffer.api.WritableComponentProcessor; import io.netty.buffer.api.internal.ArcDrop; import io.netty.buffer.api.internal.Statics; +import io.netty.util.ReferenceCounted; import io.netty.util.internal.PlatformDependent; import java.lang.ref.Reference; @@ -40,7 +45,7 @@ import static io.netty.buffer.api.internal.Statics.bufferIsReadOnly; import static io.netty.util.internal.PlatformDependent.BIG_ENDIAN_NATIVE_ORDER; class UnsafeBuffer extends ResourceSupport implements Buffer, ReadableComponent, - WritableComponent { + WritableComponent, BufferIntegratable { private static final int CLOSED_SIZE = -1; private static final boolean ACCESS_UNALIGNED = PlatformDependent.isUnaligned(); private UnsafeMemory memory; // The memory liveness; monitored by Cleaner. @@ -1606,4 +1611,59 @@ class UnsafeBuffer extends ResourceSupport implements Buff Object recover() { return memory; } + + // + private ByteBufAdaptor adaptor; + @Override + public ByteBuf asByteBuf() { + ByteBufAdaptor bba = adaptor; + if (bba == null) { + ByteBufAllocatorAdaptor alloc = new ByteBufAllocatorAdaptor( + BufferAllocator.heap(), BufferAllocator.direct()); + return adaptor = new ByteBufAdaptor(alloc, this); + } + return bba; + } + + @Override + public int refCnt() { + return isAccessible()? 1 + countBorrows() : 0; + } + + @Override + public ReferenceCounted retain() { + return retain(1); + } + + @Override + public ReferenceCounted retain(int increment) { + for (int i = 0; i < increment; i++) { + acquire(); + } + return this; + } + + @Override + public ReferenceCounted touch() { + return this; + } + + @Override + public ReferenceCounted touch(Object hint) { + return this; + } + + @Override + public boolean release() { + return release(1); + } + + @Override + public boolean release(int decrement) { + for (int i = 0; i < decrement; i++) { + close(); + } + return !isAccessible(); + } + // } diff --git a/buffer-api/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManagers.java b/buffer-api/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManagers.java index e1f0ed3..0c4ef5c 100644 --- a/buffer-api/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManagers.java +++ b/buffer-api/src/main/java/io/netty/buffer/api/unsafe/UnsafeMemoryManagers.java @@ -39,6 +39,11 @@ public class UnsafeMemoryManagers implements MemoryManagers { return new UnsafeMemoryManager(true); } + @Override + public String implementationName() { + return "Unsafe"; + } + @Override public String toString() { return "US"; diff --git a/buffer-memseg/src/main/java/io/netty/buffer/api/memseg/SegmentMemoryManagers.java b/buffer-memseg/src/main/java/io/netty/buffer/api/memseg/SegmentMemoryManagers.java index c004270..e28561f 100644 --- a/buffer-memseg/src/main/java/io/netty/buffer/api/memseg/SegmentMemoryManagers.java +++ b/buffer-memseg/src/main/java/io/netty/buffer/api/memseg/SegmentMemoryManagers.java @@ -29,6 +29,11 @@ public class SegmentMemoryManagers implements MemoryManagers { return new NativeMemorySegmentManager(); } + @Override + public String implementationName() { + return "MemorySegment"; + } + @Override public String toString() { return "MS"; diff --git a/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/ByteBufAdaptorTest.java b/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/ByteBufAdaptorTest.java index 34df8ff..743d32d 100644 --- a/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/ByteBufAdaptorTest.java +++ b/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/ByteBufAdaptorTest.java @@ -16,17 +16,25 @@ package io.netty.buffer.api.tests.adaptor; import io.netty.buffer.ByteBuf; +import io.netty.buffer.api.BufferAllocator; +import io.netty.buffer.api.MemoryManagers; import io.netty.buffer.api.adaptor.ByteBufAllocatorAdaptor; import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Ignore; -public class ByteBufAdaptorTest extends AbstractByteBufTest { +import java.util.Optional; + +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +public abstract class ByteBufAdaptorTest extends AbstractByteBufTest { static ByteBufAllocatorAdaptor alloc; - @BeforeClass - public static void setUpAllocator() { - alloc = new ByteBufAllocatorAdaptor(); + static void setUpAllocator(String name) { + Optional managers = MemoryManagers.lookupImplementation(name); + assumeTrue(managers.isPresent()); + BufferAllocator onheap = MemoryManagers.using(managers.get(), BufferAllocator::pooledHeap); + BufferAllocator offheap = MemoryManagers.using(managers.get(), BufferAllocator::pooledHeap); + alloc = new ByteBufAllocatorAdaptor(onheap, offheap); } @AfterClass diff --git a/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/MemSegByteBufAdaptorTest.java b/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/MemSegByteBufAdaptorTest.java new file mode 100644 index 0000000..ebd34d7 --- /dev/null +++ b/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/MemSegByteBufAdaptorTest.java @@ -0,0 +1,25 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.tests.adaptor; + +import org.junit.BeforeClass; + +public class MemSegByteBufAdaptorTest extends ByteBufAdaptorTest { + @BeforeClass + public static void setUpAllocator() { + setUpAllocator("MemorySegment"); + } +} diff --git a/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/NioByteBufAdaptorTest.java b/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/NioByteBufAdaptorTest.java new file mode 100644 index 0000000..34d5637 --- /dev/null +++ b/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/NioByteBufAdaptorTest.java @@ -0,0 +1,25 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.tests.adaptor; + +import org.junit.BeforeClass; + +public class NioByteBufAdaptorTest extends ByteBufAdaptorTest { + @BeforeClass + public static void setUpAllocator() { + setUpAllocator("ByteBuffer"); + } +} diff --git a/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/UnsafeByteBufAdaptorTest.java b/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/UnsafeByteBufAdaptorTest.java new file mode 100644 index 0000000..f7535c3 --- /dev/null +++ b/buffer-tests/src/test/java/io/netty/buffer/api/tests/adaptor/UnsafeByteBufAdaptorTest.java @@ -0,0 +1,25 @@ +/* + * Copyright 2021 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer.api.tests.adaptor; + +import org.junit.BeforeClass; + +public class UnsafeByteBufAdaptorTest extends ByteBufAdaptorTest { + @BeforeClass + public static void setUpAllocator() { + setUpAllocator("Unsafe"); + } +}