[#3654] No need to hold lock while destroy a chunk

Motiviation:

At the moment we sometimes hold the lock on the PoolArena during destroy a PoolChunk. This is not needed.

Modification:

- Ensure we not hold the lock during destroy a PoolChunk
- Move all synchronized usage in PoolArena
- Cleanup

Result:

Less condition.
This commit is contained in:
Norman Maurer 2015-05-25 21:00:24 +02:00
parent 0ce7c3b2a0
commit b8fee05f70
4 changed files with 105 additions and 61 deletions

View File

@ -27,6 +27,12 @@ import java.util.List;
abstract class PoolArena<T> implements PoolArenaMetric {
enum SizeClass {
Tiny,
Small,
Normal
}
static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent;
@ -83,19 +89,19 @@ abstract class PoolArena<T> implements PoolArenaMetric {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE);
q075 = new PoolChunkList<T>(this, q100, 75, 100);
q050 = new PoolChunkList<T>(this, q075, 50, 100);
q025 = new PoolChunkList<T>(this, q050, 25, 75);
q000 = new PoolChunkList<T>(this, q025, 1, 50);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25);
q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE);
q075 = new PoolChunkList<T>(q100, 75, 100);
q050 = new PoolChunkList<T>(q075, 50, 100);
q025 = new PoolChunkList<T>(q050, 25, 75);
q000 = new PoolChunkList<T>(q025, 1, 50);
qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25);
q100.prevList = q075;
q075.prevList = q050;
q050.prevList = q025;
q025.prevList = q000;
q000.prevList = null;
qInit.prevList = qInit;
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
metrics.add(qInit);
@ -234,25 +240,46 @@ abstract class PoolArena<T> implements PoolArenaMetric {
allocationsHuge.decrement();
destroyChunk(chunk);
} else {
SizeClass sizeClass = sizeClass(normCapacity);
if (sameThreads) {
PoolThreadCache cache = parent.threadCache.get();
if (cache.add(this, chunk, handle, normCapacity)) {
PoolThreadCache cache = parent.threadCache();
if (cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
}
freeChunk(chunk, handle, sizeClass);
}
}
boolean tinyOrSmall = isTinyOrSmall(normCapacity);
private SizeClass sizeClass(int normCapacity) {
if (!isTinyOrSmall(normCapacity)) {
return SizeClass.Normal;
}
return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
}
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
final boolean destroyChunk;
synchronized (this) {
if (!tinyOrSmall) {
switch (sizeClass) {
case Normal:
++deallocationsNormal;
} else if (isTiny(normCapacity)) {
++deallocationsTiny;
} else {
break;
case Small:
++deallocationsSmall;
break;
case Tiny:
++deallocationsTiny;
break;
default:
throw new Error();
}
chunk.parent.free(chunk, handle);
destroyChunk = !chunk.parent.free(chunk, handle);
}
if (destroyChunk) {
// destroyChunk not need to be called while holding the synchronized lock.
destroyChunk(chunk);
}
}
@ -328,7 +355,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
allocate(parent.threadCache.get(), buf, newCapacity);
allocate(parent.threadCache(), buf, newCapacity);
if (newCapacity > oldCapacity) {
memoryCopy(
oldMemory, oldOffset,

View File

@ -25,25 +25,29 @@ import java.util.List;
final class PoolChunkList<T> implements PoolChunkListMetric {
private static final Iterator<PoolChunkMetric> EMPTY_METRICS = Collections.<PoolChunkMetric>emptyList().iterator();
private final PoolArena<T> arena;
private final PoolChunkList<T> nextList;
PoolChunkList<T> prevList;
private final int minUsage;
private final int maxUsage;
private PoolChunk<T> head;
// This is only update once when create the linked like list of PoolChunkList in PoolArena constructor.
private PoolChunkList<T> prevList;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolChunkList(PoolArena<T> arena, PoolChunkList<T> nextList, int minUsage, int maxUsage) {
this.arena = arena;
PoolChunkList(PoolChunkList<T> nextList, int minUsage, int maxUsage) {
this.nextList = nextList;
this.minUsage = minUsage;
this.maxUsage = maxUsage;
}
void prevList(PoolChunkList<T> prevList) {
assert this.prevList == null;
this.prevList = prevList;
}
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (head == null) {
return false;
@ -67,17 +71,19 @@ final class PoolChunkList<T> implements PoolChunkListMetric {
}
}
void free(PoolChunk<T> chunk, long handle) {
boolean free(PoolChunk<T> chunk, long handle) {
chunk.free(handle);
if (chunk.usage() < minUsage) {
remove(chunk);
if (prevList == null) {
assert chunk.usage() == 0;
arena.destroyChunk(chunk);
return false;
} else {
prevList.add(chunk);
return true;
}
}
return true;
}
void add(PoolChunk<T> chunk) {

View File

@ -17,6 +17,7 @@
package io.netty.buffer;
import io.netty.buffer.PoolArena.SizeClass;
import io.netty.util.ThreadDeathWatcher;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -77,8 +78,10 @@ final class PoolThreadCache {
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools);
smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools);
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
@ -92,8 +95,10 @@ final class PoolThreadCache {
}
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools);
smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools);
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
@ -111,13 +116,14 @@ final class PoolThreadCache {
ThreadDeathWatcher.watch(thread, freeTask);
}
private static <T> SubPageMemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches) {
private static <T> SubPageMemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0) {
@SuppressWarnings("unchecked")
SubPageMemoryRegionCache<T>[] cache = new SubPageMemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
@ -191,23 +197,27 @@ final class PoolThreadCache {
* Returns {@code true} if it fit into the cache {@code false} otherwise.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity) {
MemoryRegionCache<?> cache;
if (area.isTinyOrSmall(normCapacity)) {
if (PoolArena.isTiny(normCapacity)) {
cache = cacheForTiny(area, normCapacity);
} else {
cache = cacheForSmall(area, normCapacity);
}
} else {
cache = cacheForNormal(area, normCapacity);
}
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
if (cache == null) {
return false;
}
return cache.add(chunk, handle);
}
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
switch (sizeClass) {
case Normal:
return cacheForNormal(area, normCapacity);
case Small:
return cacheForSmall(area, normCapacity);
case Tiny:
return cacheForTiny(area, normCapacity);
default:
throw new Error();
}
}
/**
* Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
*/
@ -309,8 +319,8 @@ final class PoolThreadCache {
* Cache used for buffers which are backed by TINY or SMALL size.
*/
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size) {
super(size);
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
@Override
@ -325,7 +335,7 @@ final class PoolThreadCache {
*/
private static final class NormalMemoryRegionCache<T> extends MemoryRegionCache<T> {
NormalMemoryRegionCache(int size) {
super(size);
super(size, SizeClass.Normal);
}
@Override
@ -343,6 +353,7 @@ final class PoolThreadCache {
*/
private abstract static class MemoryRegionCache<T> {
private final Entry<T>[] entries;
private final SizeClass sizeClass;
private final int maxUnusedCached;
private int head;
private int tail;
@ -350,12 +361,13 @@ final class PoolThreadCache {
private int entriesInUse;
@SuppressWarnings("unchecked")
MemoryRegionCache(int size) {
MemoryRegionCache(int size, SizeClass sizeClass) {
entries = new Entry[powerOfTwo(size)];
for (int i = 0; i < entries.length; i++) {
entries[i] = new Entry<T>();
}
maxUnusedCached = size / 2;
this.sizeClass = sizeClass;
}
private static int powerOfTwo(int res) {
@ -460,15 +472,12 @@ final class PoolThreadCache {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static boolean freeEntry(Entry entry) {
private boolean freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
if (chunk == null) {
return false;
}
// need to synchronize on the area from which it was allocated before.
synchronized (chunk.arena) {
chunk.parent.free(chunk, entry.handle);
}
chunk.arena.freeChunk(chunk, entry.handle, sizeClass);
entry.chunk = null;
return true;
}

View File

@ -127,11 +127,9 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private final int tinyCacheSize;
private final int smallCacheSize;
private final int normalCacheSize;
private final List<PoolArenaMetric> heapArenaMetrics;
private final List<PoolArenaMetric> directArenaMetrics;
final PoolThreadLocalCache threadCache;
private final PoolThreadLocalCache threadCache;
public PooledByteBufAllocator() {
this(false);
@ -362,6 +360,10 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator {
return normalCacheSize;
}
final PoolThreadCache threadCache() {
return threadCache.get();
}
// Too noisy at the moment.
//
//public String toString() {