From 3d29bcfc8ddeda50e0bff0234d0c5240f404ddb5 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 23 May 2016 11:59:55 +0200 Subject: [PATCH] Allow to create Unsafe ByteBuf implementations that not use a Cleaner to clean the native memory. Motivation: Using the Cleaner to release the native memory has a few drawbacks: - Cleaner.clean() uses static synchronized internally which means it can be a performance bottleneck - It put more load on the GC Modifications: Add new buffer implementations that can be enabled with a system flag as optimizations. In this case no Cleaner is used at all and the user must ensure everything is always released. Result: Less performance impact by direct buffers when need to be allocated and released. --- .../main/java/io/netty/buffer/PoolArena.java | 16 +++- .../netty/buffer/PooledByteBufAllocator.java | 2 +- .../buffer/UnpooledByteBufAllocator.java | 2 +- .../UnpooledUnsafeNoCleanerDirectByteBuf.java | 37 +++++++++ .../io/netty/buffer/UnsafeByteBufUtil.java | 8 ++ ...ndianUnsafeNoCleanerDirectByteBufTest.java | 34 ++++++++ ...ndianUnsafeNoCleanerDirectByteBufTest.java | 33 ++++++++ .../util/internal/OutOfDirectMemoryError.java | 30 +++++++ .../util/internal/PlatformDependent.java | 82 +++++++++++++++++++ .../util/internal/PlatformDependent0.java | 44 ++++++++++ 10 files changed, 283 insertions(+), 5 deletions(-) create mode 100644 buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java create mode 100644 buffer/src/test/java/io/netty/buffer/BigEndianUnsafeNoCleanerDirectByteBufTest.java create mode 100644 buffer/src/test/java/io/netty/buffer/LittleEndianUnsafeNoCleanerDirectByteBufTest.java create mode 100644 common/src/main/java/io/netty/util/internal/OutOfDirectMemoryError.java diff --git a/buffer/src/main/java/io/netty/buffer/PoolArena.java b/buffer/src/main/java/io/netty/buffer/PoolArena.java index 27bac5d206..646cedb059 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolArena.java +++ b/buffer/src/main/java/io/netty/buffer/PoolArena.java @@ -686,17 +686,27 @@ abstract class PoolArena implements PoolArenaMetric { @Override protected PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { return new PoolChunk( - this, ByteBuffer.allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize); + this, allocateDirect(chunkSize), + pageSize, maxOrder, pageShifts, chunkSize); } @Override protected PoolChunk newUnpooledChunk(int capacity) { - return new PoolChunk(this, ByteBuffer.allocateDirect(capacity), capacity); + return new PoolChunk(this, allocateDirect(capacity), capacity); + } + + private static ByteBuffer allocateDirect(int capacity) { + return PlatformDependent.useDirectBufferNoCleaner() ? + PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity); } @Override protected void destroyChunk(PoolChunk chunk) { - PlatformDependent.freeDirectBuffer(chunk.memory); + if (PlatformDependent.useDirectBufferNoCleaner()) { + PlatformDependent.freeDirectNoCleaner(chunk.memory); + } else { + PlatformDependent.freeDirectBuffer(chunk.memory); + } } @Override diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index 9fbcce999a..e646b4832c 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -262,7 +262,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } else { if (PlatformDependent.hasUnsafe()) { - buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); + buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java index 90278211bc..d4fe78afde 100644 --- a/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java @@ -63,7 +63,7 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator { @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { ByteBuf buf = PlatformDependent.hasUnsafe() ? - new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : + UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); return disableLeakDetector ? buf : toLeakAwareBuffer(buf); diff --git a/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java new file mode 100644 index 0000000000..5011a1a520 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/UnpooledUnsafeNoCleanerDirectByteBuf.java @@ -0,0 +1,37 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +import io.netty.util.internal.PlatformDependent; + +import java.nio.ByteBuffer; + +final class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf { + + UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { + super(alloc, initialCapacity, maxCapacity); + } + + @Override + protected ByteBuffer allocateDirect(int initialCapacity) { + return PlatformDependent.allocateDirectNoCleaner(initialCapacity); + } + + @Override + protected void freeDirect(ByteBuffer buffer) { + PlatformDependent.freeDirectNoCleaner(buffer); + } +} diff --git a/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java b/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java index 74d72d5083..5865a0ed98 100644 --- a/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java +++ b/buffer/src/main/java/io/netty/buffer/UnsafeByteBufUtil.java @@ -619,5 +619,13 @@ final class UnsafeByteBufUtil { PlatformDependent.setMemory(addr, length, ZERO); } + static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf( + ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { + if (PlatformDependent.useDirectBufferNoCleaner()) { + return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity); + } + return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity); + } + private UnsafeByteBufUtil() { } } diff --git a/buffer/src/test/java/io/netty/buffer/BigEndianUnsafeNoCleanerDirectByteBufTest.java b/buffer/src/test/java/io/netty/buffer/BigEndianUnsafeNoCleanerDirectByteBufTest.java new file mode 100644 index 0000000000..dc0ff6aa1b --- /dev/null +++ b/buffer/src/test/java/io/netty/buffer/BigEndianUnsafeNoCleanerDirectByteBufTest.java @@ -0,0 +1,34 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + + +import io.netty.util.internal.PlatformDependent; +import org.junit.Assume; +import org.junit.Before; + +public class BigEndianUnsafeNoCleanerDirectByteBufTest extends BigEndianDirectByteBufTest { + + @Before + public void checkHasUnsafe() { + Assume.assumeTrue("sun.misc.Unsafe not found, skip tests", PlatformDependent.hasUnsafe()); + } + + @Override + protected ByteBuf newBuffer(int length) { + return new UnpooledUnsafeNoCleanerDirectByteBuf(UnpooledByteBufAllocator.DEFAULT, length, Integer.MAX_VALUE); + } +} diff --git a/buffer/src/test/java/io/netty/buffer/LittleEndianUnsafeNoCleanerDirectByteBufTest.java b/buffer/src/test/java/io/netty/buffer/LittleEndianUnsafeNoCleanerDirectByteBufTest.java new file mode 100644 index 0000000000..ef9043a763 --- /dev/null +++ b/buffer/src/test/java/io/netty/buffer/LittleEndianUnsafeNoCleanerDirectByteBufTest.java @@ -0,0 +1,33 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +import io.netty.util.internal.PlatformDependent; +import org.junit.Assume; +import org.junit.Before; + +public class LittleEndianUnsafeNoCleanerDirectByteBufTest extends LittleEndianDirectByteBufTest { + + @Before + public void checkHasUnsafe() { + Assume.assumeTrue("sun.misc.Unsafe not found, skip tests", PlatformDependent.hasUnsafe()); + } + + @Override + protected ByteBuf newBuffer(int length) { + return new UnpooledUnsafeNoCleanerDirectByteBuf(UnpooledByteBufAllocator.DEFAULT, length, Integer.MAX_VALUE); + } +} diff --git a/common/src/main/java/io/netty/util/internal/OutOfDirectMemoryError.java b/common/src/main/java/io/netty/util/internal/OutOfDirectMemoryError.java new file mode 100644 index 0000000000..ff3b679d1d --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/OutOfDirectMemoryError.java @@ -0,0 +1,30 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import java.nio.ByteBuffer; + +/** + * {@link OutOfMemoryError} that is throws if {@link PlatformDependent#allocateDirectNoCleaner(int)} can not allocate + * a new {@link ByteBuffer} due memory restrictions. + */ +public final class OutOfDirectMemoryError extends OutOfMemoryError { + private static final long serialVersionUID = 4228264016184011555L; + + OutOfDirectMemoryError(String s) { + super(s); + } +} diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index 98a4b69dc4..fb7ffef18e 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -92,6 +92,10 @@ public final class PlatformDependent { private static final int BIT_MODE = bitMode0(); private static final int ADDRESS_SIZE = addressSize0(); + private static final boolean USE_DIRECT_BUFFER_NO_CLEANER; + private static final AtomicLong DIRECT_MEMORY_COUNTER; + private static final long DIRECT_MEMORY_LIMIT; + public static final boolean BIG_ENDIAN_NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; static { @@ -105,6 +109,34 @@ public final class PlatformDependent { "Unless explicitly requested, heap buffer will always be preferred to avoid potential system " + "unstability."); } + + // Here is how the system property is used: + // + // * < 0 - Don't use cleaner, and inherit max direct memory from java. In this case the + // "practical max direct memory" would be 2 * max memory as defined by the JDK. + // * == 0 - Use cleaner, Netty will not enforce max memory, and instead will defer to JDK. + // * > 0 - Don't use cleaner. This will limit Netty's total direct memory + // (note: that JDK's direct memory limit is independent of this). + long maxDirectMemory = SystemPropertyUtil.getLong("io.netty.maxDirectMemory", -1); + + if (maxDirectMemory == 0 || !hasUnsafe() || !PlatformDependent0.hasDirectBufferNoCleanerConstructor()) { + USE_DIRECT_BUFFER_NO_CLEANER = false; + DIRECT_MEMORY_COUNTER = null; + } else { + USE_DIRECT_BUFFER_NO_CLEANER = true; + if (maxDirectMemory < 0) { + maxDirectMemory = maxDirectMemory0(); + if (maxDirectMemory <= 0) { + DIRECT_MEMORY_COUNTER = null; + } else { + DIRECT_MEMORY_COUNTER = new AtomicLong(); + } + } else { + DIRECT_MEMORY_COUNTER = new AtomicLong(); + } + } + DIRECT_MEMORY_LIMIT = maxDirectMemory; + logger.debug("io.netty.maxDirectMemory: {} bytes", maxDirectMemory); } /** @@ -504,6 +536,56 @@ public final class PlatformDependent { PlatformDependent0.setMemory(address, bytes, value); } + /** + * Allocate a new {@link ByteBuffer} with the given {@code capacity}. {@link ByteBuffer}s allocated with + * this method MUST be deallocated via {@link #freeDirectNoCleaner(ByteBuffer)}. + */ + public static ByteBuffer allocateDirectNoCleaner(int capacity) { + assert USE_DIRECT_BUFFER_NO_CLEANER; + + if (DIRECT_MEMORY_COUNTER != null) { + for (;;) { + long usedMemory = DIRECT_MEMORY_COUNTER.get(); + long newUsedMemory = usedMemory + capacity; + if (newUsedMemory > DIRECT_MEMORY_LIMIT) { + throw new OutOfDirectMemoryError("failed to allocate " + capacity + + " byte(s) of direct memory (used: " + usedMemory + ", max: " + DIRECT_MEMORY_LIMIT + ')'); + } + if (DIRECT_MEMORY_COUNTER.compareAndSet(usedMemory, newUsedMemory)) { + break; + } + } + } + try { + return PlatformDependent0.allocateDirectNoCleaner(capacity); + } catch (Throwable e) { + if (DIRECT_MEMORY_COUNTER != null) { + DIRECT_MEMORY_COUNTER.addAndGet(-capacity); + } + throwException(e); + return null; + } + } + + /** + * This method MUST only be called for {@link ByteBuffer}s that were allocated via + * {@link #allocateDirectNoCleaner(int)}. + */ + public static void freeDirectNoCleaner(ByteBuffer buffer) { + assert USE_DIRECT_BUFFER_NO_CLEANER; + + int capacity = buffer.capacity(); + PlatformDependent0.freeMemory(PlatformDependent0.directBufferAddress(buffer)); + if (DIRECT_MEMORY_COUNTER != null) { + long usedMemory = DIRECT_MEMORY_COUNTER.addAndGet(-capacity); + assert usedMemory >= 0; + } + } + + public static boolean useDirectBufferNoCleaner() { + return USE_DIRECT_BUFFER_NO_CLEANER; + } + /** * Compare two {@code byte} arrays for equality. For performance reasons no bounds checking on the * parameters is performed. diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java index 50885273c9..aebfff81b6 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java @@ -19,6 +19,7 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import sun.misc.Unsafe; +import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.nio.Buffer; @@ -44,6 +45,8 @@ final class PlatformDependent0 { private static final long CHAR_ARRAY_INDEX_SCALE; private static final long STRING_CHAR_VALUE_FIELD_OFFSET; private static final long STRING_BYTE_VALUE_FIELD_OFFSET; + private static final Constructor DIRECT_BUFFER_CONSTRUCTOR; + static final int HASH_CODE_ASCII_SEED = 0xc2b2ae35; // constant borrowed from murmur3 /** @@ -73,6 +76,7 @@ final class PlatformDependent0 { // Failed to access the address field. addressField = null; } + logger.debug("java.nio.Buffer.address: {}", addressField != null? "available" : "unavailable"); Unsafe unsafe; @@ -116,7 +120,26 @@ final class PlatformDependent0 { BYTE_ARRAY_BASE_OFFSET = CHAR_ARRAY_BASE_OFFSET = CHAR_ARRAY_INDEX_SCALE = -1; UNALIGNED = false; STRING_CHAR_VALUE_FIELD_OFFSET = STRING_BYTE_VALUE_FIELD_OFFSET = -1; + DIRECT_BUFFER_CONSTRUCTOR = null; } else { + Constructor directBufferConstructor; + long address = -1; + try { + directBufferConstructor = direct.getClass().getDeclaredConstructor(long.class, int.class); + directBufferConstructor.setAccessible(true); + address = UNSAFE.allocateMemory(1); + + // Try to use the constructor now + directBufferConstructor.newInstance(address, 1); + } catch (Throwable t) { + directBufferConstructor = null; + } finally { + if (address != -1) { + UNSAFE.freeMemory(address); + } + } + DIRECT_BUFFER_CONSTRUCTOR = directBufferConstructor; + ADDRESS_FIELD_OFFSET = objectFieldOffset(addressField); BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); CHAR_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(char[].class); @@ -179,6 +202,9 @@ final class PlatformDependent0 { } } } + + logger.debug("java.nio.DirectByteBuffer.(long, int): {}", + DIRECT_BUFFER_CONSTRUCTOR != null? "available" : "unavailable"); } static boolean isUnaligned() { @@ -198,6 +224,24 @@ final class PlatformDependent0 { UNSAFE.throwException(checkNotNull(cause, "cause")); } + static boolean hasDirectBufferNoCleanerConstructor() { + return DIRECT_BUFFER_CONSTRUCTOR != null; + } + + static ByteBuffer allocateDirectNoCleaner(int capacity) { + assert DIRECT_BUFFER_CONSTRUCTOR != null; + long address = UNSAFE.allocateMemory(capacity); + try { + return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity); + } catch (Throwable cause) { + // Not expected to ever throw! + if (cause instanceof Error) { + throw (Error) cause; + } + throw new Error(cause); + } + } + static void freeDirectBuffer(ByteBuffer buffer) { // Delegate to other class to not break on android // See https://github.com/netty/netty/issues/2604