Let PoolThreadCache work even if allocation and deallocation Thread are different

Motivation:

PoolThreadCache did only cache allocations if the allocation and deallocation Thread were the same. This is not optimal as often people write from differen thread then the actual EventLoop thread.

Modification:

- Add MpscArrayQueue which was forked from jctools and lightly modified.
- Use MpscArrayQueue for caches and always add buffer back to the cache that belongs to the allocation thread.

Result:

ThreadPoolCache is now also usable and so gives performance improvements when allocation and deallocation thread are different.

Performance when using same thread for allocation and deallocation is noticable worse then before.
This commit is contained in:
Norman Maurer 2015-05-19 15:03:29 +02:00
parent bac2e3a6d2
commit 81fee66c78
11 changed files with 828 additions and 116 deletions

View File

@ -98,6 +98,14 @@ a constant-size alphabet written by Yuta Mori. It can be obtained at:
* HOMEPAGE: * HOMEPAGE:
* https://code.google.com/p/libdivsufsort/ * https://code.google.com/p/libdivsufsort/
This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
which can be obtained at:
* LICENSE:
* license/LICENSE.jctools.txt (ASL2 License)
* HOMEPAGE:
* https://github.com/JCTools/JCTools
This product optionally depends on 'JZlib', a re-implementation of zlib in This product optionally depends on 'JZlib', a re-implementation of zlib in
pure Java, which can be obtained at: pure Java, which can be obtained at:

View File

@ -237,19 +237,17 @@ abstract class PoolArena<T> implements PoolArenaMetric {
buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity); buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
} }
void free(PoolChunk<T> chunk, long handle, int normCapacity, boolean sameThreads) { void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) { if (chunk.unpooled) {
allocationsHuge.decrement(); allocationsHuge.decrement();
destroyChunk(chunk); destroyChunk(chunk);
} else { } else {
SizeClass sizeClass = sizeClass(normCapacity); SizeClass sizeClass = sizeClass(normCapacity);
if (sameThreads) { if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
PoolThreadCache cache = parent.threadCache();
if (cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it. // cached so not free it.
return; return;
} }
}
freeChunk(chunk, handle, sizeClass); freeChunk(chunk, handle, sizeClass);
} }
} }
@ -378,7 +376,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
buf.setIndex(readerIndex, writerIndex); buf.setIndex(readerIndex, writerIndex);
if (freeOldMemory) { if (freeOldMemory) {
free(oldChunk, oldHandle, oldMaxLength, buf.initThread == Thread.currentThread()); free(oldChunk, oldHandle, oldMaxLength, buf.cache);
} }
} }

View File

