From 974a18568ab5f5680da376c5ea109a5a39054739 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Mon, 21 Dec 2009 08:51:11 +0000 Subject: [PATCH] Backported the changes (~ 1.44) in LinkedTransferQueue from the upstream --- .../util/internal/LinkedTransferQueue.java | 436 +++++++++++------- 1 file changed, 264 insertions(+), 172 deletions(-) diff --git a/src/main/java/org/jboss/netty/util/internal/LinkedTransferQueue.java b/src/main/java/org/jboss/netty/util/internal/LinkedTransferQueue.java index 34e9942aaa..bd0f87e8c7 100644 --- a/src/main/java/org/jboss/netty/util/internal/LinkedTransferQueue.java +++ b/src/main/java/org/jboss/netty/util/internal/LinkedTransferQueue.java @@ -24,6 +24,7 @@ package org.jboss.netty.util.internal; import java.util.AbstractQueue; import java.util.Collection; +import java.util.ConcurrentModificationException; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; @@ -118,26 +119,29 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking * garbage retention. Similarly, setting the next field to this is * used as sentinel that node is off list. */ - private static final class QNode extends AtomicReference { + private static final class Node extends AtomicReference { private static final long serialVersionUID = 5925596372370723938L; - transient volatile QNode next; + transient volatile Node next; transient volatile Thread waiter; // to control park/unpark final boolean isData; - QNode(Object item, boolean isData) { + Node(E item, boolean isData) { super(item); this.isData = isData; } - private static final AtomicReferenceFieldUpdater nextUpdater; + @SuppressWarnings("unchecked") + private static final AtomicReferenceFieldUpdater nextUpdater; static { - AtomicReferenceFieldUpdater tmp = null; + @SuppressWarnings("unchecked") + AtomicReferenceFieldUpdater tmp = null; try { tmp = AtomicReferenceFieldUpdater.newUpdater( - QNode.class, QNode.class, "next"); + Node.class, Node.class, "next"); // Test if AtomicReferenceFieldUpdater is really working. - QNode testNode = new QNode(null, false); + @SuppressWarnings("unchecked") + Node testNode = new Node(null, false); tmp.set(testNode, testNode); if (testNode.next != testNode) { // Not set as expected - fall back to the safe mode. @@ -150,7 +154,7 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking nextUpdater = tmp; } - final boolean casNext(QNode cmp, QNode val) { + final boolean casNext(Node cmp, Node val) { if (nextUpdater != null) { return nextUpdater.compareAndSet(this, cmp, val); } else { @@ -158,7 +162,7 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking } } - private synchronized final boolean alternativeCasNext(QNode cmp, QNode val) { + private synchronized final boolean alternativeCasNext(Node cmp, Node val) { if (next == cmp) { next = val; return true; @@ -169,7 +173,7 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking final void clearNext() { // nextUpdater.lazySet(this, this); - next = this; // allows run on java5 + next = this; // allows to run on java5 } } @@ -189,22 +193,22 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking /** head of the queue */ - private final PaddedAtomicReference head; + private final PaddedAtomicReference> head; /** tail of the queue */ - private final PaddedAtomicReference tail; + private final PaddedAtomicReference> tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it cancelled. */ - private final PaddedAtomicReference cleanMe; + private final PaddedAtomicReference> cleanMe; /** * Tries to cas nh as new head; if successful, unlink * old head's next node to avoid garbage retention. */ - private boolean advanceHead(QNode h, QNode nh) { + private boolean advanceHead(Node h, Node nh) { if (h == head.get() && head.compareAndSet(h, nh)) { h.clearNext(); // forget old next return true; @@ -222,21 +226,21 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking * @param nanos timeout in nanosecs, used only if mode is TIMEOUT * @return an item, or null on failure */ - private Object xfer(Object e, int mode, long nanos) { + private E xfer(E e, int mode, long nanos) { boolean isData = e != null; - QNode s = null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + Node s = null; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); - if (t != null && (t == h || t.isData == isData)) { + if (t == h || t.isData == isData) { if (s == null) { - s = new QNode(e, isData); + s = new Node(e, isData); } - QNode last = t.next; + Node last = t.next; if (last != null) { if (t == tail.get()) { tail.compareAndSet(t, last); @@ -246,16 +250,14 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking tail.compareAndSet(t, s); return awaitFulfill(t, s, e, mode, nanos); } - } - - else if (h != null) { - QNode first = h.next; + } else { + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData? e : x; + return isData? e : cast(x); } } } @@ -267,17 +269,17 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking * Version of xfer for poll() and tryTransfer, which * simplifies control paths both here and in xfer. */ - private Object fulfill(Object e) { + private E fulfill(E e) { boolean isData = e != null; - final PaddedAtomicReference head = this.head; - final PaddedAtomicReference tail = this.tail; + final PaddedAtomicReference> head = this.head; + final PaddedAtomicReference> tail = this.tail; for (;;) { - QNode t = tail.get(); - QNode h = head.get(); + Node t = tail.get(); + Node h = head.get(); - if (t != null && (t == h || t.isData == isData)) { - QNode last = t.next; + if (t == h || t.isData == isData) { + Node last = t.next; if (t == tail.get()) { if (last != null) { tail.compareAndSet(t, last); @@ -285,16 +287,15 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking return null; } } - } - else if (h != null) { - QNode first = h.next; + } else { + Node first = h.next; if (t == tail.get() && first != null && advanceHead(h, first)) { Object x = first.get(); if (x != first && first.compareAndSet(x, e)) { LockSupport.unpark(first.waiter); - return isData? e : x; + return isData? e : cast(x); } } } @@ -310,9 +311,9 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking * @param e the comparison value for checking match * @param mode mode * @param nanos timeout value - * @return matched item, or s if cancelled + * @return matched item, or null if cancelled */ - private Object awaitFulfill(QNode pred, QNode s, Object e, + private E awaitFulfill(Node pred, Node s, E e, int mode, long nanos) { if (mode == NOWAIT) { return null; @@ -334,7 +335,7 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking } else if (x != null) { s.set(s); // avoid garbage retention - return x; + return cast(x); } else { return e; } @@ -349,10 +350,10 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking } } if (spins < 0) { - QNode h = head.get(); // only spin if at head - spins = h != null && h.next == s ? - (mode == TIMEOUT? - maxTimedSpins : maxUntimedSpins) : 0; + Node h = head.get(); // only spin if at head + spins = h.next != s ? 0 : + mode == TIMEOUT ? maxTimedSpins : + maxUntimedSpins; } if (spins > 0) { --spins; @@ -376,16 +377,16 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking /** * Returns validated tail for use in cleaning methods. */ - private QNode getValidatedTail() { + private Node getValidatedTail() { for (;;) { - QNode h = head.get(); - QNode first = h.next; - if (first != null && first.next == first) { // help advance + Node h = head.get(); + Node first = h.next; + if (first != null && first.get() == first) { // help advance advanceHead(h, first); continue; } - QNode t = tail.get(); - QNode last = t.next; + Node t = tail.get(); + Node last = t.next; if (t == tail.get()) { if (last != null) { tail.compareAndSet(t, last); // help advance @@ -402,7 +403,7 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking * @param pred predecessor of cancelled node * @param s the cancelled node */ - void clean(QNode pred, QNode s) { + void clean(Node pred, Node s) { Thread w = s.waiter; if (w != null) { // Wake up thread s.waiter = null; @@ -424,10 +425,10 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking * processed, so this always terminates. */ while (pred.next == s) { - QNode oldpred = reclean(); // First, help get rid of cleanMe - QNode t = getValidatedTail(); + Node oldpred = reclean(); // First, help get rid of cleanMe + Node t = getValidatedTail(); if (s != t) { // If not tail, try to unsplice - QNode sn = s.next; // s.next == s means s already off list + Node sn = s.next; // s.next == s means s already off list if (sn == s || pred.casNext(s, sn)) { break; } @@ -445,7 +446,7 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking * * @return current cleanMe node (or null) */ - private QNode reclean() { + private Node reclean() { /* * cleanMe is, or at one time was, predecessor of cancelled * node s that was the tail so could not be unspliced. If s @@ -457,12 +458,12 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking * This can loop only due to contention on casNext or * clearing cleanMe. */ - QNode pred; + Node pred; while ((pred = cleanMe.get()) != null) { - QNode t = getValidatedTail(); - QNode s = pred.next; + Node t = getValidatedTail(); + Node s = pred.next; if (s != t) { - QNode sn; + Node sn; if (s == null || s == pred || s.get() != s || (sn = s.next) == s || pred.casNext(s, sn)) { cleanMe.compareAndSet(pred, null); @@ -483,10 +484,10 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking * Creates an initially empty {@code LinkedTransferQueue}. */ public LinkedTransferQueue() { - QNode dummy = new QNode(null, false); - head = new PaddedAtomicReference(dummy); - tail = new PaddedAtomicReference(dummy); - cleanMe = new PaddedAtomicReference(null); + Node dummy = new Node(null, false); + head = new PaddedAtomicReference>(dummy); + tail = new PaddedAtomicReference>(dummy); + cleanMe = new PaddedAtomicReference>(null); } /** @@ -503,28 +504,36 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking addAll(c); } + /** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never block. + * + * @throws NullPointerException if the specified element is null + */ public void put(E e) throws InterruptedException { - if (e == null) { - throw new NullPointerException(); - } - if (Thread.interrupted()) { - throw new InterruptedException(); - } - xfer(e, NOWAIT, 0); + offer(e); } + /** + * Inserts the specified element at the tail of this queue + * As the queue is unbounded, this method will never block or + * return {@code false}. + * + * @return {@code true} (as specified by {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer}) + * @throws NullPointerException if the specified element is null + */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { - if (e == null) { - throw new NullPointerException(); - } - if (Thread.interrupted()) { - throw new InterruptedException(); - } - xfer(e, NOWAIT, 0); - return true; + return offer(e); } + /** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never return {@code false}. + * + * @return {@code true} (as specified by {@link BlockingQueue#offer(Object) BlockingQueue.offer}) + * @throws NullPointerException if the specified element is null + */ public boolean offer(E e) { if (e == null) { throw new NullPointerException(); @@ -533,15 +542,46 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking return true; } + /** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never throw + * {@link IllegalStateException} or return {@code false}. + * + * @return {@code true} (as specified by {@link Collection#add}) + * @throws NullPointerException if the specified element is null + */ @Override public boolean add(E e) { + return offer(e); + } + /** + * Transfers the element to a waiting consumer immediately, if possible. + * + *

More precisely, transfers the specified element immediately + * if there exists a consumer already waiting to receive it (in + * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), + * otherwise returning {@code false} without enqueuing the element. + * + * @throws NullPointerException if the specified element is null + */ + public boolean tryTransfer(E e) { if (e == null) { throw new NullPointerException(); } - xfer(e, NOWAIT, 0); - return true; + return fulfill(e) != null; } + /** + * Transfers the element to a consumer, waiting if necessary to do so. + * + *

More precisely, transfers the specified element immediately + * if there exists a consumer already waiting to receive it (in + * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), + * else inserts the specified element at the tail of this queue + * and waits until the element is received by a consumer. + * + * @throws NullPointerException if the specified element is null + */ public void transfer(E e) throws InterruptedException { if (e == null) { throw new NullPointerException(); @@ -552,6 +592,20 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking } } + /** + * Transfers the element to a consumer if it is possible to do so + * before the timeout elapses. + * + *

More precisely, transfers the specified element immediately + * if there exists a consumer already waiting to receive it (in + * {@link #take} or timed {@link #poll(long,TimeUnit) poll}), + * else inserts the specified element at the tail of this queue + * and waits until the element is received by a consumer, + * returning {@code false} if the specified wait time elapses + * before the element can be transferred. + * + * @throws NullPointerException if the specified element is null + */ public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) { @@ -566,32 +620,25 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking throw new InterruptedException(); } - public boolean tryTransfer(E e) { - if (e == null) { - throw new NullPointerException(); - } - return fulfill(e) != null; - } - public E take() throws InterruptedException { - Object e = xfer(null, WAIT, 0); + E e = xfer(null, WAIT, 0); if (e != null) { - return cast(e); + return e; } Thread.interrupted(); throw new InterruptedException(); } public E poll(long timeout, TimeUnit unit) throws InterruptedException { - Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); + E e = xfer(null, TIMEOUT, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) { - return cast(e); + return e; } throw new InterruptedException(); } public E poll() { - return cast(fulfill(null)); + return fulfill(null); } public int drainTo(Collection c) { @@ -631,32 +678,43 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking /** * Returns head after performing any outstanding helping steps. */ - QNode traversalHead() { + Node traversalHead() { for (;;) { - QNode t = tail.get(); - QNode h = head.get(); - if (h != null && t != null) { - QNode last = t.next; - QNode first = h.next; - if (t == tail.get()) { - if (last != null) { - tail.compareAndSet(t, last); - } else if (first != null) { - Object x = first.get(); - if (x == first) { - advanceHead(h, first); - } else { - return h; - } + Node t = tail.get(); + Node h = head.get(); + Node last = t.next; + Node first = h.next; + if (t == tail.get()) { + if (last != null) { + tail.compareAndSet(t, last); + } else if (first != null) { + Object x = first.get(); + if (x == first) { + advanceHead(h, first); } else { return h; } + } else { + return h; } } reclean(); } } + /** + * Returns an iterator over the elements in this queue in proper + * sequence, from head to tail. + * + *

The returned iterator is a "weakly consistent" iterator that + * will never throw + * {@link ConcurrentModificationException ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed + * to) reflect any modifications subsequent to construction. + * + * @return an iterator over the elements in this queue in proper sequence + */ @Override public Iterator iterator() { return new Itr(); @@ -670,44 +728,41 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking * if subsequently removed. */ class Itr implements Iterator { - QNode next; // node to return next - QNode pnext; // predecessor of next - QNode snext; // successor of next - QNode curr; // last returned node, for remove() - QNode pcurr; // predecessor of curr, for remove() - E nextItem; // Cache of next item, once commited to in next + Node next; // node to return next + Node pnext; // predecessor of next + Node curr; // last returned node, for remove() + Node pcurr; // predecessor of curr, for remove() + E nextItem; // Cache of next item, once committed to in next Itr() { - findNext(); + advance(); } /** - * Ensure next points to next valid node, or null if none. + * Moves to next valid node and returns item to return for + * next(), or null if no such. */ - void findNext() { + private E advance() { + pcurr = pnext; + curr = next; + E item = nextItem; for (;;) { - QNode pred = pnext; - QNode q = next; - if (pred == null || pred == q) { - pred = traversalHead(); - q = pred.next; - } - if (q == null || !q.isData) { + pnext = next == null ? traversalHead() : next; + next = pnext.next; + if (next == pnext) { next = null; - return; + continue; // restart } - Object x = q.get(); - QNode s = q.next; - if (x != null && q != x && q != s) { + if (next == null) { + break; + } + Object x = next.get(); + if (x != null && x != next) { nextItem = cast(x); - snext = s; - pnext = pred; - next = q; - return; + break; } - pnext = q; - next = s; } + return item; } public boolean hasNext() { @@ -718,17 +773,11 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking if (next == null) { throw new NoSuchElementException(); } - pcurr = pnext; - curr = next; - pnext = next; - next = snext; - E x = nextItem; - findNext(); - return x; + return advance(); } public void remove() { - QNode p = curr; + Node p = curr; if (p == null) { throw new IllegalStateException(); } @@ -741,8 +790,8 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking public E peek() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) { return null; } @@ -758,11 +807,16 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking } } + /** + * Returns {@code true} if this queue contains no elements. + * + * @return {@code true} if this queue contains no elements + */ @Override public boolean isEmpty() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) { return true; } @@ -780,8 +834,8 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking public boolean hasWaitingConsumer() { for (;;) { - QNode h = traversalHead(); - QNode p = h.next; + Node h = traversalHead(); + Node p = h.next; if (p == null) { return false; } @@ -806,51 +860,78 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking */ @Override public int size() { - int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && p.isData; p = p.next) { - Object x = p.get(); - if (x != null && x != p) { - if (++count == Integer.MAX_VALUE) { + for (;;) { + int count = 0; + Node pred = traversalHead(); + for (;;) { + Node q = pred.next; + if (q == pred) { // restart break; } + if (q == null || !q.isData) { + return count; + } + Object x = q.get(); + if (x != null && x != q) { + if (++count == Integer.MAX_VALUE) { // saturated + return count; + } + } + pred = q; } } - return count; } public int getWaitingConsumerCount() { - int count = 0; - QNode h = traversalHead(); - for (QNode p = h.next; p != null && !p.isData; p = p.next) { - if (p.get() == null) { - if (++count == Integer.MAX_VALUE) { + // converse of size -- count valid non-data nodes + for (;;) { + int count = 0; + Node pred = traversalHead(); + for (;;) { + Node q = pred.next; + if (q == pred) { // restart break; } + if (q == null || q.isData) { + return count; + } + Object x = q.get(); + if (x == null) { + if (++count == Integer.MAX_VALUE) { // saturated + return count; + } + } + pred = q; } } - return count; - } - - public int remainingCapacity() { - return Integer.MAX_VALUE; } + /** + * Removes a single instance of the specified element from this queue, + * if it is present. More formally, removes an element {@code e} such + * that {@code o.equals(e)}, if this queue contains one or more such + * elements. + * Returns {@code true} if this queue contained the specified element + * (or equivalently, if this queue changed as a result of the call). + * + * @param o element to be removed from this queue, if present + * @return {@code true} if this queue changed as a result of the call + */ @Override public boolean remove(Object o) { if (o == null) { return false; } for (;;) { - QNode pred = traversalHead(); + Node pred = traversalHead(); for (;;) { - QNode q = pred.next; - if (q == null || !q.isData) { - return false; - } + Node q = pred.next; if (q == pred) {// restart break; } + if (q == null || !q.isData) { + return false; + } Object x = q.get(); if (x != null && x != q && o.equals(x) && q.compareAndSet(x, q)) { @@ -861,4 +942,15 @@ public class LinkedTransferQueue extends AbstractQueue implements Blocking } } } + + /** + * Always returns {@code Integer.MAX_VALUE} because a + * {@code LinkedTransferQueue} is not capacity constrained. + * + * @return {@code Integer.MAX_VALUE} (as specified by + * {@link BlockingQueue#remainingCapacity()}) + */ + public int remainingCapacity() { + return Integer.MAX_VALUE; + } }