Add pinnedHeap/DirectMemory methods to ByteBufAllocatorMetric (#11667)

Motivation:
The "used memory" is the amount of memory that a pooled allocator has currently allocated and committed for itself.
Ths is useful for managing resource usage of the pool versus the available system resources.
However, it is not useful for managing resources of the currently circulating buffer instances versus the pool.
The pinned memory is the memory currently in use by buffers in circulation, plus memory held in the thread-local caches.

Modification:
Add pinned memory accounting to PoolChunk.
We cannot just use the existing freeBytes because that field is only updated when pool subpages are retired, and a chunk will never retire its last subpage instance.
The accounting statistics are available on the PooledByteBufAllocator only, since the metrics interfaces cannot be changed due to backwards compatibility.

Result:
It is now possible to get a fairly accurate (with slight over-counting due to the thread-local caches) picture of how much memory is held up in buffer instances at any given moment.

Fixes #11637
This commit is contained in:
Chris Vest 2021-09-15 16:22:57 +02:00
parent 29cae0445a
commit cf1ab852d1
5 changed files with 198 additions and 4 deletions

View File

@ -455,6 +455,22 @@ abstract class PoolArena<T> extends SizeClasses implements PoolArenaMetric {
return max(0, val);
}
/**
* Return the number of bytes that are currently pinned to buffer instances, by the arena. The pinned memory is not
* accessible for use by any other allocation, until the buffers using have all been released.
*/
public long numPinnedBytes() {
long val = activeBytesHuge.longValue(); // Huge chunks are exact-sized for the buffers they were allocated to.
synchronized (this) {
for (int i = 0; i < chunkListMetrics.size(); i++) {
for (PoolChunkMetric m: chunkListMetrics.get(i)) {
val += ((PoolChunk<?>) m).pinnedBytes();
}
}
}
return max(0, val);
}
protected abstract PoolChunk<T> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize);
protected abstract PoolChunk<T> newUnpooledChunk(int capacity);
protected abstract PooledByteBuf<T> newByteBuf(int maxCapacity);

View File

