Allow to create Unsafe ByteBuf implementations that not use a Cleaner to clean the native memory.

Motivation:

Using the Cleaner to release the native memory has a few drawbacks:

- Cleaner.clean() uses static synchronized internally which means it can be a performance bottleneck
- It put more load on the GC

Modifications:

Add new buffer implementations that can be enabled with a system flag as optimizations. In this case no Cleaner is used at all and the user must ensure everything is always released.

Result:

Less performance impact by direct buffers when need to be allocated and released.
This commit is contained in:
Norman Maurer 2016-05-23 11:59:55 +02:00
parent 789b6d62a6
commit 62655c00a9
10 changed files with 284 additions and 5 deletions

View File

@ -688,17 +688,27 @@ abstract class PoolArena<T> implements PoolArenaMetric {
@Override @Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<ByteBuffer>( return new PoolChunk<ByteBuffer>(
this, ByteBuffer.allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize); this, allocateDirect(chunkSize),
pageSize, maxOrder, pageShifts, chunkSize);
} }
@Override @Override
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) { protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
return new PoolChunk<ByteBuffer>(this, ByteBuffer.allocateDirect(capacity), capacity); return new PoolChunk<ByteBuffer>(this, allocateDirect(capacity), capacity);
}
private static ByteBuffer allocateDirect(int capacity) {
return PlatformDependent.useDirectBufferNoCleaner() ?
PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);
} }
@Override @Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) { protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
PlatformDependent.freeDirectBuffer(chunk.memory); if (PlatformDependent.useDirectBufferNoCleaner()) {
PlatformDependent.freeDirectNoCleaner(chunk.memory);
} else {
PlatformDependent.freeDirectBuffer(chunk.memory);
}
} }
@Override @Override

View File

@ -271,7 +271,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
buf = directArena.allocate(cache, initialCapacity, maxCapacity); buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else { } else {
if (PlatformDependent.hasUnsafe()) { if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else { } else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
} }

View File

@ -63,7 +63,7 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
@Override @Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf = PlatformDependent.hasUnsafe() ? ByteBuf buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
return disableLeakDetector ? buf : toLeakAwareBuffer(buf); return disableLeakDetector ? buf : toLeakAwareBuffer(buf);

View File

@ -0,0 +1,37 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
final class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
return PlatformDependent.allocateDirectNoCleaner(initialCapacity);
}
@Override
protected void freeDirect(ByteBuffer buffer) {
PlatformDependent.freeDirectNoCleaner(buffer);
}
}

View File

@ -420,5 +420,13 @@ final class UnsafeByteBufUtil {
PlatformDependent.setMemory(addr, length, ZERO); PlatformDependent.setMemory(addr, length, ZERO);
} }
static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf(
ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity);
}
return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity);
}
private UnsafeByteBufUtil() { } private UnsafeByteBufUtil() { }
} }

View File