@ -358,7 +358,8 @@ final class PoolChunk<T> implements PoolChunkMetric {
if (bitmapIdx == 0) { if (bitmapIdx == 0) {
byte val = value(memoryMapIdx); byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val); assert val == unusable : String.valueOf(val);
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx)); buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
} else { } else {
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
} }
@ -379,7 +380,8 @@ final class PoolChunk<T> implements PoolChunkMetric {
buf.init( buf.init(
this, handle, this, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize); runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,
arena.parent.threadCache());
} }
private byte value(int id) { private byte value(int id) {

View File

@ -18,11 +18,15 @@ package io.netty.buffer;
import io.netty.buffer.PoolArena.SizeClass; import io.netty.buffer.PoolArena.SizeClass;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ThreadDeathWatcher; import io.netty.util.ThreadDeathWatcher;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Queue;
/** /**
* Acts a Thread cache for allocations. This implementation is moduled after * Acts a Thread cache for allocations. This implementation is moduled after
@ -116,11 +120,11 @@ final class PoolThreadCache {
ThreadDeathWatcher.watch(thread, freeTask); ThreadDeathWatcher.watch(thread, freeTask);
} }
private static <T> SubPageMemoryRegionCache<T>[] createSubPageCaches( private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) { int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0) { if (cacheSize > 0) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
SubPageMemoryRegionCache<T>[] cache = new SubPageMemoryRegionCache[numCaches]; MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) { for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length // TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
@ -131,14 +135,14 @@ final class PoolThreadCache {
} }
} }
private static <T> NormalMemoryRegionCache<T>[] createNormalCaches( private static <T> MemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) { int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0) { if (cacheSize > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity); int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
int arraySize = Math.max(1, log2(max / area.pageSize) + 1); int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
NormalMemoryRegionCache<T>[] cache = new NormalMemoryRegionCache[arraySize]; MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
for (int i = 0; i < cache.length; i++) { for (int i = 0; i < cache.length; i++) {
cache[i] = new NormalMemoryRegionCache<T>(cacheSize); cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
} }
@ -345,28 +349,15 @@ final class PoolThreadCache {
} }
} }
/**
* Cache of {@link PoolChunk} and handles which can be used to allocate a buffer without locking at all.
*
* The {@link MemoryRegionCache} uses a LIFO implementation as this way it is more likely that the
* cached memory is still in the loaded cache-line and so no new read must happen (compared to FIFO).
*/
private abstract static class MemoryRegionCache<T> { private abstract static class MemoryRegionCache<T> {
private final Entry<T>[] entries; private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass; private final SizeClass sizeClass;
private final int maxUnusedCached; private int allocations;
private int head;
private int tail;
private int maxEntriesInUse;
private int entriesInUse;
@SuppressWarnings("unchecked")
MemoryRegionCache(int size, SizeClass sizeClass) { MemoryRegionCache(int size, SizeClass sizeClass) {
entries = new Entry[powerOfTwo(size)]; this.size = powerOfTwo(size);
for (int i = 0; i < entries.length; i++) { queue = PlatformDependent.newFixedMpscQueue(this.size);
entries[i] = new Entry<T>();
}
maxUnusedCached = size / 2;
this.sizeClass = sizeClass; this.sizeClass = sizeClass;
} }
@ -393,115 +384,100 @@ final class PoolThreadCache {
/** /**
* Add to cache if not already full. * Add to cache if not already full.
*/ */
public boolean add(PoolChunk<T> chunk, long handle) { @SuppressWarnings("unchecked")
Entry<T> entry = entries[tail]; public final boolean add(PoolChunk<T> chunk, long handle) {
if (entry.chunk != null) { return queue.offer(newEntry(chunk, handle));
// cache is full
return false;
}
entriesInUse --;
entry.chunk = chunk;
entry.handle = handle;
tail = nextIdx(tail);
return true;
} }
/** /**
* Allocate something out of the cache if possible and remove the entry from the cache. * Allocate something out of the cache if possible and remove the entry from the cache.
*/ */
public boolean allocate(PooledByteBuf<T> buf, int reqCapacity) { public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
int index = prevIdx(tail); Entry<T> entry = queue.poll();
Entry<T> entry = entries[index]; if (entry == null) {
if (entry.chunk == null) {
return false; return false;
} }
entriesInUse ++;
if (maxEntriesInUse < entriesInUse) {
maxEntriesInUse = entriesInUse;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity); initBuf(entry.chunk, entry.handle, buf, reqCapacity);
// only null out the chunk as we only use the chunk to check if the buffer is full or not.
entry.chunk = null; // allocations is not thread-safe which is fine as this is only called from the same thread all time.
tail = index; ++ allocations;
return true; return true;
} }
/** /**
* Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s. * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
*/ */
public int free() { public final int free() {
return free(Integer.MAX_VALUE);
}
private int free(int max) {
int numFreed = 0; int numFreed = 0;
entriesInUse = 0; for (; numFreed < max; numFreed++) {
maxEntriesInUse = 0; Entry<T> entry = queue.poll();
for (int i = head;; i = nextIdx(i)) { if (entry != null) {
if (freeEntry(entries[i])) { freeEntry(entry);
numFreed++;
} else { } else {
// all cleared // all cleared
return numFreed; return numFreed;
} }
} }
return numFreed;
} }
/** /**
* Free up cached {@link PoolChunk}s if not allocated frequently enough. * Free up cached {@link PoolChunk}s if not allocated frequently enough.
*/ */
private void trim() { public final void trim() {
int free = size() - maxEntriesInUse; int free = size - allocations;
entriesInUse = 0; allocations = 0;
maxEntriesInUse = 0;
if (free <= maxUnusedCached) { // We not even allocated all the number that are
return; if (free > 0) {
free(free);
} }
int i = head;
for (; free > 0; free--) {
if (!freeEntry(entries[i])) {
// all freed
break;
}
i = nextIdx(i);
}
// Update head to point to te correct entry
// See https://github.com/netty/netty/issues/2924
head = i;
} }
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
private boolean freeEntry(Entry entry) { private void freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk; PoolChunk chunk = entry.chunk;
if (chunk == null) {
return false; // recycle now so PoolChunk can be GC'ed.
} entry.recycle();
chunk.arena.freeChunk(chunk, entry.handle, sizeClass); chunk.arena.freeChunk(chunk, entry.handle, sizeClass);
entry.chunk = null;
return true;
} }
/** static final class Entry<T> {
* Return the number of cached entries. final Handle recyclerHandle;
*/
private int size() {
return tail - head & entries.length - 1;
}
private int nextIdx(int index) {
// use bitwise operation as this is faster as using modulo.
return index + 1 & entries.length - 1;
}
private int prevIdx(int index) {
// use bitwise operation as this is faster as using modulo.
return index - 1 & entries.length - 1;
}
private static final class Entry<T> {
PoolChunk<T> chunk; PoolChunk<T> chunk;
long handle; long handle = -1;
Entry(Handle recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() {
chunk = null;
handle = -1;
RECYCLER.recycle(this, recyclerHandle);
} }
} }
@SuppressWarnings("rawtypes")
private static Entry newEntry(PoolChunk<?> chunk, long handle) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.handle = handle;
return entry;
}
@SuppressWarnings("rawtypes")
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@Override
protected Entry newObject(Handle handle) {
return new Entry(handle);
}
};
}
} }

