From c784271620cb94a4dce99057a257ada58ffdb829 Mon Sep 17 00:00:00 2001 From: Guido Medina Date: Sun, 27 Mar 2016 13:25:39 +0100 Subject: [PATCH] Use shaded dependency on JCTools instead of copy and paste Motivation: JCTools supports both non-unsafe, unsafe versions of queues and JDK6 which allows us to shade the library in netty-common allowing it to stay "zero dependency". Modifications: - Remove copy paste JCTools code and shade the library (dependencies that are shaded should be removed from the section of the generated POM). - Remove usage of OneTimeTask and remove it all together. Result: Less code to maintain and easier to update JCTools and less GC pressure as the queue implementation nt creates so much garbage --- .gitignore | 1 + .../websocketx/WebSocketClientHandshaker.java | 5 +- .../codec/compression/JZlibEncoder.java | 5 +- .../codec/compression/JdkZlibEncoder.java | 5 +- common/pom.xml | 32 ++ .../java/io/netty/util/HashedWheelTimer.java | 44 +- .../io/netty/util/ThreadDeathWatcher.java | 8 +- .../AbstractScheduledEventExecutor.java | 5 +- .../netty/util/concurrent/DefaultPromise.java | 12 +- .../concurrent/SingleThreadEventExecutor.java | 55 ++- .../util/internal/BaseLinkedAtomicQueue.java | 110 ----- .../netty/util/internal/BaseLinkedQueue.java | 158 -------- .../ConcurrentCircularArrayQueue.java | 207 ---------- .../util/internal/LinkedQueueAtomicNode.java | 71 ---- .../netty/util/internal/LinkedQueueNode.java | 80 ---- .../util/internal/MessagePassingQueue.java | 297 -------------- .../netty/util/internal/MpscArrayQueue.java | 331 --------------- .../netty/util/internal/MpscLinkedQueue.java | 378 ------------------ .../util/internal/MpscLinkedQueueHeadRef.java | 54 --- .../util/internal/MpscLinkedQueueNode.java | 65 --- .../util/internal/MpscLinkedQueuePad0.java | 22 - .../util/internal/MpscLinkedQueuePad1.java | 25 -- .../util/internal/MpscLinkedQueueTailRef.java | 54 --- .../io/netty/util/internal/OneTimeTask.java | 32 -- .../util/internal/PlatformDependent.java | 22 +- .../RecyclableMpscLinkedQueueNode.java | 45 --- .../util/internal/SpscLinkedAtomicQueue.java | 118 ------ .../netty/util/internal/SpscLinkedQueue.java | 215 ---------- .../java/io/netty/handler/ssl/SslHandler.java | 11 +- .../handler/stream/ChunkedWriteHandler.java | 3 +- .../handler/timeout/WriteTimeoutHandler.java | 3 +- .../traffic/ChannelTrafficShapingHandler.java | 3 +- .../traffic/GlobalTrafficShapingHandler.java | 3 +- pom.xml | 12 + .../channel/epoll/AbstractEpollChannel.java | 3 +- .../epoll/AbstractEpollStreamChannel.java | 26 +- .../epoll/EpollDomainSocketChannel.java | 3 +- .../io/netty/channel/rxtx/RxtxChannel.java | 3 +- .../channel/sctp/nio/NioSctpChannel.java | 5 +- .../sctp/nio/NioSctpServerChannel.java | 5 +- .../channel/sctp/oio/OioSctpChannel.java | 5 +- .../sctp/oio/OioSctpServerChannel.java | 5 +- .../io/netty/bootstrap/AbstractBootstrap.java | 3 +- .../java/io/netty/bootstrap/Bootstrap.java | 3 +- .../io/netty/bootstrap/ServerBootstrap.java | 3 +- .../io/netty/channel/AbstractChannel.java | 17 +- .../AbstractChannelHandlerContext.java | 39 +- .../netty/channel/ChannelOutboundBuffer.java | 3 +- .../channel/CombinedChannelDuplexHandler.java | 3 +- .../netty/channel/DefaultChannelPipeline.java | 19 +- .../io/netty/channel/local/LocalChannel.java | 8 +- .../netty/channel/nio/AbstractNioChannel.java | 3 +- .../netty/channel/pool/FixedChannelPool.java | 5 +- .../netty/channel/pool/SimpleChannelPool.java | 5 +- .../channel/socket/nio/NioSocketChannel.java | 5 +- .../channel/socket/oio/OioSocketChannel.java | 3 +- .../netty/channel/local/LocalChannelTest.java | 15 +- 57 files changed, 208 insertions(+), 2472 deletions(-) delete mode 100644 common/src/main/java/io/netty/util/internal/BaseLinkedAtomicQueue.java delete mode 100644 common/src/main/java/io/netty/util/internal/BaseLinkedQueue.java delete mode 100644 common/src/main/java/io/netty/util/internal/ConcurrentCircularArrayQueue.java delete mode 100644 common/src/main/java/io/netty/util/internal/LinkedQueueAtomicNode.java delete mode 100644 common/src/main/java/io/netty/util/internal/LinkedQueueNode.java delete mode 100644 common/src/main/java/io/netty/util/internal/MessagePassingQueue.java delete mode 100644 common/src/main/java/io/netty/util/internal/MpscArrayQueue.java delete mode 100644 common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java delete mode 100644 common/src/main/java/io/netty/util/internal/MpscLinkedQueueHeadRef.java delete mode 100644 common/src/main/java/io/netty/util/internal/MpscLinkedQueueNode.java delete mode 100644 common/src/main/java/io/netty/util/internal/MpscLinkedQueuePad0.java delete mode 100644 common/src/main/java/io/netty/util/internal/MpscLinkedQueuePad1.java delete mode 100644 common/src/main/java/io/netty/util/internal/MpscLinkedQueueTailRef.java delete mode 100644 common/src/main/java/io/netty/util/internal/OneTimeTask.java delete mode 100644 common/src/main/java/io/netty/util/internal/RecyclableMpscLinkedQueueNode.java delete mode 100644 common/src/main/java/io/netty/util/internal/SpscLinkedAtomicQueue.java delete mode 100644 common/src/main/java/io/netty/util/internal/SpscLinkedQueue.java diff --git a/.gitignore b/.gitignore index 8a80d67d0d..a3c75313a2 100644 --- a/.gitignore +++ b/.gitignore @@ -30,3 +30,4 @@ # JVM crash logs hs_err_pid*.log +dependency-reduced-pom.xml diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java index a224313652..f222a7660e 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker.java @@ -33,7 +33,6 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.EmptyArrays; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.StringUtil; import java.net.URI; @@ -272,7 +271,7 @@ public abstract class WebSocketClientHandshaker { // Delay the removal of the decoder so the user can setup the pipeline if needed to handle // WebSocketFrame messages. // See https://github.com/netty/netty/issues/4533 - channel.eventLoop().execute(new OneTimeTask() { + channel.eventLoop().execute(new Runnable() { @Override public void run() { p.remove(codec); @@ -289,7 +288,7 @@ public abstract class WebSocketClientHandshaker { // Delay the removal of the decoder so the user can setup the pipeline if needed to handle // WebSocketFrame messages. // See https://github.com/netty/netty/issues/4533 - channel.eventLoop().execute(new OneTimeTask() { + channel.eventLoop().execute(new Runnable() { @Override public void run() { p.remove(context.handler()); diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java index 0e6f0c30d9..3868bfb868 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JZlibEncoder.java @@ -26,7 +26,6 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromiseNotifier; import io.netty.util.concurrent.EventExecutor; import io.netty.util.internal.EmptyArrays; -import io.netty.util.internal.OneTimeTask; import java.util.concurrent.TimeUnit; @@ -253,7 +252,7 @@ public class JZlibEncoder extends ZlibEncoder { return finishEncode(ctx, promise); } else { final ChannelPromise p = ctx.newPromise(); - executor.execute(new OneTimeTask() { + executor.execute(new Runnable() { @Override public void run() { ChannelFuture f = finishEncode(ctx(), p); @@ -352,7 +351,7 @@ public class JZlibEncoder extends ZlibEncoder { if (!f.isDone()) { // Ensure the channel is closed even if the write operation completes in time. - ctx.executor().schedule(new OneTimeTask() { + ctx.executor().schedule(new Runnable() { @Override public void run() { ctx.close(promise); diff --git a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java index a75fb5c4b1..49be347890 100644 --- a/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/compression/JdkZlibEncoder.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromiseNotifier; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.internal.OneTimeTask; import java.util.concurrent.TimeUnit; import java.util.zip.CRC32; @@ -164,7 +163,7 @@ public class JdkZlibEncoder extends ZlibEncoder { return finishEncode(ctx, promise); } else { final ChannelPromise p = ctx.newPromise(); - executor.execute(new OneTimeTask() { + executor.execute(new Runnable() { @Override public void run() { ChannelFuture f = finishEncode(ctx(), p); @@ -260,7 +259,7 @@ public class JdkZlibEncoder extends ZlibEncoder { if (!f.isDone()) { // Ensure the channel is closed even if the write operation completes in time. - ctx.executor().schedule(new OneTimeTask() { + ctx.executor().schedule(new Runnable() { @Override public void run() { ctx.close(promise); diff --git a/common/pom.xml b/common/pom.xml index b18d5bf3b7..67d644531f 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -37,6 +37,12 @@ compile true + + org.jctools + jctools-core + + true + @@ -68,6 +74,31 @@ + + maven-shade-plugin + + + package + + shade + + + + + org.jctools + + + + + org.jctools. + io.netty.util.internal.shaded.org.jctools. + + + true + + + + org.apache.felix maven-bundle-plugin @@ -81,6 +112,7 @@ + * diff --git a/common/src/main/java/io/netty/util/HashedWheelTimer.java b/common/src/main/java/io/netty/util/HashedWheelTimer.java index 16f6759c19..8ed6b41c1a 100644 --- a/common/src/main/java/io/netty/util/HashedWheelTimer.java +++ b/common/src/main/java/io/netty/util/HashedWheelTimer.java @@ -15,7 +15,6 @@ */ package io.netty.util; -import io.netty.util.internal.MpscLinkedQueueNode; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -106,7 +105,7 @@ public class HashedWheelTimer implements Timer { private final int mask; private final CountDownLatch startTimeInitialized = new CountDownLatch(1); private final Queue timeouts = PlatformDependent.newMpscQueue(); - private final Queue cancelledTimeouts = PlatformDependent.newMpscQueue(); + private final Queue cancelledTimeouts = PlatformDependent.newMpscQueue(); private volatile long startTime; @@ -412,13 +411,13 @@ public class HashedWheelTimer implements Timer { private void processCancelledTasks() { for (;;) { - Runnable task = cancelledTimeouts.poll(); - if (task == null) { + HashedWheelTimeout timeout = cancelledTimeouts.poll(); + if (timeout == null) { // all processed break; } try { - task.run(); + timeout.remove(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while process a cancellation task", t); @@ -472,8 +471,7 @@ public class HashedWheelTimer implements Timer { } } - private static final class HashedWheelTimeout extends MpscLinkedQueueNode - implements Timeout { + private static final class HashedWheelTimeout implements Timeout { private static final int ST_INIT = 0; private static final int ST_CANCELLED = 1; @@ -530,25 +528,20 @@ public class HashedWheelTimer implements Timer { if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { return false; } - // If a task should be canceled we create a new Runnable for this to another queue which will - // be processed on each tick. So this means that we will have a GC latency of max. 1 tick duration - // which is good enough. This way we can make again use of our MpscLinkedQueue and so minimize the - // locking / overhead as much as possible. - // - // It is important that we not just add the HashedWheelTimeout itself again as it extends - // MpscLinkedQueueNode and so may still be used as tombstone. - timer.cancelledTimeouts.add(new Runnable() { - @Override - public void run() { - HashedWheelBucket bucket = HashedWheelTimeout.this.bucket; - if (bucket != null) { - bucket.remove(HashedWheelTimeout.this); - } - } - }); + // If a task should be canceled we put this to another queue which will be processed on each tick. + // So this means that we will have a GC latency of max. 1 tick duration which is good enough. This way + // we can make again use of our MpscLinkedQueue and so minimize the locking / overhead as much as possible. + timer.cancelledTimeouts.add(this); return true; } + void remove() { + HashedWheelBucket bucket = this.bucket; + if (bucket != null) { + bucket.remove(this); + } + } + public boolean compareAndSetState(int expected, int state) { return STATE_UPDATER.compareAndSet(this, expected, state); } @@ -567,11 +560,6 @@ public class HashedWheelTimer implements Timer { return state() == ST_EXPIRED; } - @Override - public HashedWheelTimeout value() { - return this; - } - public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; diff --git a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java index 0804269736..00ded85f8a 100644 --- a/common/src/main/java/io/netty/util/ThreadDeathWatcher.java +++ b/common/src/main/java/io/netty/util/ThreadDeathWatcher.java @@ -17,7 +17,6 @@ package io.netty.util; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.internal.MpscLinkedQueueNode; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.SystemPropertyUtil; @@ -213,7 +212,7 @@ public final class ThreadDeathWatcher { } } - private static final class Entry extends MpscLinkedQueueNode { + private static final class Entry { final Thread thread; final Runnable task; final boolean isWatch; @@ -224,11 +223,6 @@ public final class ThreadDeathWatcher { this.isWatch = isWatch; } - @Override - public Entry value() { - return this; - } - @Override public int hashCode() { return thread.hashCode() ^ task.hashCode(); diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index e5a5a8350d..ccedd9ce43 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -16,7 +16,6 @@ package io.netty.util.concurrent; import io.netty.util.internal.ObjectUtil; -import io.netty.util.internal.OneTimeTask; import java.util.PriorityQueue; import java.util.Queue; @@ -188,7 +187,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut if (inEventLoop()) { scheduledTaskQueue().add(task); } else { - execute(new OneTimeTask() { + execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); @@ -203,7 +202,7 @@ public abstract class AbstractScheduledEventExecutor extends AbstractEventExecut if (inEventLoop()) { scheduledTaskQueue().remove(task); } else { - execute(new OneTimeTask() { + execute(new Runnable() { @Override public void run() { removeScheduled(task); diff --git a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java index 71612b03df..f1c11c53ad 100644 --- a/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java +++ b/common/src/main/java/io/netty/util/concurrent/DefaultPromise.java @@ -18,7 +18,6 @@ package io.netty.util.concurrent; import io.netty.util.Signal; import io.netty.util.internal.EmptyArrays; import io.netty.util.internal.InternalThreadLocalMap; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -36,12 +35,14 @@ public class DefaultPromise extends AbstractFuture implements Promise { private static final InternalLogger rejectedExecutionLogger = InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution"); private static final int MAX_LISTENER_STACK_DEPTH = 8; + @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater RESULT_UPDATER; private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class.getName() + ".SUCCESS"); private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class.getName() + ".UNCANCELLABLE"); private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(new CancellationException()); static { + @SuppressWarnings("rawtypes") AtomicReferenceFieldUpdater updater = PlatformDependent.newAtomicReferenceFieldUpdater(DefaultPromise.class, "result"); RESULT_UPDATER = updater == null ? AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, @@ -302,6 +303,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } } + @SuppressWarnings("unchecked") @Override public V getNow() { Object result = this.result; @@ -436,7 +438,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } } - safeExecute(executor, new OneTimeTask() { + safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); @@ -466,7 +468,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } } - safeExecute(executor, new OneTimeTask() { + safeExecute(executor, new Runnable() { @Override public void run() { notifyListener0(future, listener); @@ -664,7 +666,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { if (listeners instanceof GenericProgressiveFutureListener[]) { final GenericProgressiveFutureListener[] array = (GenericProgressiveFutureListener[]) listeners; - safeExecute(executor, new OneTimeTask() { + safeExecute(executor, new Runnable() { @Override public void run() { notifyProgressiveListeners0(self, array, progress, total); @@ -673,7 +675,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { } else { final GenericProgressiveFutureListener> l = (GenericProgressiveFutureListener>) listeners; - safeExecute(executor, new OneTimeTask() { + safeExecute(executor, new Runnable() { @Override public void run() { notifyProgressiveListener0(self, l, progress, total); diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index ea2ef8a68c..3fac6bb84f 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -249,7 +249,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx } } - private void fetchFromScheduledTaskQueue() { + private boolean fetchFromScheduledTaskQueue() { if (hasScheduledTasks()) { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { @@ -257,9 +257,14 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx if (scheduledTask == null) { break; } - taskQueue.add(scheduledTask); + if (!taskQueue.offer(scheduledTask)) { + // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. + scheduledTaskQueue().add((ScheduledFutureTask) scheduledTask); + return false; + } } } + return true; } /** @@ -299,7 +304,12 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx if (isShutdown()) { reject(); } - taskQueue.add(task); + try { + taskQueue.add(task); + } catch (IllegalStateException e) { + // Just use add and catch the exception as this should happen only very rarely. + throw new RejectedExecutionException("Internal task queue is full", e); + } } /** @@ -318,25 +328,30 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx * @return {@code true} if and only if at least one task was run */ protected boolean runAllTasks() { - fetchFromScheduledTaskQueue(); - Runnable task = pollTask(); - if (task == null) { - return false; - } - - for (;;) { - try { - task.run(); - } catch (Throwable t) { - logger.warn("A task raised an exception.", t); - } - - task = pollTask(); + boolean fetchedAll; + do { + fetchedAll = fetchFromScheduledTaskQueue(); + Runnable task = pollTask(); if (task == null) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); - return true; + return false; } - } + + for (;;) { + try { + task.run(); + } catch (Throwable t) { + logger.warn("A task raised an exception.", t); + } + + task = pollTask(); + if (task == null) { + break; + } + } + } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. + + lastExecutionTime = ScheduledFutureTask.nanoTime(); + return true; } /** diff --git a/common/src/main/java/io/netty/util/internal/BaseLinkedAtomicQueue.java b/common/src/main/java/io/netty/util/internal/BaseLinkedAtomicQueue.java deleted file mode 100644 index 9936efbb9f..0000000000 --- a/common/src/main/java/io/netty/util/internal/BaseLinkedAtomicQueue.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2016 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.util.internal; - -import java.util.AbstractQueue; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Forked from JCTools. - */ -abstract class BaseLinkedAtomicQueue extends AbstractQueue { - private final AtomicReference> producerNode; - private final AtomicReference> consumerNode; - public BaseLinkedAtomicQueue() { - producerNode = new AtomicReference>(); - consumerNode = new AtomicReference>(); - } - protected final LinkedQueueAtomicNode lvProducerNode() { - return producerNode.get(); - } - protected final LinkedQueueAtomicNode lpProducerNode() { - return producerNode.get(); - } - protected final void spProducerNode(LinkedQueueAtomicNode node) { - producerNode.lazySet(node); - } - protected final LinkedQueueAtomicNode xchgProducerNode(LinkedQueueAtomicNode node) { - return producerNode.getAndSet(node); - } - protected final LinkedQueueAtomicNode lvConsumerNode() { - return consumerNode.get(); - } - - protected final LinkedQueueAtomicNode lpConsumerNode() { - return consumerNode.get(); - } - protected final void spConsumerNode(LinkedQueueAtomicNode node) { - consumerNode.lazySet(node); - } - @Override - public final Iterator iterator() { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * This is an O(n) operation as we run through all the nodes and count them.
- * - * @see java.util.Queue#size() - */ - @Override - public final int size() { - LinkedQueueAtomicNode chaserNode = lvConsumerNode(); - final LinkedQueueAtomicNode producerNode = lvProducerNode(); - int size = 0; - // must chase the nodes all the way to the producer node, but there's no need to chase a moving target. - while (chaserNode != producerNode && size < Integer.MAX_VALUE) { - LinkedQueueAtomicNode next; - while ((next = chaserNode.lvNext()) == null) { - continue; - } - chaserNode = next; - size++; - } - return size; - } - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe - * the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to - * be null. - * - * @see MessagePassingQueue#isEmpty() - */ - @Override - public final boolean isEmpty() { - return lvConsumerNode() == lvProducerNode(); - } -} diff --git a/common/src/main/java/io/netty/util/internal/BaseLinkedQueue.java b/common/src/main/java/io/netty/util/internal/BaseLinkedQueue.java deleted file mode 100644 index 778a4747cf..0000000000 --- a/common/src/main/java/io/netty/util/internal/BaseLinkedQueue.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright 2016 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.util.internal; - -import java.util.AbstractQueue; -import java.util.Iterator; - -/** - * Forked from JCTools. - * - * A base data structure for concurrent linked queues. - * - * @param - */ -abstract class BaseLinkedQueue extends BaseLinkedQueueConsumerNodeRef { - long p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16, p17; - - @Override - public final Iterator iterator() { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * This is an O(n) operation as we run through all the nodes and count them.
- * - * @see java.util.Queue#size() - */ - @Override - public final int size() { - // Read consumer first, this is important because if the producer is node is 'older' than the consumer the - // consumer may overtake it (consume past it). This will lead to an infinite loop below. - LinkedQueueNode chaserNode = lvConsumerNode(); - final LinkedQueueNode producerNode = lvProducerNode(); - int size = 0; - // must chase the nodes all the way to the producer node, but there's no need to chase a moving target. - while (chaserNode != producerNode && size < Integer.MAX_VALUE) { - LinkedQueueNode next; - while ((next = chaserNode.lvNext()) == null) { - continue; - } - chaserNode = next; - size++; - } - return size; - } - - /** - * {@inheritDoc}
- *

