Allow to obtain informations of used direct and heap memory for ByteBufAllocator implementations

Motivation:

Often its useful for the user to be able to get some stats about the memory allocated via an allocator.

Modifications:

- Allow to obtain the used heap and direct memory for an allocator
- Add test case

Result:

Fixes [#6341]
This commit is contained in:
Norman Maurer 2017-02-24 20:06:17 +01:00
parent 5954cc1b85
commit 57fd316a82
10 changed files with 355 additions and 43 deletions

View File

@ -0,0 +1,31 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;
/**
* {@link ByteBufAllocator} which exposes metrics.
*/
public interface InstrumentedByteBufAllocator extends ByteBufAllocator {
/**
* Returns the number of bytes of heap memory used by a {@link ByteBufAllocator} or {@code -1} if unknown.
*/
long usedHeapMemory();
/**
* Returns the number of bytes of direct memory used by a {@link ByteBufAllocator} or {@code -1} if unknown.
*/
long usedDirectMemory();
}

View File

@ -29,7 +29,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements InstrumentedByteBufAllocator {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
private static final int DEFAULT_NUM_HEAP_ARENA;
@ -184,9 +184,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
}
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena,
int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize,
int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads) {
int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize,
int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
tinyCacheSize, smallCacheSize, normalCacheSize,
useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
@ -528,6 +528,30 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
return chunkSize;
}
@Override
public final long usedHeapMemory() {
return usedMemory(heapArenas);
}
@Override
public final long usedDirectMemory() {
return usedMemory(directArenas);
}
private static long usedMemory(PoolArena<?>... arenas) {
if (arenas == null) {
return -1;
}
long used = 0;
for (PoolArena<?> arena : arenas) {
used += arena.numActiveBytes();
if (used < 0) {
return Long.MAX_VALUE;
}
}
return used;
}
final PoolThreadCache threadCache() {
return threadCache.get();
}

View File

@ -15,13 +15,18 @@
*/
package io.netty.buffer;
import io.netty.util.internal.LongCounter;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
/**
* Simplistic {@link ByteBufAllocator} implementation that does not pool anything.
*/
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements InstrumentedByteBufAllocator {
private final LongCounter directCounter = PlatformDependent.newLongCounter();
private final LongCounter heapCounter = PlatformDependent.newLongCounter();
private final boolean disableLeakDetector;
/**
@ -56,16 +61,21 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
: new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
return PlatformDependent.hasUnsafe() ?
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = PlatformDependent.useDirectBufferNoCleaner() ?
new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
@ -85,4 +95,126 @@ public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
public boolean isDirectBufferPooled() {
return false;
}
@Override
public long usedHeapMemory() {
return heapCounter.value();
}
@Override
public long usedDirectMemory() {
return directCounter.value();
}
private static final class InstrumentedUnpooledUnsafeHeapByteBuf extends UnpooledUnsafeHeapByteBuf {
InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
byte[] allocateArray(int initialCapacity) {
byte[] bytes = super.allocateArray(initialCapacity);
((UnpooledByteBufAllocator) alloc()).heapCounter.add(bytes.length);
return bytes;
}
@Override
void freeArray(byte[] array) {
int length = array.length;
super.freeArray(array);
((UnpooledByteBufAllocator) alloc()).heapCounter.add(-length);
}
}
private static final class InstrumentedUnpooledHeapByteBuf extends UnpooledHeapByteBuf {
InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
byte[] allocateArray(int initialCapacity) {
byte[] bytes = super.allocateArray(initialCapacity);
((UnpooledByteBufAllocator) alloc()).heapCounter.add(bytes.length);
return bytes;
}
@Override
void freeArray(byte[] array) {
int length = array.length;
super.freeArray(array);
((UnpooledByteBufAllocator) alloc()).heapCounter.add(-length);
}
}
private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
extends UnpooledUnsafeNoCleanerDirectByteBuf {
InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = super.allocateDirect(initialCapacity);
((UnpooledByteBufAllocator) alloc()).directCounter.add(buffer.capacity());
return buffer;
}
@Override
ByteBuffer reallocateDirect(ByteBuffer oldBuffer, int initialCapacity) {
int capacity = oldBuffer.capacity();
ByteBuffer buffer = super.reallocateDirect(oldBuffer, initialCapacity);
((UnpooledByteBufAllocator) alloc()).directCounter.add(buffer.capacity() - capacity);
return buffer;
}
@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
super.freeDirect(buffer);
((UnpooledByteBufAllocator) alloc()).directCounter.add(-capacity);
}
}
private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
InstrumentedUnpooledUnsafeDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = super.allocateDirect(initialCapacity);
((UnpooledByteBufAllocator) alloc()).directCounter.add(buffer.capacity());
return buffer;
}
@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
super.freeDirect(buffer);
((UnpooledByteBufAllocator) alloc()).directCounter.add(-capacity);
}
}
private static final class InstrumentedUnpooledDirectByteBuf extends UnpooledDirectByteBuf {
InstrumentedUnpooledDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = super.allocateDirect(initialCapacity);
((UnpooledByteBufAllocator) alloc()).directCounter.add(buffer.capacity());
return buffer;
}
@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
super.freeDirect(buffer);
((UnpooledByteBufAllocator) alloc()).directCounter.add(-capacity);
}
}
}

View File

@ -26,6 +26,8 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import static io.netty.util.internal.ObjectUtil.checkNotNull;
/**
* Big endian Java heap buffer implementation.
*/
@ -42,7 +44,18 @@ public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
* @param maxCapacity the max capacity of the underlying byte array
*/
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
super(maxCapacity);
checkNotNull(alloc, "alloc");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
setArray(allocateArray(initialCapacity));
setIndex(0, 0);
}
/**
@ -52,20 +65,11 @@ public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
* @param maxCapacity the max capacity of the underlying byte array
*/
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) {
this(alloc, initialArray, 0, initialArray.length, maxCapacity);
}
private UnpooledHeapByteBuf(
ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialArray == null) {
throw new NullPointerException("initialArray");
}
checkNotNull(alloc, "alloc");
checkNotNull(initialArray, "initialArray");
if (initialArray.length > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));
@ -73,7 +77,15 @@ public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
this.alloc = alloc;
setArray(initialArray);
setIndex(readerIndex, writerIndex);
setIndex(0, initialArray.length);
}
byte[] allocateArray(int initialCapacity) {
return new byte[initialCapacity];
}
void freeArray(byte[] array) {
// NOOP
}
private void setArray(byte[] initialArray) {
@ -107,23 +119,26 @@ public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
checkNewCapacity(newCapacity);
int oldCapacity = array.length;
byte[] oldArray = array;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
byte[] newArray = allocateArray(newCapacity);
System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
setArray(newArray);
freeArray(oldArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
byte[] newArray = allocateArray(newCapacity);
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
freeArray(oldArray);
}
return this;
}
@ -411,6 +426,7 @@ public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
@Override
protected void deallocate() {
freeArray(array);
array = null;
}

