[#1259] Add optimized queue for SCMP pattern and use it in NIO and native transport

This queue also produces less GC then CLQ when make use of OneTimeTask
This commit is contained in:
Norman Maurer 2014-02-26 12:00:04 +01:00
parent ac5592fc05
commit d3ffa1b02b
12 changed files with 384 additions and 26 deletions

View File

@ -119,3 +119,12 @@ by Google Inc, which can be obtained at:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* http://code.google.com/p/snappy/
This product contains a modified version of Roland Kuhn's ASL2
AbstractNodeQueue, which is based on Dmitriy Vyukov's non-intrusive MPSC queue.
It can be obtained at:
* LICENSE:
* license/LICENSE.abstractnodequeue.txt (Public Domain)
* HOMEPAGE:
* https://github.com/akka/akka/blob/wip-2.2.3-for-scala-2.11/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java

View File

@ -0,0 +1,245 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package io.netty.util.internal;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
/**
* A lock-free concurrent {@link java.util.Queue} implementations for single-consumer multiple-producer pattern.
* <strong>It's important is is only used for this as otherwise it is not thread-safe.</strong>
*
* This implementation is based on:
* <ul>
* <li><a href="https://github.com/akka/akka/blob/wip-2.2.3-for-scala-2.11/akka-actor/src/main/java/akka/dispatch/
* AbstractNodeQueue.java">AbstractNodeQueue</a></li>
* <li><a href="http://www.1024cores.net/home/lock-free-algorithms/
* queues/non-intrusive-mpsc-node-based-queue">Non intrusive MPSC node based queue</a></li>
* </ul>
*
*/
@SuppressWarnings("serial")
final class MpscLinkedQueue extends AtomicReference<OneTimeTask> implements Queue<Runnable> {
private static final long tailOffset;
static {
try {
tailOffset = PlatformDependent.objectFieldOffset(
MpscLinkedQueue.class.getDeclaredField("tail"));
} catch (Throwable t) {
throw new ExceptionInInitializerError(t);
}
}
// Extends AtomicReference for the "head" slot (which is the one that is appended to)
// since Unsafe does not expose XCHG operation intrinsically
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile OneTimeTask tail;
MpscLinkedQueue() {
final OneTimeTask task = new OneTimeTaskAdapter(null);
tail = task;
set(task);
}
@Override
public boolean add(Runnable runnable) {
if (runnable instanceof OneTimeTask) {
OneTimeTask node = (OneTimeTask) runnable;
node.setNext(null);
getAndSet(node).setNext(node);
} else {
final OneTimeTask n = new OneTimeTaskAdapter(runnable);
getAndSet(n).setNext(n);
}
return true;
}
@Override
public boolean offer(Runnable runnable) {
return add(runnable);
}
@Override
public Runnable remove() {
Runnable task = poll();
if (task == null) {
throw new NoSuchElementException();
}
return task;
}
@Override
public Runnable poll() {
final OneTimeTask next = peekTask();
if (next == null) {
return null;
}
final OneTimeTask ret = next;
PlatformDependent.putOrderedObject(this, tailOffset, next);
return unwrapIfNeeded(ret);
}
@Override
public Runnable element() {
final OneTimeTask next = peekTask();
if (next == null) {
throw new NoSuchElementException();
}
return unwrapIfNeeded(next);
}
@Override
public Runnable peek() {
final OneTimeTask next = peekTask();
if (next == null) {
return null;
}
return unwrapIfNeeded(next);
}
@Override
public int size() {
int count = 0;
OneTimeTask n = peekTask();
for (;;) {
if (n == null) {
break;
}
count++;
n = n.next();
}
return count;
}
@SuppressWarnings("unchecked")
private OneTimeTask peekTask() {
for (;;) {
final OneTimeTask tail = (OneTimeTask) PlatformDependent.getObjectVolatile(this, tailOffset);
final OneTimeTask next = tail.next();
if (next != null || get() == tail) {
return next;
}
}
}
@Override
public boolean isEmpty() {
return peek() == null;
}
@Override
public boolean contains(Object o) {
OneTimeTask n = peekTask();
for (;;) {
if (n == null) {
break;
}
if (unwrapIfNeeded(n) == o) {
return true;
}
n = n.next();
}
return false;
}
@Override
public Iterator<Runnable> iterator() {
throw new UnsupportedOperationException();
}
@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}
@Override
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException();
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean containsAll(Collection<?> c) {
for (Object o: c) {
if (!contains(o)) {
return false;
}
}
return true;
}
@Override
public boolean addAll(Collection<? extends Runnable> c) {
for (Runnable r: c) {
add(r);
}
return false;
}
@Override
public boolean removeAll(Collection<?> c) {
return false;
}
@Override
public boolean retainAll(Collection<?> c) {
return false;
}
@Override
public void clear() {
for (;;) {
if (poll() == null) {
break;
}
}
}
/**
* Unwrap {@link OneTimeTask} if needed and so return the proper queued task.
*/
private static Runnable unwrapIfNeeded(OneTimeTask task) {
if (task instanceof OneTimeTaskAdapter) {
return ((OneTimeTaskAdapter) task).task;
}
return task;
}
private static final class OneTimeTaskAdapter extends OneTimeTask {
private final Runnable task;
OneTimeTaskAdapter(Runnable task) {
this.task = task;
}
@Override
public void run() {
task.run();
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;
import io.netty.util.concurrent.EventExecutor;
/**
* {@link Runnable} which represent a one time task which may allow the {@link EventExecutor} to reduce the amount of
* produced garbage when queue it for execution.
*
* <strong>It is important this will not be reused. After submitted it is not allowed to get submitted again!</strong>
*/
public abstract class OneTimeTask implements Runnable {
private static final long nextOffset;
static {
if (PlatformDependent0.hasUnsafe()) {
try {
nextOffset = PlatformDependent.objectFieldOffset(
OneTimeTask.class.getDeclaredField("tail"));
} catch (Throwable t) {
throw new ExceptionInInitializerError(t);
}
} else {
nextOffset = -1;
}
}
@SuppressWarnings("unused")
private volatile OneTimeTask tail;
// Only use from MpscLinkedQueue and so we are sure Unsafe is present
@SuppressWarnings("unchecked")
final OneTimeTask next() {
return (OneTimeTask) PlatformDependent.getObjectVolatile(this, nextOffset);
}
// Only use from MpscLinkedQueue and so we are sure Unsafe is present
final void setNext(final OneTimeTask newNext) {
PlatformDependent.putOrderedObject(this, nextOffset, newNext);
}
}

View File

@ -32,8 +32,10 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@ -266,6 +268,10 @@ public final class PlatformDependent {
return PlatformDependent0.getObject(object, fieldOffset);
}
public static Object getObjectVolatile(Object object, long fieldOffset) {
return PlatformDependent0.getObjectVolatile(object, fieldOffset);
}
public static int getInt(Object object, long fieldOffset) {
return PlatformDependent0.getInt(object, fieldOffset);
}
@ -290,6 +296,10 @@ public final class PlatformDependent {
return PlatformDependent0.getLong(address);
}
public static void putOrderedObject(Object object, long address, Object value) {
PlatformDependent0.putOrderedObject(object, address, value);
}
public static void putByte(long address, byte value) {
PlatformDependent0.putByte(address, value);
}
@ -369,6 +379,18 @@ public final class PlatformDependent {
return null;
}
/**
* Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single
* consumer (one thread!).
*/
public static Queue<Runnable> newMpscQueue() {
if (hasUnsafe()) {
return new MpscLinkedQueue();
} else {
return new ConcurrentLinkedQueue<Runnable>();
}
}
private static boolean isAndroid0() {
boolean android;
try {

View File

@ -185,6 +185,10 @@ final class PlatformDependent0 {
return UNSAFE.getObject(object, fieldOffset);
}
static Object getObjectVolatile(Object object, long fieldOffset) {
return UNSAFE.getObjectVolatile(object, fieldOffset);
}
static int getInt(Object object, long fieldOffset) {
return UNSAFE.getInt(object, fieldOffset);
}
@ -251,6 +255,10 @@ final class PlatformDependent0 {
}
}
static void putOrderedObject(Object object, long address, Object value) {
UNSAFE.putOrderedObject(object, address, value);
}
static void putByte(long address, byte value) {
UNSAFE.putByte(address, value);
}

View File

@ -0,0 +1,15 @@
This software is licensed under the Apache 2 license, quoted below.
Copyright 2009-2013 Typesafe Inc. [http://www.typesafe.com]
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
[http://www.apache.org/licenses/LICENSE-2.0]
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.

View File

@ -29,7 +29,6 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@ -156,7 +155,7 @@ final class EpollEventLoop extends SingleThreadEventLoop {
@Override
protected Queue<Runnable> newTaskQueue() {
// This event loop never calls takeTask()
return new ConcurrentLinkedQueue<Runnable>();
return PlatformDependent.newMpscQueue();
}
/**

View File

@ -19,6 +19,7 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.util.DefaultAttributeMap;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.EmptyArrays;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -422,7 +423,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
@ -490,7 +491,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelActive();
@ -517,7 +518,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelInactive();
@ -536,7 +537,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
if (inFlush0) {
invokeLater(new Runnable() {
invokeLater(new OneTimeTask() {
@Override
public void run() {
close(promise);
@ -571,7 +572,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} finally {
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelInactive();
@ -610,7 +611,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} finally {
if (registered) {
registered = false;
invokeLater(new Runnable() {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireChannelUnregistered();
@ -635,7 +636,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
invokeLater(new OneTimeTask() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);

View File

@ -23,6 +23,7 @@ import io.netty.util.ReferenceCountUtil;
import io.netty.util.ResourceLeakHint;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.StringUtil;
import java.net.SocketAddress;
@ -158,7 +159,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRegistered();
@ -183,7 +184,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
if (executor.inEventLoop()) {
next.invokeChannelUnregistered();
} else {
executor.execute(new Runnable() {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelUnregistered();
@ -209,7 +210,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelActive();
@ -234,7 +235,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
if (executor.inEventLoop()) {
next.invokeChannelInactive();
} else {
executor.execute(new Runnable() {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelInactive();
@ -265,7 +266,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
next.invokeExceptionCaught(cause);
} else {
try {
executor.execute(new Runnable() {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
@ -306,7 +307,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
executor.execute(new Runnable() {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeUserEventTriggered(event);
@ -336,7 +337,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new Runnable() {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(msg);
@ -454,7 +455,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
@ -492,7 +493,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
@ -526,7 +527,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
next.invokeDisconnect(promise);
}
} else {
safeExecute(executor, new Runnable() {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
if (!channel().metadata().hasDisconnect()) {
@ -558,7 +559,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
if (executor.inEventLoop()) {
next.invokeClose(promise);
} else {
safeExecute(executor, new Runnable() {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeClose(promise);
@ -586,7 +587,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
if (executor.inEventLoop()) {
next.invokeDeregister(promise);
} else {
safeExecute(executor, new Runnable() {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeDeregister(promise);
@ -883,7 +884,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
}
}
abstract static class AbstractWriteTask<T extends AbstractWriteTask<T>> implements Runnable {
abstract static class AbstractWriteTask<T> extends OneTimeTask {
private final Recycler.Handle<T> handle;
private DefaultChannelHandlerContext ctx;

View File

@ -23,6 +23,7 @@ import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.util.internal.OneTimeTask;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -199,7 +200,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;

View File

@ -21,6 +21,7 @@ import io.netty.channel.ChannelException;
import io.netty.channel.EventLoopException;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.channel.nio.AbstractNioChannel.NioUnsafe;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -38,7 +39,6 @@ import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@ -165,7 +165,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected Queue<Runnable> newTaskQueue() {
// This event loop never calls takeTask()
return new ConcurrentLinkedQueue<Runnable>();
return PlatformDependent.newMpscQueue();
}
/**

View File

@ -28,6 +28,7 @@ import io.netty.channel.nio.AbstractNioByteChannel;
import io.netty.channel.socket.DefaultSocketChannelConfig;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.util.internal.OneTimeTask;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -140,7 +141,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
promise.setFailure(t);
}
} else {
loop.execute(new Runnable() {
loop.execute(new OneTimeTask() {
@Override
public void run() {
shutdownOutput(promise);