- * IMPLEMENTATION NOTES:
- * Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe - * the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to - * be null. - * - * @see MessagePassingQueue#isEmpty() - */ - @Override - public final boolean isEmpty() { - return lvConsumerNode() == lvProducerNode(); - } - - @Override - public int capacity() { - return UNBOUNDED_CAPACITY; - } -} - -abstract class BaseLinkedQueuePad0 extends AbstractQueue implements MessagePassingQueue { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16; -} - -abstract class BaseLinkedQueueProducerNodeRef extends BaseLinkedQueuePad0 { - protected static final long P_NODE_OFFSET; - - static { - try { - P_NODE_OFFSET = PlatformDependent0.objectFieldOffset( - BaseLinkedQueueProducerNodeRef.class.getDeclaredField("producerNode")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - protected LinkedQueueNode producerNode; - protected final void spProducerNode(LinkedQueueNode node) { - producerNode = node; - } - - @SuppressWarnings("unchecked") - protected final LinkedQueueNode lvProducerNode() { - return (LinkedQueueNode) PlatformDependent0.getObjectVolatile(this, P_NODE_OFFSET); - } - - protected final LinkedQueueNode lpProducerNode() { - return producerNode; - } -} - -abstract class BaseLinkedQueuePad1 extends BaseLinkedQueueProducerNodeRef { - long p01, p02, p03, p04, p05, p06, p07; - long p10, p11, p12, p13, p14, p15, p16, p17; -} - -abstract class BaseLinkedQueueConsumerNodeRef extends BaseLinkedQueuePad1 { - protected static final long C_NODE_OFFSET; - - static { - try { - C_NODE_OFFSET = PlatformDependent0.objectFieldOffset( - BaseLinkedQueueConsumerNodeRef.class.getDeclaredField("consumerNode")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - protected LinkedQueueNode consumerNode; - protected final void spConsumerNode(LinkedQueueNode node) { - consumerNode = node; - } - - @SuppressWarnings("unchecked") - protected final LinkedQueueNode lvConsumerNode() { - return (LinkedQueueNode) PlatformDependent0.getObjectVolatile(this, C_NODE_OFFSET); - } - - protected final LinkedQueueNode lpConsumerNode() { - return consumerNode; - } -} diff --git a/common/src/main/java/io/netty/util/internal/ConcurrentCircularArrayQueue.java b/common/src/main/java/io/netty/util/internal/ConcurrentCircularArrayQueue.java deleted file mode 100644 index 5f3fd87180..0000000000 --- a/common/src/main/java/io/netty/util/internal/ConcurrentCircularArrayQueue.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.util.internal; - - -import java.util.AbstractQueue; -import java.util.Iterator; - -/** - * Forked from JCTools. - * - * A concurrent access enabling class used by circular array based queues this class exposes an offset computation - * method along with differently memory fenced load/store methods into the underlying array. The class is pre-padded and - * the array is padded on either side to help with False sharing prvention. It is expected theat subclasses handle post - * padding. - *

