Backported upstream updates for LinkedTransferQueue (NETTY-181)

This commit is contained in:
Trustin Lee 2009-06-23 16:08:15 +00:00
parent 590ebcc394
commit d963f4c046

View File

@ -47,7 +47,7 @@ import java.util.concurrent.locks.LockSupport;
* producer. The <em>tail</em> of the queue is that element that has
* been on the queue the shortest time for some producer.
*
* <p>Beware that, unlike in most collections, the <tt>size</tt>
* <p>Beware that, unlike in most collections, the {@code size}
* method is <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current number
* of elements requires a traversal of the elements.
@ -157,7 +157,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
nextUpdater = tmp;
}
boolean casNext(QNode cmp, QNode val) {
final boolean casNext(QNode cmp, QNode val) {
if (nextUpdater != null) {
return nextUpdater.compareAndSet(this, cmp, val);
} else {
@ -165,7 +165,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
}
}
private synchronized boolean alternativeCasNext(QNode cmp, QNode val) {
private synchronized final boolean alternativeCasNext(QNode cmp, QNode val) {
if (next == cmp) {
next = val;
return true;
@ -173,6 +173,11 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
return false;
}
}
final void clearNext() {
// nextUpdater.lazySet(this, this);
next = this; // allows run on java5
}
}
/**
@ -208,7 +213,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
*/
private boolean advanceHead(QNode h, QNode nh) {
if (h == head.get() && head.compareAndSet(h, nh)) {
h.next = h; // forget old next
h.clearNext(); // forget old next
return true;
}
return false;
@ -218,6 +223,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* Puts or takes an item. Used for most queue operations (except
* poll() and tryTransfer()). See the similar code in
* SynchronousQueue for detailed explanation.
*
* @param e the item or if null, signifies that this is a take
* @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
* @param nanos timeout in nanosecs, used only if mode is TIMEOUT
@ -266,7 +272,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
/**
* Version of xfer for poll() and tryTransfer, which
* simplifies control paths both here and in xfer
* simplifies control paths both here and in xfer.
*/
private Object fulfill(Object e) {
boolean isData = e != null;
@ -375,7 +381,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
}
/**
* Returns validated tail for use in cleaning methods
* Returns validated tail for use in cleaning methods.
*/
private QNode getValidatedTail() {
for (;;) {
@ -399,6 +405,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
/**
* Gets rid of cancelled node s with original predecessor pred.
*
* @param pred predecessor of cancelled node
* @param s the cancelled node
*/
@ -410,6 +417,11 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
LockSupport.unpark(w);
}
}
if (pred == null) {
return;
}
/*
* At any given time, exactly one node on list cannot be
* deleted -- the last inserted node. To accommodate this, if
@ -437,6 +449,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
/**
* Tries to unsplice the cancelled node held in cleanMe that was
* previously uncleanable because it was at tail.
*
* @return current cleanMe node (or null)
*/
private QNode reclean() {
@ -474,7 +487,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
}
/**
* Creates an initially empty <tt>LinkedTransferQueue</tt>.
* Creates an initially empty {@code LinkedTransferQueue}.
*/
public LinkedTransferQueue() {
QNode dummy = new QNode(null, false);
@ -484,9 +497,10 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
}
/**
* Creates a <tt>LinkedTransferQueue</tt>
* Creates a {@code LinkedTransferQueue}
* initially containing the elements of the given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
@ -526,6 +540,15 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
return true;
}
@Override
public boolean add(E e) {
if (e == null) {
throw new NullPointerException();
}
xfer(e, NOWAIT, 0);
return true;
}
public void transfer(E e) throws InterruptedException {
if (e == null) {
throw new NullPointerException();
@ -613,7 +636,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
// Traversal-based methods
/**
* Return head after performing any outstanding helping steps
* Returns head after performing any outstanding helping steps.
*/
QNode traversalHead() {
for (;;) {
@ -637,6 +660,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
}
}
}
reclean();
}
}
@ -653,59 +677,71 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* if subsequently removed.
*/
class Itr implements Iterator<E> {
QNode nextNode; // Next node to return next
QNode currentNode; // last returned node, for remove()
QNode prevNode; // predecessor of last returned node
QNode next; // node to return next
QNode pnext; // predecessor of next
QNode snext; // successor of next
QNode curr; // last returned node, for remove()
QNode pcurr; // predecessor of curr, for remove()
E nextItem; // Cache of next item, once commited to in next
Itr() {
nextNode = traversalHead();
advance();
findNext();
}
E advance() {
prevNode = currentNode;
currentNode = nextNode;
E x = nextItem;
QNode p = nextNode.next;
/**
* Ensure next points to next valid node, or null if none.
*/
void findNext() {
for (;;) {
if (p == null || !p.isData) {
nextNode = null;
nextItem = null;
return x;
QNode pred = pnext;
QNode q = next;
if (pred == null || pred == q) {
pred = traversalHead();
q = pred.next;
}
Object item = p.get();
if (item != p && item != null) {
nextNode = p;
nextItem = cast(item);
return x;
if (q == null || !q.isData) {
next = null;
return;
}
prevNode = p;
p = p.next;
Object x = q.get();
QNode s = q.next;
if (x != null && q != x && q != s) {
nextItem = cast(x);
snext = s;
pnext = pred;
next = q;
return;
}
pnext = q;
next = s;
}
}
public boolean hasNext() {
return nextNode != null;
return next != null;
}
public E next() {
if (nextNode == null) {
if (next == null) {
throw new NoSuchElementException();
}
return advance();
pcurr = pnext;
curr = next;
pnext = next;
next = snext;
E x = nextItem;
findNext();
return x;
}
public void remove() {
QNode p = currentNode;
QNode prev = prevNode;
if (prev == null || p == null) {
QNode p = curr;
if (p == null) {
throw new IllegalStateException();
}
Object x = p.get();
if (x != null && x != p && p.compareAndSet(x, p)) {
clean(prev, p);
clean(pcurr, p);
}
}
}
@ -765,8 +801,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
/**
* Returns the number of elements in this queue. If this queue
* contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
* <tt>Integer.MAX_VALUE</tt>.
* contains more than {@code Integer.MAX_VALUE} elements, returns
* {@code Integer.MAX_VALUE}.
*
* <p>Beware that, unlike in most collections, this method is
* <em>NOT</em> a constant-time operation. Because of the
@ -806,4 +842,30 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
@Override
public boolean remove(Object o) {
if (o == null) {
return false;
}
for (;;) {
QNode pred = traversalHead();
for (;;) {
QNode q = pred.next;
if (q == null || !q.isData) {
return false;
}
if (q == pred) {// restart
break;
}
Object x = q.get();
if (x != null && x != q && o.equals(x) &&
q.compareAndSet(x, q)) {
clean(pred, q);
return true;
}
pred = q;
}
}
}
}