Fixed issue: NETTY-65 (Intermittent high CPU consumption in LinkedTransferQueue)

* Applied the latest upstream fix
This commit is contained in:
Trustin Lee 2008-11-18 09:43:53 +00:00
parent 580f6f2284
commit 57b2d9a443

View File

@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
/** /**
* An unbounded {@linkplain BlockingQueue} based on linked nodes. * An unbounded <tt>TransferQueue</tt> based on linked nodes.
* This queue orders elements FIFO (first-in-first-out) with respect * This queue orders elements FIFO (first-in-first-out) with respect
* to any given producer. The <em>head</em> of the queue is that * to any given producer. The <em>head</em> of the queue is that
* element that has been on the queue the longest time for some * element that has been on the queue the longest time for some
@ -58,9 +58,10 @@ import java.util.concurrent.locks.LockSupport;
* *
* <p>Memory consistency effects: As with other concurrent * <p>Memory consistency effects: As with other concurrent
* collections, actions in a thread prior to placing an object into a * collections, actions in a thread prior to placing an object into a
* {@code LinkedTransferQueue} <i>happen-before</i> actions subsequent * {@code LinkedTransferQueue}
* to the access or removal of that element from the * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* {@code LinkedTransferQueue} in another thread. * actions subsequent to the access or removal of that element from
* the {@code LinkedTransferQueue} in another thread.
* *
* @author Doug Lea * @author Doug Lea
* @author The Netty Project (netty-dev@lists.jboss.org) * @author The Netty Project (netty-dev@lists.jboss.org)
@ -72,19 +73,19 @@ import java.util.concurrent.locks.LockSupport;
public class LinkedTransferQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> { public class LinkedTransferQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
/* /*
* This is still a work in progress...
*
* This class extends the approach used in FIFO-mode * This class extends the approach used in FIFO-mode
* SynchronousQueues. See the internal documentation, as well as * SynchronousQueues. See the internal documentation, as well as
* the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer, * the PPoPP 2006 paper "Scalable Synchronous Queues" by Scherer,
* Lea & Scott * Lea & Scott
* (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf) * (http://www.cs.rice.edu/~wns1/papers/2006-PPoPP-SQ.pdf)
* *
* The main extension is to provide different Wait modes * The main extension is to provide different Wait modes for the
* for the main "xfer" method that puts or takes items. * main "xfer" method that puts or takes items. These don't
* These do not impact the basic dual-queue logic, but instead * impact the basic dual-queue logic, but instead control whether
* control whether or how threads block upon insertion * or how threads block upon insertion of request or data nodes
* of request or data nodes into the dual queue. * into the dual queue. It also uses slightly different
* conventions for tracking whether nodes are off-list or
* cancelled.
*/ */
// Wait modes for xfer method // Wait modes for xfer method
@ -107,7 +108,7 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
/** /**
* The number of times to spin before blocking in untimed waits. * The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin * This is greater than timed value because untimed waits spin
* faster since they do not need to check times on each spin. * faster since they don't need to check times on each spin.
*/ */
private static final int maxUntimedSpins = maxTimedSpins * 16; private static final int maxUntimedSpins = maxTimedSpins * 16;
@ -118,19 +119,18 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
private static final long spinForTimeoutThreshold = 1000L; private static final long spinForTimeoutThreshold = 1000L;
/** /**
* Node class for LinkedTransferQueue. Opportunistically subclasses from * Node class for LinkedTransferQueue. Opportunistically
* AtomicReference to represent item. Uses Object, not E, to allow * subclasses from AtomicReference to represent item. Uses Object,
* setting item to "this" after use, to avoid garbage * not E, to allow setting item to "this" after use, to avoid
* retention. Similarly, setting the next field to this is used as * garbage retention. Similarly, setting the next field to this is
* sentinel that node is off list. * used as sentinel that node is off list.
*/ */
private static final class QNode extends AtomicReference<Object> { private static final class QNode extends AtomicReference<Object> {
private static final long serialVersionUID = 5925596372370723938L; private static final long serialVersionUID = 5925596372370723938L;
volatile QNode next; volatile QNode next;
transient volatile Thread waiter; // to control park/unpark volatile Thread waiter; // to control park/unpark
final boolean isData; final boolean isData;
QNode(Object item, boolean isData) { QNode(Object item, boolean isData) {
super(item); super(item);
this.isData = isData; this.isData = isData;
@ -185,7 +185,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
/** /**
* Puts or takes an item. Used for most queue operations (except * Puts or takes an item. Used for most queue operations (except
* poll() and tryTransfer()) * poll() and tryTransfer()). See the similar code in
* SynchronousQueue for detailed explanation.
* @param e the item or if null, signifies that this is a take * @param e the item or if null, signifies that this is a take
* @param mode the wait mode: NOWAIT, TIMEOUT, WAIT * @param mode the wait mode: NOWAIT, TIMEOUT, WAIT
* @param nanos timeout in nanosecs, used only if mode is TIMEOUT * @param nanos timeout in nanosecs, used only if mode is TIMEOUT
@ -194,8 +195,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
private Object xfer(Object e, int mode, long nanos) { private Object xfer(Object e, int mode, long nanos) {
boolean isData = e != null; boolean isData = e != null;
QNode s = null; QNode s = null;
final AtomicReference<QNode> head = this.head; final PaddedAtomicReference<QNode> head = this.head;
final AtomicReference<QNode> tail = this.tail; final PaddedAtomicReference<QNode> tail = this.tail;
for (;;) { for (;;) {
QNode t = tail.get(); QNode t = tail.get();
@ -238,8 +239,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
*/ */
private Object fulfill(Object e) { private Object fulfill(Object e) {
boolean isData = e != null; boolean isData = e != null;
final AtomicReference<QNode> head = this.head; final PaddedAtomicReference<QNode> head = this.head;
final AtomicReference<QNode> tail = this.tail; final PaddedAtomicReference<QNode> tail = this.tail;
for (;;) { for (;;) {
QNode t = tail.get(); QNode t = tail.get();
@ -297,13 +298,17 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
Object x = s.get(); Object x = s.get();
if (x != e) { // Node was matched or cancelled if (x != e) { // Node was matched or cancelled
advanceHead(pred, s); // unlink if head advanceHead(pred, s); // unlink if head
if (x == s) { if (x == s) { // was cancelled
return clean(pred, s); clean(pred, s);
return null;
}
else if (x != null) {
s.set(s); // avoid garbage retention
return x;
} else {
return e;
} }
s.set(s); // mark as off-list
return x != null? x : e;
} }
if (mode == TIMEOUT) { if (mode == TIMEOUT) {
long now = System.nanoTime(); long now = System.nanoTime();
nanos -= now - lastTime; nanos -= now - lastTime;
@ -339,10 +344,34 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
} }
/** /**
* Gets rid of cancelled node s with original predecessor pred. * Returns validated tail for use in cleaning methods
* @return null (to simplify use by callers)
*/ */
Object clean(final QNode pred, final QNode s) { private QNode getValidatedTail() {
for (;;) {
QNode h = head.get();
QNode first = h.next;
if (first != null && first.next == first) { // help advance
advanceHead(h, first);
continue;
}
QNode t = tail.get();
QNode last = t.next;
if (t == tail.get()) {
if (last != null) {
tail.compareAndSet(t, last); // help advance
} else {
return t;
}
}
}
}
/**
* Gets rid of cancelled node s with original predecessor pred.
* @param pred predecessor of cancelled node
* @param s the cancelled node
*/
void clean(QNode pred, QNode s) {
Thread w = s.waiter; Thread w = s.waiter;
if (w != null) { // Wake up thread if (w != null) { // Wake up thread
s.waiter = null; s.waiter = null;
@ -350,59 +379,69 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
LockSupport.unpark(w); LockSupport.unpark(w);
} }
} }
/*
for (;;) { * At any given time, exactly one node on list cannot be
if (pred.next != s) { * deleted -- the last inserted node. To accommodate this, if
return null; * we cannot delete s, we save its predecessor as "cleanMe",
} * processing the previously saved version first. At least one
QNode h = head.get(); * of node s or the node previously saved can always be
QNode hn = h.next; // Absorb cancelled first node as head * processed, so this always terminates.
if (hn != null && hn.next == hn) { */
advanceHead(h, hn); while (pred.next == s) {
continue; QNode oldpred = reclean(); // First, help get rid of cleanMe
} QNode t = getValidatedTail();
QNode t = tail.get(); // Ensure consistent read for tail if (s != t) { // If not tail, try to unsplice
if (t == h) { QNode sn = s.next; // s.next == s means s already off list
return null;
}
QNode tn = t.next;
if (t != tail.get()) {
continue;
}
if (tn != null) { // Help advance tail
tail.compareAndSet(t, tn);
continue;
}
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn)) { if (sn == s || pred.casNext(s, sn)) {
return null; break;
} }
} }
else if (oldpred == pred || // Already saved
QNode dp = cleanMe.get(); oldpred == null && cleanMe.compareAndSet(null, pred)) {
if (dp != null) { // Try unlinking previous cancelled node break; // Postpone cleaning
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
d.get() != d || // d not cancelled or
d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn)) {
cleanMe.compareAndSet(dp, null);
}
if (dp == pred) {
return null; // s is already saved node
}
}
else if (cleanMe.compareAndSet(null, pred)) {
return null; // Postpone cleaning s
} }
} }
} }
/**
* Tries to unsplice the cancelled node held in cleanMe that was
* previously uncleanable because it was at tail.
* @return current cleanMe node (or null)
*/
private QNode reclean() {
/*
* cleanMe is, or at one time was, predecessor of cancelled
* node s that was the tail so could not be unspliced. If s
* is no longer the tail, try to unsplice if necessary and
* make cleanMe slot available. This differs from similar
* code in clean() because we must check that pred still
* points to a cancelled node that must be unspliced -- if
* not, we can (must) clear cleanMe without unsplicing.
* This can loop only due to contention on casNext or
* clearing cleanMe.
*/
QNode pred;
while ((pred = cleanMe.get()) != null) {
QNode t = getValidatedTail();
QNode s = pred.next;
if (s != t) {
QNode sn;
if (s == null || s == pred || s.get() != s ||
(sn = s.next) == s || pred.casNext(s, sn)) {
cleanMe.compareAndSet(pred, null);
}
} else {
break;
}
}
return pred;
}
@SuppressWarnings("unchecked")
E cast(Object e) {
return (E)e;
}
/** /**
* Creates an initially empty <tt>LinkedTransferQueue</tt>. * Creates an initially empty <tt>LinkedTransferQueue</tt>.
*/ */
@ -496,11 +535,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
throw new InterruptedException(); throw new InterruptedException();
} }
@SuppressWarnings("unchecked")
E cast(Object e) {
return (E) e;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException { public E poll(long timeout, TimeUnit unit) throws InterruptedException {
Object e = xfer(null, TIMEOUT, unit.toNanos(timeout)); Object e = xfer(null, TIMEOUT, unit.toNanos(timeout));
if (e != null || !Thread.interrupted()) { if (e != null || !Thread.interrupted()) {
@ -575,7 +609,6 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
} }
} }
@Override @Override
public Iterator<E> iterator() { public Iterator<E> iterator() {
return new Itr(); return new Itr();
@ -588,18 +621,18 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
* so that the next call to next() will return it even * so that the next call to next() will return it even
* if subsequently removed. * if subsequently removed.
*/ */
private class Itr implements Iterator<E> { class Itr implements Iterator<E> {
private QNode nextNode; // Next node to return next QNode nextNode; // Next node to return next
private QNode currentNode; // last returned node, for remove() QNode currentNode; // last returned node, for remove()
private QNode prevNode; // predecessor of last returned node QNode prevNode; // predecessor of last returned node
private E nextItem; // Cache of next item, once commited to in next E nextItem; // Cache of next item, once commited to in next
Itr() { Itr() {
nextNode = traversalHead(); nextNode = traversalHead();
advance(); advance();
} }
private E advance() { E advance() {
prevNode = currentNode; prevNode = currentNode;
currentNode = nextNode; currentNode = nextNode;
E x = nextItem; E x = nextItem;
@ -742,4 +775,4 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E> implements Blocking
public int remainingCapacity() { public int remainingCapacity() {
return Integer.MAX_VALUE; return Integer.MAX_VALUE;
} }
} }