From 62655c00a93a3c4c9e421fabbf5991c19dfb4161 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 23 May 2016 11:59:55 +0200 Subject: [PATCH] Allow to create Unsafe ByteBuf implementations that not use a Cleaner to clean the native memory. Motivation: Using the Cleaner to release the native memory has a few drawbacks: - Cleaner.clean() uses static synchronized internally which means it can be a performance bottleneck - It put more load on the GC Modifications: Add new buffer implementations that can be enabled with a system flag as optimizations. In this case no Cleaner is used at all and the user must ensure everything is always released. Result: Less performance impact by direct buffers when need to be allocated and released. --- .../main/java/io/netty/buffer/PoolArena.java | 16 +++- .../netty/buffer/PooledByteBufAllocator.java | 2 +- .../buffer/UnpooledByteBufAllocator.java | 2 +- .../UnpooledUnsafeNoCleanerDirectByteBuf.java | 37 ++++++++ .../io/netty/buffer/UnsafeByteBufUtil.java | 8 ++ ...ndianUnsafeNoCleanerDirectByteBufTest.java | 34 ++++++++ ...ndianUnsafeNoCleanerDirectByteBufTest.java | 33 ++++++++ .../util/internal/OutOfDirectMemoryError.java | 30 +++++++ .../util/internal/PlatformDependent.java | 84 +++++++++++++++++++ .../util/internal/PlatformDependent0.java | 43 ++++++++++ 10 files changed, 284 insertions(+), 5 deletions(-) create mode 100644 buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java create mode 100644 buffer/src/test/java/io/netty/buffer/BigEndianUnsafeNoCleanerDirectByteBufTest.java create mode 100644 buffer/src/test/java/io/netty/buffer/LittleEndianUnsafeNoCleanerDirectByteBufTest.java create mode 100644 common/src/main/java/io/netty/util/internal/OutOfDirectMemoryError.java diff --git a/buffer/src/main/java/io/netty/buffer/PoolArena.java b/buffer/src/main/java/io/netty/buffer/PoolArena.java index e667d936c0..3694988bf2 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolArena.java +++ b/buffer/src/main/java/io/netty/buffer/PoolArena.java @@ -688,17 +688,27 @@ abstract class PoolArena implements PoolArenaMetric { @Override protected PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { return new PoolChunk( - this, ByteBuffer.allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize); + this, allocateDirect(chunkSize), + pageSize, maxOrder, pageShifts, chunkSize); } @Override protected PoolChunk newUnpooledChunk(int capacity) { - return new PoolChunk(this, ByteBuffer.allocateDirect(capacity), capacity); + return new PoolChunk(this, allocateDirect(capacity), capacity); + } + + private static ByteBuffer allocateDirect(int capacity) { + return PlatformDependent.useDirectBufferNoCleaner() ? + PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity); } @Override protected void destroyChunk(PoolChunk chunk) { - PlatformDependent.freeDirectBuffer(chunk.memory); + if (PlatformDependent.useDirectBufferNoCleaner()) { + PlatformDependent.freeDirectNoCleaner(chunk.memory); + } else { + PlatformDependent.freeDirectBuffer(chunk.memory); + } } @Override diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index f3b31259ba..a0c8b1fdc0 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -271,7 +271,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { - buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); + buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java index 90278211bc..d4fe78afde 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java @@ -63,7 +63,7 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator { @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { ByteBuf buf = PlatformDependent.hasUnsafe() ? - new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : + UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); return disableLeakDetector ? buf : toLeakAwareBuffer(buf); diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java new file mode 100644 index 0000000000..5011a1a520 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java @@ -0,0 +1,37 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +import io.netty.util.internal.PlatformDependent; + +import java.nio.ByteBuffer; + +final class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf { + + UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { + super(alloc, initialCapacity, maxCapacity); + } + + @Override + protected ByteBuffer allocateDirect(int initialCapacity) { + return PlatformDependent.allocateDirectNoCleaner(initialCapacity); + } + + @Override + protected void freeDirect(ByteBuffer buffer) { + PlatformDependent.freeDirectNoCleaner(buffer); + } +} diff --git a/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java b/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java index b809ba8aa9..6713add639 100644 --- a/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java +++ b/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java @@ -420,5 +420,13 @@ final class UnsafeByteBufUtil { PlatformDependent.setMemory(addr, length, ZERO); } + static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf( + ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { + if (PlatformDependent.useDirectBufferNoCleaner()) { + return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity); + } + return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity); + } + private UnsafeByteBufUtil() { } } diff --git a/buffer/src/test/java/io/netty/buffer/BigEndianUnsafeNoCleanerDirectByteBufTest.java b/buffer/src/test/java/io/netty/buffer/BigEndianUnsafeNoCleanerDirectByteBufTest.java new file mode 100644 index 0000000000..dc0ff6aa1b --- /dev/null +++ b/buffer/src/test/java/io/netty/buffer/BigEndianUnsafeNoCleanerDirectByteBufTest.java @@ -0,0 +1,34 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + + +import io.netty.util.internal.PlatformDependent; +import org.junit.Assume; +import org.junit.Before; + +public class BigEndianUnsafeNoCleanerDirectByteBufTest extends BigEndianDirectByteBufTest { + + @Before + public void checkHasUnsafe() { + Assume.assumeTrue("sun.misc.Unsafe not found, skip tests", PlatformDependent.hasUnsafe()); + } + + @Override + protected ByteBuf newBuffer(int length) { + return new UnpooledUnsafeNoCleanerDirectByteBuf(UnpooledByteBufAllocator.DEFAULT, length, Integer.MAX_VALUE); + } +} diff --git a/buffer/src/test/java/io/netty/buffer/LittleEndianUnsafeNoCleanerDirectByteBufTest.java b/buffer/src/test/java/io/netty/buffer/LittleEndianUnsafeNoCleanerDirectByteBufTest.java new file mode 100644 index 0000000000..ef9043a763 --- /dev/null +++ b/buffer/src/test/java/io/netty/buffer/LittleEndianUnsafeNoCleanerDirectByteBufTest.java @@ -0,0 +1,33 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +import io.netty.util.internal.PlatformDependent; +import org.junit.Assume; +import org.junit.Before; + +public class LittleEndianUnsafeNoCleanerDirectByteBufTest extends LittleEndianDirectByteBufTest { + + @Before + public void checkHasUnsafe() { + Assume.assumeTrue("sun.misc.Unsafe not found, skip tests", PlatformDependent.hasUnsafe()); + } + + @Override + protected ByteBuf newBuffer(int length) { + return new UnpooledUnsafeNoCleanerDirectByteBuf(UnpooledByteBufAllocator.DEFAULT, length, Integer.MAX_VALUE); + } +} diff --git a/common/src/main/java/io/netty/util/internal/OutOfDirectMemoryError.java b/common/src/main/java/io/netty/util/internal/OutOfDirectMemoryError.java new file mode 100644 index 0000000000..ff3b679d1d --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/OutOfDirectMemoryError.java @@ -0,0 +1,30 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import java.nio.ByteBuffer; + +/** + * {@link OutOfMemoryError} that is throws if {@link PlatformDependent#allocateDirectNoCleaner(int)} can not allocate + * a new {@link ByteBuffer} due memory restrictions. + */ +public final class OutOfDirectMemoryError extends OutOfMemoryError { + private static final long serialVersionUID = 4228264016184011555L; + + OutOfDirectMemoryError(String s) { + super(s); + } +} diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index 8253f50e57..b402ca38c8 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -30,6 +30,7 @@ import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Deque; import java.util.List; import java.util.Locale; @@ -87,6 +88,11 @@ public final class PlatformDependent { private static final int BIT_MODE = bitMode0(); private static final int ADDRESS_SIZE = addressSize0(); + private static final boolean USE_DIRECT_BUFFER_NO_CLEANER; + private static final AtomicLong DIRECT_MEMORY_COUNTER; + private static final long DIRECT_MEMORY_LIMIT; + + public static final boolean BIG_ENDIAN_NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; static { if (logger.isDebugEnabled()) { @@ -99,6 +105,34 @@ public final class PlatformDependent { "Unless explicitly requested, heap buffer will always be preferred to avoid potential system " + "unstability."); } + + // Here is how the system property is used: + // + // * < 0 - Don't use cleaner, and inherit max direct memory from java. In this case the + // "practical max direct memory" would be 2 * max memory as defined by the JDK. + // * == 0 - Use cleaner, Netty will not enforce max memory, and instead will defer to JDK. + // * > 0 - Don't use cleaner. This will limit Netty's total direct memory + // (note: that JDK's direct memory limit is independent of this). + long maxDirectMemory = SystemPropertyUtil.getLong("io.netty.maxDirectMemory", -1); + + if (maxDirectMemory == 0 || !hasUnsafe() || !PlatformDependent0.hasDirectBufferNoCleanerConstructor()) { + USE_DIRECT_BUFFER_NO_CLEANER = false; + DIRECT_MEMORY_COUNTER = null; + } else { + USE_DIRECT_BUFFER_NO_CLEANER = true; + if (maxDirectMemory < 0) { + maxDirectMemory = maxDirectMemory0(); + if (maxDirectMemory <= 0) { + DIRECT_MEMORY_COUNTER = null; + } else { + DIRECT_MEMORY_COUNTER = new AtomicLong(); + } + } else { + DIRECT_MEMORY_COUNTER = new AtomicLong(); + } + } + DIRECT_MEMORY_LIMIT = maxDirectMemory; + logger.debug("io.netty.maxDirectMemory: {} bytes", maxDirectMemory); } /** @@ -416,6 +450,56 @@ public final class PlatformDependent { PlatformDependent0.setMemory(address, bytes, value); } + /** + * Allocate a new {@link ByteBuffer} with the given {@code capacity}. {@link ByteBuffer}s allocated with + * this method MUST be deallocated via {@link #freeDirectNoCleaner(ByteBuffer)}. + */ + public static ByteBuffer allocateDirectNoCleaner(int capacity) { + assert USE_DIRECT_BUFFER_NO_CLEANER; + + if (DIRECT_MEMORY_COUNTER != null) { + for (;;) { + long usedMemory = DIRECT_MEMORY_COUNTER.get(); + long newUsedMemory = usedMemory + capacity; + if (newUsedMemory > DIRECT_MEMORY_LIMIT) { + throw new OutOfDirectMemoryError("failed to allocate " + capacity + + " byte(s) of direct memory (used: " + usedMemory + ", max: " + DIRECT_MEMORY_LIMIT + ')'); + } + if (DIRECT_MEMORY_COUNTER.compareAndSet(usedMemory, newUsedMemory)) { + break; + } + } + } + try { + return PlatformDependent0.allocateDirectNoCleaner(capacity); + } catch (Throwable e) { + if (DIRECT_MEMORY_COUNTER != null) { + DIRECT_MEMORY_COUNTER.addAndGet(-capacity); + } + throwException(e); + return null; + } + } + + /** + * This method MUST only be called for {@link ByteBuffer}s that were allocated via + * {@link #allocateDirectNoCleaner(int)}. + */ + public static void freeDirectNoCleaner(ByteBuffer buffer) { + assert USE_DIRECT_BUFFER_NO_CLEANER; + + int capacity = buffer.capacity(); + PlatformDependent0.freeMemory(PlatformDependent0.directBufferAddress(buffer)); + if (DIRECT_MEMORY_COUNTER != null) { + long usedMemory = DIRECT_MEMORY_COUNTER.addAndGet(-capacity); + assert usedMemory >= 0; + } + } + + public static boolean useDirectBufferNoCleaner() { + return USE_DIRECT_BUFFER_NO_CLEANER; + } + /** * Create a new optimized {@link AtomicReferenceFieldUpdater} or {@code null} if it * could not be created. Because of this the caller need to check for {@code null} and if {@code null} is returned diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java index e5c1622c82..394a2c1b15 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java @@ -19,6 +19,7 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import sun.misc.Unsafe; +import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.nio.Buffer; @@ -40,6 +41,7 @@ final class PlatformDependent0 { static final Unsafe UNSAFE; private static final long ADDRESS_FIELD_OFFSET; private static final long BYTE_ARRAY_BASE_OFFSET; + private static final Constructor DIRECT_BUFFER_CONSTRUCTOR; /** * Limits the number of bytes to copy per {@link Unsafe#copyMemory(long, long, long)} to allow safepoint polling @@ -68,6 +70,7 @@ final class PlatformDependent0 { // Failed to access the address field. addressField = null; } + logger.debug("java.nio.Buffer.address: {}", addressField != null? "available" : "unavailable"); Unsafe unsafe; @@ -110,7 +113,26 @@ final class PlatformDependent0 { BYTE_ARRAY_BASE_OFFSET = -1; ADDRESS_FIELD_OFFSET = -1; UNALIGNED = false; + DIRECT_BUFFER_CONSTRUCTOR = null; } else { + Constructor directBufferConstructor; + long address = -1; + try { + directBufferConstructor = direct.getClass().getDeclaredConstructor(long.class, int.class); + directBufferConstructor.setAccessible(true); + address = UNSAFE.allocateMemory(1); + + // Try to use the constructor now + directBufferConstructor.newInstance(address, 1); + } catch (Throwable t) { + directBufferConstructor = null; + } finally { + if (address != -1) { + UNSAFE.freeMemory(address); + } + } + DIRECT_BUFFER_CONSTRUCTOR = directBufferConstructor; + ADDRESS_FIELD_OFFSET = objectFieldOffset(addressField); boolean unaligned; try { @@ -129,6 +151,9 @@ final class PlatformDependent0 { logger.debug("java.nio.Bits.unaligned: {}", UNALIGNED); BYTE_ARRAY_BASE_OFFSET = arrayBaseOffset(); } + + logger.debug("java.nio.DirectByteBuffer.(long, int): {}", + DIRECT_BUFFER_CONSTRUCTOR != null? "available" : "unavailable"); } static boolean isUnaligned() { @@ -144,6 +169,24 @@ final class PlatformDependent0 { UNSAFE.throwException(checkNotNull(cause, "cause")); } + static boolean hasDirectBufferNoCleanerConstructor() { + return DIRECT_BUFFER_CONSTRUCTOR != null; + } + + static ByteBuffer allocateDirectNoCleaner(int capacity) { + assert DIRECT_BUFFER_CONSTRUCTOR != null; + long address = UNSAFE.allocateMemory(capacity); + try { + return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity); + } catch (Throwable cause) { + // Not expected to ever throw! + if (cause instanceof Error) { + throw (Error) cause; + } + throw new Error(cause); + } + } + static void freeDirectBuffer(ByteBuffer buffer) { // Delegate to other class to not break on android // See https://github.com/netty/netty/issues/2604