diff --git a/NOTICE.txt b/NOTICE.txt index ef811d15d0..7da06ae4a5 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -119,3 +119,12 @@ by Google Inc, which can be obtained at: * license/LICENSE.snappy.txt (New BSD License) * HOMEPAGE: * http://code.google.com/p/snappy/ + +This product contains a modified version of Roland Kuhn's ASL2 +AbstractNodeQueue, which is based on Dmitriy Vyukov's non-intrusive MPSC queue. +It can be obtained at: + + * LICENSE: + * license/LICENSE.abstractnodequeue.txt (Public Domain) + * HOMEPAGE: + * https://github.com/akka/akka/blob/wip-2.2.3-for-scala-2.11/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java \ No newline at end of file diff --git a/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java b/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java new file mode 100644 index 0000000000..349b03f26e --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java @@ -0,0 +1,245 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package io.netty.util.internal; + + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A lock-free concurrent {@link java.util.Queue} implementations for single-consumer multiple-producer pattern. + * It's important is is only used for this as otherwise it is not thread-safe. + * + * This implementation is based on: + * + * + */ +@SuppressWarnings("serial") +final class MpscLinkedQueue extends AtomicReference implements Queue { + private static final long tailOffset; + + static { + try { + tailOffset = PlatformDependent.objectFieldOffset( + MpscLinkedQueue.class.getDeclaredField("tail")); + } catch (Throwable t) { + throw new ExceptionInInitializerError(t); + } + } + + // Extends AtomicReference for the "head" slot (which is the one that is appended to) + // since Unsafe does not expose XCHG operation intrinsically + @SuppressWarnings({ "unused", "FieldMayBeFinal" }) + private volatile OneTimeTask tail; + + MpscLinkedQueue() { + final OneTimeTask task = new OneTimeTaskAdapter(null); + tail = task; + set(task); + } + + @Override + public boolean add(Runnable runnable) { + if (runnable instanceof OneTimeTask) { + OneTimeTask node = (OneTimeTask) runnable; + node.setNext(null); + getAndSet(node).setNext(node); + } else { + final OneTimeTask n = new OneTimeTaskAdapter(runnable); + getAndSet(n).setNext(n); + } + return true; + } + + @Override + public boolean offer(Runnable runnable) { + return add(runnable); + } + + @Override + public Runnable remove() { + Runnable task = poll(); + if (task == null) { + throw new NoSuchElementException(); + } + return task; + } + + @Override + public Runnable poll() { + final OneTimeTask next = peekTask(); + if (next == null) { + return null; + } + final OneTimeTask ret = next; + PlatformDependent.putOrderedObject(this, tailOffset, next); + return unwrapIfNeeded(ret); + } + + @Override + public Runnable element() { + final OneTimeTask next = peekTask(); + if (next == null) { + throw new NoSuchElementException(); + } + return unwrapIfNeeded(next); + } + + @Override + public Runnable peek() { + final OneTimeTask next = peekTask(); + if (next == null) { + return null; + } + return unwrapIfNeeded(next); + } + + @Override + public int size() { + int count = 0; + OneTimeTask n = peekTask(); + for (;;) { + if (n == null) { + break; + } + count++; + n = n.next(); + } + return count; + } + + @SuppressWarnings("unchecked") + private OneTimeTask peekTask() { + for (;;) { + final OneTimeTask tail = (OneTimeTask) PlatformDependent.getObjectVolatile(this, tailOffset); + final OneTimeTask next = tail.next(); + if (next != null || get() == tail) { + return next; + } + } + } + + @Override + public boolean isEmpty() { + return peek() == null; + } + + @Override + public boolean contains(Object o) { + OneTimeTask n = peekTask(); + for (;;) { + if (n == null) { + break; + } + if (unwrapIfNeeded(n) == o) { + return true; + } + n = n.next(); + } + return false; + } + + @Override + public Iterator iterator() { + throw new UnsupportedOperationException(); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + return false; + } + + @Override + public boolean containsAll(Collection c) { + for (Object o: c) { + if (!contains(o)) { + return false; + } + } + return true; + } + + @Override + public boolean addAll(Collection c) { + for (Runnable r: c) { + add(r); + } + return false; + } + + @Override + public boolean removeAll(Collection c) { + return false; + } + + @Override + public boolean retainAll(Collection c) { + return false; + } + + @Override + public void clear() { + for (;;) { + if (poll() == null) { + break; + } + } + } + + /** + * Unwrap {@link OneTimeTask} if needed and so return the proper queued task. + */ + private static Runnable unwrapIfNeeded(OneTimeTask task) { + if (task instanceof OneTimeTaskAdapter) { + return ((OneTimeTaskAdapter) task).task; + } + return task; + } + + private static final class OneTimeTaskAdapter extends OneTimeTask { + private final Runnable task; + + OneTimeTaskAdapter(Runnable task) { + this.task = task; + } + + @Override + public void run() { + task.run(); + } + } +} diff --git a/common/src/main/java/io/netty/util/internal/OneTimeTask.java b/common/src/main/java/io/netty/util/internal/OneTimeTask.java new file mode 100644 index 0000000000..b3f46cbfc1 --- /dev/null +++ b/common/src/main/java/io/netty/util/internal/OneTimeTask.java @@ -0,0 +1,56 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.internal; + +import io.netty.util.concurrent.EventExecutor; + +/** + * {@link Runnable} which represent a one time task which may allow the {@link EventExecutor} to reduce the amount of + * produced garbage when queue it for execution. + * + * It is important this will not be reused. After submitted it is not allowed to get submitted again! + */ +public abstract class OneTimeTask implements Runnable { + + private static final long nextOffset; + + static { + if (PlatformDependent0.hasUnsafe()) { + try { + nextOffset = PlatformDependent.objectFieldOffset( + OneTimeTask.class.getDeclaredField("tail")); + } catch (Throwable t) { + throw new ExceptionInInitializerError(t); + } + } else { + nextOffset = -1; + } + } + + @SuppressWarnings("unused") + private volatile OneTimeTask tail; + + // Only use from MpscLinkedQueue and so we are sure Unsafe is present + @SuppressWarnings("unchecked") + final OneTimeTask next() { + return (OneTimeTask) PlatformDependent.getObjectVolatile(this, nextOffset); + } + + // Only use from MpscLinkedQueue and so we are sure Unsafe is present + final void setNext(final OneTimeTask newNext) { + PlatformDependent.putOrderedObject(this, nextOffset, newNext); + } +} diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent.java b/common/src/main/java/io/netty/util/internal/PlatformDependent.java index 2b2de647a3..f26fea64b6 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent.java @@ -32,8 +32,10 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -266,6 +268,10 @@ public final class PlatformDependent { return PlatformDependent0.getObject(object, fieldOffset); } + public static Object getObjectVolatile(Object object, long fieldOffset) { + return PlatformDependent0.getObjectVolatile(object, fieldOffset); + } + public static int getInt(Object object, long fieldOffset) { return PlatformDependent0.getInt(object, fieldOffset); } @@ -290,6 +296,10 @@ public final class PlatformDependent { return PlatformDependent0.getLong(address); } + public static void putOrderedObject(Object object, long address, Object value) { + PlatformDependent0.putOrderedObject(object, address, value); + } + public static void putByte(long address, byte value) { PlatformDependent0.putByte(address, value); } @@ -369,6 +379,18 @@ public final class PlatformDependent { return null; } + /** + * Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single + * consumer (one thread!). + */ + public static Queue newMpscQueue() { + if (hasUnsafe()) { + return new MpscLinkedQueue(); + } else { + return new ConcurrentLinkedQueue(); + } + } + private static boolean isAndroid0() { boolean android; try { diff --git a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java index bf3abc1084..d38f196e33 100644 --- a/common/src/main/java/io/netty/util/internal/PlatformDependent0.java +++ b/common/src/main/java/io/netty/util/internal/PlatformDependent0.java @@ -185,6 +185,10 @@ final class PlatformDependent0 { return UNSAFE.getObject(object, fieldOffset); } + static Object getObjectVolatile(Object object, long fieldOffset) { + return UNSAFE.getObjectVolatile(object, fieldOffset); + } + static int getInt(Object object, long fieldOffset) { return UNSAFE.getInt(object, fieldOffset); } @@ -251,6 +255,10 @@ final class PlatformDependent0 { } } + static void putOrderedObject(Object object, long address, Object value) { + UNSAFE.putOrderedObject(object, address, value); + } + static void putByte(long address, byte value) { UNSAFE.putByte(address, value); } diff --git a/license/LICENSE.abstractnodequeue.txt b/license/LICENSE.abstractnodequeue.txt new file mode 100644 index 0000000000..b3fdc8f507 --- /dev/null +++ b/license/LICENSE.abstractnodequeue.txt @@ -0,0 +1,15 @@ +This software is licensed under the Apache 2 license, quoted below. + +Copyright 2009-2013 Typesafe Inc. [http://www.typesafe.com] + +Licensed under the Apache License, Version 2.0 (the "License"); you may not +use this file except in compliance with the License. You may obtain a copy of +the License at + + [http://www.apache.org/licenses/LICENSE-2.0] + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +License for the specific language governing permissions and limitations under +the License. \ No newline at end of file diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 6adb90aff1..8d110146a3 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -29,7 +29,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -156,7 +155,7 @@ final class EpollEventLoop extends SingleThreadEventLoop { @Override protected Queue newTaskQueue() { // This event loop never calls takeTask() - return new ConcurrentLinkedQueue(); + return PlatformDependent.newMpscQueue(); } /** diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index f153a652c8..c61fef6848 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -19,6 +19,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.util.DefaultAttributeMap; import io.netty.util.ReferenceCountUtil; import io.netty.util.internal.EmptyArrays; +import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -422,7 +423,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha register0(promise); } else { try { - eventLoop.execute(new Runnable() { + eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); @@ -490,7 +491,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } if (!wasActive && isActive()) { - invokeLater(new Runnable() { + invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelActive(); @@ -517,7 +518,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } if (wasActive && !isActive()) { - invokeLater(new Runnable() { + invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelInactive(); @@ -536,7 +537,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } if (inFlush0) { - invokeLater(new Runnable() { + invokeLater(new OneTimeTask() { @Override public void run() { close(promise); @@ -571,7 +572,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } finally { if (wasActive && !isActive()) { - invokeLater(new Runnable() { + invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelInactive(); @@ -610,7 +611,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } finally { if (registered) { registered = false; - invokeLater(new Runnable() { + invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireChannelUnregistered(); @@ -635,7 +636,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doBeginRead(); } catch (final Exception e) { - invokeLater(new Runnable() { + invokeLater(new OneTimeTask() { @Override public void run() { pipeline.fireExceptionCaught(e); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 1aa469b269..53afe53614 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -23,6 +23,7 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; +import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.StringUtil; import java.net.SocketAddress; @@ -158,7 +159,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { - executor.execute(new Runnable() { + executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRegistered(); @@ -183,7 +184,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou if (executor.inEventLoop()) { next.invokeChannelUnregistered(); } else { - executor.execute(new Runnable() { + executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelUnregistered(); @@ -209,7 +210,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou if (executor.inEventLoop()) { next.invokeChannelActive(); } else { - executor.execute(new Runnable() { + executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelActive(); @@ -234,7 +235,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou if (executor.inEventLoop()) { next.invokeChannelInactive(); } else { - executor.execute(new Runnable() { + executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelInactive(); @@ -265,7 +266,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou next.invokeExceptionCaught(cause); } else { try { - executor.execute(new Runnable() { + executor.execute(new OneTimeTask() { @Override public void run() { next.invokeExceptionCaught(cause); @@ -306,7 +307,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou if (executor.inEventLoop()) { next.invokeUserEventTriggered(event); } else { - executor.execute(new Runnable() { + executor.execute(new OneTimeTask() { @Override public void run() { next.invokeUserEventTriggered(event); @@ -336,7 +337,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { - executor.execute(new Runnable() { + executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRead(msg); @@ -454,7 +455,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { - safeExecute(executor, new Runnable() { + safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeBind(localAddress, promise); @@ -492,7 +493,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); } else { - safeExecute(executor, new Runnable() { + safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeConnect(remoteAddress, localAddress, promise); @@ -526,7 +527,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou next.invokeDisconnect(promise); } } else { - safeExecute(executor, new Runnable() { + safeExecute(executor, new OneTimeTask() { @Override public void run() { if (!channel().metadata().hasDisconnect()) { @@ -558,7 +559,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou if (executor.inEventLoop()) { next.invokeClose(promise); } else { - safeExecute(executor, new Runnable() { + safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeClose(promise); @@ -586,7 +587,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou if (executor.inEventLoop()) { next.invokeDeregister(promise); } else { - safeExecute(executor, new Runnable() { + safeExecute(executor, new OneTimeTask() { @Override public void run() { next.invokeDeregister(promise); @@ -883,7 +884,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou } } - abstract static class AbstractWriteTask> implements Runnable { + abstract static class AbstractWriteTask extends OneTimeTask { private final Recycler.Handle handle; private DefaultChannelHandlerContext ctx; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 62961c11d4..1ea06db141 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -23,6 +23,7 @@ import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelPromise; import io.netty.channel.ConnectTimeoutException; import io.netty.channel.EventLoop; +import io.netty.util.internal.OneTimeTask; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -199,7 +200,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(new Runnable() { + connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index e6817bc565..a09b95bc93 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelException; import io.netty.channel.EventLoopException; import io.netty.channel.SingleThreadEventLoop; import io.netty.channel.nio.AbstractNioChannel.NioUnsafe; +import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.SystemPropertyUtil; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -38,7 +39,6 @@ import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -165,7 +165,7 @@ public final class NioEventLoop extends SingleThreadEventLoop { @Override protected Queue newTaskQueue() { // This event loop never calls takeTask() - return new ConcurrentLinkedQueue(); + return PlatformDependent.newMpscQueue(); } /** diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index cd5e39373c..83bc5d0bf3 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -28,6 +28,7 @@ import io.netty.channel.nio.AbstractNioByteChannel; import io.netty.channel.socket.DefaultSocketChannelConfig; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannelConfig; +import io.netty.util.internal.OneTimeTask; import java.io.IOException; import java.net.InetSocketAddress; @@ -140,7 +141,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty promise.setFailure(t); } } else { - loop.execute(new Runnable() { + loop.execute(new OneTimeTask() { @Override public void run() { shutdownOutput(promise);