diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index 2fd3c985a9..e56ba5817c 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -19,9 +19,11 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import org.jctools.queues.MpscArrayQueue; import org.jctools.queues.MpscChunkedArrayQueue; +import org.jctools.queues.MpscUnboundedArrayQueue; import org.jctools.queues.SpscLinkedQueue; import org.jctools.queues.atomic.MpscAtomicArrayQueue; -import org.jctools.queues.atomic.MpscLinkedAtomicQueue; +import org.jctools.queues.atomic.MpscGrowableAtomicArrayQueue; +import org.jctools.queues.atomic.MpscUnboundedAtomicArrayQueue; import org.jctools.queues.atomic.SpscLinkedAtomicQueue; import org.jctools.util.Pow2; import org.jctools.util.UnsafeAccess; @@ -51,6 +53,8 @@ import static io.netty.util.internal.PlatformDependent0.HASH_CODE_C1; import static io.netty.util.internal.PlatformDependent0.HASH_CODE_C2; import static io.netty.util.internal.PlatformDependent0.hashCodeAsciiSanitize; import static io.netty.util.internal.PlatformDependent0.unalignedAccess; +import static java.lang.Math.max; +import static java.lang.Math.min; /** * Utility that detects various properties specific to the current runtime @@ -79,7 +83,6 @@ public final class PlatformDependent { private static final int MPSC_CHUNK_SIZE = 1024; private static final int MIN_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE * 2; - private static final int DEFAULT_MAX_MPSC_CAPACITY = MPSC_CHUNK_SIZE * MPSC_CHUNK_SIZE; private static final int MAX_ALLOWED_MPSC_CAPACITY = Pow2.MAX_POW2; private static final long BYTE_ARRAY_BASE_OFFSET = byteArrayBaseOffset0(); @@ -826,25 +829,27 @@ public final class PlatformDependent { } static Queue newMpscQueue(final int maxCapacity) { - if (USE_MPSC_CHUNKED_ARRAY_QUEUE) { - // Calculate the max capacity which can not be bigger then MAX_ALLOWED_MPSC_CAPACITY. - // This is forced by the MpscChunkedArrayQueue implementation as will try to round it - // up to the next power of two and so will overflow otherwise. - final int capacity = - Math.max(Math.min(maxCapacity, MAX_ALLOWED_MPSC_CAPACITY), MIN_MAX_MPSC_CAPACITY); - return new MpscChunkedArrayQueue(MPSC_CHUNK_SIZE, capacity); - } else { - return new MpscLinkedAtomicQueue(); - } + // Calculate the max capacity which can not be bigger then MAX_ALLOWED_MPSC_CAPACITY. + // This is forced by the MpscChunkedArrayQueue implementation as will try to round it + // up to the next power of two and so will overflow otherwise. + final int capacity = max(min(maxCapacity, MAX_ALLOWED_MPSC_CAPACITY), MIN_MAX_MPSC_CAPACITY); + return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscChunkedArrayQueue(MPSC_CHUNK_SIZE, capacity) + : new MpscGrowableAtomicArrayQueue(MPSC_CHUNK_SIZE, capacity); + } + + static Queue newMpscQueue() { + return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue(MPSC_CHUNK_SIZE) + : new MpscUnboundedAtomicArrayQueue(MPSC_CHUNK_SIZE); } } /** * Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single * consumer (one thread!). + * @return A MPSC queue which may be unbounded. */ public static Queue newMpscQueue() { - return newMpscQueue(DEFAULT_MAX_MPSC_CAPACITY); + return Mpsc.newMpscQueue(); } /** diff --git a/pom.xml b/pom.xml index ce90816c20..af3002c121 100644 --- a/pom.xml +++ b/pom.xml @@ -370,7 +370,7 @@ org.jctools jctools-core - 2.0.1 + 2.0.2 diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 2607615117..16f0adff74 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -171,7 +171,8 @@ final class EpollEventLoop extends SingleThreadEventLoop { @Override protected Queue newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() - return PlatformDependent.newMpscQueue(maxPendingTasks); + return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() + : PlatformDependent.newMpscQueue(maxPendingTasks); } @Override diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index fea086693b..ccb91173ee 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -298,7 +298,8 @@ final class KQueueEventLoop extends SingleThreadEventLoop { @Override protected Queue newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() - return PlatformDependent.newMpscQueue(maxPendingTasks); + return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() + : PlatformDependent.newMpscQueue(maxPendingTasks); } @Override diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index 3de80fca12..331182e0e7 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -256,7 +256,8 @@ public final class NioEventLoop extends SingleThreadEventLoop { @Override protected Queue newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() - return PlatformDependent.newMpscQueue(maxPendingTasks); + return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() + : PlatformDependent.newMpscQueue(maxPendingTasks); } @Override