Code cleanup

This commit is contained in:
Trustin Lee 2008-10-06 13:25:27 +00:00
parent 7a004aa421
commit 87b4f886a6

View File

@ -28,6 +28,7 @@
*/ */
package org.jboss.netty.util; package org.jboss.netty.util;
import java.util.AbstractQueue; import java.util.AbstractQueue;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
@ -68,9 +69,7 @@ import java.util.concurrent.locks.LockSupport;
* @param <E> the type of elements held in this collection * @param <E> the type of elements held in this collection
* *
*/ */
public class LinkedTransferQueue<E> extends AbstractQueue<E> public class LinkedTransferQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -3223113410248163686L;
/* /*
* This is still a work in progress... * This is still a work in progress...
@ -89,12 +88,12 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
*/ */
// Wait modes for xfer method // Wait modes for xfer method
static final int NOWAIT = 0; private static final int NOWAIT = 0;
static final int TIMEOUT = 1; private static final int TIMEOUT = 1;
static final int WAIT = 2; private static final int WAIT = 2;
/** The number of CPUs, for spin control */ /** The number of CPUs, for spin control */
static final int NCPUS = Runtime.getRuntime().availableProcessors(); private static final int NCPUS = Runtime.getRuntime().availableProcessors();
/** /**
* The number of times to spin before blocking in timed waits. * The number of times to spin before blocking in timed waits.
@ -103,20 +102,20 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* seems not to vary with number of CPUs (beyond 2) so is just * seems not to vary with number of CPUs (beyond 2) so is just
* a constant. * a constant.
*/ */
static final int maxTimedSpins = NCPUS < 2? 0 : 32; private static final int maxTimedSpins = NCPUS < 2? 0 : 32;
/** /**
* The number of times to spin before blocking in untimed waits. * The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin * This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin. * faster since they don't need to check times on each spin.
*/ */
static final int maxUntimedSpins = maxTimedSpins * 16; private static final int maxUntimedSpins = maxTimedSpins * 16;
/** /**
* The number of nanoseconds for which it is faster to spin * The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices. * rather than to use timed park. A rough estimate suffices.
*/ */
static final long spinForTimeoutThreshold = 1000L; private static final long spinForTimeoutThreshold = 1000L;
/** /**
* Node class for LinkedTransferQueue. Opportunistically subclasses from * Node class for LinkedTransferQueue. Opportunistically subclasses from
@ -125,18 +124,19 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* retention. Similarly, setting the next field to this is used as * retention. Similarly, setting the next field to this is used as
* sentinel that node is off list. * sentinel that node is off list.
*/ */
static final class QNode extends AtomicReference<Object> { private static final class QNode extends AtomicReference<Object> {
private static final long serialVersionUID = 5925596372370723938L; private static final long serialVersionUID = 5925596372370723938L;
volatile QNode next; volatile QNode next;
volatile Thread waiter; // to control park/unpark volatile Thread waiter; // to control park/unpark
final boolean isData; final boolean isData;
QNode(Object item, boolean isData) { QNode(Object item, boolean isData) {
super(item); super(item);
this.isData = isData; this.isData = isData;
} }
static final AtomicReferenceFieldUpdater<QNode, QNode> private static final AtomicReferenceFieldUpdater<QNode, QNode>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(QNode.class, QNode.class, "next"); (QNode.class, QNode.class, "next");
@ -150,26 +150,26 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* cleanMe, to alleviate contention across threads CASing one vs * cleanMe, to alleviate contention across threads CASing one vs
* the other. * the other.
*/ */
// static final class PaddedAtomicReference<T> extends AtomicReference<T> { private static final class PaddedAtomicReference<T> extends AtomicReference<T> {
// private static final long serialVersionUID = 4684288940772921317L; private static final long serialVersionUID = 4684288940772921317L;
//
// // enough padding for 64bytes with 4byte refs // enough padding for 64bytes with 4byte refs
// Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe; Object p0, p1, p2, p3, p4, p5, p6, p7, p8, p9, pa, pb, pc, pd, pe;
// PaddedAtomicReference(T r) { super(r); } PaddedAtomicReference(T r) { super(r); }
// } }
/** head of the queue */ /** head of the queue */
private transient final AtomicReference<QNode> head; private final PaddedAtomicReference<QNode> head;
/** tail of the queue */ /** tail of the queue */
private transient final AtomicReference<QNode> tail; private final PaddedAtomicReference<QNode> tail;
/** /**
* Reference to a cancelled node that might not yet have been * Reference to a cancelled node that might not yet have been
* unlinked from queue because it was the last inserted node * unlinked from queue because it was the last inserted node
* when it cancelled. * when it cancelled.
*/ */
private transient final AtomicReference<QNode> cleanMe; private final PaddedAtomicReference<QNode> cleanMe;
/** /**
* Tries to cas nh as new head; if successful, unlink * Tries to cas nh as new head; if successful, unlink
@ -410,9 +410,9 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
*/ */
public LinkedTransferQueue() { public LinkedTransferQueue() {
QNode dummy = new QNode(null, false); QNode dummy = new QNode(null, false);
head = new AtomicReference<QNode>(dummy); head = new PaddedAtomicReference<QNode>(dummy);
tail = new AtomicReference<QNode>(dummy); tail = new PaddedAtomicReference<QNode>(dummy);
cleanMe = new AtomicReference<QNode>(null); cleanMe = new PaddedAtomicReference<QNode>(null);
} }
/** /**
@ -498,7 +498,8 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
throw new InterruptedException(); throw new InterruptedException();
} }
@SuppressWarnings("unchecked") E cast(Object e) { @SuppressWarnings("unchecked")
E cast(Object e) {
return (E) e; return (E) e;
} }
@ -589,18 +590,18 @@ public class LinkedTransferQueue<E> extends AbstractQueue<E>
* so that the next call to next() will return it even * so that the next call to next() will return it even
* if subsequently removed. * if subsequently removed.
*/ */
class Itr implements Iterator<E> { private class Itr implements Iterator<E> {
QNode nextNode; // Next node to return next private QNode nextNode; // Next node to return next
QNode currentNode; // last returned node, for remove() private QNode currentNode; // last returned node, for remove()
QNode prevNode; // predecessor of last returned node private QNode prevNode; // predecessor of last returned node
E nextItem; // Cache of next item, once commited to in next private E nextItem; // Cache of next item, once commited to in next
Itr() { Itr() {
nextNode = traversalHead(); nextNode = traversalHead();
advance(); advance();
} }
E advance() { private E advance() {
prevNode = currentNode; prevNode = currentNode;
currentNode = nextNode; currentNode = nextNode;
E x = nextItem; E x = nextItem;