diff --git a/src/main/java/org/jboss/netty/util/LinkedTransferQueue.java b/src/main/java/org/jboss/netty/util/LinkedTransferQueue.java index 2ea7970501..ae410b9acb 100644 --- a/src/main/java/org/jboss/netty/util/LinkedTransferQueue.java +++ b/src/main/java/org/jboss/netty/util/LinkedTransferQueue.java @@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.LockSupport; /** - * An unbounded {@linkplain BlockingQueue} based on linked nodes. + * An unbounded TransferQueue based on linked nodes. * This queue orders elements FIFO (first-in-first-out) with respect * to any given producer. The head of the queue is that * element that has been on the queue the longest time for some @@ -58,9 +58,10 @@ import java.util.concurrent.locks.LockSupport; * *

Memory consistency effects: As with other concurrent * collections, actions in a thread prior to placing an object into a - * {@code LinkedTransferQueue} happen-before actions subsequent - * to the access or removal of that element from the - * {@code LinkedTransferQueue} in another thread. + * {@code LinkedTransferQueue} + * happen-before + * actions subsequent to the access or removal of that element from + * the {@code LinkedTransferQueue} in another thread. * * @author Doug Lea * @author The Netty Project (netty-dev@lists.jboss.org) @@ -72,19 +73,19 @@ import java.util.concurrent.locks.LockSupport; public class LinkedTransferQueue extends AbstractQueue implements BlockingQueue { /* - * This is still a work in progress... - * * This class extends the approach used in FIFO-mode * SynchronousQueues. See the internal documentation, as well as * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer, * Lea & Scott * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf) * - * The main extension is to provide different Wait modes - * for the main "xfer" method that puts or takes items. - * These do not impact the basic dual-queue logic, but instead - * control whether or how threads block upon insertion - * of request or data nodes into the dual queue. + * The main extension is to provide different Wait modes for the + * main "xfer" method that puts or takes items. These don't + * impact the basic dual-queue logic, but instead control whether + * or how threads block upon insertion of request or data nodes + * into the dual queue. It also uses slightly different + * conventions for tracking whether nodes are off-list or + * cancelled. */ // Wait modes for xfer method @@ -107,7 +108,7 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking /** * The number of times to spin before blocking in untimed waits. * This is greater than timed value because untimed waits spin - * faster since they do not need to check times on each spin. + * faster since they don't need to check times on each spin. */ private static final int maxUntimedSpins = maxTimedSpins * 16; @@ -118,19 +119,18 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking private static final long spinForTimeoutThreshold = 1000L; /** - * Node class for LinkedTransferQueue. Opportunistically subclasses from - * AtomicReference to represent item. Uses Object, not E, to allow - * setting item to "this" after use, to avoid garbage - * retention. Similarly, setting the next field to this is used as - * sentinel that node is off list. + * Node class for LinkedTransferQueue. Opportunistically + * subclasses from AtomicReference to represent item. Uses Object, + * not E, to allow setting item to "this" after use, to avoid + * garbage retention. Similarly, setting the next field to this is + * used as sentinel that node is off list. */ private static final class QNode extends AtomicReference { private static final long serialVersionUID = 5925596372370723938L; volatile QNode next; - transient volatile Thread waiter; // to control park/unpark + volatile Thread waiter; // to control park/unpark final boolean isData; - QNode(Object item, boolean isData) { super(item); this.isData = isData; @@ -185,7 +185,8 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking /** * Puts or takes an item. Used for most queue operations (except - * poll() and tryTransfer()) + * poll() and tryTransfer()). See the similar code in + * SynchronousQueue for detailed explanation. * @param e the item or if null, signifies that this is a take * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT * @param nanos timeout in nanosecs, used only if mode is TIMEOUT @@ -194,8 +195,8 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking private Object xfer(Object e, int mode, long nanos) { boolean isData = e != null; QNode s = null; - final AtomicReference head = this.head; - final AtomicReference tail = this.tail; + final PaddedAtomicReference head = this.head; + final PaddedAtomicReference tail = this.tail; for (;;) { QNode t = tail.get(); @@ -238,8 +239,8 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking */ private Object fulfill(Object e) { boolean isData = e != null; - final AtomicReference head = this.head; - final AtomicReference tail = this.tail; + final PaddedAtomicReference head = this.head; + final PaddedAtomicReference tail = this.tail; for (;;) { QNode t = tail.get(); @@ -297,13 +298,17 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking Object x = s.get(); if (x != e) { // Node was matched or cancelled advanceHead(pred, s); // unlink if head - if (x == s) { - return clean(pred, s); + if (x == s) { // was cancelled + clean(pred, s); + return null; + } + else if (x != null) { + s.set(s); // avoid garbage retention + return x; + } else { + return e; } - s.set(s); // mark as off-list - return x != null? x : e; } - if (mode == TIMEOUT) { long now = System.nanoTime(); nanos -= now - lastTime; @@ -339,10 +344,34 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking } /** - * Gets rid of cancelled node s with original predecessor pred. - * @return null (to simplify use by callers) + * Returns validated tail for use in cleaning methods */ - Object clean(final QNode pred, final QNode s) { + private QNode getValidatedTail() { + for (;;) { + QNode h = head.get(); + QNode first = h.next; + if (first != null && first.next == first) { // help advance + advanceHead(h, first); + continue; + } + QNode t = tail.get(); + QNode last = t.next; + if (t == tail.get()) { + if (last != null) { + tail.compareAndSet(t, last); // help advance + } else { + return t; + } + } + } + } + + /** + * Gets rid of cancelled node s with original predecessor pred. + * @param pred predecessor of cancelled node + * @param s the cancelled node + */ + void clean(QNode pred, QNode s) { Thread w = s.waiter; if (w != null) { // Wake up thread s.waiter = null; @@ -350,59 +379,69 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking LockSupport.unpark(w); } } - - for (;;) { - if (pred.next != s) { - return null; - } - QNode h = head.get(); - QNode hn = h.next; // Absorb cancelled first node as head - if (hn != null && hn.next == hn) { - advanceHead(h, hn); - continue; - } - QNode t = tail.get(); // Ensure consistent read for tail - if (t == h) { - return null; - } - QNode tn = t.next; - if (t != tail.get()) { - continue; - } - if (tn != null) { // Help advance tail - tail.compareAndSet(t, tn); - continue; - } - if (s != t) { // If not tail, try to unsplice - QNode sn = s.next; + /* + * At any given time, exactly one node on list cannot be + * deleted -- the last inserted node. To accommodate this, if + * we cannot delete s, we save its predecessor as "cleanMe", + * processing the previously saved version first. At least one + * of node s or the node previously saved can always be + * processed, so this always terminates. + */ + while (pred.next == s) { + QNode oldpred = reclean(); // First, help get rid of cleanMe + QNode t = getValidatedTail(); + if (s != t) { // If not tail, try to unsplice + QNode sn = s.next; // s.next == s means s already off list if (sn == s || pred.casNext(s, sn)) { - return null; + break; } } - - QNode dp = cleanMe.get(); - if (dp != null) { // Try unlinking previous cancelled node - QNode d = dp.next; - QNode dn; - if (d == null || // d is gone or - d == dp || // d is off list or - d.get() != d || // d not cancelled or - d != t && // d not tail and - (dn = d.next) != null && // has successor - dn != d && // that is on list - dp.casNext(d, dn)) { - cleanMe.compareAndSet(dp, null); - } - if (dp == pred) { - return null; // s is already saved node - } - } - else if (cleanMe.compareAndSet(null, pred)) { - return null; // Postpone cleaning s + else if (oldpred == pred || // Already saved + oldpred == null && cleanMe.compareAndSet(null, pred)) { + break; // Postpone cleaning } } } + /** + * Tries to unsplice the cancelled node held in cleanMe that was + * previously uncleanable because it was at tail. + * @return current cleanMe node (or null) + */ + private QNode reclean() { + /* + * cleanMe is, or at one time was, predecessor of cancelled + * node s that was the tail so could not be unspliced. If s + * is no longer the tail, try to unsplice if necessary and + * make cleanMe slot available. This differs from similar + * code in clean() because we must check that pred still + * points to a cancelled node that must be unspliced -- if + * not, we can (must) clear cleanMe without unsplicing. + * This can loop only due to contention on casNext or + * clearing cleanMe. + */ + QNode pred; + while ((pred = cleanMe.get()) != null) { + QNode t = getValidatedTail(); + QNode s = pred.next; + if (s != t) { + QNode sn; + if (s == null || s == pred || s.get() != s || + (sn = s.next) == s || pred.casNext(s, sn)) { + cleanMe.compareAndSet(pred, null); + } + } else { + break; + } + } + return pred; + } + + @SuppressWarnings("unchecked") + E cast(Object e) { + return (E)e; + } + /** * Creates an initially empty LinkedTransferQueue. */ @@ -496,11 +535,6 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking throw new InterruptedException(); } - @SuppressWarnings("unchecked") - E cast(Object e) { - return (E) e; - } - public E poll(long timeout, TimeUnit unit) throws InterruptedException { Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) { @@ -575,7 +609,6 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking } } - @Override public Iterator iterator() { return new Itr(); @@ -588,18 +621,18 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking * so that the next call to next() will return it even * if subsequently removed. */ - private class Itr implements Iterator { - private QNode nextNode; // Next node to return next - private QNode currentNode; // last returned node, for remove() - private QNode prevNode; // predecessor of last returned node - private E nextItem; // Cache of next item, once commited to in next + class Itr implements Iterator { + QNode nextNode; // Next node to return next + QNode currentNode; // last returned node, for remove() + QNode prevNode; // predecessor of last returned node + E nextItem; // Cache of next item, once commited to in next Itr() { nextNode = traversalHead(); advance(); } - private E advance() { + E advance() { prevNode = currentNode; currentNode = nextNode; E x = nextItem; @@ -742,4 +775,4 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking public int remainingCapacity() { return Integer.MAX_VALUE; } -} \ No newline at end of file +}