View File

@ -32,7 +32,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
protected int offset; protected int offset;
protected int length; protected int length;
int maxLength; int maxLength;
Thread initThread; PoolThreadCache cache;
private ByteBuffer tmpNioBuf; private ByteBuffer tmpNioBuf;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -41,7 +41,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle; this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle;
} }
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength) { void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0; assert handle >= 0;
assert chunk != null; assert chunk != null;
@ -54,7 +54,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
setIndex(0, 0); setIndex(0, 0);
discardMarks(); discardMarks();
tmpNioBuf = null; tmpNioBuf = null;
initThread = Thread.currentThread(); this.cache = cache;
} }
void initUnpooled(PoolChunk<T> chunk, int length) { void initUnpooled(PoolChunk<T> chunk, int length) {
@ -67,7 +67,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
this.length = maxLength = length; this.length = maxLength = length;
setIndex(0, 0); setIndex(0, 0);
tmpNioBuf = null; tmpNioBuf = null;
initThread = Thread.currentThread(); cache = null;
} }
@Override @Override
@ -155,9 +155,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
final long handle = this.handle; final long handle = this.handle;
this.handle = -1; this.handle = -1;
memory = null; memory = null;
boolean sameThread = initThread == Thread.currentThread(); chunk.arena.free(chunk, handle, maxLength, cache);
initThread = null;
chunk.arena.free(chunk, handle, maxLength, sameThread);
recycle(); recycle();
} }
} }

View File

@ -16,6 +16,7 @@
package io.netty.buffer; package io.netty.buffer;
import io.netty.buffer.PooledByteBufAllocator.PoolThreadLocalCache;
import io.netty.util.Recycler; import io.netty.util.Recycler;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
@ -53,8 +54,9 @@ final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
} }
@Override @Override
void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength) { void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,
super.init(chunk, handle, offset, length, maxLength); PoolThreadCache cache) {
super.init(chunk, handle, offset, length, maxLength, cache);
initMemoryAddress(); initMemoryAddress();
} }

View File

