Sync with upstream

This commit is contained in:
Trustin Lee 2010-10-20 11:33:23 +00:00
parent 35a2326e82
commit b85731e59f

View File

@ -63,7 +63,7 @@ import java.util.concurrent.locks.LockSupport;
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a> * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author Doug Lea * @author Doug Lea
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$ (Upstream: 1.71) * @version $Rev$, $Date$ (Upstream: 1.79)
* *
* @param <E> the type of elements held in this collection * @param <E> the type of elements held in this collection
*/ */
@ -341,7 +341,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* situations in which we cannot guarantee to make node s * situations in which we cannot guarantee to make node s
* unreachable in this way: (1) If s is the trailing node of list * unreachable in this way: (1) If s is the trailing node of list
* (i.e., with null next), then it is pinned as the target node * (i.e., with null next), then it is pinned as the target node
* for appends, so can only be removed later when other nodes are * for appends, so can only be removed later after other nodes are
* appended. (2) We cannot necessarily unlink s given a * appended. (2) We cannot necessarily unlink s given a
* predecessor node that is matched (including the case of being * predecessor node that is matched (including the case of being
* cancelled): the predecessor may already be unspliced, in which * cancelled): the predecessor may already be unspliced, in which
@ -363,18 +363,18 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* When these cases arise, rather than always retraversing the * When these cases arise, rather than always retraversing the
* entire list to find an actual predecessor to unlink (which * entire list to find an actual predecessor to unlink (which
* won't help for case (1) anyway), we record a conservative * won't help for case (1) anyway), we record a conservative
* estimate of possible unsplice failures (in "sweepVotes"). We * estimate of possible unsplice failures (in "sweepVotes").
* trigger a full sweep when the estimate exceeds a threshold * We trigger a full sweep when the estimate exceeds a threshold
* indicating the maximum number of estimated removal failures to * ("SWEEP_THRESHOLD") indicating the maximum number of estimated
* tolerate before sweeping through, unlinking cancelled nodes * removal failures to tolerate before sweeping through, unlinking
* that were not unlinked upon initial removal. We perform sweeps * cancelled nodes that were not unlinked upon initial removal.
* by the thread hitting threshold (rather than background threads * We perform sweeps by the thread hitting threshold (rather than
* or by spreading work to other threads) because in the main * background threads or by spreading work to other threads)
* contexts in which removal occurs, the caller is already * because in the main contexts in which removal occurs, the
* timed-out, cancelled, or performing a potentially O(n) * caller is already timed-out, cancelled, or performing a
* operation (i.e., remove(x)), none of which are time-critical * potentially O(n) operation (e.g. remove(x)), none of which are
* enough to warrant the overhead that alternatives would impose * time-critical enough to warrant the overhead that alternatives
* on other threads. * would impose on other threads.
* *
* Because the sweepVotes estimate is conservative, and because * Because the sweepVotes estimate is conservative, and because
* nodes become unlinked "naturally" as they fall off the head of * nodes become unlinked "naturally" as they fall off the head of
@ -453,7 +453,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
} }
final boolean casItem(Object cmp, Object val) { final boolean casItem(Object cmp, Object val) {
assert cmp == null || cmp.getClass() != Node.class; // assert cmp == null || cmp.getClass() != Node.class;
if (AtomicFieldUpdaterUtil.isAvailable()) { if (AtomicFieldUpdaterUtil.isAvailable()) {
return itemUpdater.compareAndSet(this, cmp, val); return itemUpdater.compareAndSet(this, cmp, val);
} else { } else {
@ -469,8 +469,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
} }
/** /**
* Creates a new node. Uses relaxed write because item can only * Constructs a new node. Uses relaxed write because item can
* be seen if followed by CAS. * only be seen after publication via casNext.
*/ */
Node(Object item, boolean isData) { Node(Object item, boolean isData) {
this.item = item; this.item = item;
@ -530,7 +530,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* Tries to artificially match a data node -- used by remove. * Tries to artificially match a data node -- used by remove.
*/ */
final boolean tryMatchData() { final boolean tryMatchData() {
assert isData; // assert isData;
Object x = item; Object x = item;
if (x != null && x != this && casItem(x, null)) { if (x != null && x != this && casItem(x, null)) {
LockSupport.unpark(waiter); LockSupport.unpark(waiter);
@ -612,7 +612,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
static <E> E cast(Object item) { static <E> E cast(Object item) {
assert item == null || item.getClass() != Node.class; // assert item == null || item.getClass() != Node.class;
return (E) item; return (E) item;
} }
@ -638,7 +638,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
boolean isData = p.isData; boolean isData = p.isData;
Object item = p.item; Object item = p.item;
if (item != p && item != null == isData) { // unmatched if (item != p && item != null == isData) { // unmatched
if (isData == haveData) { if (isData == haveData) { // can't match
break; break;
} }
if (p.casItem(item, e)) { // match if (p.casItem(item, e)) { // match
@ -696,7 +696,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
} }
else if (p.cannotPrecede(haveData)) { else if (p.cannotPrecede(haveData)) {
return null; // lost race vs opposite mode return null; // lost race vs opposite mode
} else if ((n = p.next) != null) { } else if ((n = p.next) != null) { // not last; keep traversing
p = p != t && t != (u = tail) ? (t = u) : // stale tail p = p != t && t != (u = tail) ? (t = u) : // stale tail
p != n ? n : null; // restart if off list p != n ? n : null; // restart if off list
} else if (!p.casNext(null, s)) { } else if (!p.casNext(null, s)) {
@ -736,7 +736,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
for (;;) { for (;;) {
Object item = s.item; Object item = s.item;
if (item != e) { // matched if (item != e) { // matched
assert item != s; // assert item != s;
s.forgetContents(); // avoid garbage s.forgetContents(); // avoid garbage
return LinkedTransferQueue.<E>cast(item); return LinkedTransferQueue.<E>cast(item);
} }
@ -779,13 +779,13 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
*/ */
private static int spinsFor(Node pred, boolean haveData) { private static int spinsFor(Node pred, boolean haveData) {
if (MP && pred != null) { if (MP && pred != null) {
if (pred.isData != haveData) { if (pred.isData != haveData) { // phase change
return FRONT_SPINS + CHAINED_SPINS; return FRONT_SPINS + CHAINED_SPINS;
} }
if (pred.isMatched()) { if (pred.isMatched()) { // probably at front
return FRONT_SPINS; return FRONT_SPINS;
} }
if (pred.waiter == null) { if (pred.waiter == null) { // pred apparently spinning
return CHAINED_SPINS; return CHAINED_SPINS;
} }
} }
@ -847,7 +847,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
if (p.isData != data) { if (p.isData != data) {
return 0; return 0;
} }
if (++count == Integer.MAX_VALUE) { if (++count == Integer.MAX_VALUE) { // saturated
break; break;
} }
} }
@ -978,16 +978,19 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
} }
/** /**
* Unlinks matched nodes encountered in a traversal from head. * Unlinks matched (typically cancelled) nodes encountered in a
* traversal from head.
*/ */
private void sweep() { private void sweep() {
for (Node p = head, s, n; p != null && (s = p.next) != null; ) { for (Node p = head, s, n; p != null && (s = p.next) != null; ) {
if (p == s) { if (!s.isMatched()) {
p = head; // Unmatched nodes are never self-linked
} else if (!s.isMatched()) {
p = s; p = s;
} else if ((n = s.next) == null) { } else if ((n = s.next) == null) { // trailing node is pinned
break; break;
} else if (s == n) { // stale
// No need to also check for p == s, since that implies s == n
p = head;
} else { } else {
p.casNext(s, n); p.casNext(s, n);
} }