From 8fed94eee3ebd84b30b54a797b329296f738e07a Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 13 May 2015 17:15:06 +0200 Subject: [PATCH] Expose metrics for PooledByteBufAllocator Motivation: The PooledByteBufAllocator is more or less a black-box atm. We need to expose some metrics to allow the user to get a better idea how to tune it. Modifications: - Expose different metrics via PooledByteBufAllocator - Add *Metrics interfaces Result: It is now easy to gather metrics and detail about the PooledByteBufAllocator and so get a better understanding about resource-usage etc. --- .../main/java/io/netty/buffer/PoolArena.java | 181 ++++++++- .../java/io/netty/buffer/PoolArenaMetric.java | 130 +++++++ .../main/java/io/netty/buffer/PoolChunk.java | 16 +- .../java/io/netty/buffer/PoolChunkList.java | 34 +- .../io/netty/buffer/PoolChunkListMetric.java | 32 ++ .../java/io/netty/buffer/PoolChunkMetric.java | 37 ++ .../java/io/netty/buffer/PoolSubpage.java | 23 +- .../io/netty/buffer/PoolSubpageMetric.java | 43 +++ .../netty/buffer/PooledByteBufAllocator.java | 126 +++++-- .../io/netty/util/internal/LongCounter.java | 26 ++ .../util/internal/PlatformDependent.java | 35 ++ .../util/internal/chmv8/LongAdderV8.java | 225 +++++++++++ .../netty/util/internal/chmv8/Striped64.java | 351 ++++++++++++++++++ 13 files changed, 1231 insertions(+), 28 deletions(-) create mode 100644 buffer/src/main/java/io/netty/buffer/PoolArenaMetric.java create mode 100644 buffer/src/main/java/io/netty/buffer/PoolChunkListMetric.java create mode 100644 buffer/src/main/java/io/netty/buffer/PoolChunkMetric.java create mode 100644 buffer/src/main/java/io/netty/buffer/PoolSubpageMetric.java create mode 100644 common/src/main/java/io/netty/util/internal/LongCounter.java create mode 100644 common/src/main/java/io/netty/util/internal/chmv8/LongAdderV8.java create mode 100644 common/src/main/java/io/netty/util/internal/chmv8/Striped64.java diff --git a/buffer/src/main/java/io/netty/buffer/PoolArena.java b/buffer/src/main/java/io/netty/buffer/PoolArena.java index 6275f9d9e6..00bf373cc4 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolArena.java +++ b/buffer/src/main/java/io/netty/buffer/PoolArena.java @@ -16,12 +16,16 @@ package io.netty.buffer; +import io.netty.util.internal.LongCounter; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; -abstract class PoolArena { +abstract class PoolArena implements PoolArenaMetric { static final int numTinySubpagePools = 512 >>> 4; @@ -43,6 +47,21 @@ abstract class PoolArena { private final PoolChunkList q075; private final PoolChunkList q100; + private final List chunkListMetrics; + + // Metrics for allocations and deallocations + private long allocationsTiny; + private long allocationsSmall; + private long allocationsNormal; + // We need to use the LongCounter here as this is not guarded via synchronized block. + private final LongCounter allocationsHuge = PlatformDependent.newLongCounter(); + + private long deallocationsTiny; + private long deallocationsSmall; + private long deallocationsNormal; + // We need to use the LongCounter here as this is not guarded via synchronized block. + private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter(); + // TODO: Test if adding padding helps under contention //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; @@ -77,6 +96,15 @@ abstract class PoolArena { q025.prevList = q000; q000.prevList = null; qInit.prevList = qInit; + + List metrics = new ArrayList(6); + metrics.add(qInit); + metrics.add(q000); + metrics.add(q025); + metrics.add(q050); + metrics.add(q075); + metrics.add(q100); + chunkListMetrics = Collections.unmodifiableList(metrics); } private PoolSubpage newSubpagePoolHead(int pageSize) { @@ -128,7 +156,8 @@ abstract class PoolArena { if (isTinyOrSmall(normCapacity)) { // capacity < pageSize int tableIdx; PoolSubpage[] table; - if (isTiny(normCapacity)) { // < 512 + boolean tiny = isTiny(normCapacity); + if (tiny) { // < 512 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { // was able to allocate out of the cache so move on return; @@ -152,6 +181,12 @@ abstract class PoolArena { long handle = s.allocate(); assert handle >= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); + + if (tiny) { + ++allocationsTiny; + } else { + ++allocationsSmall; + } return; } allocateNormal(buf, reqCapacity, normCapacity); @@ -173,6 +208,8 @@ abstract class PoolArena { } private void allocateNormal(PooledByteBuf buf, int reqCapacity, int normCapacity) { + ++allocationsNormal; + if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity) || q100.allocate(buf, reqCapacity, normCapacity)) { @@ -188,11 +225,13 @@ abstract class PoolArena { } private void allocateHuge(PooledByteBuf buf, int reqCapacity) { + allocationsHuge.increment(); buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity); } void free(PoolChunk chunk, long handle, int normCapacity, boolean sameThreads) { if (chunk.unpooled) { + allocationsHuge.decrement(); destroyChunk(chunk); } else { if (sameThreads) { @@ -203,7 +242,15 @@ abstract class PoolArena { } } + boolean tinyOrSmall = isTinyOrSmall(normCapacity); synchronized (this) { + if (!tinyOrSmall) { + ++deallocationsNormal; + } else if (isTiny(normCapacity)) { + ++deallocationsTiny; + } else { + ++deallocationsSmall; + } chunk.parent.free(chunk, handle); } } @@ -306,12 +353,142 @@ abstract class PoolArena { } } + @Override + public int numTinySubpages() { + return tinySubpagePools.length; + } + + @Override + public int numSmallSubpages() { + return smallSubpagePools.length; + } + + @Override + public int numChunkLists() { + return chunkListMetrics.size(); + } + + @Override + public List tinySubpages() { + return subPageMetricList(tinySubpagePools); + } + + @Override + public List smallSubpages() { + return subPageMetricList(smallSubpagePools); + } + + @Override + public List chunkLists() { + return chunkListMetrics; + } + + private static List subPageMetricList(PoolSubpage[] pages) { + List metrics = new ArrayList(); + for (int i = 1; i < pages.length; i ++) { + PoolSubpage head = pages[i]; + if (head.next == head) { + continue; + } + PoolSubpage s = head.next; + for (;;) { + metrics.add(s); + s = s.next; + if (s == head) { + break; + } + } + } + return metrics; + } + + @Override + public long numAllocations() { + return allocationsTiny + allocationsSmall + allocationsNormal + allocationsHuge.value(); + } + + @Override + public long numTinyAllocations() { + return allocationsTiny; + } + + @Override + public long numSmallAllocations() { + return allocationsSmall; + } + + @Override + public long numNormalAllocations() { + return allocationsNormal; + } + + @Override + public long numDeallocations() { + return deallocationsTiny + deallocationsSmall + allocationsNormal + deallocationsHuge.value(); + } + + @Override + public long numTinyDeallocations() { + return deallocationsTiny; + } + + @Override + public long numSmallDeallocations() { + return deallocationsSmall; + } + + @Override + public long numNormalDeallocations() { + return deallocationsNormal; + } + + @Override + public long numHugeAllocations() { + return allocationsHuge.value(); + } + + @Override + public long numHugeDeallocations() { + return deallocationsHuge.value(); + } + + @Override + public long numActiveAllocations() { + long val = numAllocations() - numDeallocations(); + return val >= 0 ? val : 0; + } + + @Override + public long numActiveTinyAllocations() { + long val = numTinyAllocations() - numTinyDeallocations(); + return val >= 0 ? val : 0; + } + + @Override + public long numActiveSmallAllocations() { + long val = numSmallAllocations() - numSmallDeallocations(); + return val >= 0 ? val : 0; + } + + @Override + public long numActiveNormalAllocations() { + long val = numNormalAllocations() - numNormalDeallocations(); + return val >= 0 ? val : 0; + } + + @Override + public long numActiveHugeAllocations() { + long val = numHugeAllocations() - numHugeDeallocations(); + return val >= 0 ? val : 0; + } + protected abstract PoolChunk newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize); protected abstract PoolChunk newUnpooledChunk(int capacity); protected abstract PooledByteBuf newByteBuf(int maxCapacity); protected abstract void memoryCopy(T src, int srcOffset, T dst, int dstOffset, int length); protected abstract void destroyChunk(PoolChunk chunk); + @Override public synchronized String toString() { StringBuilder buf = new StringBuilder() .append("Chunk(s) at 0~25%:") diff --git a/buffer/src/main/java/io/netty/buffer/PoolArenaMetric.java b/buffer/src/main/java/io/netty/buffer/PoolArenaMetric.java new file mode 100644 index 0000000000..927d84a15c --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PoolArenaMetric.java @@ -0,0 +1,130 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.buffer; + +import java.util.List; + +/** + * Expose metrics for an arena. + */ +public interface PoolArenaMetric { + + /** + * Returns the number of tiny sub-pages for the arena. + */ + int numTinySubpages(); + + /** + * Returns the number of small sub-pages for the arena. + */ + int numSmallSubpages(); + + /** + * Returns the number of chunk lists for the arena. + */ + int numChunkLists(); + + /** + * Returns an unmodifiable {@link List} which holds {@link PoolSubpageMetric}s for tiny sub-pages. + */ + List tinySubpages(); + + /** + * Returns an unmodifiable {@link List} which holds {@link PoolSubpageMetric}s for small sub-pages. + */ + List smallSubpages(); + + /** + * Returns an unmodifiable {@link List} which holds {@link PoolChunkListMetric}s. + */ + List chunkLists(); + + /** + * Return the number of allocations done via the arena. This includes all sizes. + */ + long numAllocations(); + + /** + * Return the number of tiny allocations done via the arena. + */ + long numTinyAllocations(); + + /** + * Return the number of small allocations done via the arena. + */ + long numSmallAllocations(); + + /** + * Return the number of normal allocations done via the arena. + */ + long numNormalAllocations(); + + /** + * Return the number of huge allocations done via the arena. + */ + long numHugeAllocations(); + + /** + * Return the number of deallocations done via the arena. This includes all sizes. + */ + long numDeallocations(); + + /** + * Return the number of tiny deallocations done via the arena. + */ + long numTinyDeallocations(); + + /** + * Return the number of small deallocations done via the arena. + */ + long numSmallDeallocations(); + + /** + * Return the number of normal deallocations done via the arena. + */ + long numNormalDeallocations(); + + /** + * Return the number of huge deallocations done via the arena. + */ + long numHugeDeallocations(); + + /** + * Return the number of currently active allocations. + */ + long numActiveAllocations(); + + /** + * Return the number of currently active tiny allocations. + */ + long numActiveTinyAllocations(); + + /** + * Return the number of currently active small allocations. + */ + long numActiveSmallAllocations(); + + /** + * Return the number of currently active normal allocations. + */ + long numActiveNormalAllocations(); + + /** + * Return the number of currently active huge allocations. + */ + long numActiveHugeAllocations(); +} diff --git a/buffer/src/main/java/io/netty/buffer/PoolChunk.java b/buffer/src/main/java/io/netty/buffer/PoolChunk.java index 06713988bc..a5f0ee5e7d 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolChunk.java +++ b/buffer/src/main/java/io/netty/buffer/PoolChunk.java @@ -100,8 +100,7 @@ package io.netty.buffer; * where as per convention defined above * the second value (i.e, x) indicates that the first node which is free to be allocated is at depth x (from root) */ - -final class PoolChunk { +final class PoolChunk implements PoolChunkMetric { final PoolArena arena; final T memory; @@ -186,7 +185,8 @@ final class PoolChunk { return new PoolSubpage[size]; } - int usage() { + @Override + public int usage() { final int freeBytes = this.freeBytes; if (freeBytes == 0) { return 100; @@ -414,6 +414,16 @@ final class PoolChunk { return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset } + @Override + public int chunkSize() { + return chunkSize; + } + + @Override + public int freeBytes() { + return freeBytes; + } + @Override public String toString() { return new StringBuilder() diff --git a/buffer/src/main/java/io/netty/buffer/PoolChunkList.java b/buffer/src/main/java/io/netty/buffer/PoolChunkList.java index 781054efa8..c7518ff8d8 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolChunkList.java +++ b/buffer/src/main/java/io/netty/buffer/PoolChunkList.java @@ -18,7 +18,13 @@ package io.netty.buffer; import io.netty.util.internal.StringUtil; -final class PoolChunkList { +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +final class PoolChunkList implements PoolChunkListMetric { + private static final Iterator EMPTY_METRICS = Collections.emptyList().iterator(); private final PoolArena arena; private final PoolChunkList nextList; PoolChunkList prevList; @@ -108,6 +114,32 @@ final class PoolChunkList { } } + @Override + public int minUsage() { + return minUsage; + } + + @Override + public int maxUsage() { + return maxUsage; + } + + @Override + public Iterator iterator() { + if (head == null) { + return EMPTY_METRICS; + } + List metrics = new ArrayList(); + for (PoolChunk cur = head;;) { + metrics.add(cur); + cur = cur.next; + if (cur == null) { + break; + } + } + return metrics.iterator(); + } + @Override public String toString() { if (head == null) { diff --git a/buffer/src/main/java/io/netty/buffer/PoolChunkListMetric.java b/buffer/src/main/java/io/netty/buffer/PoolChunkListMetric.java new file mode 100644 index 0000000000..471f0de878 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PoolChunkListMetric.java @@ -0,0 +1,32 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +/** + * Metrics for a list of chunks. + */ +public interface PoolChunkListMetric extends Iterable { + + /** + * Return the minum usage of the chunk list before which chunks are promoted to the previous list. + */ + int minUsage(); + + /** + * Return the minum usage of the chunk list after which chunks are promoted to the next list. + */ + int maxUsage(); +} diff --git a/buffer/src/main/java/io/netty/buffer/PoolChunkMetric.java b/buffer/src/main/java/io/netty/buffer/PoolChunkMetric.java new file mode 100644 index 0000000000..b08ad06f0e --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PoolChunkMetric.java @@ -0,0 +1,37 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +/** + * Metrics for a chunk. + */ +public interface PoolChunkMetric { + + /** + * Return the percentage of the current usage of the chunk. + */ + int usage(); + + /** + * Return the size of the chunk in bytes, this is the maximum of bytes that can be served out of the chunk. + */ + int chunkSize(); + + /** + * Return the number of free bytes in the chunk. + */ + int freeBytes(); +} diff --git a/buffer/src/main/java/io/netty/buffer/PoolSubpage.java b/buffer/src/main/java/io/netty/buffer/PoolSubpage.java index 89cb680f6b..993900bc68 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolSubpage.java +++ b/buffer/src/main/java/io/netty/buffer/PoolSubpage.java @@ -16,7 +16,7 @@ package io.netty.buffer; -final class PoolSubpage { +final class PoolSubpage implements PoolSubpageMetric { final PoolChunk chunk; private final int memoryMapIdx; @@ -202,6 +202,7 @@ final class PoolSubpage { return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx; } + @Override public String toString() { if (!doNotDestroy) { return "(" + memoryMapIdx + ": not in use)"; @@ -210,4 +211,24 @@ final class PoolSubpage { return String.valueOf('(') + memoryMapIdx + ": " + (maxNumElems - numAvail) + '/' + maxNumElems + ", offset: " + runOffset + ", length: " + pageSize + ", elemSize: " + elemSize + ')'; } + + @Override + public int maxNumElements() { + return maxNumElems; + } + + @Override + public int numAvailable() { + return numAvail; + } + + @Override + public int elementSize() { + return elemSize; + } + + @Override + public int pageSize() { + return pageSize; + } } diff --git a/buffer/src/main/java/io/netty/buffer/PoolSubpageMetric.java b/buffer/src/main/java/io/netty/buffer/PoolSubpageMetric.java new file mode 100644 index 0000000000..d674767462 --- /dev/null +++ b/buffer/src/main/java/io/netty/buffer/PoolSubpageMetric.java @@ -0,0 +1,43 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.buffer; + +/** + * Metrics for a sub-page. + */ +public interface PoolSubpageMetric { + + /** + * Return the number of maximal elements that can be allocated out of the sub-page. + */ + int maxNumElements(); + + /** + * Return the number of available elements to be allocated. + */ + int numAvailable(); + + /** + * Return the size (in bytes) of the elements that will be allocated. + */ + int elementSize(); + + /** + * Return the size (in bytes) of this page. + */ + int pageSize(); +} + diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java index 1e84aace45..7fa86d76ed 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java @@ -23,6 +23,9 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class PooledByteBufAllocator extends AbstractByteBufAllocator { @@ -125,6 +128,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { private final int smallCacheSize; private final int normalCacheSize; + private final List heapArenaMetrics; + private final List directArenaMetrics; + final PoolThreadLocalCache threadCache; public PooledByteBufAllocator() { @@ -164,20 +170,31 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { if (nHeapArena > 0) { heapArenas = newArenaArray(nHeapArena); + List metrics = new ArrayList(heapArenas.length); for (int i = 0; i < heapArenas.length; i ++) { - heapArenas[i] = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize); + PoolArena.HeapArena arena = new PoolArena.HeapArena(this, pageSize, maxOrder, pageShifts, chunkSize); + heapArenas[i] = arena; + metrics.add(arena); } + heapArenaMetrics = Collections.unmodifiableList(metrics); } else { heapArenas = null; + heapArenaMetrics = Collections.emptyList(); } if (nDirectArena > 0) { directArenas = newArenaArray(nDirectArena); + List metrics = new ArrayList(directArenas.length); for (int i = 0; i < directArenas.length; i ++) { - directArenas[i] = new PoolArena.DirectArena(this, pageSize, maxOrder, pageShifts, chunkSize); + PoolArena.DirectArena arena = new PoolArena.DirectArena( + this, pageSize, maxOrder, pageShifts, chunkSize); + directArenas[i] = arena; + metrics.add(arena); } + directArenaMetrics = Collections.unmodifiableList(metrics); } else { directArenas = null; + directArenaMetrics = Collections.emptyList(); } } @@ -283,9 +300,11 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { final class PoolThreadLocalCache extends FastThreadLocal { private final AtomicInteger index = new AtomicInteger(); + final AtomicInteger caches = new AtomicInteger(); @Override protected PoolThreadCache initialValue() { + caches.incrementAndGet(); final int idx = index.getAndIncrement(); final PoolArena heapArena; final PoolArena directArena; @@ -301,7 +320,6 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { } else { directArena = null; } - return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); @@ -310,25 +328,91 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator { @Override protected void onRemoval(PoolThreadCache value) { value.free(); + caches.decrementAndGet(); } } -// Too noisy at the moment. -// -// public String toString() { -// StringBuilder buf = new StringBuilder(); -// buf.append(heapArenas.length); -// buf.append(" heap arena(s):"); -// buf.append(StringUtil.NEWLINE); -// for (PoolArena a: heapArenas) { -// buf.append(a); -// } -// buf.append(directArenas.length); -// buf.append(" direct arena(s):"); -// buf.append(StringUtil.NEWLINE); -// for (PoolArena a: directArenas) { -// buf.append(a); -// } -// return buf.toString(); -// } + /** + * Return the number of heap arenas. + */ + public int numHeapArenas() { + return heapArenaMetrics.size(); + } + + /** + * Return the number of direct arenas. + */ + public int numDirectArenas() { + return directArenaMetrics.size(); + } + + /** + * Return a {@link List} of all heap {@link PoolArenaMetric}s that are provided by this pool. + */ + public List heapArenas() { + return heapArenaMetrics; + } + + /** + * Return a {@link List} of all direct {@link PoolArenaMetric}s that are provided by this pool. + */ + public List directArenas() { + return directArenaMetrics; + } + + /** + * Return the number of thread local caches used by this {@link PooledByteBufAllocator}. + */ + public int numThreadLocalCaches() { + return threadCache.caches.get(); + } + + /** + * Return the size of the tiny cache. + */ + public int tinyCacheSize() { + return tinyCacheSize; + } + + /** + * Return the size of the small cache. + */ + public int smallCacheSize() { + return smallCacheSize; + } + + /** + * Return the size of the normal cache. + */ + public int normalCacheSize() { + return normalCacheSize; + } + + // Too noisy at the moment. + // + //public String toString() { + // StringBuilder buf = new StringBuilder(); + // int heapArenasLen = heapArenas == null ? 0 : heapArenas.length; + // buf.append(heapArenasLen); + // buf.append(" heap arena(s):"); + // buf.append(StringUtil.NEWLINE); + // if (heapArenasLen > 0) { + // for (PoolArena a: heapArenas) { + // buf.append(a); + // } + // } + // + // int directArenasLen = directArenas == null ? 0 : directArenas.length; + // + // buf.append(directArenasLen); + // buf.append(" direct arena(s):"); + // buf.append(StringUtil.NEWLINE); + // if (directArenasLen > 0) { + // for (PoolArena a: directArenas) { + // buf.append(a); + // } + // } + // + // return buf.toString(); + //} } diff --git a/common/src/main/java/io/netty/util/internal/LongCounter.java b/common/src/main/java/io/netty/util/internal/LongCounter.java new file mode 100644 index 0000000000..b56938cb36 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/LongCounter.java @@ -0,0 +1,26 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +/** + * Counter for long. + */ +public interface LongCounter { + void add(long delta); + void increment(); + void decrement(); + long value(); +} diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index 335446f7fa..4112b01895 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -17,6 +17,7 @@ package io.netty.util.internal; import io.netty.util.CharsetUtil; import io.netty.util.internal.chmv8.ConcurrentHashMapV8; +import io.netty.util.internal.chmv8.LongAdderV8; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -40,6 +41,7 @@ import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.regex.Matcher; @@ -228,6 +230,17 @@ public final class PlatformDependent { } } + /** + * Creates a new fastest {@link LongCounter} implementaion for the current platform. + */ + public static LongCounter newLongCounter() { + if (HAS_UNSAFE) { + return new LongAdderV8(); + } else { + return new AtomicLongCounter(); + } + } + /** * Creates a new fastest {@link ConcurrentMap} implementaion for the current platform. */ @@ -861,6 +874,28 @@ public final class PlatformDependent { return PlatformDependent0.addressSize(); } + private static final class AtomicLongCounter extends AtomicLong implements LongCounter { + @Override + public void add(long delta) { + addAndGet(delta); + } + + @Override + public void increment() { + incrementAndGet(); + } + + @Override + public void decrement() { + decrementAndGet(); + } + + @Override + public long value() { + return get(); + } + } + private PlatformDependent() { // only static method supported } diff --git a/common/src/main/java/io/netty/util/internal/chmv8/LongAdderV8.java b/common/src/main/java/io/netty/util/internal/chmv8/LongAdderV8.java new file mode 100644 index 0000000000..00f29c9a8d --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/chmv8/LongAdderV8.java @@ -0,0 +1,225 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ +package io.netty.util.internal.chmv8; + + +import io.netty.util.internal.LongCounter; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + +/** + * One or more variables that together maintain an initially zero + * {@code long} sum. When updates (method {@link #add}) are contended + * across threads, the set of variables may grow dynamically to reduce + * contention. Method {@link #sum} (or, equivalently, {@link + * #longValue}) returns the current total combined across the + * variables maintaining the sum. + * + *

This class is usually preferable to {@link AtomicLong} when + * multiple threads update a common sum that is used for purposes such + * as collecting statistics, not for fine-grained synchronization + * control. Under low update contention, the two classes have similar + * characteristics. But under high contention, expected throughput of + * this class is significantly higher, at the expense of higher space + * consumption. + * + *

This class extends {@link Number}, but does not define + * methods such as {@code equals}, {@code hashCode} and {@code + * compareTo} because instances are expected to be mutated, and so are + * not useful as collection keys. + * + *

jsr166e note: This class is targeted to be placed in + * java.util.concurrent.atomic. + * + * @since 1.8 + * @author Doug Lea + */ +@SuppressWarnings("all") +public class LongAdderV8 extends Striped64 implements Serializable, LongCounter { + private static final long serialVersionUID = 7249069246863182397L; + + /** + * Version of plus for use in retryUpdate + */ + final long fn(long v, long x) { return v + x; } + + /** + * Creates a new adder with initial sum of zero. + */ + public LongAdderV8() { + } + + /** + * Adds the given value. + * + * @param x the value to add + */ + public void add(long x) { + Cell[] as; long b, v; int[] hc; Cell a; int n; + if ((as = cells) != null || !casBase(b = base, b + x)) { + boolean uncontended = true; + if ((hc = threadHashCode.get()) == null || + as == null || (n = as.length) < 1 || + (a = as[(n - 1) & hc[0]]) == null || + !(uncontended = a.cas(v = a.value, v + x))) + retryUpdate(x, hc, uncontended); + } + } + + /** + * Equivalent to {@code add(1)}. + */ + public void increment() { + add(1L); + } + + /** + * Equivalent to {@code add(-1)}. + */ + public void decrement() { + add(-1L); + } + + /** + * Returns the current sum. The returned value is NOT an + * atomic snapshot; invocation in the absence of concurrent + * updates returns an accurate result, but concurrent updates that + * occur while the sum is being calculated might not be + * incorporated. + * + * @return the sum + */ + public long sum() { + long sum = base; + Cell[] as = cells; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) + sum += a.value; + } + } + return sum; + } + + /** + * Resets variables maintaining the sum to zero. This method may + * be a useful alternative to creating a new adder, but is only + * effective if there are no concurrent updates. Because this + * method is intrinsically racy, it should only be used when it is + * known that no threads are concurrently updating. + */ + public void reset() { + internalReset(0L); + } + + /** + * Equivalent in effect to {@link #sum} followed by {@link + * #reset}. This method may apply for example during quiescent + * points between multithreaded computations. If there are + * updates concurrent with this method, the returned value is + * not guaranteed to be the final value occurring before + * the reset. + * + * @return the sum + */ + public long sumThenReset() { + long sum = base; + Cell[] as = cells; + base = 0L; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) { + sum += a.value; + a.value = 0L; + } + } + } + return sum; + } + + /** + * Returns the String representation of the {@link #sum}. + * @return the String representation of the {@link #sum} + */ + public String toString() { + return Long.toString(sum()); + } + + /** + * Equivalent to {@link #sum}. + * + * @return the sum + */ + public long longValue() { + return sum(); + } + + /** + * Returns the {@link #sum} as an {@code int} after a narrowing + * primitive conversion. + */ + public int intValue() { + return (int)sum(); + } + + /** + * Returns the {@link #sum} as a {@code float} + * after a widening primitive conversion. + */ + public float floatValue() { + return (float)sum(); + } + + /** + * Returns the {@link #sum} as a {@code double} after a widening + * primitive conversion. + */ + public double doubleValue() { + return (double)sum(); + } + + private void writeObject(ObjectOutputStream s) throws IOException { + s.defaultWriteObject(); + s.writeLong(sum()); + } + + private void readObject(ObjectInputStream s) + throws IOException, ClassNotFoundException { + s.defaultReadObject(); + busy = 0; + cells = null; + base = s.readLong(); + } + + @Override + public long value() { + return sum(); + } +} \ No newline at end of file diff --git a/common/src/main/java/io/netty/util/internal/chmv8/Striped64.java b/common/src/main/java/io/netty/util/internal/chmv8/Striped64.java new file mode 100644 index 0000000000..5c00610e36 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/chmv8/Striped64.java @@ -0,0 +1,351 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + */ +package io.netty.util.internal.chmv8; + + +import java.util.Random; + +/** + * A package-local class holding common representation and mechanics + * for classes supporting dynamic striping on 64bit values. The class + * extends Number so that concrete subclasses must publicly do so. + */ +@SuppressWarnings("all") +abstract class Striped64 extends Number { + /* + * This class maintains a lazily-initialized table of atomically + * updated variables, plus an extra "base" field. The table size + * is a power of two. Indexing uses masked per-thread hash codes. + * Nearly all declarations in this class are package-private, + * accessed directly by subclasses. + * + * Table entries are of class Cell; a variant of AtomicLong padded + * to reduce cache contention on most processors. Padding is + * overkill for most Atomics because they are usually irregularly + * scattered in memory and thus don't interfere much with each + * other. But Atomic objects residing in arrays will tend to be + * placed adjacent to each other, and so will most often share + * cache lines (with a huge negative performance impact) without + * this precaution. + * + * In part because Cells are relatively large, we avoid creating + * them until they are needed. When there is no contention, all + * updates are made to the base field. Upon first contention (a + * failed CAS on base update), the table is initialized to size 2. + * The table size is doubled upon further contention until + * reaching the nearest power of two greater than or equal to the + * number of CPUS. Table slots remain empty (null) until they are + * needed. + * + * A single spinlock ("busy") is used for initializing and + * resizing the table, as well as populating slots with new Cells. + * There is no need for a blocking lock; when the lock is not + * available, threads try other slots (or the base). During these + * retries, there is increased contention and reduced locality, + * which is still better than alternatives. + * + * Per-thread hash codes are initialized to random values. + * Contention and/or table collisions are indicated by failed + * CASes when performing an update operation (see method + * retryUpdate). Upon a collision, if the table size is less than + * the capacity, it is doubled in size unless some other thread + * holds the lock. If a hashed slot is empty, and lock is + * available, a new Cell is created. Otherwise, if the slot + * exists, a CAS is tried. Retries proceed by "double hashing", + * using a secondary hash (Marsaglia XorShift) to try to find a + * free slot. + * + * The table size is capped because, when there are more threads + * than CPUs, supposing that each thread were bound to a CPU, + * there would exist a perfect hash function mapping threads to + * slots that eliminates collisions. When we reach capacity, we + * search for this mapping by randomly varying the hash codes of + * colliding threads. Because search is random, and collisions + * only become known via CAS failures, convergence can be slow, + * and because threads are typically not bound to CPUS forever, + * may not occur at all. However, despite these limitations, + * observed contention rates are typically low in these cases. + * + * It is possible for a Cell to become unused when threads that + * once hashed to it terminate, as well as in the case where + * doubling the table causes no thread to hash to it under + * expanded mask. We do not try to detect or remove such cells, + * under the assumption that for long-running instances, observed + * contention levels will recur, so the cells will eventually be + * needed again; and for short-lived ones, it does not matter. + */ + + /** + * Padded variant of AtomicLong supporting only raw accesses plus CAS. + * The value field is placed between pads, hoping that the JVM doesn't + * reorder them. + * + * JVM intrinsics note: It would be possible to use a release-only + * form of CAS here, if it were provided. + */ + static final class Cell { + volatile long p0, p1, p2, p3, p4, p5, p6; + volatile long value; + volatile long q0, q1, q2, q3, q4, q5, q6; + Cell(long x) { value = x; } + + final boolean cas(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long valueOffset; + static { + try { + UNSAFE = getUnsafe(); + Class ak = Cell.class; + valueOffset = UNSAFE.objectFieldOffset + (ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } + + } + + /** + * ThreadLocal holding a single-slot int array holding hash code. + * Unlike the JDK8 version of this class, we use a suboptimal + * int[] representation to avoid introducing a new type that can + * impede class-unloading when ThreadLocals are not removed. + */ + static final ThreadLocal threadHashCode = new ThreadLocal(); + + /** + * Generator of new random hash codes + */ + static final Random rng = new Random(); + + /** Number of CPUS, to place bound on table size */ + static final int NCPU = Runtime.getRuntime().availableProcessors(); + + /** + * Table of cells. When non-null, size is a power of 2. + */ + transient volatile Cell[] cells; + + /** + * Base value, used mainly when there is no contention, but also as + * a fallback during table initialization races. Updated via CAS. + */ + transient volatile long base; + + /** + * Spinlock (locked via CAS) used when resizing and/or creating Cells. + */ + transient volatile int busy; + + /** + * Package-private default constructor + */ + Striped64() { + } + + /** + * CASes the base field. + */ + final boolean casBase(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val); + } + + /** + * CASes the busy field from 0 to 1 to acquire lock. + */ + final boolean casBusy() { + return UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1); + } + + /** + * Computes the function of current and new value. Subclasses + * should open-code this update function for most uses, but the + * virtualized form is needed within retryUpdate. + * + * @param currentValue the current value (of either base or a cell) + * @param newValue the argument from a user update call + * @return result of the update function + */ + abstract long fn(long currentValue, long newValue); + + /** + * Handles cases of updates involving initialization, resizing, + * creating new Cells, and/or contention. See above for + * explanation. This method suffers the usual non-modularity + * problems of optimistic retry code, relying on rechecked sets of + * reads. + * + * @param x the value + * @param hc the hash code holder + * @param wasUncontended false if CAS failed before call + */ + final void retryUpdate(long x, int[] hc, boolean wasUncontended) { + int h; + if (hc == null) { + threadHashCode.set(hc = new int[1]); // Initialize randomly + int r = rng.nextInt(); // Avoid zero to allow xorShift rehash + h = hc[0] = (r == 0) ? 1 : r; + } + else + h = hc[0]; + boolean collide = false; // True if last slot nonempty + for (;;) { + Cell[] as; Cell a; int n; long v; + if ((as = cells) != null && (n = as.length) > 0) { + if ((a = as[(n - 1) & h]) == null) { + if (busy == 0) { // Try to attach new Cell + Cell r = new Cell(x); // Optimistically create + if (busy == 0 && casBusy()) { + boolean created = false; + try { // Recheck under lock + Cell[] rs; int m, j; + if ((rs = cells) != null && + (m = rs.length) > 0 && + rs[j = (m - 1) & h] == null) { + rs[j] = r; + created = true; + } + } finally { + busy = 0; + } + if (created) + break; + continue; // Slot is now non-empty + } + } + collide = false; + } + else if (!wasUncontended) // CAS already known to fail + wasUncontended = true; // Continue after rehash + else if (a.cas(v = a.value, fn(v, x))) + break; + else if (n >= NCPU || cells != as) + collide = false; // At max size or stale + else if (!collide) + collide = true; + else if (busy == 0 && casBusy()) { + try { + if (cells == as) { // Expand table unless stale + Cell[] rs = new Cell[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + cells = rs; + } + } finally { + busy = 0; + } + collide = false; + continue; // Retry with expanded table + } + h ^= h << 13; // Rehash + h ^= h >>> 17; + h ^= h << 5; + hc[0] = h; // Record index for next time + } + else if (busy == 0 && cells == as && casBusy()) { + boolean init = false; + try { // Initialize table + if (cells == as) { + Cell[] rs = new Cell[2]; + rs[h & 1] = new Cell(x); + cells = rs; + init = true; + } + } finally { + busy = 0; + } + if (init) + break; + } + else if (casBase(v = base, fn(v, x))) + break; // Fall back on using base + } + } + + + /** + * Sets base and all cells to the given value. + */ + final void internalReset(long initialValue) { + Cell[] as = cells; + base = initialValue; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) + a.value = initialValue; + } + } + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long baseOffset; + private static final long busyOffset; + static { + try { + UNSAFE = getUnsafe(); + Class sk = Striped64.class; + baseOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("base")); + busyOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("busy")); + } catch (Exception e) { + throw new Error(e); + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. + * Replace with a simple call to Unsafe.getUnsafe when integrating + * into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException tryReflectionInstead) {} + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + Class k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) + return k.cast(x); + } + throw new NoSuchFieldError("the Unsafe"); + }}); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } +} \ No newline at end of file