- * Offset calculation is separate from access to enable the reuse of a give compute offset. - *

- * Load/Store methods using a buffer parameter are provided to allow the prevention of final field reload after a - * LoadLoad barrier. - *

- * - * @param - */ -abstract class ConcurrentCircularArrayQueue extends ConcurrentCircularArrayQueueL0Pad { - protected static final int REF_BUFFER_PAD; - private static final long REF_ARRAY_BASE; - private static final int REF_ELEMENT_SHIFT; - static { - final int scale = PlatformDependent0.UNSAFE.arrayIndexScale(Object[].class); - if (4 == scale) { - REF_ELEMENT_SHIFT = 2; - } else if (8 == scale) { - REF_ELEMENT_SHIFT = 3; - } else { - throw new IllegalStateException("Unknown pointer size"); - } - // 2 cache lines pad - // TODO: replace 64 with the value we can detect - REF_BUFFER_PAD = (64 * 2) / scale; - // Including the buffer pad in the array base offset - REF_ARRAY_BASE = PlatformDependent0.UNSAFE.arrayBaseOffset(Object[].class) + (REF_BUFFER_PAD * scale); - } - protected final long mask; - // @Stable :( - protected final E[] buffer; - - @SuppressWarnings("unchecked") - public ConcurrentCircularArrayQueue(int capacity) { - int actualCapacity = roundToPowerOfTwo(capacity); - mask = actualCapacity - 1; - // pad data on either end with some empty slots. - buffer = (E[]) new Object[actualCapacity + REF_BUFFER_PAD * 2]; - } - - private static int roundToPowerOfTwo(final int value) { - return 1 << (32 - Integer.numberOfLeadingZeros(value - 1)); - } - /** - * @param index desirable element index - * @return the offset in bytes within the array for a given index. - */ - protected final long calcElementOffset(long index) { - return calcElementOffset(index, mask); - } - /** - * @param index desirable element index - * @param mask - * @return the offset in bytes within the array for a given index. - */ - protected static final long calcElementOffset(long index, long mask) { - return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT); - } - /** - * A plain store (no ordering/fences) of an element to a given offset - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e a kitty - */ - protected final void spElement(long offset, E e) { - spElement(buffer, offset, e); - } - - /** - * A plain store (no ordering/fences) of an element to a given offset - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected static final void spElement(E[] buffer, long offset, E e) { - PlatformDependent0.UNSAFE.putObject(buffer, offset, e); - } - - /** - * An ordered store(store + StoreStore barrier) of an element to a given offset - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected final void soElement(long offset, E e) { - soElement(buffer, offset, e); - } - - /** - * An ordered store(store + StoreStore barrier) of an element to a given offset - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @param e an orderly kitty - */ - protected static final void soElement(E[] buffer, long offset, E e) { - PlatformDependent0.UNSAFE.putOrderedObject(buffer, offset, e); - } - - /** - * A plain load (no ordering/fences) of an element from a given offset. - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - protected final E lpElement(long offset) { - return lpElement(buffer, offset); - } - - /** - * A plain load (no ordering/fences) of an element from a given offset. - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - @SuppressWarnings("unchecked") - protected static final E lpElement(E[] buffer, long offset) { - return (E) PlatformDependent0.UNSAFE.getObject(buffer, offset); - } - - /** - * A volatile load (load + LoadLoad barrier) of an element from a given offset. - * - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - protected final E lvElement(long offset) { - return lvElement(buffer, offset); - } - - /** - * A volatile load (load + LoadLoad barrier) of an element from a given offset. - * - * @param buffer this.buffer - * @param offset computed via {@link ConcurrentCircularArrayQueue#calcElementOffset(long)} - * @return the element at the offset - */ - @SuppressWarnings("unchecked") - protected static final E lvElement(E[] buffer, long offset) { - return (E) PlatformDependent0.UNSAFE.getObjectVolatile(buffer, offset); - } - - @Override - public Iterator iterator() { - throw new UnsupportedOperationException(); - } - - @Override - public void clear() { - while (poll() != null || !isEmpty()) { - // looping - } - } - - public int capacity() { - return (int) (mask + 1); - } -} - -abstract class ConcurrentCircularArrayQueueL0Pad extends AbstractQueue { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p30, p31, p32, p33, p34, p35, p36, p37; -} - diff --git a/common/src/main/java/io/netty/util/internal/LinkedQueueAtomicNode.java b/common/src/main/java/io/netty/util/internal/LinkedQueueAtomicNode.java deleted file mode 100644 index be7b07d351..0000000000 --- a/common/src/main/java/io/netty/util/internal/LinkedQueueAtomicNode.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright 2016 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.util.internal; - -import java.util.concurrent.atomic.AtomicReference; - -/** - * Forked from JCTools. - */ -public final class LinkedQueueAtomicNode extends AtomicReference> { - /** */ - private static final long serialVersionUID = 2404266111789071508L; - private E value; - LinkedQueueAtomicNode() { - } - LinkedQueueAtomicNode(E val) { - spValue(val); - } - /** - * Gets the current value and nulls out the reference to it from this node. - * - * @return value - */ - public E getAndNullValue() { - E temp = lpValue(); - spValue(null); - return temp; - } - - public E lpValue() { - return value; - } - - public void spValue(E newValue) { - value = newValue; - } - - public void soNext(LinkedQueueAtomicNode n) { - lazySet(n); - } - - public LinkedQueueAtomicNode lvNext() { - return get(); - } -} diff --git a/common/src/main/java/io/netty/util/internal/LinkedQueueNode.java b/common/src/main/java/io/netty/util/internal/LinkedQueueNode.java deleted file mode 100644 index cdec73cecd..0000000000 --- a/common/src/main/java/io/netty/util/internal/LinkedQueueNode.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2016 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.util.internal; - -/** - * Forked from JCTools. - */ -final class LinkedQueueNode { - private static final long NEXT_OFFSET; - static { - try { - NEXT_OFFSET = PlatformDependent0.objectFieldOffset(LinkedQueueNode.class.getDeclaredField("next")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - private E value; - private volatile LinkedQueueNode next; - - LinkedQueueNode() { - this(null); - } - - LinkedQueueNode(E val) { - spValue(val); - } - - /** - * Gets the current value and nulls out the reference to it from this node. - * - * @return value - */ - public E getAndNullValue() { - E temp = lpValue(); - spValue(null); - return temp; - } - - public E lpValue() { - return value; - } - - public void spValue(E newValue) { - value = newValue; - } - - public void soNext(LinkedQueueNode n) { - PlatformDependent0.putOrderedObject(this, NEXT_OFFSET, n); - } - - public LinkedQueueNode lvNext() { - return next; - } -} diff --git a/common/src/main/java/io/netty/util/internal/MessagePassingQueue.java b/common/src/main/java/io/netty/util/internal/MessagePassingQueue.java deleted file mode 100644 index 63aa559da5..0000000000 --- a/common/src/main/java/io/netty/util/internal/MessagePassingQueue.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Copyright 2016 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.util.internal; - -import java.util.Queue; - -/** - * Forked from JCTools. - * - * This is a tagging interface for the queues in this library which implement a subset of the {@link Queue} - * interface sufficient for concurrent message passing.
- * Message passing queues provide happens before semantics to messages passed through, namely that writes made - * by the producer before offering the message are visible to the consuming thread after the message has been - * polled out of the queue. - * - * @param - * the event/message type - */ -public interface MessagePassingQueue { - int UNBOUNDED_CAPACITY = -1; - - interface Supplier { - /** - * This method will return the next value to be written to the queue. As such the queue - * implementations are commited to insert the value once the call is made. - *

- * Users should be aware that underlying queue implementations may upfront claim parts of the queue - * for batch operations and this will effect the view on the queue from the supplier method. In - * particular size and any offer methods may take the view that the full batch has already happened. - * - * @return new element, NEVER null - */ - T get(); - } - - interface Consumer { - /** - * This method will process an element already removed from the queue. This method is expected to - * never throw an exception. - *

- * Users should be aware that underlying queue implementations may upfront claim parts of the queue - * for batch operations and this will effect the view on the queue from the accept method. In - * particular size and any poll/peek methods may take the view that the full batch has already - * happened. - * - * @param e not null - */ - void accept(T e); - } - - interface WaitStrategy { - /** - * This method can implement static or dynamic backoff. Dynamic backoff will rely on the counter for - * estimating how long the caller has been idling. The expected usage is: - * - *

-         * 
-         * int ic = 0;
-         * while(true) {
-         *   if(!isGodotArrived()) {
-         *     ic = w.idle(ic);
-         *     continue;
-         *   }
-         *   ic = 0;
-         *   // party with Godot until he goes again
-         * }
-         * 
-         * 
- * - * @param idleCounter idle calls counter, managed by the idle method until reset - * @return new counter value to be used on subsequent idle cycle - */ - int idle(int idleCounter); - } - - interface ExitCondition { - - /** - * This method should be implemented such that the flag read or determination cannot be hoisted out of - * a loop which notmally means a volatile load, but with JDK9 VarHandles may mean getOpaque. - * - * @return true as long as we should keep running - */ - boolean keepRunning(); - } - - /** - * Called from a producer thread subject to the restrictions appropriate to the implementation and - * according to the {@link Queue#offer(Object)} interface. - * - * @param e not null, will throw NPE if it is - * @return true if element was inserted into the queue, false iff full - */ - boolean offer(T e); - - /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation and - * according to the {@link Queue#poll()} interface. - * - * @return a message from the queue if one is available, null iff empty - */ - T poll(); - - /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation and - * according to the {@link Queue#peek()} interface. - * - * @return a message from the queue if one is available, null iff empty - */ - T peek(); - - /** - * This method's accuracy is subject to concurrent modifications happening as the size is estimated and as - * such is a best effort rather than absolute value. For some implementations this method may be O(n) - * rather than O(1). - * - * @return number of messages in the queue, between 0 and {@link Integer#MAX_VALUE} but less or equals to - * capacity (if bounded). - */ - int size(); - - /** - * Removes all items from the queue. Called from the consumer thread subject to the restrictions - * appropriate to the implementation and according to the {@link Queue#clear()} interface. - */ - void clear(); - - /** - * This method's accuracy is subject to concurrent modifications happening as the observation is carried - * out. - * - * @return true if empty, false otherwise - */ - boolean isEmpty(); - - /** - * @return the capacity of this queue or UNBOUNDED_CAPACITY if not bounded - */ - int capacity(); - - /** - * Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed - * to {@link Queue#offer(Object)} this method may return false without the queue being full. - * - * @param e not null, will throw NPE if it is - * @return true if element was inserted into the queue, false if unable to offer - */ - boolean relaxedOffer(T e); - - /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation. As - * opposed to {@link Queue#poll()} this method may return null without the queue being empty. - * - * @return a message from the queue if one is available, null if unable to poll - */ - T relaxedPoll(); - - /** - * Called from the consumer thread subject to the restrictions appropriate to the implementation. As - * opposed to {@link Queue#peek()} this method may return null without the queue being empty. - * - * @return a message from the queue if one is available, null if unable to peek - */ - T relaxedPeek(); - - /** - * Remove all available item from the queue and hand to consume. This should be semantically similar to: - *

-     * M m;
-     * while((m = relaxedPoll()) != null){
-     *  c.accept(m);
-     * }
-     * 
- * There's no strong commitment to the queue being empty at the end of a drain. Called from a - * consumer thread subject to the restrictions appropriate to the implementation. - * - * @return the number of polled elements - */ - int drain(Consumer c); - - /** - * Stuff the queue with elements from the supplier. Semantically similar to: - *

-     * while(relaxedOffer(s.get());
-     * 
- * There's no strong commitment to the queue being full at the end of a fill. Called from a - * producer thread subject to the restrictions appropriate to the implementation. - * - * @return the number of offered elements - */ - int fill(Supplier s); - - /** - * Remove up to limit elements from the queue and hand to consume. This should be semantically - * similar to: - * - *

-     *   M m;
-     *   while((m = relaxedPoll()) != null){
-     *     c.accept(m);
-     *   }
-     * 
- * - * There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer - * thread subject to the restrictions appropriate to the implementation. - * - * @return the number of polled elements - */ - int drain(Consumer c, int limit); - - /** - * Stuff the queue with up to limit elements from the supplier. Semantically similar to: - * - *
-     * 
-     *   for(int i=0; i < limit && relaxedOffer(s.get(); i++);
-     * 
-     * 
- * - * There's no strong commitment to the queue being full at the end of a fill. Called from a producer - * thread subject to the restrictions appropriate to the implementation. - * - * @return the number of offered elements - */ - int fill(Supplier s, int limit); - - /** - * Remove elements from the queue and hand to consume forever. Semantically similar to: - * - *
-     * 
-     *  int idleCounter = 0;
-     *  while (exit.keepRunning()) {
-     *      E e = relaxedPoll();
-     *      if(e==null){
-     *          idleCounter = wait.idle(idleCounter);
-     *          continue;
-     *      }
-     *      idleCounter = 0;
-     *      c.accept(e);
-     *  }
-     * 
-     * 
- * - * Called from a consumer thread subject to the restrictions appropriate to the implementation. - * - */ - void drain(Consumer c, WaitStrategy wait, ExitCondition exit); - - /** - * Stuff the queue with elements from the supplier forever. Semantically similar to: - * - *
-     * 
-     *  int idleCounter = 0;
-     *  while (exit.keepRunning()) {
-     *      E e = s.get();
-     *      while (!relaxedOffer(e)) {
-     *          idleCounter = wait.idle(idleCounter);
-     *          continue;
-     *      }
-     *      idleCounter = 0;
-     *  }
-     * 
-     * 
- * - * Called from a producer thread subject to the restrictions appropriate to the implementation. - * - */ - void fill(Supplier s, WaitStrategy wait, ExitCondition exit); -} diff --git a/common/src/main/java/io/netty/util/internal/MpscArrayQueue.java b/common/src/main/java/io/netty/util/internal/MpscArrayQueue.java deleted file mode 100644 index cda283d440..0000000000 --- a/common/src/main/java/io/netty/util/internal/MpscArrayQueue.java +++ /dev/null @@ -1,331 +0,0 @@ -/* - * Copyright 2015 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.util.internal; - -/** - * Forked from JCTools. - * - * A Multi-Producer-Single-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that - * any thread may call the offer method, but only a single thread may call poll/peek for correctness to - * maintained.
- * This implementation follows patterns documented on the package level for False Sharing protection.
- * This implementation is using the Fast Flow - * method for polling from the queue (with minor change to correctly publish the index) and an extension of - * the Leslie Lamport concurrent queue algorithm (originated by Martin Thompson) on the producer side.
- * - * @param - */ -final class MpscArrayQueue extends MpscArrayQueueConsumerField { - long p40, p41, p42, p43, p44, p45, p46; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpscArrayQueue(final int capacity) { - super(capacity); - } - - /** - * {@inheritDoc}
- * - * IMPLEMENTATION NOTES:
- * Lock free offer using a single CAS. As class name suggests access is permitted to many threads - * concurrently. - * - * @see java.util.Queue#offer(java.lang.Object) - */ - @Override - public boolean offer(final E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - - // use a cached view on consumer index (potentially updated in loop) - final long mask = this.mask; - final long capacity = mask + 1; - long consumerIndexCache = lvConsumerIndexCache(); // LoadLoad - long currentProducerIndex; - do { - currentProducerIndex = lvProducerIndex(); // LoadLoad - final long wrapPoint = currentProducerIndex - capacity; - if (consumerIndexCache <= wrapPoint) { - final long currHead = lvConsumerIndex(); // LoadLoad - if (currHead <= wrapPoint) { - return false; // FULL :( - } else { - // update shared cached value of the consumerIndex - svConsumerIndexCache(currHead); // StoreLoad - // update on stack copy, we might need this value again if we lose the CAS. - consumerIndexCache = currHead; - } - } - } while (!casProducerIndex(currentProducerIndex, currentProducerIndex + 1)); - /* - * NOTE: the new producer index value is made visible BEFORE the element in the array. If we relied on - * the index visibility to poll() we would need to handle the case where the element is not visible. - */ - - // Won CAS, move on to storing - final long offset = calcElementOffset(currentProducerIndex, mask); - soElement(offset, e); // StoreStore - return true; // AWESOME :) - } - - /** - * A wait free alternative to offer which fails on CAS failure. - * - * @param e new element, not null - * @return 1 if next element cannot be filled, -1 if CAS failed, 0 if successful - */ - public int weakOffer(final E e) { - if (null == e) { - throw new NullPointerException("Null is not a valid element"); - } - final long mask = this.mask; - final long capacity = mask + 1; - final long currentTail = lvProducerIndex(); // LoadLoad - final long consumerIndexCache = lvConsumerIndexCache(); // LoadLoad - final long wrapPoint = currentTail - capacity; - if (consumerIndexCache <= wrapPoint) { - long currHead = lvConsumerIndex(); // LoadLoad - if (currHead <= wrapPoint) { - return 1; // FULL :( - } else { - svConsumerIndexCache(currHead); // StoreLoad - } - } - - // look Ma, no loop! - if (!casProducerIndex(currentTail, currentTail + 1)) { - return -1; // CAS FAIL :( - } - - // Won CAS, move on to storing - final long offset = calcElementOffset(currentTail, mask); - soElement(offset, e); - return 0; // AWESOME :) - } - - /** - * {@inheritDoc} - *

- * IMPLEMENTATION NOTES:
- * Lock free poll using ordered loads/stores. As class name suggests access is limited to a single thread. - * - * @see java.util.Queue#poll() - */ - @Override - public E poll() { - final long consumerIndex = lvConsumerIndex(); // LoadLoad - final long offset = calcElementOffset(consumerIndex); - // Copy field to avoid re-reading after volatile load - final E[] buffer = this.buffer; - - // If we can't see the next available element we can't poll - E e = lvElement(buffer, offset); // LoadLoad - if (null == e) { - /* - * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after - * winning the CAS on offer but before storing the element in the queue. Other producers may go on - * to fill up the queue after this element. - */ - if (consumerIndex != lvProducerIndex()) { - do { - e = lvElement(buffer, offset); - } while (e == null); - } else { - return null; - } - } - - spElement(buffer, offset, null); - soConsumerIndex(consumerIndex + 1); // StoreStore - return e; - } - - /** - * {@inheritDoc} - *

- * IMPLEMENTATION NOTES:
- * Lock free peek using ordered loads. As class name suggests access is limited to a single thread. - * - * @see java.util.Queue#poll() - */ - @Override - public E peek() { - // Copy field to avoid re-reading after volatile load - final E[] buffer = this.buffer; - - final long consumerIndex = lvConsumerIndex(); // LoadLoad - final long offset = calcElementOffset(consumerIndex); - E e = lvElement(buffer, offset); - if (null == e) { - /* - * NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after - * winning the CAS on offer but before storing the element in the queue. Other producers may go on - * to fill up the queue after this element. - */ - if (consumerIndex != lvProducerIndex()) { - do { - e = lvElement(buffer, offset); - } while (e == null); - } else { - return null; - } - } - return e; - } - - /** - * {@inheritDoc} - *

- * - */ - @Override - public int size() { - /* - * It is possible for a thread to be interrupted or reschedule between the read of the producer and - * consumer indices, therefore protection is required to ensure size is within valid range. In the - * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer - * index BEFORE the producer index. - */ - long after = lvConsumerIndex(); - while (true) { - final long before = after; - final long currentProducerIndex = lvProducerIndex(); - after = lvConsumerIndex(); - if (before == after) { - return (int) (currentProducerIndex - after); - } - } - } - - @Override - public boolean isEmpty() { - // Order matters! - // Loading consumer before producer allows for producer increments after consumer index is read. - // This ensures the correctness of this method at least for the consumer thread. Other threads POV is - // not really - // something we can fix here. - return lvConsumerIndex() == lvProducerIndex(); - } -} - -abstract class MpscArrayQueueL1Pad extends ConcurrentCircularArrayQueue { - long p10, p11, p12, p13, p14, p15, p16; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpscArrayQueueL1Pad(int capacity) { - super(capacity); - } -} - -abstract class MpscArrayQueueTailField extends MpscArrayQueueL1Pad { - private static final long P_INDEX_OFFSET; - - static { - try { - P_INDEX_OFFSET = PlatformDependent0.UNSAFE.objectFieldOffset(MpscArrayQueueTailField.class - .getDeclaredField("producerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - private volatile long producerIndex; - - public MpscArrayQueueTailField(int capacity) { - super(capacity); - } - - protected final long lvProducerIndex() { - return producerIndex; - } - - protected final boolean casProducerIndex(long expect, long newValue) { - return PlatformDependent0.UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); - } -} - -abstract class MpscArrayQueueMidPad extends MpscArrayQueueTailField { - long p20, p21, p22, p23, p24, p25, p26; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpscArrayQueueMidPad(int capacity) { - super(capacity); - } -} - -abstract class MpscArrayQueueHeadCacheField extends MpscArrayQueueMidPad { - private volatile long headCache; - - public MpscArrayQueueHeadCacheField(int capacity) { - super(capacity); - } - - protected final long lvConsumerIndexCache() { - return headCache; - } - - protected final void svConsumerIndexCache(long v) { - headCache = v; - } -} - -abstract class MpscArrayQueueL2Pad extends MpscArrayQueueHeadCacheField { - long p20, p21, p22, p23, p24, p25, p26; - long p30, p31, p32, p33, p34, p35, p36, p37; - - public MpscArrayQueueL2Pad(int capacity) { - super(capacity); - } -} - -abstract class MpscArrayQueueConsumerField extends MpscArrayQueueL2Pad { - private static final long C_INDEX_OFFSET; - static { - try { - C_INDEX_OFFSET = PlatformDependent0.UNSAFE.objectFieldOffset(MpscArrayQueueConsumerField.class - .getDeclaredField("consumerIndex")); - } catch (NoSuchFieldException e) { - throw new RuntimeException(e); - } - } - private volatile long consumerIndex; - - public MpscArrayQueueConsumerField(int capacity) { - super(capacity); - } - - protected final long lvConsumerIndex() { - return consumerIndex; - } - - protected void soConsumerIndex(long l) { - PlatformDependent0.UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, l); - } -} - diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java deleted file mode 100644 index a1a8c2c9d1..0000000000 --- a/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java +++ /dev/null @@ -1,378 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/** - * Copyright (C) 2009-2013 Typesafe Inc. - */ -package io.netty.util.internal; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; -import java.util.Queue; - -/** - * A lock-free concurrent single-consumer multi-producer {@link Queue}. - * It allows multiple producer threads to perform the following operations simultaneously: - *

    - *
  • {@link #offer(Object)}, {@link #add(Object)}, {@link #addAll(Collection)}
  • - *
  • {@link #isEmpty()}
  • - *
- * .. while only one consumer thread is allowed to perform the following operations exclusively: - *
    - *
  • {@link #poll()} and {@link #remove()}
  • - *
  • {@link #element()}, {@link #peek()}
  • - *
  • {@link #remove(Object)}, {@link #removeAll(Collection)}, and {@link #retainAll(Collection)}
  • - *
  • {@link #clear()}
  • {@link #} - *
  • {@link #iterator()}
  • - *
  • {@link #toArray()} and {@link #toArray(Object[])}
  • - *
  • {@link #contains(Object)} and {@link #containsAll(Collection)}
  • - *
  • {@link #size()}
  • - *
- * - * The behavior of this implementation is undefined if you perform the operations for a consumer thread only - * from multiple threads. - * - * The initial implementation is based on: - * - * and adopted padded head node changes from: - * - * data structure modified to avoid false sharing between head and tail Ref as per implementation of MpscLinkedQueue - * on JCTools project. - */ -final class MpscLinkedQueue extends MpscLinkedQueueTailRef implements Queue { - - private static final long serialVersionUID = -1878402552271506449L; - - long p00, p01, p02, p03, p04, p05, p06, p07; - long p30, p31, p32, p33, p34, p35, p36, p37; - - // offer() occurs at the tail of the linked list. - // poll() occurs at the head of the linked list. - // - // Resulting layout is: - // - // head --next--> 1st element --next--> 2nd element --next--> ... tail (last element) - // - // where the head is a dummy node whose value is null. - // - // offer() appends a new node next to the tail using AtomicReference.getAndSet() - // poll() removes head from the linked list and promotes the 1st element to the head, - // setting its value to null if possible. - // - // Also note that this class extends AtomicReference for the "tail" slot (which is the one that is appended to) - // since Unsafe does not expose XCHG operation intrinsically. - MpscLinkedQueue() { - MpscLinkedQueueNode tombstone = new DefaultNode(null); - setHeadRef(tombstone); - setTailRef(tombstone); - } - - /** - * Returns the node right next to the head, which contains the first element of this queue. - */ - private MpscLinkedQueueNode peekNode() { - MpscLinkedQueueNode head = headRef(); - MpscLinkedQueueNode next = head.next(); - if (next == null && head != tailRef()) { - // if tail != head this is not going to change until consumer makes progress - // we can avoid reading the head and just spin on next until it shows up - // - // See https://github.com/akka/akka/pull/15596 - do { - next = head.next(); - } while (next == null); - } - return next; - } - - @Override - @SuppressWarnings("unchecked") - public boolean offer(E value) { - if (value == null) { - throw new NullPointerException("value"); - } - - final MpscLinkedQueueNode newTail; - if (value instanceof MpscLinkedQueueNode) { - newTail = (MpscLinkedQueueNode) value; - newTail.setNext(null); - } else { - newTail = new DefaultNode(value); - } - - MpscLinkedQueueNode oldTail = getAndSetTailRef(newTail); - oldTail.setNext(newTail); - return true; - } - - @Override - public E poll() { - final MpscLinkedQueueNode next = peekNode(); - if (next == null) { - return null; - } - - // next becomes a new head. - MpscLinkedQueueNode oldHead = headRef(); - // Similar to 'headRef.node = next', but slightly faster (storestore vs loadstore) - // See: http://robsjava.blogspot.com/2013/06/a-faster-volatile.html - // See: http://psy-lob-saw.blogspot.com/2012/12/atomiclazyset-is-performance-win-for.html - lazySetHeadRef(next); - - // Break the linkage between the old head and the new head. - oldHead.unlink(); - - return next.clearMaybe(); - } - - @Override - public E peek() { - final MpscLinkedQueueNode next = peekNode(); - if (next == null) { - return null; - } - return next.value(); - } - - @Override - public int size() { - int count = 0; - MpscLinkedQueueNode n = peekNode(); - for (;;) { - // If value == null it means that clearMaybe() was called on the MpscLinkedQueueNode. - if (n == null || n.value() == null) { - break; - } - MpscLinkedQueueNode next = n.next(); - if (n == next) { - break; - } - n = next; - if (++ count == Integer.MAX_VALUE) { - // Guard against overflow of integer. - break; - } - } - return count; - } - - @Override - public boolean isEmpty() { - return headRef() == tailRef(); - } - - @Override - public boolean contains(Object o) { - MpscLinkedQueueNode n = peekNode(); - for (;;) { - if (n == null) { - break; - } - E value = n.value(); - // If value == null it means that clearMaybe() was called on the MpscLinkedQueueNode. - if (value == null) { - return false; - } - if (value == o) { - return true; - } - MpscLinkedQueueNode next = n.next(); - if (n == next) { - break; - } - n = next; - } - return false; - } - - @Override - public Iterator iterator() { - return new ReadOnlyIterator(toList().iterator()); - } - - @Override - public boolean add(E e) { - if (offer(e)) { - return true; - } - throw new IllegalStateException("queue full"); - } - - @Override - public E remove() { - E e = poll(); - if (e != null) { - return e; - } - throw new NoSuchElementException(); - } - - @Override - public E element() { - E e = peek(); - if (e != null) { - return e; - } - throw new NoSuchElementException(); - } - - private List toList(int initialCapacity) { - return toList(new ArrayList(initialCapacity)); - } - - private List toList() { - return toList(new ArrayList()); - } - - private List toList(List elements) { - MpscLinkedQueueNode n = peekNode(); - for (;;) { - if (n == null) { - break; - } - E value = n.value(); - if (value == null) { - break; - } - if (!elements.add(value)) { - // Seems like there is no space left, break here. - break; - } - MpscLinkedQueueNode next = n.next(); - if (n == next) { - break; - } - n = next; - } - return elements; - } - - @Override - public Object[] toArray() { - return toList().toArray(); - } - - @Override - @SuppressWarnings("unchecked") - public T[] toArray(T[] a) { - return toList(a.length).toArray(a); - } - - @Override - public boolean remove(Object o) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean containsAll(Collection c) { - for (Object e: c) { - if (!contains(e)) { - return false; - } - } - return true; - } - - @Override - public boolean addAll(Collection c) { - if (c == null) { - throw new NullPointerException("c"); - } - if (c == this) { - throw new IllegalArgumentException("c == this"); - } - - boolean modified = false; - for (E e: c) { - add(e); - modified = true; - } - return modified; - } - - @Override - public boolean removeAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean retainAll(Collection c) { - throw new UnsupportedOperationException(); - } - - @Override - public void clear() { - while (poll() != null) { - continue; - } - } - - private void writeObject(ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - for (E e: this) { - out.writeObject(e); - } - out.writeObject(null); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - final MpscLinkedQueueNode tombstone = new DefaultNode(null); - setHeadRef(tombstone); - setTailRef(tombstone); - - for (;;) { - @SuppressWarnings("unchecked") - E e = (E) in.readObject(); - if (e == null) { - break; - } - add(e); - } - } - - private static final class DefaultNode extends MpscLinkedQueueNode { - - private T value; - - DefaultNode(T value) { - this.value = value; - } - - @Override - public T value() { - return value; - } - - @Override - protected T clearMaybe() { - T value = this.value; - this.value = null; - return value; - } - } -} diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueueHeadRef.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueueHeadRef.java deleted file mode 100644 index fabcf42ed4..0000000000 --- a/common/src/main/java/io/netty/util/internal/MpscLinkedQueueHeadRef.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.util.internal; - -import java.io.Serializable; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - - -abstract class MpscLinkedQueueHeadRef extends MpscLinkedQueuePad0 implements Serializable { - - private static final long serialVersionUID = 8467054865577874285L; - - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater UPDATER; - - static { - @SuppressWarnings("rawtypes") - AtomicReferenceFieldUpdater updater; - updater = PlatformDependent.newAtomicReferenceFieldUpdater(MpscLinkedQueueHeadRef.class, "headRef"); - if (updater == null) { - updater = AtomicReferenceFieldUpdater.newUpdater( - MpscLinkedQueueHeadRef.class, MpscLinkedQueueNode.class, "headRef"); - } - UPDATER = updater; - } - - private transient volatile MpscLinkedQueueNode headRef; - - protected final MpscLinkedQueueNode headRef() { - return headRef; - } - - protected final void setHeadRef(MpscLinkedQueueNode headRef) { - this.headRef = headRef; - } - - protected final void lazySetHeadRef(MpscLinkedQueueNode headRef) { - UPDATER.lazySet(this, headRef); - } -} diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueueNode.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueueNode.java deleted file mode 100644 index 050f604bc1..0000000000 --- a/common/src/main/java/io/netty/util/internal/MpscLinkedQueueNode.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.util.internal; - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -public abstract class MpscLinkedQueueNode { - - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater nextUpdater; - - static { - @SuppressWarnings("rawtypes") - AtomicReferenceFieldUpdater u; - - u = PlatformDependent.newAtomicReferenceFieldUpdater(MpscLinkedQueueNode.class, "next"); - if (u == null) { - u = AtomicReferenceFieldUpdater.newUpdater(MpscLinkedQueueNode.class, MpscLinkedQueueNode.class, "next"); - } - nextUpdater = u; - } - - @SuppressWarnings("unused") - private volatile MpscLinkedQueueNode next; - - final MpscLinkedQueueNode next() { - return next; - } - - final void setNext(final MpscLinkedQueueNode newNext) { - // Similar to 'next = newNext', but slightly faster (storestore vs loadstore) - // See: http://robsjava.blogspot.com/2013/06/a-faster-volatile.html - nextUpdater.lazySet(this, newNext); - } - - public abstract T value(); - - /** - * Sets the element this node contains to {@code null} so that the node can be used as a tombstone. - */ - protected T clearMaybe() { - return value(); - } - - /** - * Unlink to allow GC'ed - */ - void unlink() { - setNext(null); - } -} diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueuePad0.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueuePad0.java deleted file mode 100644 index 04631e78e4..0000000000 --- a/common/src/main/java/io/netty/util/internal/MpscLinkedQueuePad0.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.util.internal; - -abstract class MpscLinkedQueuePad0 { - long p00, p01, p02, p03, p04, p05, p06, p07; - long p30, p31, p32, p33, p34, p35, p36, p37; -} diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueuePad1.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueuePad1.java deleted file mode 100644 index 66b486b5cf..0000000000 --- a/common/src/main/java/io/netty/util/internal/MpscLinkedQueuePad1.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.util.internal; - -abstract class MpscLinkedQueuePad1 extends MpscLinkedQueueHeadRef { - - private static final long serialVersionUID = 2886694927079691637L; - - long p00, p01, p02, p03, p04, p05, p06, p07; - long p30, p31, p32, p33, p34, p35, p36, p37; -} diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueueTailRef.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueueTailRef.java deleted file mode 100644 index 4dd99c26aa..0000000000 --- a/common/src/main/java/io/netty/util/internal/MpscLinkedQueueTailRef.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.util.internal; - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -abstract class MpscLinkedQueueTailRef extends MpscLinkedQueuePad1 { - - private static final long serialVersionUID = 8717072462993327429L; - - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater UPDATER; - - static { - @SuppressWarnings("rawtypes") - AtomicReferenceFieldUpdater updater; - updater = PlatformDependent.newAtomicReferenceFieldUpdater(MpscLinkedQueueTailRef.class, "tailRef"); - if (updater == null) { - updater = AtomicReferenceFieldUpdater.newUpdater( - MpscLinkedQueueTailRef.class, MpscLinkedQueueNode.class, "tailRef"); - } - UPDATER = updater; - } - - private transient volatile MpscLinkedQueueNode tailRef; - - protected final MpscLinkedQueueNode tailRef() { - return tailRef; - } - - protected final void setTailRef(MpscLinkedQueueNode tailRef) { - this.tailRef = tailRef; - } - - @SuppressWarnings("unchecked") - protected final MpscLinkedQueueNode getAndSetTailRef(MpscLinkedQueueNode tailRef) { - // LOCK XCHG in JDK8, a CAS loop in JDK 7/6 - return (MpscLinkedQueueNode) UPDATER.getAndSet(this, tailRef); - } -} diff --git a/common/src/main/java/io/netty/util/internal/OneTimeTask.java b/common/src/main/java/io/netty/util/internal/OneTimeTask.java deleted file mode 100644 index d9473bd45c..0000000000 --- a/common/src/main/java/io/netty/util/internal/OneTimeTask.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2014 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.internal; - -import io.netty.util.concurrent.EventExecutor; - -/** - * {@link Runnable} which represent a one time task which may allow the {@link EventExecutor} to reduce the amount of - * produced garbage when queue it for execution. - * - * It is important this will not be reused. After submitted it is not allowed to get submitted again! - */ -public abstract class OneTimeTask extends MpscLinkedQueueNode implements Runnable { - - @Override - public Runnable value() { - return this; - } -} diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index fbc6ecfcd9..314011821c 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -20,6 +20,12 @@ import io.netty.util.internal.chmv8.ConcurrentHashMapV8; import io.netty.util.internal.chmv8.LongAdderV8; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import org.jctools.queues.MpscArrayQueue; +import org.jctools.queues.MpscChunkedArrayQueue; +import org.jctools.queues.SpscLinkedQueue; +import org.jctools.queues.atomic.MpscAtomicArrayQueue; +import org.jctools.queues.atomic.MpscLinkedAtomicQueue; +import org.jctools.queues.atomic.SpscLinkedAtomicQueue; import java.io.BufferedReader; import java.io.File; @@ -41,7 +47,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -78,6 +83,7 @@ public final class PlatformDependent { private static final boolean DIRECT_BUFFER_PREFERRED = HAS_UNSAFE && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false); private static final long MAX_DIRECT_MEMORY = maxDirectMemory0(); + private static final int MAX_MPSC_CAPACITY = 1024 * 1024; // TODO: Maybe make this configurable ? private static final long ARRAY_BASE_OFFSET = arrayBaseOffset0(); @@ -580,7 +586,8 @@ public final class PlatformDependent { * consumer (one thread!). */ public static Queue newMpscQueue() { - return new MpscLinkedQueue(); + return hasUnsafe() ? new MpscChunkedArrayQueue(1024, MAX_MPSC_CAPACITY, true) + : new MpscLinkedAtomicQueue(); } /** @@ -588,10 +595,7 @@ public final class PlatformDependent { * consumer (one thread!). */ public static Queue newSpscQueue() { - if (hasUnsafe()) { - return new SpscLinkedQueue(); - } - return new SpscLinkedAtomicQueue(); + return hasUnsafe() ? new SpscLinkedQueue() : new SpscLinkedAtomicQueue(); } /** @@ -599,11 +603,7 @@ public final class PlatformDependent { * consumer (one thread!) with the given fixes {@code capacity}. */ public static Queue newFixedMpscQueue(int capacity) { - if (hasUnsafe()) { - return new MpscArrayQueue(capacity); - } else { - return new LinkedBlockingQueue(capacity); - } + return hasUnsafe() ? new MpscArrayQueue(capacity) : new MpscAtomicArrayQueue(capacity); } /** diff --git a/common/src/main/java/io/netty/util/internal/RecyclableMpscLinkedQueueNode.java b/common/src/main/java/io/netty/util/internal/RecyclableMpscLinkedQueueNode.java deleted file mode 100644 index ad27b248ee..0000000000 --- a/common/src/main/java/io/netty/util/internal/RecyclableMpscLinkedQueueNode.java +++ /dev/null @@ -1,45 +0,0 @@ -/* -* Copyright 2014 The Netty Project -* -* The Netty Project licenses this file to you under the Apache License, -* version 2.0 (the "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at: -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -* License for the specific language governing permissions and limitations -* under the License. -*/ - -package io.netty.util.internal; - -import io.netty.util.Recycler; - -/** - * {@link MpscLinkedQueueNode} that will automatically call {@link #recycle(Recycler.Handle)} when the node was - * unlinked. - */ -public abstract class RecyclableMpscLinkedQueueNode extends MpscLinkedQueueNode { - private final Recycler.Handle handle; - - protected RecyclableMpscLinkedQueueNode(Recycler.Handle handle) { - if (handle == null) { - throw new NullPointerException("handle"); - } - this.handle = handle; - } - - @Override - final void unlink() { - super.unlink(); - recycle(handle); - } - - /** - * Called once unliked and so ready to recycled. - */ - protected abstract void recycle(Recycler.Handle handle); -} diff --git a/common/src/main/java/io/netty/util/internal/SpscLinkedAtomicQueue.java b/common/src/main/java/io/netty/util/internal/SpscLinkedAtomicQueue.java deleted file mode 100644 index 2d9d35670b..0000000000 --- a/common/src/main/java/io/netty/util/internal/SpscLinkedAtomicQueue.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2016 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.util.internal; - -/** - * Forked from JCTools. - * - * This is a weakened version of the MPSC algorithm as presented on 1024 - * Cores by D. Vyukov. The original has been adapted to Java and it's quirks with regards to memory model and - * layout: - *
    - *
  1. As this is an SPSC we have no need for XCHG, an ordered store is enough. - *
- * The queue is initialized with a stub node which is set to both the producer and consumer node references. From this - * point follow the notes on offer/poll. - * - * @param - */ -public final class SpscLinkedAtomicQueue extends BaseLinkedAtomicQueue { - - public SpscLinkedAtomicQueue() { - super(); - LinkedQueueAtomicNode node = new LinkedQueueAtomicNode(); - spProducerNode(node); - spConsumerNode(node); - node.soNext(null); // this ensures correct construction: StoreStore - } - - /** - * {@inheritDoc}
- * - * IMPLEMENTATION NOTES:
- * Offer is allowed from a SINGLE thread.
- * Offer allocates a new node (holding the offered value) and: - *
    - *
  1. Sets that node as the producerNode.next - *
  2. Sets the new node as the producerNode - *
- * From this follows that producerNode.next is always null and for all other nodes node.next is not null. - * - * @see MessagePassingQueue#offer(Object) - * @see java.util.Queue#offer(java.lang.Object) - */ - @Override - public boolean offer(final E nextValue) { - if (nextValue == null) { - throw new IllegalArgumentException("null elements not allowed"); - } - final LinkedQueueAtomicNode nextNode = new LinkedQueueAtomicNode(nextValue); - lpProducerNode().soNext(nextNode); - spProducerNode(nextNode); - return true; - } - - /** - * {@inheritDoc}
- * - * IMPLEMENTATION NOTES:
- * Poll is allowed from a SINGLE thread.
- * Poll reads the next node from the consumerNode and: - *
    - *
  1. If it is null, the queue is empty. - *
  2. If it is not null set it as the consumer node and return it's now evacuated value. - *
- * This means the consumerNode.value is always null, which is also the starting point for the queue. Because null - * values are not allowed to be offered this is the only node with it's value set to null at any one time. - * - */ - @Override - public E poll() { - final LinkedQueueAtomicNode nextNode = lpConsumerNode().lvNext(); - if (nextNode != null) { - // we have to null out the value because we are going to hang on to the node - final E nextValue = nextNode.getAndNullValue(); - spConsumerNode(nextNode); - return nextValue; - } - return null; - } - - @Override - public E peek() { - final LinkedQueueAtomicNode nextNode = lpConsumerNode().lvNext(); - if (nextNode != null) { - return nextNode.lpValue(); - } else { - return null; - } - } - -} diff --git a/common/src/main/java/io/netty/util/internal/SpscLinkedQueue.java b/common/src/main/java/io/netty/util/internal/SpscLinkedQueue.java deleted file mode 100644 index b5a844c2f2..0000000000 --- a/common/src/main/java/io/netty/util/internal/SpscLinkedQueue.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Copyright 2016 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.netty.util.internal; - -/** - * Forked from JCTools. - * - * This is a weakened version of the MPSC algorithm as presented - * on - * 1024 Cores by D. Vyukov. The original has been adapted to Java and it's quirks with regards to memory - * model and layout: - *
    - *
  1. Use inheritance to ensure no false sharing occurs between producer/consumer node reference fields. - *
  2. As this is an SPSC we have no need for XCHG, an ordered store is enough. - *
- * The queue is initialized with a stub node which is set to both the producer and consumer node references. - * From this point follow the notes on offer/poll. - * - * @param - */ -public class SpscLinkedQueue extends BaseLinkedQueue { - - public SpscLinkedQueue() { - spProducerNode(new LinkedQueueNode()); - spConsumerNode(producerNode); - consumerNode.soNext(null); // this ensures correct construction: StoreStore - } - - /** - * {@inheritDoc}
- * - * IMPLEMENTATION NOTES:
- * Offer is allowed from a SINGLE thread.
- * Offer allocates a new node (holding the offered value) and: - *
    - *
  1. Sets that node as the producerNode.next - *
  2. Sets the new node as the producerNode - *
- * From this follows that producerNode.next is always null and for all other nodes node.next is not null. - * - * @see MessagePassingQueue#offer(Object) - * @see java.util.Queue#offer(java.lang.Object) - */ - @Override - public boolean offer(final E nextValue) { - if (nextValue == null) { - throw new IllegalArgumentException("null elements not allowed"); - } - final LinkedQueueNode nextNode = new LinkedQueueNode(nextValue); - producerNode.soNext(nextNode); - producerNode = nextNode; - return true; - } - - /** - * {@inheritDoc}
- * - * IMPLEMENTATION NOTES:
- * Poll is allowed from a SINGLE thread.
- * Poll reads the next node from the consumerNode and: - *
    - *
  1. If it is null, the queue is empty. - *
  2. If it is not null set it as the consumer node and return it's now evacuated value. - *
- * This means the consumerNode.value is always null, which is also the starting point for the queue. - * Because null values are not allowed to be offered this is the only node with it's value set to null at - * any one time. - * - */ - @Override - public E poll() { - final LinkedQueueNode nextNode = consumerNode.lvNext(); - if (nextNode != null) { - // we have to null out the value because we are going to hang on to the node - final E nextValue = nextNode.getAndNullValue(); - consumerNode = nextNode; - return nextValue; - } - return null; - } - - @Override - public E peek() { - final LinkedQueueNode nextNode = consumerNode.lvNext(); - if (nextNode != null) { - return nextNode.lpValue(); - } else { - return null; - } - } - - @Override - public boolean relaxedOffer(E e) { - return offer(e); - } - - @Override - public E relaxedPoll() { - return poll(); - } - - @Override - public E relaxedPeek() { - return peek(); - } - - @Override - public int drain(Consumer c) { - long result = 0; // use long to force safepoint into loop below - int drained; - do { - drained = drain(c, 4096); - result += drained; - } while (drained == 4096 && result <= Integer.MAX_VALUE - 4096); - return (int) result; - } - - @Override - public int fill(Supplier s) { - long result = 0; // result is a long because we want to have a safepoint check at regular intervals - do { - fill(s, 4096); - result += 4096; - } while (result <= Integer.MAX_VALUE - 4096); - return (int) result; - } - - @Override - public int drain(Consumer c, int limit) { - LinkedQueueNode chaserNode = this.consumerNode; - for (int i = 0; i < limit; i++) { - chaserNode = chaserNode.lvNext(); - if (chaserNode == null) { - return i; - } - // we have to null out the value because we are going to hang on to the node - final E nextValue = chaserNode.getAndNullValue(); - this.consumerNode = chaserNode; - c.accept(nextValue); - } - return limit; - } - - @Override - public int fill(Supplier s, int limit) { - LinkedQueueNode chaserNode = producerNode; - for (int i = 0; i < limit; i++) { - final LinkedQueueNode nextNode = new LinkedQueueNode(s.get()); - chaserNode.soNext(nextNode); - chaserNode = nextNode; - this.producerNode = chaserNode; - } - return limit; - } - - @Override - public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) { - LinkedQueueNode chaserNode = this.consumerNode; - int idleCounter = 0; - while (exit.keepRunning()) { - for (int i = 0; i < 4096; i++) { - final LinkedQueueNode next = chaserNode.lvNext(); - if (next == null) { - idleCounter = wait.idle(idleCounter); - continue; - } - chaserNode = next; - idleCounter = 0; - // we have to null out the value because we are going to hang on to the node - final E nextValue = chaserNode.getAndNullValue(); - this.consumerNode = chaserNode; - c.accept(nextValue); - } - } - } - - @Override - public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) { - LinkedQueueNode chaserNode = producerNode; - while (exit.keepRunning()) { - for (int i = 0; i < 4096; i++) { - final LinkedQueueNode nextNode = new LinkedQueueNode(s.get()); - chaserNode.soNext(nextNode); - chaserNode = nextNode; - this.producerNode = chaserNode; - } - } - } -} diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 2d1634fbeb..8b0abff358 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -41,7 +41,6 @@ import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateExecutor; import io.netty.util.concurrent.Promise; import io.netty.util.internal.EmptyArrays; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -389,7 +388,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH */ public ChannelFuture close(final ChannelPromise future) { final ChannelHandlerContext ctx = this.ctx; - ctx.executor().execute(new OneTimeTask() { + ctx.executor().execute(new Runnable() { @Override public void run() { outboundClosed = true; @@ -1139,7 +1138,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH } final CountDownLatch latch = new CountDownLatch(1); - delegatedTaskExecutor.execute(new OneTimeTask() { + delegatedTaskExecutor.execute(new Runnable() { @Override public void run() { try { @@ -1304,7 +1303,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH EventExecutor executor = ctx.executor(); if (!executor.inEventLoop()) { - executor.execute(new OneTimeTask() { + executor.execute(new Runnable() { @Override public void run() { handshake(promise); @@ -1371,7 +1370,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH return; } - final ScheduledFuture timeoutFuture = ctx.executor().schedule(new OneTimeTask() { + final ScheduledFuture timeoutFuture = ctx.executor().schedule(new Runnable() { @Override public void run() { if (p.isDone()) { @@ -1413,7 +1412,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH final ScheduledFuture timeoutFuture; if (closeNotifyTimeoutMillis > 0) { // Force-close the connection if close_notify is not fully sent in time. - timeoutFuture = ctx.executor().schedule(new OneTimeTask() { + timeoutFuture = ctx.executor().schedule(new Runnable() { @Override public void run() { logger.warn("{} Last write attempt timed out; force-closing the connection.", ctx.channel()); diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 0ff57826e8..48f402c875 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -28,7 +28,6 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelProgressivePromise; import io.netty.channel.ChannelPromise; import io.netty.util.ReferenceCountUtil; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -114,7 +113,7 @@ public class ChunkedWriteHandler } } else { // let the transfer resume on the next event loop round - ctx.executor().execute(new OneTimeTask() { + ctx.executor().execute(new Runnable() { @Override public void run() { diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index bacee04bc5..f395992aba 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; -import io.netty.util.internal.OneTimeTask; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -180,7 +179,7 @@ public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter { } } - private final class WriteTimeoutTask extends OneTimeTask implements ChannelFutureListener { + private final class WriteTimeoutTask implements Runnable, ChannelFutureListener { private final ChannelHandlerContext ctx; private final ChannelPromise promise; diff --git a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java index f5fd5d3599..8074a6a344 100644 --- a/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/ChannelTrafficShapingHandler.java @@ -18,7 +18,6 @@ package io.netty.handler.traffic; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.util.internal.OneTimeTask; import java.util.ArrayDeque; import java.util.concurrent.TimeUnit; @@ -193,7 +192,7 @@ public class ChannelTrafficShapingHandler extends AbstractTrafficShapingHandler checkWriteSuspend(ctx, delay, queueSize); } final long futureNow = newToSend.relativeTimeAction; - ctx.executor().schedule(new OneTimeTask() { + ctx.executor().schedule(new Runnable() { @Override public void run() { sendAllValid(ctx, futureNow); diff --git a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java index 3bbbf01574..6c4fc58291 100644 --- a/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java +++ b/handler/src/main/java/io/netty/handler/traffic/GlobalTrafficShapingHandler.java @@ -21,7 +21,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelHandler.Sharable; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import java.util.ArrayDeque; @@ -361,7 +360,7 @@ public class GlobalTrafficShapingHandler extends AbstractTrafficShapingHandler { } final long futureNow = newToSend.relativeTimeAction; final PerChannel forSchedule = perChannel; - ctx.executor().schedule(new OneTimeTask() { + ctx.executor().schedule(new Runnable() { @Override public void run() { sendAllValid(ctx, forSchedule, futureNow); diff --git a/pom.xml b/pom.xml index 1a3afe8a72..cca334f5f3 100644 --- a/pom.xml +++ b/pom.xml @@ -321,6 +321,13 @@ 1.1.3
+ + + org.jctools + jctools-core + 1.2 + + org.rxtx rxtx @@ -1087,6 +1094,11 @@ exec-maven-plugin 1.0.0.Final + + org.apache.maven.plugins + maven-shade-plugin + 2.4.3 + diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index ccdddd7d17..1bcd182cd0 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -29,7 +29,6 @@ import io.netty.channel.socket.ChannelInputShutdownEvent; import io.netty.channel.unix.Socket; import io.netty.channel.unix.UnixChannel; import io.netty.util.ReferenceCountUtil; -import io.netty.util.internal.OneTimeTask; import java.io.IOException; import java.net.InetSocketAddress; @@ -149,7 +148,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann unsafe.clearEpollIn0(); } else { // schedule a task to clear the EPOLLIN as it is not safe to modify it directly - loop.execute(new OneTimeTask() { + loop.execute(new Runnable() { @Override public void run() { if (!config().isAutoRead() && !unsafe.readPending) { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 947986097b..c33429e2e9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -33,8 +33,6 @@ import io.netty.channel.socket.DuplexChannel; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; import io.netty.util.internal.EmptyArrays; -import io.netty.util.internal.MpscLinkedQueueNode; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -238,7 +236,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im // Seems like the Channel was closed in the meantime try to fail the promise to prevent any // cases where a future may not be notified otherwise. if (promise.tryFailure(CLOSED_CHANNEL_EXCEPTION)) { - eventLoop().execute(new OneTimeTask() { + eventLoop().execute(new Runnable() { @Override public void run() { // Call this via the EventLoop as it is a MPSC queue. @@ -558,7 +556,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im public ChannelFuture shutdownOutput(final ChannelPromise promise) { Executor closeExecutor = ((EpollStreamUnsafe) unsafe()).prepareToClose(); if (closeExecutor != null) { - closeExecutor.execute(new OneTimeTask() { + closeExecutor.execute(new Runnable() { @Override public void run() { shutdownOutput0(promise); @@ -569,7 +567,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im if (loop.inEventLoop()) { shutdownOutput0(promise); } else { - loop.execute(new OneTimeTask() { + loop.execute(new Runnable() { @Override public void run() { shutdownOutput0(promise); @@ -702,7 +700,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractEpollStreamChannel.this.connectPromise; @@ -928,7 +926,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im if (!closed) { // trigger a read again as there may be something left to read and because of epoll ET we // will not get notified again until we read everything from the socket - eventLoop().execute(new OneTimeTask() { + eventLoop().execute(new Runnable() { @Override public void run() { epollInReady(); @@ -954,7 +952,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im if (eventLoop.inEventLoop()) { addToSpliceQueue0(task); } else { - eventLoop.execute(new OneTimeTask() { + eventLoop.execute(new Runnable() { @Override public void run() { addToSpliceQueue0(task); @@ -970,7 +968,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im spliceQueue.add(task); } - protected abstract class SpliceInTask extends MpscLinkedQueueNode { + protected abstract class SpliceInTask { final ChannelPromise promise; int len; @@ -979,11 +977,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im this.len = len; } - @Override - public SpliceInTask value() { - return this; - } - abstract boolean spliceIn(RecvByteBufAllocator.Handle handle); protected final int spliceIn(FileDescriptor pipeOut, RecvByteBufAllocator.Handle handle) throws IOException { @@ -1126,11 +1119,6 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im this.offset = offset; } - @Override - public SpliceFdTask value() { - return this; - } - @Override public boolean spliceIn(RecvByteBufAllocator.Handle handle) { assert eventLoop().inEventLoop(); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java index 5ccdaf4a83..d4f09064c6 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -23,7 +23,6 @@ import io.netty.channel.unix.DomainSocketAddress; import io.netty.channel.unix.DomainSocketChannel; import io.netty.channel.unix.FileDescriptor; import io.netty.channel.unix.Socket; -import io.netty.util.internal.OneTimeTask; import java.net.SocketAddress; @@ -201,7 +200,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i pipeline.fireExceptionCaught(t); // trigger a read again as there may be something left to read and because of epoll ET we // will not get notified again until we read everything from the socket - eventLoop().execute(new OneTimeTask() { + eventLoop().execute(new Runnable() { @Override public void run() { epollInReady(); diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java index 07628a944a..8df3d10cc8 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannel.java @@ -20,7 +20,6 @@ import gnu.io.CommPortIdentifier; import gnu.io.SerialPort; import io.netty.channel.ChannelPromise; import io.netty.channel.oio.OioByteStreamChannel; -import io.netty.util.internal.OneTimeTask; import java.net.SocketAddress; import java.util.concurrent.TimeUnit; @@ -144,7 +143,7 @@ public class RxtxChannel extends OioByteStreamChannel { int waitTime = config().getOption(WAIT_TIME); if (waitTime > 0) { - eventLoop().schedule(new OneTimeTask() { + eventLoop().schedule(new Runnable() { @Override public void run() { try { diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java index 7e4e4a342b..8ce914a2b6 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpChannel.java @@ -34,7 +34,6 @@ import io.netty.channel.sctp.SctpChannelConfig; import io.netty.channel.sctp.SctpMessage; import io.netty.channel.sctp.SctpNotificationHandler; import io.netty.channel.sctp.SctpServerChannel; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -360,7 +359,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett promise.setFailure(t); } } else { - eventLoop().execute(new OneTimeTask() { + eventLoop().execute(new Runnable() { @Override public void run() { bindAddress(localAddress, promise); @@ -385,7 +384,7 @@ public class NioSctpChannel extends AbstractNioMessageChannel implements io.nett promise.setFailure(t); } } else { - eventLoop().execute(new OneTimeTask() { + eventLoop().execute(new Runnable() { @Override public void run() { unbindAddress(localAddress, promise); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java index 6aa5f4dbc9..1a0d50b8de 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/nio/NioSctpServerChannel.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.nio.AbstractNioMessageChannel; import io.netty.channel.sctp.DefaultSctpServerChannelConfig; import io.netty.channel.sctp.SctpServerChannelConfig; -import io.netty.util.internal.OneTimeTask; import java.io.IOException; import java.net.InetAddress; @@ -160,7 +159,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel promise.setFailure(t); } } else { - eventLoop().execute(new OneTimeTask() { + eventLoop().execute(new Runnable() { @Override public void run() { bindAddress(localAddress, promise); @@ -185,7 +184,7 @@ public class NioSctpServerChannel extends AbstractNioMessageChannel promise.setFailure(t); } } else { - eventLoop().execute(new OneTimeTask() { + eventLoop().execute(new Runnable() { @Override public void run() { unbindAddress(localAddress, promise); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java index 77ded22112..d84ba706cd 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpChannel.java @@ -33,7 +33,6 @@ import io.netty.channel.sctp.SctpChannelConfig; import io.netty.channel.sctp.SctpMessage; import io.netty.channel.sctp.SctpNotificationHandler; import io.netty.channel.sctp.SctpServerChannel; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; @@ -429,7 +428,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel promise.setFailure(t); } } else { - eventLoop().execute(new OneTimeTask() { + eventLoop().execute(new Runnable() { @Override public void run() { bindAddress(localAddress, promise); @@ -454,7 +453,7 @@ public class OioSctpChannel extends AbstractOioMessageChannel promise.setFailure(t); } } else { - eventLoop().execute(new OneTimeTask() { + eventLoop().execute(new Runnable() { @Override public void run() { unbindAddress(localAddress, promise); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java index a0972dcb19..fc592b5c9f 100755 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/oio/OioSctpServerChannel.java @@ -25,7 +25,6 @@ import io.netty.channel.ChannelPromise; import io.netty.channel.oio.AbstractOioMessageChannel; import io.netty.channel.sctp.DefaultSctpServerChannelConfig; import io.netty.channel.sctp.SctpServerChannelConfig; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -235,7 +234,7 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel promise.setFailure(t); } } else { - eventLoop().execute(new OneTimeTask() { + eventLoop().execute(new Runnable() { @Override public void run() { bindAddress(localAddress, promise); @@ -260,7 +259,7 @@ public class OioSctpServerChannel extends AbstractOioMessageChannel promise.setFailure(t); } } else { - eventLoop().execute(new OneTimeTask() { + eventLoop().execute(new Runnable() { @Override public void run() { unbindAddress(localAddress, promise); diff --git a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java index ab24748044..39e85a7d55 100644 --- a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java @@ -29,7 +29,6 @@ import io.netty.channel.EventLoopGroup; import io.netty.util.AttributeKey; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.StringUtil; import java.net.InetAddress; @@ -343,7 +342,7 @@ public abstract class AbstractBootstrap, C ext // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. - channel.eventLoop().execute(new OneTimeTask() { + channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java index 3861bf698b..1b24e58744 100644 --- a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java @@ -23,7 +23,6 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.util.AttributeKey; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -159,7 +158,7 @@ public class Bootstrap extends AbstractBootstrap { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. - channel.eventLoop().execute(new OneTimeTask() { + channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index 0eef81a245..19981cf484 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -28,7 +28,6 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.util.AttributeKey; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.StringUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -275,7 +274,7 @@ public class ServerBootstrap extends AbstractBootstrap implements Runnable { + abstract static class AbstractWriteTask implements Runnable { private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT = SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true); @@ -981,13 +979,14 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme private static final int WRITE_TASK_OVERHEAD = SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48); + private final Recycler.Handle handle; private AbstractChannelHandlerContext ctx; private Object msg; private ChannelPromise promise; private int size; private AbstractWriteTask(Recycler.Handle handle) { - super(handle); + this.handle = handle; } protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx, @@ -1025,17 +1024,15 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap impleme ctx = null; msg = null; promise = null; + recycle(handle); } } - @Override - public Runnable value() { - return this; - } - protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ctx.invokeWrite(msg, promise); } + + protected abstract void recycle(Recycler.Handle handle); } static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable { diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index 5b9a3f5911..261d5e701b 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -24,7 +24,6 @@ import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.internal.InternalThreadLocalMap; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -633,7 +632,7 @@ public final class ChannelOutboundBuffer { void close(final ClosedChannelException cause) { if (inFail) { - channel.eventLoop().execute(new OneTimeTask() { + channel.eventLoop().execute(new Runnable() { @Override public void run() { close(cause); diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java index a09b32e3c3..0f74a33905 100644 --- a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java +++ b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java @@ -19,7 +19,6 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.util.Attribute; import io.netty.util.AttributeKey; import io.netty.util.concurrent.EventExecutor; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -579,7 +578,7 @@ public class CombinedChannelDuplexHandler 0) { - connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; diff --git a/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java b/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java index a3fca61c8a..6bacb1d27e 100644 --- a/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java +++ b/transport/src/main/java/io/netty/channel/pool/FixedChannelPool.java @@ -22,7 +22,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import io.netty.util.internal.EmptyArrays; -import io.netty.util.internal.OneTimeTask; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; @@ -202,7 +201,7 @@ public final class FixedChannelPool extends SimpleChannelPool { if (executor.inEventLoop()) { acquire0(promise); } else { - executor.execute(new OneTimeTask() { + executor.execute(new Runnable() { @Override public void run() { acquire0(promise); @@ -398,7 +397,7 @@ public final class FixedChannelPool extends SimpleChannelPool { @Override public void close() { - executor.execute(new OneTimeTask() { + executor.execute(new Runnable() { @Override public void run() { if (!closed) { diff --git a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java index 72cd7cf9b8..62023823db 100644 --- a/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java +++ b/transport/src/main/java/io/netty/channel/pool/SimpleChannelPool.java @@ -26,7 +26,6 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; import io.netty.util.internal.EmptyArrays; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import java.util.Deque; @@ -143,7 +142,7 @@ public class SimpleChannelPool implements ChannelPool { if (loop.inEventLoop()) { doHealthCheck(ch, promise); } else { - loop.execute(new OneTimeTask() { + loop.execute(new Runnable() { @Override public void run() { doHealthCheck(ch, promise); @@ -226,7 +225,7 @@ public class SimpleChannelPool implements ChannelPool { if (loop.inEventLoop()) { doReleaseChannel(channel, promise); } else { - loop.execute(new OneTimeTask() { + loop.execute(new Runnable() { @Override public void run() { doReleaseChannel(channel, promise); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 4f949eff19..1aeb805487 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -29,7 +29,6 @@ import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannelConfig; import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.internal.OneTimeTask; import java.io.IOException; import java.net.InetSocketAddress; @@ -152,7 +151,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty public ChannelFuture shutdownOutput(final ChannelPromise promise) { Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).prepareToClose(); if (closeExecutor != null) { - closeExecutor.execute(new OneTimeTask() { + closeExecutor.execute(new Runnable() { @Override public void run() { shutdownOutput0(promise); @@ -163,7 +162,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty if (loop.inEventLoop()) { shutdownOutput0(promise); } else { - loop.execute(new OneTimeTask() { + loop.execute(new Runnable() { @Override public void run() { shutdownOutput0(promise); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index a96c0255b3..4ad6624546 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -25,7 +25,6 @@ import io.netty.channel.EventLoop; import io.netty.channel.oio.OioByteStreamChannel; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -153,7 +152,7 @@ public class OioSocketChannel extends OioByteStreamChannel future.setFailure(t); } } else { - loop.execute(new OneTimeTask() { + loop.execute(new Runnable() { @Override public void run() { shutdownOutput(future); diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index 8eac9f7415..a366ef0918 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -34,7 +34,6 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; import org.junit.AfterClass; @@ -365,7 +364,7 @@ public class LocalChannelTest { final Channel ccCpy = cc; // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new OneTimeTask() { + cc.pipeline().lastContext().executor().execute(new Runnable() { @Override public void run() { ChannelPromise promise = ccCpy.newPromise(); @@ -429,7 +428,7 @@ public class LocalChannelTest { final Channel ccCpy = cc; // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new OneTimeTask() { + cc.pipeline().lastContext().executor().execute(new Runnable() { @Override public void run() { ChannelPromise promise = ccCpy.newPromise(); @@ -511,7 +510,7 @@ public class LocalChannelTest { final Channel ccCpy = cc; // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new OneTimeTask() { + cc.pipeline().lastContext().executor().execute(new Runnable() { @Override public void run() { ChannelPromise promise = ccCpy.newPromise(); @@ -593,7 +592,7 @@ public class LocalChannelTest { final Channel ccCpy = cc; // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new OneTimeTask() { + cc.pipeline().lastContext().executor().execute(new Runnable() { @Override public void run() { ChannelPromise promise = ccCpy.newPromise(); @@ -673,7 +672,7 @@ public class LocalChannelTest { final Channel ccCpy = cc; // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new OneTimeTask() { + cc.pipeline().lastContext().executor().execute(new Runnable() { @Override public void run() { ChannelPromise promise = ccCpy.newPromise(); @@ -754,14 +753,14 @@ public class LocalChannelTest { ccCpy.closeFuture().addListener(clientChannelCloseLatch); // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new OneTimeTask() { + cc.pipeline().lastContext().executor().execute(new Runnable() { @Override public void run() { ccCpy.writeAndFlush(data.duplicate().retain(), ccCpy.newPromise()) .addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - serverChannelCpy.eventLoop().execute(new OneTimeTask() { + serverChannelCpy.eventLoop().execute(new Runnable() { @Override public void run() { // The point of this test is to write while the peer is closed, so we should