@ -0,0 +1,207 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.util.internal;
import java.util.AbstractQueue;
import java.util.Iterator;
/**
* Forked from <a href="https://github.com/JCTools/JCTools">JCTools</a>.
*
* A concurrent access enabling class used by circular array based queues this class exposes an offset computation
* method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and
* the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post
* padding.
* <p>
* Offset calculation is separate from access to enable the reuse of a give compute offset.
* <p>
* Load/Store methods using a <i>buffer</i> parameter are provided to allow the prevention of final field reload after a
* LoadLoad barrier.
* <p>
*
* @param <E>
*/
abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E> {
protected static final int REF_BUFFER_PAD;
private static final long REF_ARRAY_BASE;
private static final int REF_ELEMENT_SHIFT;
static {
final int scale = PlatformDependent0.UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale) {
REF_ELEMENT_SHIFT = 2;
} else if (8 == scale) {
REF_ELEMENT_SHIFT = 3;
} else {
throw new IllegalStateException("Unknown pointer size");
}
// 2 cache lines pad
// TODO: replace 64 with the value we can detect
REF_BUFFER_PAD = (64 * 2) / scale;
// Including the buffer pad in the array base offset
REF_ARRAY_BASE = PlatformDependent0.UNSAFE.arrayBaseOffset(Object[].class) + (REF_BUFFER_PAD * scale);
}
protected final long mask;
// @Stable :(
protected final E[] buffer;
@SuppressWarnings("unchecked")
public ConcurrentCircularArrayQueue(int capacity) {
int actualCapacity = roundToPowerOfTwo(capacity);
mask = actualCapacity - 1;
// pad data on either end with some empty slots.
buffer = (E[]) new Object[actualCapacity + REF_BUFFER_PAD * 2];
}
private static int roundToPowerOfTwo(final int value) {
return 1 << (32 - Integer.numberOfLeadingZeros(value - 1));
}
/**
* @param index desirable element index
* @return the offset in bytes within the array for a given index.
*/
protected final long calcElementOffset(long index) {
return calcElementOffset(index, mask);
}
/**
* @param index desirable element index
* @param mask
* @return the offset in bytes within the array for a given index.
*/
protected static final long calcElementOffset(long index, long mask) {
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
}
/**
* A plain store (no ordering/fences) of an element to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e a kitty
*/
protected final void spElement(long offset, E e) {
spElement(buffer, offset, e);
}
/**
* A plain store (no ordering/fences) of an element to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected static final <E> void spElement(E[] buffer, long offset, E e) {
PlatformDependent0.UNSAFE.putObject(buffer, offset, e);
}
/**
* An ordered store(store + StoreStore barrier) of an element to a given offset
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected final void soElement(long offset, E e) {
soElement(buffer, offset, e);
}
/**
* An ordered store(store + StoreStore barrier) of an element to a given offset
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @param e an orderly kitty
*/
protected static final <E> void soElement(E[] buffer, long offset, E e) {
PlatformDependent0.UNSAFE.putOrderedObject(buffer, offset, e);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final E lpElement(long offset) {
return lpElement(buffer, offset);
}
/**
* A plain load (no ordering/fences) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected static final <E> E lpElement(E[] buffer, long offset) {
return (E) PlatformDependent0.UNSAFE.getObject(buffer, offset);
}
/**
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
*
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
protected final E lvElement(long offset) {
return lvElement(buffer, offset);
}
/**
* A volatile load (load + LoadLoad barrier) of an element from a given offset.
*
* @param buffer this.buffer
* @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)}
* @return the element at the offset
*/
@SuppressWarnings("unchecked")
protected static final <E> E lvElement(E[] buffer, long offset) {
return (E) PlatformDependent0.UNSAFE.getObjectVolatile(buffer, offset);
}
@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
while (poll() != null || !isEmpty()) {
// looping
}
}
public int capacity() {
return (int) (mask + 1);
}
}
abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
}

View File

