Backported the changes (~ 1.44) in LinkedTransferQueue from the upstream

This commit is contained in:
Trustin Lee 2009-12-21 08:51:11 +00:00
parent ad4d99c764
commit 974a18568a

View File

@ -24,6 +24,7 @@ package org.jboss.netty.util.internal;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
@ -118,26 +119,29 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* garbage retention. Similarly, setting the next field to this is
* used as sentinel that node is off list.
*/
private static final class QNode extends AtomicReference<Object> {
private static final class Node<E> extends AtomicReference<Object> {
private static final long serialVersionUID = 5925596372370723938L;
transient volatile QNode next;
transient volatile Node<E> next;
transient volatile Thread waiter; // to control park/unpark
final boolean isData;
QNode(Object item, boolean isData) {
Node(E item, boolean isData) {
super(item);
this.isData = isData;
}
private static final AtomicReferenceFieldUpdater<QNode, QNode> nextUpdater;
@SuppressWarnings("unchecked")
private static final AtomicReferenceFieldUpdater<Node, Node> nextUpdater;
static {
AtomicReferenceFieldUpdater<QNode, QNode> tmp = null;
@SuppressWarnings("unchecked")
AtomicReferenceFieldUpdater<Node, Node> tmp = null;
try {
tmp = AtomicReferenceFieldUpdater.newUpdater(
QNode.class, QNode.class, "next");
Node.class, Node.class, "next");
// Test if AtomicReferenceFieldUpdater is really working.
QNode testNode = new QNode(null, false);
@SuppressWarnings("unchecked")
Node testNode = new Node(null, false);
tmp.set(testNode, testNode);
if (testNode.next != testNode) {
// Not set as expected - fall back to the safe mode.
@ -150,7 +154,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
nextUpdater = tmp;
}
final boolean casNext(QNode cmp, QNode val) {
final boolean casNext(Node<E> cmp, Node<E> val) {
if (nextUpdater != null) {
return nextUpdater.compareAndSet(this, cmp, val);
} else {
@ -158,7 +162,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
}
}
private synchronized final boolean alternativeCasNext(QNode cmp, QNode val) {
private synchronized final boolean alternativeCasNext(Node<E> cmp, Node<E> val) {
if (next == cmp) {
next = val;
return true;
@ -169,7 +173,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
final void clearNext() {
// nextUpdater.lazySet(this, this);
next = this; // allows run on java5
next = this; // allows to run on java5
}
}
@ -189,22 +193,22 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
/** head of the queue */
private final PaddedAtomicReference<QNode> head;
private final PaddedAtomicReference<Node<E>> head;
/** tail of the queue */
private final PaddedAtomicReference<QNode> tail;
private final PaddedAtomicReference<Node<E>> tail;
/**
* Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node
* when it cancelled.
*/
private final PaddedAtomicReference<QNode> cleanMe;
private final PaddedAtomicReference<Node<E>> cleanMe;
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
private boolean advanceHead(QNode h, QNode nh) {
private boolean advanceHead(Node<E> h, Node<E> nh) {
if (h == head.get() && head.compareAndSet(h, nh)) {
h.clearNext(); // forget old next
return true;
@ -222,21 +226,21 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* @param nanos timeout in nanosecs, used only if mode is TIMEOUT
* @return an item, or null on failure
*/
private Object xfer(Object e, int mode, long nanos) {
private E xfer(E e, int mode, long nanos) {
boolean isData = e != null;
QNode s = null;
final PaddedAtomicReference<QNode> head = this.head;
final PaddedAtomicReference<QNode> tail = this.tail;
Node<E> s = null;
final PaddedAtomicReference<Node<E>> head = this.head;
final PaddedAtomicReference<Node<E>> tail = this.tail;
for (;;) {
QNode t = tail.get();
QNode h = head.get();
Node<E> t = tail.get();
Node<E> h = head.get();
if (t != null && (t == h || t.isData == isData)) {
if (t == h || t.isData == isData) {
if (s == null) {
s = new QNode(e, isData);
s = new Node<E>(e, isData);
}
QNode last = t.next;
Node<E> last = t.next;
if (last != null) {
if (t == tail.get()) {
tail.compareAndSet(t, last);
@ -246,16 +250,14 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
tail.compareAndSet(t, s);
return awaitFulfill(t, s, e, mode, nanos);
}
}
else if (h != null) {
QNode first = h.next;
} else {
Node<E> first = h.next;
if (t == tail.get() && first != null &&
advanceHead(h, first)) {
Object x = first.get();
if (x != first && first.compareAndSet(x, e)) {
LockSupport.unpark(first.waiter);
return isData? e : x;
return isData? e : cast(x);
}
}
}
@ -267,17 +269,17 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* Version of xfer for poll() and tryTransfer, which
* simplifies control paths both here and in xfer.
*/
private Object fulfill(Object e) {
private E fulfill(E e) {
boolean isData = e != null;
final PaddedAtomicReference<QNode> head = this.head;
final PaddedAtomicReference<QNode> tail = this.tail;
final PaddedAtomicReference<Node<E>> head = this.head;
final PaddedAtomicReference<Node<E>> tail = this.tail;
for (;;) {
QNode t = tail.get();
QNode h = head.get();
Node<E> t = tail.get();
Node<E> h = head.get();
if (t != null && (t == h || t.isData == isData)) {
QNode last = t.next;
if (t == h || t.isData == isData) {
Node<E> last = t.next;
if (t == tail.get()) {
if (last != null) {
tail.compareAndSet(t, last);
@ -285,16 +287,15 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
return null;
}
}
}
else if (h != null) {
QNode first = h.next;
} else {
Node<E> first = h.next;
if (t == tail.get() &&
first != null &&
advanceHead(h, first)) {
Object x = first.get();
if (x != first && first.compareAndSet(x, e)) {
LockSupport.unpark(first.waiter);
return isData? e : x;
return isData? e : cast(x);
}
}
}
@ -310,9 +311,9 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* @param e the comparison value for checking match
* @param mode mode
* @param nanos timeout value
* @return matched item, or s if cancelled
* @return matched item, or null if cancelled
*/
private Object awaitFulfill(QNode pred, QNode s, Object e,
private E awaitFulfill(Node<E> pred, Node<E> s, E e,
int mode, long nanos) {
if (mode == NOWAIT) {
return null;
@ -334,7 +335,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
}
else if (x != null) {
s.set(s); // avoid garbage retention
return x;
return cast(x);
} else {
return e;
}
@ -349,10 +350,10 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
}
}
if (spins < 0) {
QNode h = head.get(); // only spin if at head
spins = h != null && h.next == s ?
(mode == TIMEOUT?
maxTimedSpins : maxUntimedSpins) : 0;
Node<E> h = head.get(); // only spin if at head
spins = h.next != s ? 0 :
mode == TIMEOUT ? maxTimedSpins :
maxUntimedSpins;
}
if (spins > 0) {
--spins;
@ -376,16 +377,16 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
/**
* Returns validated tail for use in cleaning methods.
*/
private QNode getValidatedTail() {
private Node<E> getValidatedTail() {
for (;;) {
QNode h = head.get();
QNode first = h.next;
if (first != null && first.next == first) { // help advance
Node<E> h = head.get();
Node<E> first = h.next;
if (first != null && first.get() == first) { // help advance
advanceHead(h, first);
continue;
}
QNode t = tail.get();
QNode last = t.next;
Node<E> t = tail.get();
Node<E> last = t.next;
if (t == tail.get()) {
if (last != null) {
tail.compareAndSet(t, last); // help advance
@ -402,7 +403,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* @param pred predecessor of cancelled node
* @param s the cancelled node
*/
void clean(QNode pred, QNode s) {
void clean(Node<E> pred, Node<E> s) {
Thread w = s.waiter;
if (w != null) { // Wake up thread
s.waiter = null;
@ -424,10 +425,10 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* processed, so this always terminates.
*/
while (pred.next == s) {
QNode oldpred = reclean(); // First, help get rid of cleanMe
QNode t = getValidatedTail();
Node<E> oldpred = reclean(); // First, help get rid of cleanMe
Node<E> t = getValidatedTail();
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next; // s.next == s means s already off list
Node<E> sn = s.next; // s.next == s means s already off list
if (sn == s || pred.casNext(s, sn)) {
break;
}
@ -445,7 +446,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
*
* @return current cleanMe node (or null)
*/
private QNode reclean() {
private Node<E> reclean() {
/*
* cleanMe is, or at one time was, predecessor of cancelled
* node s that was the tail so could not be unspliced. If s
@ -457,12 +458,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* This can loop only due to contention on casNext or
* clearing cleanMe.
*/
QNode pred;
Node<E> pred;
while ((pred = cleanMe.get()) != null) {
QNode t = getValidatedTail();
QNode s = pred.next;
Node<E> t = getValidatedTail();
Node<E> s = pred.next;
if (s != t) {
QNode sn;
Node<E> sn;
if (s == null || s == pred || s.get() != s ||
(sn = s.next) == s || pred.casNext(s, sn)) {
cleanMe.compareAndSet(pred, null);
@ -483,10 +484,10 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* Creates an initially empty {@code LinkedTransferQueue}.
*/
public LinkedTransferQueue() {
QNode dummy = new QNode(null, false);
head = new PaddedAtomicReference<QNode>(dummy);
tail = new PaddedAtomicReference<QNode>(dummy);
cleanMe = new PaddedAtomicReference<QNode>(null);
Node<E> dummy = new Node<E>(null, false);
head = new PaddedAtomicReference<Node<E>>(dummy);
tail = new PaddedAtomicReference<Node<E>>(dummy);
cleanMe = new PaddedAtomicReference<Node<E>>(null);
}
/**
@ -503,28 +504,36 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
addAll(c);
}
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never block.
*
* @throws NullPointerException if the specified element is null
*/
public void put(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
xfer(e, NOWAIT, 0);
offer(e);
}
/**
* Inserts the specified element at the tail of this queue
* As the queue is unbounded, this method will never block or
* return {@code false}.
*
* @return {@code true} (as specified by {@link BlockingQueue#offer(Object,long,TimeUnit) BlockingQueue.offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) {
throw new NullPointerException();
}
if (Thread.interrupted()) {
throw new InterruptedException();
}
xfer(e, NOWAIT, 0);
return true;
return offer(e);
}
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link BlockingQueue#offer(Object) BlockingQueue.offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null) {
throw new NullPointerException();
@ -533,15 +542,46 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
return true;
}
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never throw
* {@link IllegalStateException} or return {@code false}.
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
@Override
public boolean add(E e) {
return offer(e);
}
/**
* Transfers the element to a waiting consumer immediately, if possible.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* otherwise returning {@code false} without enqueuing the element.
*
* @throws NullPointerException if the specified element is null
*/
public boolean tryTransfer(E e) {
if (e == null) {
throw new NullPointerException();
}
xfer(e, NOWAIT, 0);
return true;
return fulfill(e) != null;
}
/**
* Transfers the element to a consumer, waiting if necessary to do so.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* else inserts the specified element at the tail of this queue
* and waits until the element is received by a consumer.
*
* @throws NullPointerException if the specified element is null
*/
public void transfer(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
@ -552,6 +592,20 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
}
}
/**
* Transfers the element to a consumer if it is possible to do so
* before the timeout elapses.
*
* <p>More precisely, transfers the specified element immediately
* if there exists a consumer already waiting to receive it (in
* {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
* else inserts the specified element at the tail of this queue
* and waits until the element is received by a consumer,
* returning {@code false} if the specified wait time elapses
* before the element can be transferred.
*
* @throws NullPointerException if the specified element is null
*/
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) {
@ -566,32 +620,25 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
throw new InterruptedException();
}
public boolean tryTransfer(E e) {
if (e == null) {
throw new NullPointerException();
}
return fulfill(e) != null;
}
public E take() throws InterruptedException {
Object e = xfer(null, WAIT, 0);
E e = xfer(null, WAIT, 0);
if (e != null) {
return cast(e);
return e;
}
Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
E e = xfer(null, TIMEOUT, unit.toNanos(timeout));
if (e != null || !Thread.interrupted()) {
return cast(e);
return e;
}
throw new InterruptedException();
}
public E poll() {
return cast(fulfill(null));
return fulfill(null);
}
public int drainTo(Collection<? super E> c) {
@ -631,32 +678,43 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
/**
* Returns head after performing any outstanding helping steps.
*/
QNode traversalHead() {
Node<E> traversalHead() {
for (;;) {
QNode t = tail.get();
QNode h = head.get();
if (h != null && t != null) {
QNode last = t.next;
QNode first = h.next;
if (t == tail.get()) {
if (last != null) {
tail.compareAndSet(t, last);
} else if (first != null) {
Object x = first.get();
if (x == first) {
advanceHead(h, first);
} else {
return h;
}
Node<E> t = tail.get();
Node<E> h = head.get();
Node<E> last = t.next;
Node<E> first = h.next;
if (t == tail.get()) {
if (last != null) {
tail.compareAndSet(t, last);
} else if (first != null) {
Object x = first.get();
if (x == first) {
advanceHead(h, first);
} else {
return h;
}
} else {
return h;
}
}
reclean();
}
}
/**
* Returns an iterator over the elements in this queue in proper
* sequence, from head to tail.
*
* <p>The returned iterator is a "weakly consistent" iterator that
* will never throw
* {@link ConcurrentModificationException ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed
* to) reflect any modifications subsequent to construction.
*
* @return an iterator over the elements in this queue in proper sequence
*/
@Override
public Iterator<E> iterator() {
return new Itr();
@ -670,44 +728,41 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* if subsequently removed.
*/
class Itr implements Iterator<E> {
QNode next; // node to return next
QNode pnext; // predecessor of next
QNode snext; // successor of next
QNode curr; // last returned node, for remove()
QNode pcurr; // predecessor of curr, for remove()
E nextItem; // Cache of next item, once commited to in next
Node<E> next; // node to return next
Node<E> pnext; // predecessor of next
Node<E> curr; // last returned node, for remove()
Node<E> pcurr; // predecessor of curr, for remove()
E nextItem; // Cache of next item, once committed to in next
Itr() {
findNext();
advance();
}
/**
* Ensure next points to next valid node, or null if none.
* Moves to next valid node and returns item to return for
* next(), or null if no such.
*/
void findNext() {
private E advance() {
pcurr = pnext;
curr = next;
E item = nextItem;
for (;;) {
QNode pred = pnext;
QNode q = next;
if (pred == null || pred == q) {
pred = traversalHead();
q = pred.next;
}
if (q == null || !q.isData) {
pnext = next == null ? traversalHead() : next;
next = pnext.next;
if (next == pnext) {
next = null;
return;
continue; // restart
}
Object x = q.get();
QNode s = q.next;
if (x != null && q != x && q != s) {
if (next == null) {
break;
}
Object x = next.get();
if (x != null && x != next) {
nextItem = cast(x);
snext = s;
pnext = pred;
next = q;
return;
break;
}
pnext = q;
next = s;
}
return item;
}
public boolean hasNext() {
@ -718,17 +773,11 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
if (next == null) {
throw new NoSuchElementException();
}
pcurr = pnext;
curr = next;
pnext = next;
next = snext;
E x = nextItem;
findNext();
return x;
return advance();
}
public void remove() {
QNode p = curr;
Node<E> p = curr;
if (p == null) {
throw new IllegalStateException();
}
@ -741,8 +790,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
public E peek() {
for (;;) {
QNode h = traversalHead();
QNode p = h.next;
Node<E> h = traversalHead();
Node<E> p = h.next;
if (p == null) {
return null;
}
@ -758,11 +807,16 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
}
}
/**
* Returns {@code true} if this queue contains no elements.
*
* @return {@code true} if this queue contains no elements
*/
@Override
public boolean isEmpty() {
for (;;) {
QNode h = traversalHead();
QNode p = h.next;
Node<E> h = traversalHead();
Node<E> p = h.next;
if (p == null) {
return true;
}
@ -780,8 +834,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
public boolean hasWaitingConsumer() {
for (;;) {
QNode h = traversalHead();
QNode p = h.next;
Node<E> h = traversalHead();
Node<E> p = h.next;
if (p == null) {
return false;
}
@ -806,51 +860,78 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
*/
@Override
public int size() {
int count = 0;
QNode h = traversalHead();
for (QNode p = h.next; p != null && p.isData; p = p.next) {
Object x = p.get();
if (x != null && x != p) {
if (++count == Integer.MAX_VALUE) {
for (;;) {
int count = 0;
Node<E> pred = traversalHead();
for (;;) {
Node<E> q = pred.next;
if (q == pred) { // restart
break;
}
if (q == null || !q.isData) {
return count;
}
Object x = q.get();
if (x != null && x != q) {
if (++count == Integer.MAX_VALUE) { // saturated
return count;
}
}
pred = q;
}
}
return count;
}
public int getWaitingConsumerCount() {
int count = 0;
QNode h = traversalHead();
for (QNode p = h.next; p != null && !p.isData; p = p.next) {
if (p.get() == null) {
if (++count == Integer.MAX_VALUE) {
// converse of size -- count valid non-data nodes
for (;;) {
int count = 0;
Node<E> pred = traversalHead();
for (;;) {
Node<E> q = pred.next;
if (q == pred) { // restart
break;
}
if (q == null || q.isData) {
return count;
}
Object x = q.get();
if (x == null) {
if (++count == Integer.MAX_VALUE) { // saturated
return count;
}
}
pred = q;
}
}
return count;
}
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
@Override
public boolean remove(Object o) {
if (o == null) {
return false;
}
for (;;) {
QNode pred = traversalHead();
Node<E> pred = traversalHead();
for (;;) {
QNode q = pred.next;
if (q == null || !q.isData) {
return false;
}
Node<E> q = pred.next;
if (q == pred) {// restart
break;
}
if (q == null || !q.isData) {
return false;
}
Object x = q.get();
if (x != null && x != q && o.equals(x) &&
q.compareAndSet(x, q)) {
@ -861,4 +942,15 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
}
}
}
/**
* Always returns {@code Integer.MAX_VALUE} because a
* {@code LinkedTransferQueue} is not capacity constrained.
*
* @return {@code Integer.MAX_VALUE} (as specified by
* {@link BlockingQueue#remainingCapacity()})
*/
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
}