diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java index 3e1af63d59..2f21f88ddd 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioSocketChannel.java @@ -25,6 +25,7 @@ package org.jboss.netty.channel.socket.nio; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.SocketChannel; +import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import org.jboss.netty.channel.AbstractChannel; @@ -34,7 +35,7 @@ import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.util.FastQueue; +import org.jboss.netty.util.LinkedTransferQueue; /** * @author The Netty Project (netty-dev@lists.jboss.org) @@ -52,7 +53,7 @@ abstract class NioSocketChannel extends AbstractChannel final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); final Runnable writeTask = new WriteTask(); final Object writeLock = new Object(); - final FastQueue writeBuffer = new FastQueue(); + final Queue writeBuffer = new LinkedTransferQueue(); MessageEvent currentWriteEvent; int currentWriteIndex; diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index d334ffb643..d1fde6260b 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -32,8 +32,8 @@ import java.nio.channels.ScatteringByteChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.Iterator; +import java.util.Queue; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -48,7 +48,7 @@ import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.FastQueue; +import org.jboss.netty.util.LinkedTransferQueue; import org.jboss.netty.util.ThreadRenamingRunnable; /** @@ -76,10 +76,8 @@ class NioWorker implements Runnable { private final AtomicBoolean wakenUp = new AtomicBoolean(); private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); private final Object shutdownLock = new Object(); - //private final FastQueue taskQueue = new FastQueue(); - //private final ConcurrentFastQueue taskQueue = new ConcurrentFastQueue(); - private final FastQueue registerTaskQueue = new FastQueue(); - private final ConcurrentLinkedQueue writeTaskQueue = new ConcurrentLinkedQueue(); + private final Queue registerTaskQueue = new LinkedTransferQueue(); + private final Queue writeTaskQueue = new LinkedTransferQueue(); NioWorker(int bossId, int id, Executor executor) { this.bossId = bossId; @@ -409,7 +407,7 @@ class NioWorker implements Runnable { int bufIdx; synchronized (channel.writeLock) { - FastQueue writeBuffer = channel.writeBuffer; + Queue writeBuffer = channel.writeBuffer; evt = channel.currentWriteEvent; for (;;) { if (evt == null) { @@ -485,7 +483,7 @@ class NioWorker implements Runnable { int writtenBytes = 0; synchronized (channel.writeLock) { - FastQueue writeBuffer = channel.writeBuffer; + Queue writeBuffer = channel.writeBuffer; evt = channel.currentWriteEvent; for (;;) { if (evt == null) { @@ -733,7 +731,7 @@ class NioWorker implements Runnable { fireExceptionCaught(channel, cause); } - FastQueue writeBuffer = channel.writeBuffer; + Queue writeBuffer = channel.writeBuffer; for (;;) { evt = writeBuffer.poll(); if (evt == null) { diff --git a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java index 4fa70a6f82..6e2f49a598 100644 --- a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java @@ -27,7 +27,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -38,6 +37,7 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.util.LinkedTransferQueue; /** * A {@link ThreadPoolExecutor} which blocks the task submission when there's @@ -162,7 +162,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) { - super(corePoolSize, corePoolSize, keepAliveTime, unit, new LinkedBlockingQueue(), threadFactory); + super(corePoolSize, corePoolSize, keepAliveTime, unit, new LinkedTransferQueue(), threadFactory); if (objectSizeEstimator == null) { throw new NullPointerException("objectSizeEstimator"); diff --git a/src/main/java/org/jboss/netty/util/ConcurrentFastQueue.java b/src/main/java/org/jboss/netty/util/ConcurrentFastQueue.java deleted file mode 100644 index 0d53d14a4b..0000000000 --- a/src/main/java/org/jboss/netty/util/ConcurrentFastQueue.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * - * Copyright 2008, Red Hat Middleware LLC, and individual contributors - * by the @author tags. See the COPYRIGHT.txt in the distribution for a - * full listing of individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ -package org.jboss.netty.util; - - -/** - * @author The Netty Project (netty-dev@lists.jboss.org) - * @author Trustin Lee (tlee@redhat.com) - * - * @version $Rev$, $Date$ - * - */ -public class ConcurrentFastQueue { - - private static final int DEFAULT_CONCURRENCY_LEVEL = 16; - - final FastQueue[] segments; - private final int mask; - private int pollIndex; - - public ConcurrentFastQueue() { - this(DEFAULT_CONCURRENCY_LEVEL); - } - - @SuppressWarnings("unchecked") - public ConcurrentFastQueue(int concurrencyLevel) { - if (concurrencyLevel <= 0) { - throw new IllegalArgumentException( - "concurrencyLevel: " + concurrencyLevel); - } - - int actualConcurrencyLevel = 1; - while (actualConcurrencyLevel < concurrencyLevel) { - actualConcurrencyLevel <<= 1; - } - - mask = actualConcurrencyLevel - 1; - segments = new FastQueue[actualConcurrencyLevel]; - for (int i = 0; i < actualConcurrencyLevel; i ++) { - segments[i] = new FastQueue(); - } - } - - public void offer(E e) { - segments[hash(e)].offer(e); - } - - public E poll() { - int oldPollIndex = pollIndex; - while (pollIndex < segments.length) { - E v = segments[pollIndex].poll(); - if (v != null) { - return v; - } - - pollIndex ++; - } - - for (pollIndex = 0; pollIndex < oldPollIndex; pollIndex ++) { - E v = segments[pollIndex].poll(); - if (v != null) { - return v; - } - } - - return null; - } - - private int hash(Object o) { - int hash = System.identityHashCode(o); - hash = (hash << 1) - (hash << 8); - hash &= mask; - return hash; - } -} diff --git a/src/main/java/org/jboss/netty/util/FastQueue.java b/src/main/java/org/jboss/netty/util/FastQueue.java deleted file mode 100644 index 0b94b9054a..0000000000 --- a/src/main/java/org/jboss/netty/util/FastQueue.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * - * Copyright 2008, Red Hat Middleware LLC, and individual contributors - * by the @author tags. See the COPYRIGHT.txt in the distribution for a - * full listing of individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ -package org.jboss.netty.util; - -import java.util.LinkedList; -import java.util.Queue; - - -/** - * @author The Netty Project (netty-dev@lists.jboss.org) - * @author Trustin Lee (tlee@redhat.com) - * - * @version $Rev$, $Date$ - * - */ -public class FastQueue { - - // Put - private Queue offeredElements; - - // Take - private Queue drainedElements; - - public synchronized void offer(E e) { - if (offeredElements == null) { - offeredElements = new LinkedList(); - } - - offeredElements.offer(e); - } - - public E poll() { - for (;;) { - if (drainedElements == null) { - synchronized (this) { - drainedElements = offeredElements; - if (offeredElements == null) { - break; - } - offeredElements = null; - } - } - - E e = cast(drainedElements.poll()); - if (e != null) { - return e; - } - - drainedElements = null; - } - return null; - } - - public synchronized boolean isEmpty() { - return offeredElements == null && - (drainedElements == null || drainedElements.isEmpty()); - } - - @SuppressWarnings("unchecked") - private E cast(Object o) { - return (E) o; - } -} diff --git a/src/main/java/org/jboss/netty/util/LinkedTransferQueue.java b/src/main/java/org/jboss/netty/util/LinkedTransferQueue.java index 2b40a7fd82..5f2f66d5d8 100644 --- a/src/main/java/org/jboss/netty/util/LinkedTransferQueue.java +++ b/src/main/java/org/jboss/netty/util/LinkedTransferQueue.java @@ -150,26 +150,26 @@ public class LinkedTransferQueue extends AbstractQueue * cleanMe, to alleviate contention across threads CASing one vs * the other. */ - static final class PaddedAtomicReference extends AtomicReference { - private static final long serialVersionUID = 4684288940772921317L; - - // enough padding for 64bytes with 4byte refs - Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; - PaddedAtomicReference(T r) { super(r); } - } +// static final class PaddedAtomicReference extends AtomicReference { +// private static final long serialVersionUID = 4684288940772921317L; +// +// // enough padding for 64bytes with 4byte refs +// Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; +// PaddedAtomicReference(T r) { super(r); } +// } /** head of the queue */ - private transient final PaddedAtomicReference head; + private transient final AtomicReference head; /** tail of the queue */ - private transient final PaddedAtomicReference tail; + private transient final AtomicReference tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it cancelled. */ - private transient final PaddedAtomicReference cleanMe; + private transient final AtomicReference cleanMe; /** * Tries to cas nh as new head; if successful, unlink @@ -194,8 +194,8 @@ public class LinkedTransferQueue extends AbstractQueue private Object xfer(Object e, int mode, long nanos) { boolean isData = e != null; QNode s = null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + final AtomicReference head = this.head; + final AtomicReference tail = this.tail; for (;;) { QNode t = tail.get(); @@ -238,8 +238,8 @@ public class LinkedTransferQueue extends AbstractQueue */ private Object fulfill(Object e) { boolean isData = e != null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + final AtomicReference head = this.head; + final AtomicReference tail = this.tail; for (;;) { QNode t = tail.get(); @@ -410,9 +410,9 @@ public class LinkedTransferQueue extends AbstractQueue */ public LinkedTransferQueue() { QNode dummy = new QNode(null, false); - head = new PaddedAtomicReference(dummy); - tail = new PaddedAtomicReference(dummy); - cleanMe = new PaddedAtomicReference(null); + head = new AtomicReference(dummy); + tail = new AtomicReference(dummy); + cleanMe = new AtomicReference(null); } /**