@ -0,0 +1,331 @@
/*
* Copyright 2015 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.util.internal;
/**
* Forked from <a href="https://github.com/JCTools/JCTools">JCTools</a>.
*
* A Multi-Producer-Single-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that
* any thread may call the offer method, but only a single thread may call poll/peek for correctness to
* maintained. <br>
* This implementation follows patterns documented on the package level for False Sharing protection.<br>
* This implementation is using the <a href="http://sourceforge.net/projects/mc-fastflow/">Fast Flow</a>
* method for polling from the queue (with minor change to correctly publish the index) and an extension of
* the Leslie Lamport concurrent queue algorithm (originated by Martin Thompson) on the producer side.<br>
*
* @param <E>
*/
final class MpscArrayQueue<E> extends MpscArrayQueueConsumerField<E> {
long p40, p41, p42, p43, p44, p45, p46;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpscArrayQueue(final int capacity) {
super(capacity);
}
/**
* {@inheritDoc} <br>
*
* IMPLEMENTATION NOTES:<br>
* Lock free offer using a single CAS. As class name suggests access is permitted to many threads
* concurrently.
*
* @see java.util.Queue#offer(java.lang.Object)
*/
@Override
public boolean offer(final E e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
// use a cached view on consumer index (potentially updated in loop)
final long mask = this.mask;
final long capacity = mask + 1;
long consumerIndexCache = lvConsumerIndexCache(); // LoadLoad
long currentProducerIndex;
do {
currentProducerIndex = lvProducerIndex(); // LoadLoad
final long wrapPoint = currentProducerIndex - capacity;
if (consumerIndexCache <= wrapPoint) {
final long currHead = lvConsumerIndex(); // LoadLoad
if (currHead <= wrapPoint) {
return false; // FULL :(
} else {
// update shared cached value of the consumerIndex
svConsumerIndexCache(currHead); // StoreLoad
// update on stack copy, we might need this value again if we lose the CAS.
consumerIndexCache = currHead;
}
}
} while (!casProducerIndex(currentProducerIndex, currentProducerIndex + 1));
/*
* NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on
* the index visibility to poll() we would need to handle the case where the element is not visible.
*/
// Won CAS, move on to storing
final long offset = calcElementOffset(currentProducerIndex, mask);
soElement(offset, e); // StoreStore
return true; // AWESOME :)
}
/**
* A wait free alternative to offer which fails on CAS failure.
*
* @param e new element, not null
* @return 1 if next element cannot be filled, -1 if CAS failed, 0 if successful
*/
public int weakOffer(final E e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
final long mask = this.mask;
final long capacity = mask + 1;
final long currentTail = lvProducerIndex(); // LoadLoad
final long consumerIndexCache = lvConsumerIndexCache(); // LoadLoad
final long wrapPoint = currentTail - capacity;
if (consumerIndexCache <= wrapPoint) {
long currHead = lvConsumerIndex(); // LoadLoad
if (currHead <= wrapPoint) {
return 1; // FULL :(
} else {
svConsumerIndexCache(currHead); // StoreLoad
}
}
// look Ma, no loop!
if (!casProducerIndex(currentTail, currentTail + 1)) {
return -1; // CAS FAIL :(
}
// Won CAS, move on to storing
final long offset = calcElementOffset(currentTail, mask);
soElement(offset, e);
return 0; // AWESOME :)
}
/**
* {@inheritDoc}
* <p>
* IMPLEMENTATION NOTES:<br>
* Lock free poll using ordered loads/stores. As class name suggests access is limited to a single thread.
*
* @see java.util.Queue#poll()
*/
@Override
public E poll() {
final long consumerIndex = lvConsumerIndex(); // LoadLoad
final long offset = calcElementOffset(consumerIndex);
// Copy field to avoid re-reading after volatile load
final E[] buffer = this.buffer;
// If we can't see the next available element we can't poll
E e = lvElement(buffer, offset); // LoadLoad
if (null == e) {
/*
* NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
* winning the CAS on offer but before storing the element in the queue. Other producers may go on
* to fill up the queue after this element.
*/
if (consumerIndex != lvProducerIndex()) {
do {
e = lvElement(buffer, offset);
} while (e == null);
} else {
return null;
}
}
spElement(buffer, offset, null);
soConsumerIndex(consumerIndex + 1); // StoreStore
return e;
}
/**
* {@inheritDoc}
* <p>
* IMPLEMENTATION NOTES:<br>
* Lock free peek using ordered loads. As class name suggests access is limited to a single thread.
*
* @see java.util.Queue#poll()
*/
@Override
public E peek() {
// Copy field to avoid re-reading after volatile load
final E[] buffer = this.buffer;
final long consumerIndex = lvConsumerIndex(); // LoadLoad
final long offset = calcElementOffset(consumerIndex);
E e = lvElement(buffer, offset);
if (null == e) {
/*
* NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
* winning the CAS on offer but before storing the element in the queue. Other producers may go on
* to fill up the queue after this element.
*/
if (consumerIndex != lvProducerIndex()) {
do {
e = lvElement(buffer, offset);
} while (e == null);
} else {
return null;
}
}
return e;
}
/**
* {@inheritDoc}
* <p>
*
*/
@Override
public int size() {
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
* consumer indices, therefore protection is required to ensure size is within valid range. In the
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
* index BEFORE the producer index.
*/
long after = lvConsumerIndex();
while (true) {
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after) {
return (int) (currentProducerIndex - after);
}
}
}
@Override
public boolean isEmpty() {
// Order matters!
// Loading consumer before producer allows for producer increments after consumer index is read.
// This ensures the correctness of this method at least for the consumer thread. Other threads POV is
// not really
// something we can fix here.
return lvConsumerIndex() == lvProducerIndex();
}
}
abstract class MpscArrayQueueL1Pad<E> extends ConcurrentCircularArrayQueue<E> {
long p10, p11, p12, p13, p14, p15, p16;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpscArrayQueueL1Pad(int capacity) {
super(capacity);
}
}
abstract class MpscArrayQueueTailField<E> extends MpscArrayQueueL1Pad<E> {
private static final long P_INDEX_OFFSET;
static {
try {
P_INDEX_OFFSET = PlatformDependent0.UNSAFE.objectFieldOffset(MpscArrayQueueTailField.class
.getDeclaredField("producerIndex"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private volatile long producerIndex;
public MpscArrayQueueTailField(int capacity) {
super(capacity);
}
protected final long lvProducerIndex() {
return producerIndex;
}
protected final boolean casProducerIndex(long expect, long newValue) {
return PlatformDependent0.UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}
}
abstract class MpscArrayQueueMidPad<E> extends MpscArrayQueueTailField<E> {
long p20, p21, p22, p23, p24, p25, p26;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpscArrayQueueMidPad(int capacity) {
super(capacity);
}
}
abstract class MpscArrayQueueHeadCacheField<E> extends MpscArrayQueueMidPad<E> {
private volatile long headCache;
public MpscArrayQueueHeadCacheField(int capacity) {
super(capacity);
}
protected final long lvConsumerIndexCache() {
return headCache;
}
protected final void svConsumerIndexCache(long v) {
headCache = v;
}
}
abstract class MpscArrayQueueL2Pad<E> extends MpscArrayQueueHeadCacheField<E> {
long p20, p21, p22, p23, p24, p25, p26;
long p30, p31, p32, p33, p34, p35, p36, p37;
public MpscArrayQueueL2Pad(int capacity) {
super(capacity);
}
}
abstract class MpscArrayQueueConsumerField<E> extends MpscArrayQueueL2Pad<E> {
private static final long C_INDEX_OFFSET;
static {
try {
C_INDEX_OFFSET = PlatformDependent0.UNSAFE.objectFieldOffset(MpscArrayQueueConsumerField.class
.getDeclaredField("consumerIndex"));
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private volatile long consumerIndex;
public MpscArrayQueueConsumerField(int capacity) {
super(capacity);
}
protected final long lvConsumerIndex() {
return consumerIndex;
}
protected void soConsumerIndex(long l) {
PlatformDependent0.UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, l);
}
}

View File

@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@ -442,6 +443,18 @@ public final class PlatformDependent {
return new MpscLinkedQueue<T>(); return new MpscLinkedQueue<T>();
} }
/**
* Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single
* consumer (one thread!) with the given fixes {@code capacity}.
*/
public static <T> Queue<T> newFixedMpscQueue(int capacity) {
if (hasUnsafe()) {
return new MpscArrayQueue<T>(capacity);
} else {
return new LinkedBlockingQueue<T>(capacity);
}
}
/** /**
* Return the {@link ClassLoader} for the given {@link Class}. * Return the {@link ClassLoader} for the given {@link Class}.
*/ */

View File

@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
final class PlatformDependent0 { final class PlatformDependent0 {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PlatformDependent0.class); private static final InternalLogger logger = InternalLoggerFactory.getInstance(PlatformDependent0.class);
private static final Unsafe UNSAFE; static final Unsafe UNSAFE;
private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN;
private static final long ADDRESS_FIELD_OFFSET; private static final long ADDRESS_FIELD_OFFSET;
private static final long ARRAY_BASE_OFFSET; private static final long ARRAY_BASE_OFFSET;

177
license/LICENSE.jctools.txt Normal file
View File

@ -0,0 +1,177 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS