[#3654] No need to hold lock while destroy a chunk

Motiviation:

At the moment we sometimes hold the lock on the PoolArena during destroy a PoolChunk. This is not needed.

Modification:

- Ensure we not hold the lock during destroy a PoolChunk
- Move all synchronized usage in PoolArena
- Cleanup

Result:

Less condition.
This commit is contained in:
Norman Maurer 2015-05-25 21:00:24 +02:00
parent d5f1dc66aa
commit dce0dd9b78
4 changed files with 105 additions and 61 deletions

View File

@ -27,6 +27,12 @@ import java.util.List;
abstract class PoolArena<T> implements PoolArenaMetric { abstract class PoolArena<T> implements PoolArenaMetric {
enum SizeClass {
Tiny,
Small,
Normal
}
static final int numTinySubpagePools = 512 >>> 4; static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent; final PooledByteBufAllocator parent;
@ -83,19 +89,19 @@ abstract class PoolArena<T> implements PoolArenaMetric {
smallSubpagePools[i] = newSubpagePoolHead(pageSize); smallSubpagePools[i] = newSubpagePoolHead(pageSize);
} }
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE); q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE);
q075 = new PoolChunkList<T>(this, q100, 75, 100); q075 = new PoolChunkList<T>(q100, 75, 100);
q050 = new PoolChunkList<T>(this, q075, 50, 100); q050 = new PoolChunkList<T>(q075, 50, 100);
q025 = new PoolChunkList<T>(this, q050, 25, 75); q025 = new PoolChunkList<T>(q050, 25, 75);
q000 = new PoolChunkList<T>(this, q025, 1, 50); q000 = new PoolChunkList<T>(q025, 1, 50);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25); qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25);
q100.prevList = q075; q100.prevList(q075);
q075.prevList = q050; q075.prevList(q050);
q050.prevList = q025; q050.prevList(q025);
q025.prevList = q000; q025.prevList(q000);
q000.prevList = null; q000.prevList(null);
qInit.prevList = qInit; qInit.prevList(qInit);
List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6); List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
metrics.add(qInit); metrics.add(qInit);
@ -234,25 +240,46 @@ abstract class PoolArena<T> implements PoolArenaMetric {
allocationsHuge.decrement(); allocationsHuge.decrement();
destroyChunk(chunk); destroyChunk(chunk);
} else { } else {
SizeClass sizeClass = sizeClass(normCapacity);
if (sameThreads) { if (sameThreads) {
PoolThreadCache cache = parent.threadCache.get(); PoolThreadCache cache = parent.threadCache();
if (cache.add(this, chunk, handle, normCapacity)) { if (cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it. // cached so not free it.
return; return;
} }
} }
freeChunk(chunk, handle, sizeClass);
}
}
boolean tinyOrSmall = isTinyOrSmall(normCapacity); private SizeClass sizeClass(int normCapacity) {
if (!isTinyOrSmall(normCapacity)) {
return SizeClass.Normal;
}
return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
}
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
final boolean destroyChunk;
synchronized (this) { synchronized (this) {
if (!tinyOrSmall) { switch (sizeClass) {
case Normal:
++deallocationsNormal; ++deallocationsNormal;
} else if (isTiny(normCapacity)) { break;
++deallocationsTiny; case Small:
} else {
++deallocationsSmall; ++deallocationsSmall;
break;
case Tiny:
++deallocationsTiny;
break;
default:
throw new Error();
} }
chunk.parent.free(chunk, handle); destroyChunk = !chunk.parent.free(chunk, handle);
} }
if (destroyChunk) {
// destroyChunk not need to be called while holding the synchronized lock.
destroyChunk(chunk);
} }
} }
@ -328,7 +355,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
int readerIndex = buf.readerIndex(); int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex(); int writerIndex = buf.writerIndex();
allocate(parent.threadCache.get(), buf, newCapacity); allocate(parent.threadCache(), buf, newCapacity);
if (newCapacity > oldCapacity) { if (newCapacity > oldCapacity) {
memoryCopy( memoryCopy(
oldMemory, oldOffset, oldMemory, oldOffset,

View File

@ -25,25 +25,29 @@ import java.util.List;
final class PoolChunkList<T> implements PoolChunkListMetric { final class PoolChunkList<T> implements PoolChunkListMetric {
private static final Iterator<PoolChunkMetric> EMPTY_METRICS = Collections.<PoolChunkMetric>emptyList().iterator(); private static final Iterator<PoolChunkMetric> EMPTY_METRICS = Collections.<PoolChunkMetric>emptyList().iterator();
private final PoolArena<T> arena;
private final PoolChunkList<T> nextList; private final PoolChunkList<T> nextList;
PoolChunkList<T> prevList;
private final int minUsage; private final int minUsage;
private final int maxUsage; private final int maxUsage;
private PoolChunk<T> head; private PoolChunk<T> head;
// This is only update once when create the linked like list of PoolChunkList in PoolArena constructor.
private PoolChunkList<T> prevList;
// TODO: Test if adding padding helps under contention // TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7; //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolChunkList(PoolArena<T> arena, PoolChunkList<T> nextList, int minUsage, int maxUsage) { PoolChunkList(PoolChunkList<T> nextList, int minUsage, int maxUsage) {
this.arena = arena;
this.nextList = nextList; this.nextList = nextList;
this.minUsage = minUsage; this.minUsage = minUsage;
this.maxUsage = maxUsage; this.maxUsage = maxUsage;
} }
void prevList(PoolChunkList<T> prevList) {
assert this.prevList == null;
this.prevList = prevList;
}
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (head == null) { if (head == null) {
return false; return false;
@ -67,17 +71,19 @@ final class PoolChunkList<T> implements PoolChunkListMetric {
} }
} }
void free(PoolChunk<T> chunk, long handle) { boolean free(PoolChunk<T> chunk, long handle) {
chunk.free(handle); chunk.free(handle);
if (chunk.usage() < minUsage) { if (chunk.usage() < minUsage) {
remove(chunk); remove(chunk);
if (prevList == null) { if (prevList == null) {
assert chunk.usage() == 0; assert chunk.usage() == 0;
arena.destroyChunk(chunk); return false;
} else { } else {
prevList.add(chunk); prevList.add(chunk);
return true;
} }
} }
return true;
} }
void add(PoolChunk<T> chunk) { void add(PoolChunk<T> chunk) {

View File

@ -17,6 +17,7 @@
package io.netty.buffer; package io.netty.buffer;
import io.netty.buffer.PoolArena.SizeClass;
import io.netty.util.ThreadDeathWatcher; import io.netty.util.ThreadDeathWatcher;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -77,8 +78,10 @@ final class PoolThreadCache {
this.heapArena = heapArena; this.heapArena = heapArena;
this.directArena = directArena; this.directArena = directArena;
if (directArena != null) { if (directArena != null) {
tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); tinySubPageDirectCaches = createSubPageCaches(
smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools); tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize); numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches( normalDirectCaches = createNormalCaches(
@ -92,8 +95,10 @@ final class PoolThreadCache {
} }
if (heapArena != null) { if (heapArena != null) {
// Create the caches for the heap allocations // Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools); tinySubPageHeapCaches = createSubPageCaches(
smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools); tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize); numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches( normalHeapCaches = createNormalCaches(
@ -111,13 +116,14 @@ final class PoolThreadCache {
ThreadDeathWatcher.watch(thread, freeTask); ThreadDeathWatcher.watch(thread, freeTask);
} }
private static <T> SubPageMemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches) { private static <T> SubPageMemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0) { if (cacheSize > 0) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
SubPageMemoryRegionCache<T>[] cache = new SubPageMemoryRegionCache[numCaches]; SubPageMemoryRegionCache<T>[] cache = new SubPageMemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) { for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length // TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize); cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
} }
return cache; return cache;
} else { } else {
@ -191,23 +197,27 @@ final class PoolThreadCache {
* Returns {@code true} if it fit into the cache {@code false} otherwise. * Returns {@code true} if it fit into the cache {@code false} otherwise.
*/ */
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity) { boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
MemoryRegionCache<?> cache; MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
if (area.isTinyOrSmall(normCapacity)) {
if (PoolArena.isTiny(normCapacity)) {
cache = cacheForTiny(area, normCapacity);
} else {
cache = cacheForSmall(area, normCapacity);
}
} else {
cache = cacheForNormal(area, normCapacity);
}
if (cache == null) { if (cache == null) {
return false; return false;
} }
return cache.add(chunk, handle); return cache.add(chunk, handle);
} }
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
return cacheForNormal(area, normCapacity);
case Small:
return cacheForSmall(area, normCapacity);
case Tiny:
return cacheForTiny(area, normCapacity);
default:
throw new Error();
}
}
/** /**
* Should be called if the Thread that uses this cache is about to exist to release resources out of the cache * Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
*/ */
@ -309,8 +319,8 @@ final class PoolThreadCache {
* Cache used for buffers which are backed by TINY or SMALL size. * Cache used for buffers which are backed by TINY or SMALL size.
*/ */
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> { private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size) { SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size); super(size, sizeClass);
} }
@Override @Override
@ -325,7 +335,7 @@ final class PoolThreadCache {
*/ */
private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> { private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
NormalMemoryRegionCache(int size) { NormalMemoryRegionCache(int size) {
super(size); super(size, SizeClass.Normal);
} }
@Override @Override
@ -343,6 +353,7 @@ final class PoolThreadCache {
*/ */
private abstract static class MemoryRegionCache<T> { private abstract static class MemoryRegionCache<T> {
private final Entry<T>[] entries; private final Entry<T>[] entries;
private final SizeClass sizeClass;
private final int maxUnusedCached; private final int maxUnusedCached;
private int head; private int head;
private int tail; private int tail;
@ -350,12 +361,13 @@ final class PoolThreadCache {
private int entriesInUse; private int entriesInUse;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
MemoryRegionCache(int size) { MemoryRegionCache(int size, SizeClass sizeClass) {
entries = new Entry[powerOfTwo(size)]; entries = new Entry[powerOfTwo(size)];
for (int i = 0; i < entries.length; i++) { for (int i = 0; i < entries.length; i++) {
entries[i] = new Entry<T>(); entries[i] = new Entry<T>();
} }
maxUnusedCached = size / 2; maxUnusedCached = size / 2;
this.sizeClass = sizeClass;
} }
private static int powerOfTwo(int res) { private static int powerOfTwo(int res) {
@ -460,15 +472,12 @@ final class PoolThreadCache {
} }
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
private static boolean freeEntry(Entry entry) { private boolean freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk; PoolChunk chunk = entry.chunk;
if (chunk == null) { if (chunk == null) {
return false; return false;
} }
// need to synchronize on the area from which it was allocated before. chunk.arena.freeChunk(chunk, entry.handle, sizeClass);
synchronized (chunk.arena) {
chunk.parent.free(chunk, entry.handle);
}
entry.chunk = null; entry.chunk = null;
return true; return true;
} }

View File

@ -127,11 +127,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private final int tinyCacheSize; private final int tinyCacheSize;
private final int smallCacheSize; private final int smallCacheSize;
private final int normalCacheSize; private final int normalCacheSize;
private final List<PoolArenaMetric> heapArenaMetrics; private final List<PoolArenaMetric> heapArenaMetrics;
private final List<PoolArenaMetric> directArenaMetrics; private final List<PoolArenaMetric> directArenaMetrics;
private final PoolThreadLocalCache threadCache;
final PoolThreadLocalCache threadCache;
public PooledByteBufAllocator() { public PooledByteBufAllocator() {
this(false); this(false);
@ -379,6 +377,10 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
return normalCacheSize; return normalCacheSize;
} }
final PoolThreadCache threadCache() {
return threadCache.get();
}
// Too noisy at the moment. // Too noisy at the moment.
// //
//public String toString() { //public String toString() {