@ -175,6 +175,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
private final Deque<ByteBuffer> cachedNioBuffers;
int freeBytes;
int pinnedBytes;
PoolChunkList<T> parent;
PoolChunk<T> prev;
@ -342,7 +343,9 @@ final class PoolChunk<T> implements PoolChunkMetric {
handle = splitLargeRun(handle, pages);
}
freeBytes -= runSize(pageShifts, handle);
int pinnedSize = runSize(pageShifts, handle);
freeBytes -= pinnedSize;
pinnedBytes += pinnedSize;
return handle;
}
}
@ -451,6 +454,8 @@ final class PoolChunk<T> implements PoolChunkMetric {
* @param handle handle to free
*/
void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
int runSize = runSize(pageShifts, handle);
pinnedBytes -= runSize;
if (isSubpage(handle)) {
int sizeIdx = arena.size2SizeIdx(normCapacity);
PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);
@ -473,8 +478,6 @@ final class PoolChunk<T> implements PoolChunkMetric {
}
//start free run
int pages = runPages(handle);
synchronized (runsAvail) {
// collapse continuous runs, successfully collapsed runs
// will be removed from runsAvail and runsAvailMap
@ -486,7 +489,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
freeBytes += pages << pageShifts;
freeBytes += runSize;
}
if (nioBuffer != null && cachedNioBuffers != null &&
@ -588,6 +591,12 @@ final class PoolChunk<T> implements PoolChunkMetric {
}
}
public int pinnedBytes() {
synchronized (arena) {
return pinnedBytes;
}
}
@Override
public String toString() {
final int freeBytes;

View File

@ -658,6 +658,40 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
return used;
}
/**
* Returns the number of bytes of heap memory that is currently pinned to heap buffers allocated by a
* {@link ByteBufAllocator}, or {@code -1} if unknown.
* A buffer can pin more memory than its {@linkplain ByteBuf#capacity() capacity} might indicate,
* due to implementation details of the allocator.
*/
public final long pinnedHeapMemory() {
return pinnedMemory(heapArenas);
}
/**
* Returns the number of bytes of direct memory that is currently pinned to direct buffers allocated by a
* {@link ByteBufAllocator}, or {@code -1} if unknown.
* A buffer can pin more memory than its {@linkplain ByteBuf#capacity() capacity} might indicate,
* due to implementation details of the allocator.
*/
public final long pinnedDirectMemory() {
return pinnedMemory(directArenas);
}
private static long pinnedMemory(PoolArena<?>[] arenas) {
if (arenas == null) {
return -1;
}
long used = 0;
for (PoolArena<?> arena : arenas) {
used += arena.numPinnedBytes();
if (used < 0) {
return Long.MAX_VALUE;
}
}
return used;
}
final PoolThreadCache threadCache() {
PoolThreadCache cache = threadCache.get();
assert cache != null;

View File

@ -139,4 +139,7 @@ public abstract class AbstractByteBufAllocatorTest<T extends AbstractByteBufAllo
protected long expectedUsedMemoryAfterRelease(T allocator, int capacity) {
return 0;
}
protected void trimCaches(T allocator) {
}
}

View File

@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@ -37,6 +38,7 @@ import org.junit.jupiter.api.Timeout;
import static io.netty.buffer.PoolChunk.runOffset;
import static io.netty.buffer.PoolChunk.runPages;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -67,6 +69,11 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
return allocator.metric().chunkSize();
}
@Override
protected void trimCaches(PooledByteBufAllocator allocator) {
allocator.trimCurrentThreadCache();
}
@Test
public void testTrim() {
PooledByteBufAllocator allocator = newAllocator(true);
@ -679,4 +686,129 @@ public class PooledByteBufAllocatorTest extends AbstractByteBufAllocatorTest<Poo
assertTrue(beforeFreeBytes < afterFreeBytes);
}
@Override
@Test
public void testUsedDirectMemory() {
for (int power = 0; power < 8; power++) {
int initialCapacity = 1024 << power;
testUsedDirectMemory(initialCapacity);
}
}
private void testUsedDirectMemory(int initialCapacity) {
PooledByteBufAllocator allocator = newAllocator(true);
ByteBufAllocatorMetric metric = allocator.metric();
assertEquals(0, metric.usedDirectMemory());
assertEquals(0, allocator.pinnedDirectMemory());
ByteBuf buffer = allocator.directBuffer(initialCapacity, 4 * initialCapacity);
int capacity = buffer.capacity();
assertEquals(expectedUsedMemory(allocator, capacity), metric.usedDirectMemory());
assertThat(allocator.pinnedDirectMemory())
.isGreaterThanOrEqualTo(capacity)
.isLessThanOrEqualTo(metric.usedDirectMemory());
// Double the size of the buffer
buffer.capacity(capacity << 1);
capacity = buffer.capacity();
assertEquals(expectedUsedMemory(allocator, capacity), metric.usedDirectMemory(), buffer.toString());
assertThat(allocator.pinnedDirectMemory())
.isGreaterThanOrEqualTo(capacity)
.isLessThanOrEqualTo(metric.usedDirectMemory());
buffer.release();
assertEquals(expectedUsedMemoryAfterRelease(allocator, capacity), metric.usedDirectMemory());
assertThat(allocator.pinnedDirectMemory())
.isGreaterThanOrEqualTo(0)
.isLessThanOrEqualTo(metric.usedDirectMemory());
trimCaches(allocator);
assertEquals(0, allocator.pinnedDirectMemory());
int[] capacities = new int[30];
Random rng = new Random();
for (int i = 0; i < capacities.length; i++) {
capacities[i] = initialCapacity / 4 + rng.nextInt(8 * initialCapacity);
}
ByteBuf[] bufs = new ByteBuf[capacities.length];
for (int i = 0; i < 20; i++) {
bufs[i] = allocator.directBuffer(capacities[i], 2 * capacities[i]);
}
for (int i = 0; i < 10; i++) {
bufs[i].release();
}
for (int i = 20; i < 30; i++) {
bufs[i] = allocator.directBuffer(capacities[i], 2 * capacities[i]);
}
for (int i = 0; i < 10; i++) {
bufs[i] = allocator.directBuffer(capacities[i], 2 * capacities[i]);
}
for (int i = 0; i < 30; i++) {
bufs[i].release();
}
trimCaches(allocator);
assertEquals(0, allocator.pinnedDirectMemory());
}
@Override
@Test
public void testUsedHeapMemory() {
for (int power = 0; power < 8; power++) {
int initialCapacity = 1024 << power;
testUsedHeapMemory(initialCapacity);
}
}
private void testUsedHeapMemory(int initialCapacity) {
PooledByteBufAllocator allocator = newAllocator(true);
ByteBufAllocatorMetric metric = allocator.metric();
assertEquals(0, metric.usedHeapMemory());
assertEquals(0, allocator.pinnedDirectMemory());
ByteBuf buffer = allocator.heapBuffer(initialCapacity, 4 * initialCapacity);
int capacity = buffer.capacity();
assertEquals(expectedUsedMemory(allocator, capacity), metric.usedHeapMemory());
assertThat(allocator.pinnedHeapMemory())
.isGreaterThanOrEqualTo(capacity)
.isLessThanOrEqualTo(metric.usedHeapMemory());
// Double the size of the buffer
buffer.capacity(capacity << 1);
capacity = buffer.capacity();
assertEquals(expectedUsedMemory(allocator, capacity), metric.usedHeapMemory());
assertThat(allocator.pinnedHeapMemory())
.isGreaterThanOrEqualTo(capacity)
.isLessThanOrEqualTo(metric.usedHeapMemory());
buffer.release();
assertEquals(expectedUsedMemoryAfterRelease(allocator, capacity), metric.usedHeapMemory());
assertThat(allocator.pinnedHeapMemory())
.isGreaterThanOrEqualTo(0)
.isLessThanOrEqualTo(metric.usedHeapMemory());
trimCaches(allocator);
assertEquals(0, allocator.pinnedHeapMemory());
int[] capacities = new int[30];
Random rng = new Random();
for (int i = 0; i < capacities.length; i++) {
capacities[i] = initialCapacity / 4 + rng.nextInt(8 * initialCapacity);
}
ByteBuf[] bufs = new ByteBuf[capacities.length];
for (int i = 0; i < 20; i++) {
bufs[i] = allocator.heapBuffer(capacities[i], 2 * capacities[i]);
}
for (int i = 0; i < 10; i++) {
bufs[i].release();
}
for (int i = 20; i < 30; i++) {
bufs[i] = allocator.heapBuffer(capacities[i], 2 * capacities[i]);
}
for (int i = 0; i < 10; i++) {
bufs[i] = allocator.heapBuffer(capacities[i], 2 * capacities[i]);
}
for (int i = 0; i < 30; i++) {
bufs[i].release();
}
trimCaches(allocator);
assertEquals(0, allocator.pinnedDirectMemory());
}
}