diff --git a/NOTICE.txt b/NOTICE.txt index 2ea45edfa2..dd1d88c3e9 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -98,6 +98,14 @@ a constant-size alphabet written by Yuta Mori. It can be obtained at: * HOMEPAGE: * https://code.google.com/p/libdivsufsort/ +This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM, + which can be obtained at: + + * LICENSE: + * license/LICENSE.jctools.txt (ASL2 License) + * HOMEPAGE: + * https://github.com/JCTools/JCTools + This product optionally depends on 'JZlib', a re-implementation of zlib in pure Java, which can be obtained at: diff --git a/buffer/src/main/java/io/netty/buffer/PoolArena.java b/buffer/src/main/java/io/netty/buffer/PoolArena.java index d2e4411604..778a9eee51 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolArena.java +++ b/buffer/src/main/java/io/netty/buffer/PoolArena.java @@ -237,19 +237,17 @@ abstract class PoolArena implements PoolArenaMetric { buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity); } - void free(PoolChunk chunk, long handle, int normCapacity, boolean sameThreads) { + void free(PoolChunk chunk, long handle, int normCapacity, PoolThreadCache cache) { if (chunk.unpooled) { allocationsHuge.decrement(); destroyChunk(chunk); } else { SizeClass sizeClass = sizeClass(normCapacity); - if (sameThreads) { - PoolThreadCache cache = parent.threadCache(); - if (cache.add(this, chunk, handle, normCapacity, sizeClass)) { - // cached so not free it. - return; - } + if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) { + // cached so not free it. + return; } + freeChunk(chunk, handle, sizeClass); } } @@ -378,7 +376,7 @@ abstract class PoolArena implements PoolArenaMetric { buf.setIndex(readerIndex, writerIndex); if (freeOldMemory) { - free(oldChunk, oldHandle, oldMaxLength, buf.initThread == Thread.currentThread()); + free(oldChunk, oldHandle, oldMaxLength, buf.cache); } } diff --git a/buffer/src/main/java/io/netty/buffer/PoolChunk.java b/buffer/src/main/java/io/netty/buffer/PoolChunk.java index a5f0ee5e7d..bf2108cacf 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolChunk.java +++ b/buffer/src/main/java/io/netty/buffer/PoolChunk.java @@ -358,7 +358,8 @@ final class PoolChunk implements PoolChunkMetric { if (bitmapIdx == 0) { byte val = value(memoryMapIdx); assert val == unusable : String.valueOf(val); - buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx)); + buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx), + arena.parent.threadCache()); } else { initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); } @@ -379,7 +380,8 @@ final class PoolChunk implements PoolChunkMetric { buf.init( this, handle, - runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize); + runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize, + arena.parent.threadCache()); } private byte value(int id) { diff --git a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java index eb06ec9016..3b92a47afb 100644 --- a/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java +++ b/buffer/src/main/java/io/netty/buffer/PoolThreadCache.java @@ -18,11 +18,15 @@ package io.netty.buffer; import io.netty.buffer.PoolArena.SizeClass; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; import io.netty.util.ThreadDeathWatcher; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import java.nio.ByteBuffer; +import java.util.Queue; /** * Acts a Thread cache for allocations. This implementation is moduled after @@ -116,11 +120,11 @@ final class PoolThreadCache { ThreadDeathWatcher.watch(thread, freeTask); } - private static SubPageMemoryRegionCache[] createSubPageCaches( + private static MemoryRegionCache[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0) { @SuppressWarnings("unchecked") - SubPageMemoryRegionCache[] cache = new SubPageMemoryRegionCache[numCaches]; + MemoryRegionCache[] cache = new MemoryRegionCache[numCaches]; for (int i = 0; i < cache.length; i++) { // TODO: maybe use cacheSize / cache.length cache[i] = new SubPageMemoryRegionCache(cacheSize, sizeClass); @@ -131,14 +135,14 @@ final class PoolThreadCache { } } - private static NormalMemoryRegionCache[] createNormalCaches( + private static MemoryRegionCache[] createNormalCaches( int cacheSize, int maxCachedBufferCapacity, PoolArena area) { if (cacheSize > 0) { int max = Math.min(area.chunkSize, maxCachedBufferCapacity); int arraySize = Math.max(1, log2(max / area.pageSize) + 1); @SuppressWarnings("unchecked") - NormalMemoryRegionCache[] cache = new NormalMemoryRegionCache[arraySize]; + MemoryRegionCache[] cache = new MemoryRegionCache[arraySize]; for (int i = 0; i < cache.length; i++) { cache[i] = new NormalMemoryRegionCache(cacheSize); } @@ -345,28 +349,15 @@ final class PoolThreadCache { } } - /** - * Cache of {@link PoolChunk} and handles which can be used to allocate a buffer without locking at all. - * - * The {@link MemoryRegionCache} uses a LIFO implementation as this way it is more likely that the - * cached memory is still in the loaded cache-line and so no new read must happen (compared to FIFO). - */ private abstract static class MemoryRegionCache { - private final Entry[] entries; + private final int size; + private final Queue> queue; private final SizeClass sizeClass; - private final int maxUnusedCached; - private int head; - private int tail; - private int maxEntriesInUse; - private int entriesInUse; + private int allocations; - @SuppressWarnings("unchecked") MemoryRegionCache(int size, SizeClass sizeClass) { - entries = new Entry[powerOfTwo(size)]; - for (int i = 0; i < entries.length; i++) { - entries[i] = new Entry(); - } - maxUnusedCached = size / 2; + this.size = powerOfTwo(size); + queue = PlatformDependent.newFixedMpscQueue(this.size); this.sizeClass = sizeClass; } @@ -393,115 +384,100 @@ final class PoolThreadCache { /** * Add to cache if not already full. */ - public boolean add(PoolChunk chunk, long handle) { - Entry entry = entries[tail]; - if (entry.chunk != null) { - // cache is full - return false; - } - entriesInUse --; - - entry.chunk = chunk; - entry.handle = handle; - tail = nextIdx(tail); - return true; + @SuppressWarnings("unchecked") + public final boolean add(PoolChunk chunk, long handle) { + return queue.offer(newEntry(chunk, handle)); } /** * Allocate something out of the cache if possible and remove the entry from the cache. */ - public boolean allocate(PooledByteBuf buf, int reqCapacity) { - int index = prevIdx(tail); - Entry entry = entries[index]; - if (entry.chunk == null) { + public final boolean allocate(PooledByteBuf buf, int reqCapacity) { + Entry entry = queue.poll(); + if (entry == null) { return false; } - - entriesInUse ++; - if (maxEntriesInUse < entriesInUse) { - maxEntriesInUse = entriesInUse; - } initBuf(entry.chunk, entry.handle, buf, reqCapacity); - // only null out the chunk as we only use the chunk to check if the buffer is full or not. - entry.chunk = null; - tail = index; + + // allocations is not thread-safe which is fine as this is only called from the same thread all time. + ++ allocations; return true; } /** * Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s. */ - public int free() { + public final int free() { + return free(Integer.MAX_VALUE); + } + + private int free(int max) { int numFreed = 0; - entriesInUse = 0; - maxEntriesInUse = 0; - for (int i = head;; i = nextIdx(i)) { - if (freeEntry(entries[i])) { - numFreed++; + for (; numFreed < max; numFreed++) { + Entry entry = queue.poll(); + if (entry != null) { + freeEntry(entry); } else { // all cleared return numFreed; } } + return numFreed; } /** * Free up cached {@link PoolChunk}s if not allocated frequently enough. */ - private void trim() { - int free = size() - maxEntriesInUse; - entriesInUse = 0; - maxEntriesInUse = 0; + public final void trim() { + int free = size - allocations; + allocations = 0; - if (free <= maxUnusedCached) { - return; + // We not even allocated all the number that are + if (free > 0) { + free(free); } - - int i = head; - for (; free > 0; free--) { - if (!freeEntry(entries[i])) { - // all freed - break; - } - i = nextIdx(i); - } - - // Update head to point to te correct entry - // See https://github.com/netty/netty/issues/2924 - head = i; } @SuppressWarnings({ "unchecked", "rawtypes" }) - private boolean freeEntry(Entry entry) { + private void freeEntry(Entry entry) { PoolChunk chunk = entry.chunk; - if (chunk == null) { - return false; - } + + // recycle now so PoolChunk can be GC'ed. + entry.recycle(); + chunk.arena.freeChunk(chunk, entry.handle, sizeClass); - entry.chunk = null; - return true; } - /** - * Return the number of cached entries. - */ - private int size() { - return tail - head & entries.length - 1; - } - - private int nextIdx(int index) { - // use bitwise operation as this is faster as using modulo. - return index + 1 & entries.length - 1; - } - - private int prevIdx(int index) { - // use bitwise operation as this is faster as using modulo. - return index - 1 & entries.length - 1; - } - - private static final class Entry { + static final class Entry { + final Handle recyclerHandle; PoolChunk chunk; - long handle; + long handle = -1; + + Entry(Handle recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + void recycle() { + chunk = null; + handle = -1; + RECYCLER.recycle(this, recyclerHandle); + } } + + @SuppressWarnings("rawtypes") + private static Entry newEntry(PoolChunk chunk, long handle) { + Entry entry = RECYCLER.get(); + entry.chunk = chunk; + entry.handle = handle; + return entry; + } + + @SuppressWarnings("rawtypes") + private static final Recycler RECYCLER = new Recycler() { + @Override + protected Entry newObject(Handle handle) { + return new Entry(handle); + } + }; } } diff --git a/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java index 52115a1d00..5430a622c6 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/PooledByteBuf.java @@ -32,7 +32,7 @@ abstract class PooledByteBuf extends AbstractReferenceCountedByteBuf { protected int offset; protected int length; int maxLength; - Thread initThread; + PoolThreadCache cache; private ByteBuffer tmpNioBuf; @SuppressWarnings("unchecked") @@ -41,7 +41,7 @@ abstract class PooledByteBuf extends AbstractReferenceCountedByteBuf { this.recyclerHandle = (Handle>) recyclerHandle; } - void init(PoolChunk chunk, long handle, int offset, int length, int maxLength) { + void init(PoolChunk chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { assert handle >= 0; assert chunk != null; @@ -54,7 +54,7 @@ abstract class PooledByteBuf extends AbstractReferenceCountedByteBuf { setIndex(0, 0); discardMarks(); tmpNioBuf = null; - initThread = Thread.currentThread(); + this.cache = cache; } void initUnpooled(PoolChunk chunk, int length) { @@ -67,7 +67,7 @@ abstract class PooledByteBuf extends AbstractReferenceCountedByteBuf { this.length = maxLength = length; setIndex(0, 0); tmpNioBuf = null; - initThread = Thread.currentThread(); + cache = null; } @Override @@ -155,9 +155,7 @@ abstract class PooledByteBuf extends AbstractReferenceCountedByteBuf { final long handle = this.handle; this.handle = -1; memory = null; - boolean sameThread = initThread == Thread.currentThread(); - initThread = null; - chunk.arena.free(chunk, handle, maxLength, sameThread); + chunk.arena.free(chunk, handle, maxLength, cache); recycle(); } } diff --git a/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java b/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java index 6e35214cf7..407c8bee83 100644 --- a/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java +++ b/buffer/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBuf.java @@ -16,6 +16,7 @@ package io.netty.buffer; +import io.netty.buffer.PooledByteBufAllocator.PoolThreadLocalCache; import io.netty.util.Recycler; import io.netty.util.internal.PlatformDependent; @@ -53,8 +54,9 @@ final class PooledUnsafeDirectByteBuf extends PooledByteBuf { } @Override - void init(PoolChunk chunk, long handle, int offset, int length, int maxLength) { - super.init(chunk, handle, offset, length, maxLength); + void init(PoolChunk chunk, long handle, int offset, int length, int maxLength, + PoolThreadCache cache) { + super.init(chunk, handle, offset, length, maxLength, cache); initMemoryAddress(); } diff --git a/common/src/main/java/io/netty/util/internal/ConcurrentCircularArrayQueue.java b/common/src/main/java/io/netty/util/internal/ConcurrentCircularArrayQueue.java new file mode 100644 index 0000000000..5f3fd87180 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/ConcurrentCircularArrayQueue.java @@ -0,0 +1,207 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.netty.util.internal; + + +import java.util.AbstractQueue; +import java.util.Iterator; + +/** + * Forked from JCTools. + * + * A concurrent access enabling class used by circular array based queues this class exposes an offset computation + * method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and + * the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post + * padding. + *