View File

@ -17,7 +17,7 @@ package io.netty.buffer;
import io.netty.util.internal.PlatformDependent;
final class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf {
class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf {
/**
* Creates a new heap buffer with a newly allocated byte array.

View File

@ -19,7 +19,7 @@ import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
final class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
@ -30,6 +30,10 @@ final class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByt
return PlatformDependent.allocateDirectNoCleaner(initialCapacity);
}
ByteBuffer reallocateDirect(ByteBuffer oldBuffer, int initialCapacity) {
return PlatformDependent.reallocateDirectNoCleaner(oldBuffer, initialCapacity);
}
@Override
protected void freeDirect(ByteBuffer buffer) {
PlatformDependent.freeDirectNoCleaner(buffer);
@ -45,7 +49,7 @@ final class UnpooledUnsafeNoCleanerDirectByteBuf extends UnpooledUnsafeDirectByt
if (newCapacity > oldCapacity) {
ByteBuffer oldBuffer = buffer;
ByteBuffer newBuffer = PlatformDependent.reallocateDirectNoCleaner(oldBuffer, newCapacity);
ByteBuffer newBuffer = reallocateDirect(oldBuffer, newCapacity);
setByteBuffer(newBuffer, false);
} else if (newCapacity < oldCapacity) {
ByteBuffer oldBuffer = buffer;

View File

@ -17,16 +17,18 @@ package io.netty.buffer;
import io.netty.util.internal.PlatformDependent;
import org.junit.Assume;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public abstract class AbstractByteBufAllocatorTest extends ByteBufAllocatorTest {
public abstract class AbstractByteBufAllocatorTest<T extends AbstractByteBufAllocator> extends ByteBufAllocatorTest {
@Override
protected abstract AbstractByteBufAllocator newAllocator(boolean preferDirect);
protected abstract T newAllocator(boolean preferDirect);
protected abstract AbstractByteBufAllocator newUnpooledAllocator();
protected abstract T newUnpooledAllocator();
@Override
protected boolean isDirectExpected(boolean preferDirect) {
@ -45,7 +47,7 @@ public abstract class AbstractByteBufAllocatorTest extends ByteBufAllocatorTest
@Test
public void testUnsafeHeapBufferAndUnsafeDirectBuffer() {
AbstractByteBufAllocator allocator = newUnpooledAllocator();
T allocator = newUnpooledAllocator();
ByteBuf directBuffer = allocator.directBuffer();
assertInstanceOf(directBuffer,
PlatformDependent.hasUnsafe() ? UnpooledUnsafeDirectByteBuf.class : UnpooledDirectByteBuf.class);
@ -61,4 +63,50 @@ public abstract class AbstractByteBufAllocatorTest extends ByteBufAllocatorTest
// Unwrap if needed
assertTrue(clazz.isInstance(buffer instanceof SimpleLeakAwareByteBuf ? buffer.unwrap() : buffer));
}
@SuppressWarnings("unchecked")
@Test
public void testUsedDirectMemory() {
InstrumentedByteBufAllocator allocator = (InstrumentedByteBufAllocator) newAllocator(true);
assertEquals(0, allocator.usedDirectMemory());
ByteBuf buffer = allocator.directBuffer(1024, 4096);
int capacity = buffer.capacity();
assertEquals(expectedUsedMemory((T) allocator, capacity), allocator.usedDirectMemory());
// Double the size of the buffer
buffer.capacity(capacity << 1);
capacity = buffer.capacity();
assertEquals(buffer.toString(), expectedUsedMemory((T) allocator, capacity), allocator.usedDirectMemory());
buffer.release();
assertEquals(expectedUsedMemoryAfterRelease((T) allocator, capacity), allocator.usedDirectMemory());
}
@SuppressWarnings("unchecked")
@Test
public void testUsedHeapMemory() {
InstrumentedByteBufAllocator allocator = (InstrumentedByteBufAllocator) newAllocator(true);
Assume.assumeTrue(allocator instanceof InstrumentedByteBufAllocator);
assertEquals(0, allocator.usedHeapMemory());
ByteBuf buffer = allocator.heapBuffer(1024, 4096);
int capacity = buffer.capacity();
assertEquals(expectedUsedMemory((T) allocator, capacity), allocator.usedHeapMemory());
// Double the size of the buffer
buffer.capacity(capacity << 1);
capacity = buffer.capacity();
assertEquals(expectedUsedMemory((T) allocator, capacity), allocator.usedHeapMemory());
buffer.release();
assertEquals(expectedUsedMemoryAfterRelease((T) allocator, capacity), allocator.usedHeapMemory());
}
protected long expectedUsedMemory(T allocator, int capacity) {
return capacity;
}
protected long expectedUsedMemoryAfterRelease(T allocator, int capacity) {
return 0;
}
}

View File

@ -38,21 +38,34 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest {
public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<PooledByteBufAllocator> {
@Override
protected AbstractByteBufAllocator newAllocator(boolean preferDirect) {
protected PooledByteBufAllocator newAllocator(boolean preferDirect) {
return new PooledByteBufAllocator(preferDirect);
}
@Override
protected AbstractByteBufAllocator newUnpooledAllocator() {
protected PooledByteBufAllocator newUnpooledAllocator() {
return new PooledByteBufAllocator(0, 0, 8192, 1);
}
@Override
protected long expectedUsedMemory(PooledByteBufAllocator allocator, int capacity) {
return allocator.chunkSize();
}
@Override
protected long expectedUsedMemoryAfterRelease(PooledByteBufAllocator allocator, int capacity) {
// This is the case as allocations will start in qInit and chunks in qInit will never be released until
// these are moved to q000.
// See https://www.bsdcan.org/2006/papers/jemalloc.pdf
return allocator.chunkSize();
}
@Test
public void testPooledUnsafeHeapBufferAndUnsafeDirectBuffer() {
AbstractByteBufAllocator allocator = newAllocator(true);
PooledByteBufAllocator allocator = newAllocator(true);
ByteBuf directBuffer = allocator.directBuffer();
assertInstanceOf(directBuffer,
PlatformDependent.hasUnsafe() ? PooledUnsafeDirectByteBuf.class : PooledDirectByteBuf.class);

View File

@ -15,15 +15,15 @@
*/
package io.netty.buffer;
public class UnpooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest {
public class UnpooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<UnpooledByteBufAllocator> {
@Override
protected AbstractByteBufAllocator newAllocator(boolean preferDirect) {
protected UnpooledByteBufAllocator newAllocator(boolean preferDirect) {
return new UnpooledByteBufAllocator(preferDirect);
}
@Override
protected AbstractByteBufAllocator newUnpooledAllocator() {
protected UnpooledByteBufAllocator newUnpooledAllocator() {
return new UnpooledByteBufAllocator(false);
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.microbench.buffer;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.microbench.util.AbstractMicrobenchmark;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
@State(Scope.Benchmark)
@Warmup(iterations = 5)
@Measurement(iterations = 10)
@Threads(8)
public class ByteBufAllocatorConcurrentBenchmark extends AbstractMicrobenchmark {
private static final ByteBufAllocator unpooledAllocator = new UnpooledByteBufAllocator(true, true);
@Param({ "00064", "00256", "01024", "04096" })
public int size;
@Benchmark
public boolean allocateRelease() {
return unpooledAllocator.directBuffer(size).release();
}
}