@ -0,0 +1,34 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.util.internal.PlatformDependent;
import org.junit.Assume;
import org.junit.Before;
public class BigEndianUnsafeNoCleanerDirectByteBufTest extends BigEndianDirectByteBufTest {
@Before
public void checkHasUnsafe() {
Assume.assumeTrue("sun.misc.Unsafe not found, skip tests", PlatformDependent.hasUnsafe());
}
@Override
protected ByteBuf newBuffer(int length) {
return new UnpooledUnsafeNoCleanerDirectByteBuf(UnpooledByteBufAllocator.DEFAULT, length, Integer.MAX_VALUE);
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
import io.netty.util.internal.PlatformDependent;
import org.junit.Assume;
import org.junit.Before;
public class LittleEndianUnsafeNoCleanerDirectByteBufTest extends LittleEndianDirectByteBufTest {
@Before
public void checkHasUnsafe() {
Assume.assumeTrue("sun.misc.Unsafe not found, skip tests", PlatformDependent.hasUnsafe());
}
@Override
protected ByteBuf newBuffer(int length) {
return new UnpooledUnsafeNoCleanerDirectByteBuf(UnpooledByteBufAllocator.DEFAULT, length, Integer.MAX_VALUE);
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import java.nio.ByteBuffer;
/**
* {@link OutOfMemoryError} that is throws if {@link PlatformDependent#allocateDirectNoCleaner(int)} can not allocate
* a new {@link ByteBuffer} due memory restrictions.
*/
public final class OutOfDirectMemoryError extends OutOfMemoryError {
private static final long serialVersionUID = 4228264016184011555L;
OutOfDirectMemoryError(String s) {
super(s);
}
}

View File

@ -30,6 +30,7 @@ import java.lang.reflect.Method;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Deque; import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -87,6 +88,11 @@ public final class PlatformDependent {
private static final int BIT_MODE = bitMode0(); private static final int BIT_MODE = bitMode0();
private static final int ADDRESS_SIZE = addressSize0(); private static final int ADDRESS_SIZE = addressSize0();
private static final boolean USE_DIRECT_BUFFER_NO_CLEANER;
private static final AtomicLong DIRECT_MEMORY_COUNTER;
private static final long DIRECT_MEMORY_LIMIT;
public static final boolean BIG_ENDIAN_NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
static { static {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -99,6 +105,34 @@ public final class PlatformDependent {
"Unless explicitly requested, heap buffer will always be preferred to avoid potential system " + "Unless explicitly requested, heap buffer will always be preferred to avoid potential system " +
"unstability."); "unstability.");
} }
// Here is how the system property is used:
//
// * < 0 - Don't use cleaner, and inherit max direct memory from java. In this case the
// "practical max direct memory" would be 2 * max memory as defined by the JDK.
// * == 0 - Use cleaner, Netty will not enforce max memory, and instead will defer to JDK.
// * > 0 - Don't use cleaner. This will limit Netty's total direct memory
// (note: that JDK's direct memory limit is independent of this).
long maxDirectMemory = SystemPropertyUtil.getLong("io.netty.maxDirectMemory", -1);
if (maxDirectMemory == 0 || !hasUnsafe() || !PlatformDependent0.hasDirectBufferNoCleanerConstructor()) {
USE_DIRECT_BUFFER_NO_CLEANER = false;
DIRECT_MEMORY_COUNTER = null;
} else {
USE_DIRECT_BUFFER_NO_CLEANER = true;
if (maxDirectMemory < 0) {
maxDirectMemory = maxDirectMemory0();
if (maxDirectMemory <= 0) {
DIRECT_MEMORY_COUNTER = null;
} else {
DIRECT_MEMORY_COUNTER = new AtomicLong();
}
} else {
DIRECT_MEMORY_COUNTER = new AtomicLong();
}
}
DIRECT_MEMORY_LIMIT = maxDirectMemory;
logger.debug("io.netty.maxDirectMemory: {} bytes", maxDirectMemory);
} }
/** /**
@ -416,6 +450,56 @@ public final class PlatformDependent {
PlatformDependent0.setMemory(address, bytes, value); PlatformDependent0.setMemory(address, bytes, value);
} }
/**
* Allocate a new {@link ByteBuffer} with the given {@code capacity}. {@link ByteBuffer}s allocated with
* this method <strong>MUST</strong> be deallocated via {@link #freeDirectNoCleaner(ByteBuffer)}.
*/
public static ByteBuffer allocateDirectNoCleaner(int capacity) {
assert USE_DIRECT_BUFFER_NO_CLEANER;
if (DIRECT_MEMORY_COUNTER != null) {
for (;;) {
long usedMemory = DIRECT_MEMORY_COUNTER.get();
long newUsedMemory = usedMemory + capacity;
if (newUsedMemory > DIRECT_MEMORY_LIMIT) {
throw new OutOfDirectMemoryError("failed to allocate " + capacity
+ " byte(s) of direct memory (used: " + usedMemory + ", max: " + DIRECT_MEMORY_LIMIT + ')');
}
if (DIRECT_MEMORY_COUNTER.compareAndSet(usedMemory, newUsedMemory)) {
break;
}
}
}
try {
return PlatformDependent0.allocateDirectNoCleaner(capacity);
} catch (Throwable e) {
if (DIRECT_MEMORY_COUNTER != null) {
DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
}
throwException(e);
return null;
}
}
/**
* This method <strong>MUST</strong> only be called for {@link ByteBuffer}s that were allocated via
* {@link #allocateDirectNoCleaner(int)}.
*/
public static void freeDirectNoCleaner(ByteBuffer buffer) {
assert USE_DIRECT_BUFFER_NO_CLEANER;
int capacity = buffer.capacity();
PlatformDependent0.freeMemory(PlatformDependent0.directBufferAddress(buffer));
if (DIRECT_MEMORY_COUNTER != null) {
long usedMemory = DIRECT_MEMORY_COUNTER.addAndGet(-capacity);
assert usedMemory >= 0;
}
}
public static boolean useDirectBufferNoCleaner() {
return USE_DIRECT_BUFFER_NO_CLEANER;
}
/** /**
* Create a new optimized {@link AtomicReferenceFieldUpdater} or {@code null} if it * Create a new optimized {@link AtomicReferenceFieldUpdater} or {@code null} if it
* could not be created. Because of this the caller need to check for {@code null} and if {@code null} is returned * could not be created. Because of this the caller need to check for {@code null} and if {@code null} is returned

View File

@ -19,6 +19,7 @@ import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import sun.misc.Unsafe; import sun.misc.Unsafe;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.nio.Buffer; import java.nio.Buffer;
@ -40,6 +41,7 @@ final class PlatformDependent0 {
static final Unsafe UNSAFE; static final Unsafe UNSAFE;
private static final long ADDRESS_FIELD_OFFSET; private static final long ADDRESS_FIELD_OFFSET;
private static final long BYTE_ARRAY_BASE_OFFSET; private static final long BYTE_ARRAY_BASE_OFFSET;
private static final Constructor<?> DIRECT_BUFFER_CONSTRUCTOR;
/** /**
* Limits the number of bytes to copy per {@link Unsafe#copyMemory(long, long, long)} to allow safepoint polling * Limits the number of bytes to copy per {@link Unsafe#copyMemory(long, long, long)} to allow safepoint polling
@ -68,6 +70,7 @@ final class PlatformDependent0 {
// Failed to access the address field. // Failed to access the address field.
addressField = null; addressField = null;
} }
logger.debug("java.nio.Buffer.address: {}", addressField != null? "available" : "unavailable"); logger.debug("java.nio.Buffer.address: {}", addressField != null? "available" : "unavailable");
Unsafe unsafe; Unsafe unsafe;
@ -110,7 +113,26 @@ final class PlatformDependent0 {
BYTE_ARRAY_BASE_OFFSET = -1; BYTE_ARRAY_BASE_OFFSET = -1;
ADDRESS_FIELD_OFFSET = -1; ADDRESS_FIELD_OFFSET = -1;
UNALIGNED = false; UNALIGNED = false;
DIRECT_BUFFER_CONSTRUCTOR = null;
} else { } else {
Constructor<?> directBufferConstructor;
long address = -1;
try {
directBufferConstructor = direct.getClass().getDeclaredConstructor(long.class, int.class);
directBufferConstructor.setAccessible(true);
address = UNSAFE.allocateMemory(1);
// Try to use the constructor now
directBufferConstructor.newInstance(address, 1);
} catch (Throwable t) {
directBufferConstructor = null;
} finally {
if (address != -1) {
UNSAFE.freeMemory(address);
}
}
DIRECT_BUFFER_CONSTRUCTOR = directBufferConstructor;
ADDRESS_FIELD_OFFSET = objectFieldOffset(addressField); ADDRESS_FIELD_OFFSET = objectFieldOffset(addressField);
boolean unaligned; boolean unaligned;
try { try {
@ -129,6 +151,9 @@ final class PlatformDependent0 {
logger.debug("java.nio.Bits.unaligned: {}", UNALIGNED); logger.debug("java.nio.Bits.unaligned: {}", UNALIGNED);
BYTE_ARRAY_BASE_OFFSET = arrayBaseOffset(); BYTE_ARRAY_BASE_OFFSET = arrayBaseOffset();
} }
logger.debug("java.nio.DirectByteBuffer.<init>(long, int): {}",
DIRECT_BUFFER_CONSTRUCTOR != null? "available" : "unavailable");
} }
static boolean isUnaligned() { static boolean isUnaligned() {
@ -144,6 +169,24 @@ final class PlatformDependent0 {
UNSAFE.throwException(checkNotNull(cause, "cause")); UNSAFE.throwException(checkNotNull(cause, "cause"));
} }
static boolean hasDirectBufferNoCleanerConstructor() {
return DIRECT_BUFFER_CONSTRUCTOR != null;
}
static ByteBuffer allocateDirectNoCleaner(int capacity) {
assert DIRECT_BUFFER_CONSTRUCTOR != null;
long address = UNSAFE.allocateMemory(capacity);
try {
return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
} catch (Throwable cause) {
// Not expected to ever throw!
if (cause instanceof Error) {
throw (Error) cause;
}
throw new Error(cause);
}
}
static void freeDirectBuffer(ByteBuffer buffer) { static void freeDirectBuffer(ByteBuffer buffer) {
// Delegate to other class to not break on android // Delegate to other class to not break on android
// See https://github.com/netty/netty/issues/2604 // See https://github.com/netty/netty/issues/2604