+ * Offset calculation is separate from access to enable the reuse of a give compute offset. + *

+ * Load/Store methods using a buffer parameter are provided to allow the prevention of final field reload after a + * LoadLoad barrier. + *

+ * + * @param + */ +abstract class ConcurrentCircularArrayQueue extends ConcurrentCircularArrayQueueL0Pad { + protected static final int REF_BUFFER_PAD; + private static final long REF_ARRAY_BASE; + private static final int REF_ELEMENT_SHIFT; + static { + final int scale = PlatformDependent0.UNSAFE.arrayIndexScale(Object[].class); + if (4 == scale) { + REF_ELEMENT_SHIFT = 2; + } else if (8 == scale) { + REF_ELEMENT_SHIFT = 3; + } else { + throw new IllegalStateException("Unknown pointer size"); + } + // 2 cache lines pad + // TODO: replace 64 with the value we can detect + REF_BUFFER_PAD = (64 * 2) / scale; + // Including the buffer pad in the array base offset + REF_ARRAY_BASE = PlatformDependent0.UNSAFE.arrayBaseOffset(Object[].class) + (REF_BUFFER_PAD * scale); + } + protected final long mask; + // @Stable :( + protected final E[] buffer; + + @SuppressWarnings("unchecked") + public ConcurrentCircularArrayQueue(int capacity) { + int actualCapacity = roundToPowerOfTwo(capacity); + mask = actualCapacity - 1; + // pad data on either end with some empty slots. + buffer = (E[]) new Object[actualCapacity + REF_BUFFER_PAD * 2]; + } + + private static int roundToPowerOfTwo(final int value) { + return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); + } + /** + * @param index desirable element index + * @return the offset in bytes within the array for a given index. + */ + protected final long calcElementOffset(long index) { + return calcElementOffset(index, mask); + } + /** + * @param index desirable element index + * @param mask + * @return the offset in bytes within the array for a given index. + */ + protected static final long calcElementOffset(long index, long mask) { + return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT); + } + /** + * A plain store (no ordering/fences) of an element to a given offset + * + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @param e a kitty + */ + protected final void spElement(long offset, E e) { + spElement(buffer, offset, e); + } + + /** + * A plain store (no ordering/fences) of an element to a given offset + * + * @param buffer this.buffer + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @param e an orderly kitty + */ + protected static final void spElement(E[] buffer, long offset, E e) { + PlatformDependent0.UNSAFE.putObject(buffer, offset, e); + } + + /** + * An ordered store(store + StoreStore barrier) of an element to a given offset + * + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @param e an orderly kitty + */ + protected final void soElement(long offset, E e) { + soElement(buffer, offset, e); + } + + /** + * An ordered store(store + StoreStore barrier) of an element to a given offset + * + * @param buffer this.buffer + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @param e an orderly kitty + */ + protected static final void soElement(E[] buffer, long offset, E e) { + PlatformDependent0.UNSAFE.putOrderedObject(buffer, offset, e); + } + + /** + * A plain load (no ordering/fences) of an element from a given offset. + * + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @return the element at the offset + */ + protected final E lpElement(long offset) { + return lpElement(buffer, offset); + } + + /** + * A plain load (no ordering/fences) of an element from a given offset. + * + * @param buffer this.buffer + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @return the element at the offset + */ + @SuppressWarnings("unchecked") + protected static final E lpElement(E[] buffer, long offset) { + return (E) PlatformDependent0.UNSAFE.getObject(buffer, offset); + } + + /** + * A volatile load (load + LoadLoad barrier) of an element from a given offset. + * + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @return the element at the offset + */ + protected final E lvElement(long offset) { + return lvElement(buffer, offset); + } + + /** + * A volatile load (load + LoadLoad barrier) of an element from a given offset. + * + * @param buffer this.buffer + * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} + * @return the element at the offset + */ + @SuppressWarnings("unchecked") + protected static final E lvElement(E[] buffer, long offset) { + return (E) PlatformDependent0.UNSAFE.getObjectVolatile(buffer, offset); + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + while (poll() != null || !isEmpty()) { + // looping + } + } + + public int capacity() { + return (int) (mask + 1); + } +} + +abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue { + long p00, p01, p02, p03, p04, p05, p06, p07; + long p30, p31, p32, p33, p34, p35, p36, p37; +} + diff --git a/common/src/main/java/io/netty/util/internal/MpscArrayQueue.java b/common/src/main/java/io/netty/util/internal/MpscArrayQueue.java new file mode 100644 index 0000000000..cda283d440 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/MpscArrayQueue.java @@ -0,0 +1,331 @@ +/* + * Copyright 2015 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.netty.util.internal; + +/** + * Forked from JCTools. + * + * A Multi-Producer-Single-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that + * any thread may call the offer method, but only a single thread may call poll/peek for correctness to + * maintained.
+ * This implementation follows patterns documented on the package level for False Sharing protection.
+ * This implementation is using the Fast Flow + * method for polling from the queue (with minor change to correctly publish the index) and an extension of + * the Leslie Lamport concurrent queue algorithm (originated by Martin Thompson) on the producer side.
+ * + * @param + */ +final class MpscArrayQueue extends MpscArrayQueueConsumerField { + long p40, p41, p42, p43, p44, p45, p46; + long p30, p31, p32, p33, p34, p35, p36, p37; + + public MpscArrayQueue(final int capacity) { + super(capacity); + } + + /** + * {@inheritDoc}
+ * + * IMPLEMENTATION NOTES:
+ * Lock free offer using a single CAS. As class name suggests access is permitted to many threads + * concurrently. + * + * @see java.util.Queue#offer(java.lang.Object) + */ + @Override + public boolean offer(final E e) { + if (null == e) { + throw new NullPointerException("Null is not a valid element"); + } + + // use a cached view on consumer index (potentially updated in loop) + final long mask = this.mask; + final long capacity = mask + 1; + long consumerIndexCache = lvConsumerIndexCache(); // LoadLoad + long currentProducerIndex; + do { + currentProducerIndex = lvProducerIndex(); // LoadLoad + final long wrapPoint = currentProducerIndex - capacity; + if (consumerIndexCache <= wrapPoint) { + final long currHead = lvConsumerIndex(); // LoadLoad + if (currHead <= wrapPoint) { + return false; // FULL :( + } else { + // update shared cached value of the consumerIndex + svConsumerIndexCache(currHead); // StoreLoad + // update on stack copy, we might need this value again if we lose the CAS. + consumerIndexCache = currHead; + } + } + } while (!casProducerIndex(currentProducerIndex, currentProducerIndex + 1)); + /* + * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on + * the index visibility to poll() we would need to handle the case where the element is not visible. + */ + + // Won CAS, move on to storing + final long offset = calcElementOffset(currentProducerIndex, mask); + soElement(offset, e); // StoreStore + return true; // AWESOME :) + } + + /** + * A wait free alternative to offer which fails on CAS failure. + * + * @param e new element, not null + * @return 1 if next element cannot be filled, -1 if CAS failed, 0 if successful + */ + public int weakOffer(final E e) { + if (null == e) { + throw new NullPointerException("Null is not a valid element"); + } + final long mask = this.mask; + final long capacity = mask + 1; + final long currentTail = lvProducerIndex(); // LoadLoad + final long consumerIndexCache = lvConsumerIndexCache(); // LoadLoad + final long wrapPoint = currentTail - capacity; + if (consumerIndexCache <= wrapPoint) { + long currHead = lvConsumerIndex(); // LoadLoad + if (currHead <= wrapPoint) { + return 1; // FULL :( + } else { + svConsumerIndexCache(currHead); // StoreLoad + } + } + + // look Ma, no loop! + if (!casProducerIndex(currentTail, currentTail + 1)) { + return -1; // CAS FAIL :( + } + + // Won CAS, move on to storing + final long offset = calcElementOffset(currentTail, mask); + soElement(offset, e); + return 0; // AWESOME :) + } + + /** + * {@inheritDoc} + *

+ * IMPLEMENTATION NOTES:
+ * Lock free poll using ordered loads/stores. As class name suggests access is limited to a single thread. + * + * @see java.util.Queue#poll() + */ + @Override + public E poll() { + final long consumerIndex = lvConsumerIndex(); // LoadLoad + final long offset = calcElementOffset(consumerIndex); + // Copy field to avoid re-reading after volatile load + final E[] buffer = this.buffer; + + // If we can't see the next available element we can't poll + E e = lvElement(buffer, offset); // LoadLoad + if (null == e) { + /* + * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after + * winning the CAS on offer but before storing the element in the queue. Other producers may go on + * to fill up the queue after this element. + */ + if (consumerIndex != lvProducerIndex()) { + do { + e = lvElement(buffer, offset); + } while (e == null); + } else { + return null; + } + } + + spElement(buffer, offset, null); + soConsumerIndex(consumerIndex + 1); // StoreStore + return e; + } + + /** + * {@inheritDoc} + *

+ * IMPLEMENTATION NOTES:
+ * Lock free peek using ordered loads. As class name suggests access is limited to a single thread. + * + * @see java.util.Queue#poll() + */ + @Override + public E peek() { + // Copy field to avoid re-reading after volatile load + final E[] buffer = this.buffer; + + final long consumerIndex = lvConsumerIndex(); // LoadLoad + final long offset = calcElementOffset(consumerIndex); + E e = lvElement(buffer, offset); + if (null == e) { + /* + * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after + * winning the CAS on offer but before storing the element in the queue. Other producers may go on + * to fill up the queue after this element. + */ + if (consumerIndex != lvProducerIndex()) { + do { + e = lvElement(buffer, offset); + } while (e == null); + } else { + return null; + } + } + return e; + } + + /** + * {@inheritDoc} + *

+ * + */ + @Override + public int size() { + /* + * It is possible for a thread to be interrupted or reschedule between the read of the producer and + * consumer indices, therefore protection is required to ensure size is within valid range. In the + * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer + * index BEFORE the producer index. + */ + long after = lvConsumerIndex(); + while (true) { + final long before = after; + final long currentProducerIndex = lvProducerIndex(); + after = lvConsumerIndex(); + if (before == after) { + return (int) (currentProducerIndex - after); + } + } + } + + @Override + public boolean isEmpty() { + // Order matters! + // Loading consumer before producer allows for producer increments after consumer index is read. + // This ensures the correctness of this method at least for the consumer thread. Other threads POV is + // not really + // something we can fix here. + return lvConsumerIndex() == lvProducerIndex(); + } +} + +abstract class MpscArrayQueueL1Pad extends ConcurrentCircularArrayQueue { + long p10, p11, p12, p13, p14, p15, p16; + long p30, p31, p32, p33, p34, p35, p36, p37; + + public MpscArrayQueueL1Pad(int capacity) { + super(capacity); + } +} + +abstract class MpscArrayQueueTailField extends MpscArrayQueueL1Pad { + private static final long P_INDEX_OFFSET; + + static { + try { + P_INDEX_OFFSET = PlatformDependent0.UNSAFE.objectFieldOffset(MpscArrayQueueTailField.class + .getDeclaredField("producerIndex")); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + private volatile long producerIndex; + + public MpscArrayQueueTailField(int capacity) { + super(capacity); + } + + protected final long lvProducerIndex() { + return producerIndex; + } + + protected final boolean casProducerIndex(long expect, long newValue) { + return PlatformDependent0.UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); + } +} + +abstract class MpscArrayQueueMidPad extends MpscArrayQueueTailField { + long p20, p21, p22, p23, p24, p25, p26; + long p30, p31, p32, p33, p34, p35, p36, p37; + + public MpscArrayQueueMidPad(int capacity) { + super(capacity); + } +} + +abstract class MpscArrayQueueHeadCacheField extends MpscArrayQueueMidPad { + private volatile long headCache; + + public MpscArrayQueueHeadCacheField(int capacity) { + super(capacity); + } + + protected final long lvConsumerIndexCache() { + return headCache; + } + + protected final void svConsumerIndexCache(long v) { + headCache = v; + } +} + +abstract class MpscArrayQueueL2Pad extends MpscArrayQueueHeadCacheField { + long p20, p21, p22, p23, p24, p25, p26; + long p30, p31, p32, p33, p34, p35, p36, p37; + + public MpscArrayQueueL2Pad(int capacity) { + super(capacity); + } +} + +abstract class MpscArrayQueueConsumerField extends MpscArrayQueueL2Pad { + private static final long C_INDEX_OFFSET; + static { + try { + C_INDEX_OFFSET = PlatformDependent0.UNSAFE.objectFieldOffset(MpscArrayQueueConsumerField.class + .getDeclaredField("consumerIndex")); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + } + private volatile long consumerIndex; + + public MpscArrayQueueConsumerField(int capacity) { + super(capacity); + } + + protected final long lvConsumerIndex() { + return consumerIndex; + } + + protected void soConsumerIndex(long l) { + PlatformDependent0.UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, l); + } +} + diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index ad68145054..767d80eb65 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -442,6 +443,18 @@ public final class PlatformDependent { return new MpscLinkedQueue(); } + /** + * Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single + * consumer (one thread!) with the given fixes {@code capacity}. + */ + public static Queue newFixedMpscQueue(int capacity) { + if (hasUnsafe()) { + return new MpscArrayQueue(capacity); + } else { + return new LinkedBlockingQueue(capacity); + } + } + /** * Return the {@link ClassLoader} for the given {@link Class}. */ diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java index 3109c171e1..e4670e9d28 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java @@ -36,7 +36,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; final class PlatformDependent0 { private static final InternalLogger logger = InternalLoggerFactory.getInstance(PlatformDependent0.class); - private static final Unsafe UNSAFE; + static final Unsafe UNSAFE; private static final boolean BIG_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN; private static final long ADDRESS_FIELD_OFFSET; private static final long ARRAY_BASE_OFFSET; diff --git a/license/LICENSE.jctools.txt b/license/LICENSE.jctools.txt new file mode 100644 index 0000000000..66a27ec5ff --- /dev/null +++ b/license/LICENSE.jctools.txt @@ -0,0 +1,177 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS +