Use unbounded queues from JCTools 2.0.2

Motivation:
JCTools 2.0.2 provides an unbounded MPSC linked queue. Before we shaded JCTools we had our own unbounded MPSC linked queue and used it in various places but gave this up because there was no public equivalent available in JCTools at the time.

Modifications:
- Use JCTool's MPSC linked queue when no upper bound is specified

Result:
Fixes https://github.com/netty/netty/issues/5951
This commit is contained in:
Scott Mitchell 2016-12-06 09:52:13 -08:00
parent 24263c2bd8
commit 7cfe416182
5 changed files with 25 additions and 17 deletions

View File

@ -19,9 +19,11 @@ import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
import org.jctools.queues.MpscArrayQueue; import org.jctools.queues.MpscArrayQueue;
import org.jctools.queues.MpscChunkedArrayQueue; import org.jctools.queues.MpscChunkedArrayQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;
import org.jctools.queues.SpscLinkedQueue; import org.jctools.queues.SpscLinkedQueue;
import org.jctools.queues.atomic.MpscAtomicArrayQueue; import org.jctools.queues.atomic.MpscAtomicArrayQueue;
import org.jctools.queues.atomic.MpscLinkedAtomicQueue; import org.jctools.queues.atomic.MpscGrowableAtomicArrayQueue;
import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue;
import org.jctools.queues.atomic.SpscLinkedAtomicQueue; import org.jctools.queues.atomic.SpscLinkedAtomicQueue;
import org.jctools.util.Pow2; import org.jctools.util.Pow2;
import org.jctools.util.UnsafeAccess; import org.jctools.util.UnsafeAccess;
@ -51,6 +53,8 @@ import static io.netty.util.internal.PlatformDependent0.HASH_CODE_C1;
import static io.netty.util.internal.PlatformDependent0.HASH_CODE_C2; import static io.netty.util.internal.PlatformDependent0.HASH_CODE_C2;
import static io.netty.util.internal.PlatformDependent0.hashCodeAsciiSanitize; import static io.netty.util.internal.PlatformDependent0.hashCodeAsciiSanitize;
import static io.netty.util.internal.PlatformDependent0.unalignedAccess; import static io.netty.util.internal.PlatformDependent0.unalignedAccess;
import static java.lang.Math.max;
import static java.lang.Math.min;
/** /**
* Utility that detects various properties specific to the current runtime * Utility that detects various properties specific to the current runtime
@ -79,7 +83,6 @@ public final class PlatformDependent {
private static final int MPSC_CHUNK_SIZE = 1024; private static final int MPSC_CHUNK_SIZE = 1024;
private static final int MIN_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE * 2; private static final int MIN_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE * 2;
private static final int DEFAULT_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE * MPSC_CHUNK_SIZE;
private static final int MAX_ALLOWED_MPSC_CAPACITY = Pow2.MAX_POW2; private static final int MAX_ALLOWED_MPSC_CAPACITY = Pow2.MAX_POW2;
private static final long BYTE_ARRAY_BASE_OFFSET = byteArrayBaseOffset0(); private static final long BYTE_ARRAY_BASE_OFFSET = byteArrayBaseOffset0();
@ -826,25 +829,27 @@ public final class PlatformDependent {
} }
static <T> Queue<T> newMpscQueue(final int maxCapacity) { static <T> Queue<T> newMpscQueue(final int maxCapacity) {
if (USE_MPSC_CHUNKED_ARRAY_QUEUE) { // Calculate the max capacity which can not be bigger then MAX_ALLOWED_MPSC_CAPACITY.
// Calculate the max capacity which can not be bigger then MAX_ALLOWED_MPSC_CAPACITY. // This is forced by the MpscChunkedArrayQueue implementation as will try to round it
// This is forced by the MpscChunkedArrayQueue implementation as will try to round it // up to the next power of two and so will overflow otherwise.
// up to the next power of two and so will overflow otherwise. final int capacity = max(min(maxCapacity, MAX_ALLOWED_MPSC_CAPACITY), MIN_MAX_MPSC_CAPACITY);
final int capacity = return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscChunkedArrayQueue<T>(MPSC_CHUNK_SIZE, capacity)
Math.max(Math.min(maxCapacity, MAX_ALLOWED_MPSC_CAPACITY), MIN_MAX_MPSC_CAPACITY); : new MpscGrowableAtomicArrayQueue<T>(MPSC_CHUNK_SIZE, capacity);
return new MpscChunkedArrayQueue<T>(MPSC_CHUNK_SIZE, capacity); }
} else {
return new MpscLinkedAtomicQueue<T>(); static <T> Queue<T> newMpscQueue() {
} return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
: new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE);
} }
} }
/** /**
* Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single * Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single
* consumer (one thread!). * consumer (one thread!).
* @return A MPSC queue which may be unbounded.
*/ */
public static <T> Queue<T> newMpscQueue() { public static <T> Queue<T> newMpscQueue() {
return newMpscQueue(DEFAULT_MAX_MPSC_CAPACITY); return Mpsc.newMpscQueue();
} }
/** /**

View File

@ -370,7 +370,7 @@
<dependency> <dependency>
<groupId>org.jctools</groupId> <groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId> <artifactId>jctools-core</artifactId>
<version>2.0.1</version> <version>2.0.2</version>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -171,7 +171,8 @@ final class EpollEventLoop extends SingleThreadEventLoop {
@Override @Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask() // This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks); return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
} }
@Override @Override

View File

@ -298,7 +298,8 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
@Override @Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask() // This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks); return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
} }
@Override @Override

View File

@ -256,7 +256,8 @@ public final class NioEventLoop extends SingleThreadEventLoop {
@Override @Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask() // This event loop never calls takeTask()
return PlatformDependent.newMpscQueue(maxPendingTasks); return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
} }
@Override @Override