* Added alternative implementations of ConcurrentHashMap and replaced existing references to java.util.concurrent.ConcurrentHashMap with them

* Added ReusableIterator to avoid Iterator creation overhead
* Optimized HashedWheelTimer
This commit is contained in:
Trustin Lee 2009-01-20 07:57:45 +00:00
parent 546ac3260c
commit b2d27d3b69
12 changed files with 1720 additions and 661 deletions

View File

@ -24,7 +24,7 @@ package org.jboss.netty.channel;
import java.util.concurrent.ConcurrentMap;
import org.jboss.netty.util.ConcurrentWeakHashMap;
import org.jboss.netty.util.ConcurrentIdentityWeakHashMap;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
@ -34,7 +34,8 @@ import org.jboss.netty.util.ConcurrentWeakHashMap;
* @apiviz.stereotype utility
*/
public class ChannelLocal<T> {
private final ConcurrentMap<Channel, T> map = new ConcurrentWeakHashMap<Channel, T>();
private final ConcurrentMap<Channel, T> map =
new ConcurrentIdentityWeakHashMap<Channel, T>();
/**
* Creates a {@link Channel} local variable.

View File

@ -22,9 +22,10 @@
*/
package org.jboss.netty.channel.group;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.jboss.netty.util.ConcurrentHashMap;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)

View File

@ -30,7 +30,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.jboss.netty.channel.Channel;
@ -38,6 +37,7 @@ import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ServerChannel;
import org.jboss.netty.util.CombinedIterator;
import org.jboss.netty.util.ConcurrentHashMap;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)

View File

@ -27,11 +27,11 @@ import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.ConcurrentHashMap;
/**
* The default {@link ObjectSizeEstimator} implementation for general purpose.

View File

@ -23,7 +23,6 @@
package org.jboss.netty.handler.execution;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@ -41,6 +40,7 @@ import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ConcurrentHashMap;
import org.jboss.netty.util.LinkedTransferQueue;
/**

View File

@ -23,7 +23,6 @@
package org.jboss.netty.handler.execution;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
@ -33,6 +32,7 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.util.ConcurrentHashMap;
/**
* A {@link MemoryAwareThreadPoolExecutor} which maintains the

View File

@ -26,16 +26,18 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ConcurrentIdentityHashMap;
import org.jboss.netty.util.ExecutorUtil;
import org.jboss.netty.util.MapBackedSet;
import org.jboss.netty.util.ReusableIterator;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
@ -47,18 +49,20 @@ public class HashedWheelTimer implements Timer {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(HashedWheelTimer.class);
private final Executor executor;
private final Worker worker = new Worker();
final Executor executor;
final Worker worker = new Worker();
final AtomicInteger activeTimeouts = new AtomicInteger();
final long tickDuration;
final long wheelDuration;
final long roundDuration;
final Set<HashedWheelTimeout>[] wheel;
final ReusableIterator<HashedWheelTimeout>[] iterators;
final int mask;
final ReadWriteLock lock = new ReentrantReadWriteLock();
volatile int wheelCursor;
public HashedWheelTimer(Executor executor) {
this(executor, 1, TimeUnit.SECONDS, 64);
this(executor, 100, TimeUnit.MILLISECONDS, 512); // about 50 sec
}
public HashedWheelTimer(
@ -80,6 +84,7 @@ public class HashedWheelTimer implements Timer {
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
iterators = createIterators(wheel);
mask = wheel.length - 1;
// Convert checkInterval to nanoseconds.
@ -93,8 +98,7 @@ public class HashedWheelTimer implements Timer {
tickDuration + ' ' + unit);
}
wheelDuration = tickDuration * wheel.length;
executor.execute(worker);
roundDuration = tickDuration * wheel.length;
}
@SuppressWarnings("unchecked")
@ -109,11 +113,20 @@ public class HashedWheelTimer implements Timer {
}
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
Set<HashedWheelTimeout>[] buckets = new Set[ticksPerWheel];
for (int i = 0; i < buckets.length; i ++) {
buckets[i] = new MapBackedSet<HashedWheelTimeout>(new ConcurrentHashMap<HashedWheelTimeout, Boolean>());
Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new MapBackedSet<HashedWheelTimeout>(new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>());
}
return buckets;
return wheel;
}
@SuppressWarnings("unchecked")
private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
for (int i = 0; i < wheel.length; i ++) {
iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
}
return iterators;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
@ -139,14 +152,19 @@ public class HashedWheelTimer implements Timer {
initialDelay = unit.toNanos(initialDelay);
checkDelay(initialDelay);
// Add the timeout to the wheel.
HashedWheelTimeout timeout;
lock.readLock().lock();
try {
timeout = new HashedWheelTimeout(
task, wheelCursor, System.nanoTime(), initialDelay);
wheel[schedule(timeout)].add(timeout);
// Start the worker if necessary.
if (activeTimeouts.getAndIncrement() == 0) {
executor.execute(worker);
}
} finally {
lock.readLock().unlock();
}
@ -163,33 +181,41 @@ public class HashedWheelTimer implements Timer {
final long oldCumulativeDelay = timeout.cumulativeDelay;
final long newCumulativeDelay = oldCumulativeDelay + additionalDelay;
final long lastWheelDelay = newCumulativeDelay % wheelDuration;
final long lastRoundDelay = newCumulativeDelay % roundDuration;
final long lastTickDelay = newCumulativeDelay % tickDuration;
final int relativeIndex =
(int) (lastWheelDelay / tickDuration) + (lastTickDelay != 0? 1 : 0);
final long relativeIndex =
lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
timeout.deadline = timeout.startTime + newCumulativeDelay;
timeout.cumulativeDelay = newCumulativeDelay;
timeout.remainingRounds =
additionalDelay / wheelDuration -
(additionalDelay % wheelDuration == 0? 1:0) - timeout.slippedRounds;
additionalDelay / roundDuration -
(additionalDelay % roundDuration == 0? 1:0) - timeout.slippedRounds;
timeout.slippedRounds = 0;
return timeout.stopIndex = timeout.startIndex + relativeIndex & mask;
return timeout.stopIndex = (int) (timeout.startIndex + relativeIndex & mask);
}
}
boolean isWheelEmpty() {
for (Set<HashedWheelTimeout> bucket: wheel) {
if (!bucket.isEmpty()) {
return false;
}
}
return true;
}
void checkDelay(long delay) {
if (delay < tickDuration) {
throw new IllegalArgumentException(
"delay must be greater than " +
tickDuration + " nanoseconds");
"delay must be greater than " + tickDuration + " nanoseconds");
}
}
private final class Worker implements Runnable {
private long startTime;
private volatile long threadSafeStartTime;
private long tick;
Worker() {
@ -200,17 +226,18 @@ public class HashedWheelTimer implements Timer {
List<HashedWheelTimeout> expiredTimeouts =
new ArrayList<HashedWheelTimeout>();
startTime = System.nanoTime();
long startTime = threadSafeStartTime;
tick = 1;
for (;;) {
waitForNextTick();
fetchExpiredTimeouts(expiredTimeouts);
boolean continueTheLoop;
do {
startTime = waitForNextTick(startTime);
continueTheLoop = fetchExpiredTimeouts(expiredTimeouts);
notifyExpiredTimeouts(expiredTimeouts);
}
} while (continueTheLoop && !ExecutorUtil.isShutdown(executor));
}
private void fetchExpiredTimeouts(
private boolean fetchExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts) {
// Find the expired timeouts and decrease the round counter
@ -219,30 +246,49 @@ public class HashedWheelTimer implements Timer {
// an exclusive lock.
lock.writeLock().lock();
try {
long currentTime = System.nanoTime();
int newBucketHead = wheelCursor + 1 & mask;
Set<HashedWheelTimeout> bucket = wheel[wheelCursor];
int oldBucketHead = wheelCursor;
int newBucketHead = oldBucketHead + 1 & mask;
wheelCursor = newBucketHead;
for (Iterator<HashedWheelTimeout> i = bucket.iterator(); i.hasNext();) {
HashedWheelTimeout timeout = i.next();
synchronized (timeout) {
if (timeout.remainingRounds <= 0) {
if (timeout.deadline <= currentTime) {
i.remove();
expiredTimeouts.add(timeout);
} else {
// A rare case where a timeout is put for the next
// round: just wait for the next round.
timeout.slippedRounds ++;
}
} else {
timeout.remainingRounds --;
}
}
ReusableIterator<HashedWheelTimeout> i = iterators[oldBucketHead];
i.rewind();
fetchExpiredTimeouts(expiredTimeouts, i);
if (activeTimeouts.get() == 0) {
// Exit the loop.
return false;
}
} finally {
lock.writeLock().unlock();
}
// Continue the loop.
return true;
}
private void fetchExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts,
Iterator<HashedWheelTimeout> i) {
long currentTime = System.nanoTime();
while (i.hasNext()) {
HashedWheelTimeout timeout = i.next();
synchronized (timeout) {
if (timeout.remainingRounds <= 0) {
if (timeout.deadline <= currentTime) {
i.remove();
expiredTimeouts.add(timeout);
activeTimeouts.getAndDecrement();
} else {
// A rare case where a timeout is put for the next
// round: just wait for the next round.
timeout.slippedRounds ++;
}
} else {
timeout.remainingRounds --;
}
}
}
}
private void notifyExpiredTimeouts(
@ -256,7 +302,7 @@ public class HashedWheelTimer implements Timer {
expiredTimeouts.clear();
}
private void waitForNextTick() {
private long waitForNextTick(long startTime) {
for (;;) {
final long currentTime = System.nanoTime();
final long sleepTime = tickDuration * tick - (currentTime - startTime);
@ -268,7 +314,9 @@ public class HashedWheelTimer implements Timer {
try {
Thread.sleep(sleepTime / 1000000, (int) (sleepTime % 1000000));
} catch (InterruptedException e) {
// FIXME: must exit the loop if necessary
if (ExecutorUtil.isShutdown(executor) || isWheelEmpty()) {
return startTime;
}
}
}
@ -280,6 +328,8 @@ public class HashedWheelTimer implements Timer {
// Increase the tick if overflow is not likely to happen.
tick ++;
}
return startTime;
}
}
@ -318,10 +368,13 @@ public class HashedWheelTimer implements Timer {
return;
}
boolean removed;
synchronized (this) {
if (!wheel[stopIndex].remove(this)) {
return;
}
removed = wheel[stopIndex].remove(this);
}
if (removed) {
activeTimeouts.getAndDecrement();
}
}
@ -341,11 +394,19 @@ public class HashedWheelTimer implements Timer {
lock.readLock().lock();
try {
// Reinsert the timeout to the appropriate bucket.
int newStopIndex;
synchronized (this) {
newStopIndex = stopIndex = schedule(this, additionalDelay);
}
wheel[newStopIndex].add(this);
if (wheel[newStopIndex].add(this)) {
// Start the worker if necessary.
if (activeTimeouts.getAndIncrement() == 0) {
executor.execute(worker);
}
}
} finally {
extensionCount ++;
lock.readLock().unlock();

File diff suppressed because it is too large Load Diff

View File

@ -27,118 +27,24 @@
*/
package org.jboss.netty.util;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.AbstractCollection;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.EnumSet;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
/**
* An advanced hash table supporting configurable garbage collection semantics
* of keys and values, optional referential-equality, full concurrency of
* retrievals, and adjustable expected concurrency for updates.
*
* This table is designed around specific advanced use-cases. If there is any
* doubt whether this table is for you, you most likely should be using
* {@link ConcurrentHashMap} instead.
*
* This table supports strong, weak, and soft keys and values. By default keys
* are weak, and values are strong. Such a configuration offers similar behavior
* to {@link WeakHashMap}, entries of this table are periodically removed once
* their corresponding keys are no longer referenced outside of this table. In
* other words, this table will not prevent a key from being discarded by the
* garbage collector. Once a key has been discarded by the collector, the
* corresponding entry is no longer visible to this table; however, the entry
* may occupy space until a future table operation decides to reclaim it. For
* this reason, summary functions such as <tt>size</tt> and <tt>isEmpty</tt>
* might return a value greater than the observed number of entries. In order to
* support a high level of concurrency, stale entries are only reclaimed during
* blocking (usually mutating) operations.
*
* Enabling soft keys allows entries in this table to remain until their space
* is absolutely needed by the garbage collector. This is unlike weak keys which
* can be reclaimed as soon as they are no longer referenced by a normal strong
* reference. The primary use case for soft keys is a cache, which ideally
* occupies memory that is not in use for as long as possible.
*
* By default, values are held using a normal strong reference. This provides
* the commonly desired guarantee that a value will always have at least the
* same life-span as it's key. For this reason, care should be taken to ensure
* that a value never refers, either directly or indirectly, to its key, thereby
* preventing reclamation. If this is unavoidable, then it is recommended to use
* the same reference type in use for the key. However, it should be noted that
* non-strong values may disappear before their corresponding key.
*
* While this table does allow the use of both strong keys and values, it is
* recommended to use {@link ConcurrentHashMap} for such a configuration, since
* it is optimized for that case.
*
* Just like {@link ConcurrentHashMap}, this class obeys the same functional
* specification as {@link Hashtable}, and includes versions of methods
* corresponding to each method of <tt>Hashtable</tt>. However, even though all
* operations are thread-safe, retrieval operations do <em>not</em> entail
* locking, and there is <em>not</em> any support for locking the entire table
* in a way that prevents all access. This class is fully interoperable with
* <tt>Hashtable</tt> in programs that rely on its thread safety but not on its
* synchronization details.
*
* <p>
* Retrieval operations (including <tt>get</tt>) generally do not block, so may
* overlap with update operations (including <tt>put</tt> and <tt>remove</tt>).
* Retrievals reflect the results of the most recently <em>completed</em> update
* operations holding upon their onset. For aggregate operations such as
* <tt>putAll</tt> and <tt>clear</tt>, concurrent retrievals may reflect
* insertion or removal of only some entries. Similarly, Iterators and
* Enumerations return elements reflecting the state of the hash table at some
* point at or since the creation of the iterator/enumeration. They do
* <em>not</em> throw {@link ConcurrentModificationException}. However,
* iterators are designed to be used by only one thread at a time.
*
* <p>
* The allowed concurrency among update operations is guided by the optional
* <tt>concurrencyLevel</tt> constructor argument (default <tt>16</tt>), which
* is used as a hint for internal sizing. The table is internally partitioned to
* try to permit the indicated number of concurrent updates without contention.
* Because placement in hash tables is essentially random, the actual
* concurrency will vary. Ideally, you should choose a value to accommodate as
* many threads as will ever concurrently modify the table. Using a
* significantly higher value than you need can waste space and time, and a
* significantly lower value can lead to thread contention. But overestimates
* and underestimates within an order of magnitude do not usually have much
* noticeable impact. A value of one is appropriate when it is known that only
* one thread will modify and all others will only read. Also, resizing this or
* any other kind of hash table is a relatively slow operation, so, when
* possible, it is a good idea to provide estimates of expected table sizes in
* constructors.
*
* <p>
* This class and its views and iterators implement all of the <em>optional</em>
* methods of the {@link Map} and {@link Iterator} interfaces.
*
* <p>
* Like {@link Hashtable} but unlike {@link HashMap}, this class does
* <em>not</em> allow <tt>null</tt> to be used as a key or value.
* An alternative identity-comparing {@link ConcurrentMap} which is similar to
* {@link java.util.concurrent.ConcurrentHashMap}.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Doug Lea
@ -149,42 +55,8 @@ import java.util.concurrent.locks.ReentrantLock;
* @param <K> the type of keys maintained by this map
* @param <V> the type of mapped values
*/
public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
/*
* The basic strategy is to subdivide the table among Segments,
* each of which itself is a concurrently readable hash table.
*/
/**
* An option specifying which Java reference type should be used to refer
* to a key and/or value.
*/
public static enum ReferenceType {
/** Indicates a normal Java strong reference should be used */
STRONG,
/** Indicates a {@link WeakReference} should be used */
WEAK,
/** Indicates a {@link SoftReference} should be used */
SOFT
}
public static enum Option {
/**
* Indicates that referential-equality (== instead of .equals()) should
* be used when locating keys. This offers similar behavior to
* {@link IdentityHashMap}
*/
IDENTITY_COMPARISONS
}
/* ---------------- Constants -------------- */
static final ReferenceType DEFAULT_KEY_TYPE = ReferenceType.WEAK;
static final ReferenceType DEFAULT_VALUE_TYPE = ReferenceType.STRONG;
public final class ConcurrentIdentityHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>{
/**
* The default initial capacity for this table, used when not otherwise
@ -243,11 +115,9 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
*/
final Segment<K, V>[] segments;
boolean identityComparisons;
transient Set<K> keySet;
transient Set<Map.Entry<K, V>> entrySet;
transient Collection<V> values;
Set<K> keySet;
Set<Map.Entry<K, V>> entrySet;
Collection<V> values;
/* ---------------- Small Utilities -------------- */
@ -280,103 +150,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
}
private int hashOf(Object key) {
return hash(identityComparisons?
System.identityHashCode(key) : key.hashCode());
}
/* ---------------- Inner Classes -------------- */
static interface KeyReference {
int keyHash();
Object keyRef();
}
/**
* A weak-key reference which stores the key hash needed for reclamation.
*/
static final class WeakKeyReference<K>
extends WeakReference<K> implements KeyReference {
final int hash;
WeakKeyReference(K key, int hash, ReferenceQueue<Object> refQueue) {
super(key, refQueue);
this.hash = hash;
}
public final int keyHash() {
return hash;
}
public final Object keyRef() {
return this;
}
}
/**
* A soft-key reference which stores the key hash needed for reclamation.
*/
static final class SoftKeyReference<K>
extends SoftReference<K> implements KeyReference {
final int hash;
SoftKeyReference(K key, int hash, ReferenceQueue<Object> refQueue) {
super(key, refQueue);
this.hash = hash;
}
public final int keyHash() {
return hash;
}
public final Object keyRef() {
return this;
}
}
static final class WeakValueReference<V>
extends WeakReference<V> implements KeyReference {
final Object keyRef;
final int hash;
WeakValueReference(
V value, Object keyRef, int hash, ReferenceQueue<Object> refQueue) {
super(value, refQueue);
this.keyRef = keyRef;
this.hash = hash;
}
public final int keyHash() {
return hash;
}
public final Object keyRef() {
return keyRef;
}
}
static final class SoftValueReference<V>
extends SoftReference<V> implements KeyReference {
final Object keyRef;
final int hash;
SoftValueReference(
V value, Object keyRef, int hash, ReferenceQueue<Object> refQueue) {
super(value, refQueue);
this.keyRef = keyRef;
this.hash = hash;
}
public final int keyHash() {
return hash;
}
public final Object keyRef() {
return keyRef;
}
return hash(System.identityHashCode(key));
}
/**
@ -392,70 +166,31 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
* an unsynchronized access method.
*/
static final class HashEntry<K, V> {
final Object keyRef;
final Object key;
final int hash;
volatile Object valueRef;
volatile Object value;
final HashEntry<K, V> next;
HashEntry(
K key, int hash, HashEntry<K, V> next, V value,
ReferenceType keyType, ReferenceType valueType,
ReferenceQueue<Object> refQueue) {
K key, int hash, HashEntry<K, V> next, V value) {
this.hash = hash;
this.next = next;
this.keyRef = newKeyReference(key, keyType, refQueue);
this.valueRef = newValueReference(value, valueType, refQueue);
}
final Object newKeyReference(
K key, ReferenceType keyType, ReferenceQueue<Object> refQueue) {
if (keyType == ReferenceType.WEAK) {
return new WeakKeyReference<K>(key, hash, refQueue);
}
if (keyType == ReferenceType.SOFT) {
return new SoftKeyReference<K>(key, hash, refQueue);
}
return key;
}
final Object newValueReference(
V value, ReferenceType valueType, ReferenceQueue<Object> refQueue) {
if (valueType == ReferenceType.WEAK) {
return new WeakValueReference<V>(value, keyRef, hash, refQueue);
}
if (valueType == ReferenceType.SOFT) {
return new SoftValueReference<V>(value, keyRef, hash, refQueue);
}
return value;
this.key = key;
this.value = value;
}
@SuppressWarnings("unchecked")
final K key() {
if (keyRef instanceof KeyReference) {
return ((Reference<K>) keyRef).get();
}
return (K) keyRef;
}
final V value() {
return dereferenceValue(valueRef);
return (K) key;
}
@SuppressWarnings("unchecked")
final V dereferenceValue(Object value) {
if (value instanceof KeyReference) {
return ((Reference<V>) value).get();
}
final V value() {
return (V) value;
}
final void setValue(
V value, ReferenceType valueType, ReferenceQueue<Object> refQueue) {
this.valueRef = newValueReference(value, valueType, refQueue);
final void setValue(V value) {
this.value = value;
}
@SuppressWarnings("unchecked")
@ -510,7 +245,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
/**
* The number of elements in this segment's region.
*/
transient volatile int count;
volatile int count;
/**
* Number of updates that alter the size of the table. This is used
@ -519,18 +254,18 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
* checking containsValue, then we might have an inconsistent view of
* state so (usually) must retry.
*/
transient int modCount;
int modCount;
/**
* The table is rehashed when its size exceeds this threshold.
* (The value of this field is always <tt>(capacity * loadFactor)</tt>.)
*/
transient int threshold;
int threshold;
/**
* The per-segment table.
*/
transient volatile HashEntry<K, V>[] table;
volatile HashEntry<K, V>[] table;
/**
* The load factor for the hash table. Even though this value is same
@ -541,24 +276,8 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
*/
final float loadFactor;
/**
* The collected weak-key reference queue for this segment. This should
* be (re)initialized whenever table is assigned,
*/
transient volatile ReferenceQueue<Object> refQueue;
final ReferenceType keyType;
final ReferenceType valueType;
final boolean identityComparisons;
Segment(int initialCapacity, float lf, ReferenceType keyType,
ReferenceType valueType, boolean identityComparisons) {
Segment(int initialCapacity, float lf) {
loadFactor = lf;
this.keyType = keyType;
this.valueType = valueType;
this.identityComparisons = identityComparisons;
setTable(HashEntry.<K, V> newArray(initialCapacity));
}
@ -568,7 +287,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
}
private boolean keyEq(Object src, Object dest) {
return identityComparisons? src == dest : src.equals(dest);
return src == dest;
}
/**
@ -578,7 +297,6 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
void setTable(HashEntry<K, V>[] newTable) {
threshold = (int) (newTable.length * loadFactor);
table = newTable;
refQueue = new ReferenceQueue<Object>();
}
/**
@ -591,8 +309,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
HashEntry<K, V> newHashEntry(
K key, int hash, HashEntry<K, V> next, V value) {
return new HashEntry<K, V>(
key, hash, next, value, keyType, valueType, refQueue);
return new HashEntry<K, V>(key, hash, next, value);
}
/**
@ -604,7 +321,6 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
V readValueUnderLock(HashEntry<K, V> e) {
lock();
try {
removeStale();
return e.value();
} finally {
unlock();
@ -618,9 +334,9 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
HashEntry<K, V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && keyEq(key, e.key())) {
Object opaque = e.valueRef;
V opaque = e.value();
if (opaque != null) {
return e.dereferenceValue(opaque);
return opaque;
}
return readValueUnderLock(e); // recheck
@ -650,13 +366,13 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
int len = tab.length;
for (int i = 0; i < len; i ++) {
for (HashEntry<K, V> e = tab[i]; e != null; e = e.next) {
Object opaque = e.valueRef;
V opaque = e.value();
V v;
if (opaque == null) {
v = readValueUnderLock(e); // recheck
} else {
v = e.dereferenceValue(opaque);
v = opaque;
}
if (value.equals(v)) {
@ -671,7 +387,6 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
boolean replace(K key, int hash, V oldValue, V newValue) {
lock();
try {
removeStale();
HashEntry<K, V> e = getFirst(hash);
while (e != null && (e.hash != hash || !keyEq(key, e.key()))) {
e = e.next;
@ -680,7 +395,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
boolean replaced = false;
if (e != null && oldValue.equals(e.value())) {
replaced = true;
e.setValue(newValue, valueType, refQueue);
e.setValue(newValue);
}
return replaced;
} finally {
@ -691,7 +406,6 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
V replace(K key, int hash, V newValue) {
lock();
try {
removeStale();
HashEntry<K, V> e = getFirst(hash);
while (e != null && (e.hash != hash || !keyEq(key, e.key()))) {
e = e.next;
@ -700,7 +414,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
V oldValue = null;
if (e != null) {
oldValue = e.value();
e.setValue(newValue, valueType, refQueue);
e.setValue(newValue);
}
return oldValue;
} finally {
@ -711,7 +425,6 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();
try {
removeStale();
int c = count;
if (c ++ > threshold) { // ensure capacity
int reduced = rehash();
@ -732,7 +445,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
if (e != null) {
oldValue = e.value();
if (!onlyIfAbsent) {
e.setValue(value, valueType, refQueue);
e.setValue(value);
}
} else {
oldValue = null;
@ -819,16 +532,13 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
V remove(Object key, int hash, Object value, boolean refRemove) {
lock();
try {
if (!refRemove) {
removeStale();
}
int c = count - 1;
HashEntry<K, V>[] tab = table;
int index = hash & tab.length - 1;
HashEntry<K, V> first = tab[index];
HashEntry<K, V> e = first;
// a reference remove operation compares the Reference instance
while (e != null && key != e.keyRef &&
while (e != null && key != e.key &&
(refRemove || hash != e.hash || !keyEq(key, e.key()))) {
e = e.next;
}
@ -862,13 +572,6 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
}
}
final void removeStale() {
KeyReference ref;
while ((ref = (KeyReference) refQueue.poll()) != null) {
remove(ref.keyRef(), ref.keyHash(), null, true);
}
}
void clear() {
if (count != 0) {
lock();
@ -878,9 +581,6 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
tab[i] = null;
}
++ modCount;
// replace the reference queue to avoid unnecessary stale
// cleanups
refQueue = new ReferenceQueue<Object>();
count = 0; // write-volatile
} finally {
unlock();
@ -892,31 +592,24 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
/* ---------------- Public operations -------------- */
/**
* Creates a new, empty map with the specified initial capacity, reference
* types, load factor and concurrency level.
*
* Behavioral changing options such as {@link Option#IDENTITY_COMPARISONS}
* can also be specified.
* Creates a new, empty map with the specified initial capacity, load factor
* and concurrency level.
*
* @param initialCapacity the initial capacity. The implementation performs
* internal sizing to accommodate this many elements.
* @param loadFactor the load factor threshold, used to control resizing.
* Resizing may be performed when the average number of
* elements per bin exceeds this threshold.
* @param loadFactor the load factor threshold, used to control resizing.
* Resizing may be performed when the average number of
* elements per bin exceeds this threshold.
* @param concurrencyLevel the estimated number of concurrently updating
* threads. The implementation performs internal
* sizing to try to accommodate this many threads.
* @param keyType the reference type to use for keys
* @param valueType the reference type to use for values
* @param options the behavioral options
* @throws IllegalArgumentException if the initial capacity is negative or
* the load factor or concurrencyLevel are
* nonpositive.
*/
public ConcurrentReferenceHashMap(
public ConcurrentIdentityHashMap(
int initialCapacity, float loadFactor,
int concurrencyLevel, ReferenceType keyType,
ReferenceType valueType, EnumSet<Option> options) {
int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {
throw new IllegalArgumentException();
}
@ -948,36 +641,11 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
cap <<= 1;
}
identityComparisons =
options != null && options.contains(Option.IDENTITY_COMPARISONS);
for (int i = 0; i < this.segments.length; ++ i) {
this.segments[i] = new Segment<K, V>(
cap, loadFactor, keyType, valueType, identityComparisons);
this.segments[i] = new Segment<K, V>(cap, loadFactor);
}
}
/**
* Creates a new, empty map with the specified initial capacity, load factor
* and concurrency level.
*
* @param initialCapacity the initial capacity. The implementation performs
* internal sizing to accommodate this many elements.
* @param loadFactor the load factor threshold, used to control resizing.
* Resizing may be performed when the average number of
* elements per bin exceeds this threshold.
* @param concurrencyLevel the estimated number of concurrently updating
* threads. The implementation performs internal
* sizing to try to accommodate this many threads.
* @throws IllegalArgumentException if the initial capacity is negative or
* the load factor or concurrencyLevel are
* nonpositive.
*/
public ConcurrentReferenceHashMap(
int initialCapacity, float loadFactor, int concurrencyLevel) {
this(initialCapacity, loadFactor, concurrencyLevel,
DEFAULT_KEY_TYPE, DEFAULT_VALUE_TYPE, null);
}
/**
* Creates a new, empty map with the specified initial capacity and load
@ -993,27 +661,10 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
* negative or the load factor is
* nonpositive
*/
public ConcurrentReferenceHashMap(int initialCapacity, float loadFactor) {
public ConcurrentIdentityHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
/**
* Creates a new, empty map with the specified initial capacity, reference
* types and with default load factor (0.75) and concurrencyLevel (16).
*
* @param initialCapacity the initial capacity. The implementation performs
* internal sizing to accommodate this many elements.
* @param keyType the reference type to use for keys
* @param valueType the reference type to use for values
* @throws IllegalArgumentException if the initial capacity of elements is
* negative.
*/
public ConcurrentReferenceHashMap(
int initialCapacity, ReferenceType keyType, ReferenceType valueType) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL,
keyType, valueType, null);
}
/**
* Creates a new, empty map with the specified initial capacity, and with
* default reference types (weak keys, strong values), load factor (0.75)
@ -1024,7 +675,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
* @throws IllegalArgumentException if the initial capacity of elements is
* negative.
*/
public ConcurrentReferenceHashMap(int initialCapacity) {
public ConcurrentIdentityHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
@ -1033,7 +684,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
* types (weak keys, strong values), default load factor (0.75) and
* concurrencyLevel (16).
*/
public ConcurrentReferenceHashMap() {
public ConcurrentIdentityHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
@ -1045,7 +696,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
*
* @param m the map
*/
public ConcurrentReferenceHashMap(Map<? extends K, ? extends V> m) {
public ConcurrentIdentityHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR,
DEFAULT_CONCURRENCY_LEVEL);
@ -1375,23 +1026,6 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
}
}
/**
* Removes any stale entries whose keys have been finalized. Use of this
* method is normally not necessary since stale entries are automatically
* removed lazily, when blocking operations are required. However, there are
* some cases where this operation should be performed eagerly, such as
* cleaning up old references to a ClassLoader in a multi-classloader
* environment.
*
* Note: this method will acquire locks, one at a time, across all segments
* of this table, so if it is to be used, it should be used sparingly.
*/
public void purgeStaleEntries() {
for (int i = 0; i < segments.length; ++ i) {
segments[i].removeStale();
}
}
/**
* Returns a {@link Set} view of the keys contained in this map. The set is
* backed by the map, so changes to the map are reflected in the set, and
@ -1491,6 +1125,15 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
advance();
}
public void rewind() {
nextSegmentIndex = segments.length - 1;
nextTableIndex = -1;
currentTable = null;
nextEntry = null;
lastReturned = null;
advance();
}
public boolean hasMoreElements() {
return hasNext();
}
@ -1549,13 +1192,13 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
if (lastReturned == null) {
throw new IllegalStateException();
}
ConcurrentReferenceHashMap.this.remove(currentKey);
ConcurrentIdentityHashMap.this.remove(currentKey);
lastReturned = null;
}
}
final class KeyIterator
extends HashIterator implements Iterator<K>, Enumeration<K> {
extends HashIterator implements ReusableIterator<K>, Enumeration<K> {
public K next() {
return super.nextEntry().key();
@ -1567,7 +1210,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
}
final class ValueIterator
extends HashIterator implements Iterator<V>, Enumeration<V> {
extends HashIterator implements ReusableIterator<V>, Enumeration<V> {
public V next() {
return super.nextEntry().value();
@ -1665,14 +1308,14 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
throw new NullPointerException();
}
V v = super.setValue(value);
ConcurrentReferenceHashMap.this.put(getKey(), value);
ConcurrentIdentityHashMap.this.put(getKey(), value);
return v;
}
}
final class EntryIterator extends HashIterator implements
Iterator<Entry<K, V>> {
ReusableIterator<Entry<K, V>> {
public Map.Entry<K, V> next() {
HashEntry<K, V> e = super.nextEntry();
return new WriteThroughEntry(e.key(), e.value());
@ -1688,28 +1331,28 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
@Override
public int size() {
return ConcurrentReferenceHashMap.this.size();
return ConcurrentIdentityHashMap.this.size();
}
@Override
public boolean isEmpty() {
return ConcurrentReferenceHashMap.this.isEmpty();
return ConcurrentIdentityHashMap.this.isEmpty();
}
@Override
public boolean contains(Object o) {
return ConcurrentReferenceHashMap.this.containsKey(o);
return ConcurrentIdentityHashMap.this.containsKey(o);
}
@Override
public boolean remove(Object o) {
return ConcurrentReferenceHashMap.this.remove(o) != null;
return ConcurrentIdentityHashMap.this.remove(o) != null;
}
@Override
public void clear() {
ConcurrentReferenceHashMap.this.clear();
ConcurrentIdentityHashMap.this.clear();
}
}
@ -1721,22 +1364,22 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
@Override
public int size() {
return ConcurrentReferenceHashMap.this.size();
return ConcurrentIdentityHashMap.this.size();
}
@Override
public boolean isEmpty() {
return ConcurrentReferenceHashMap.this.isEmpty();
return ConcurrentIdentityHashMap.this.isEmpty();
}
@Override
public boolean contains(Object o) {
return ConcurrentReferenceHashMap.this.containsValue(o);
return ConcurrentIdentityHashMap.this.containsValue(o);
}
@Override
public void clear() {
ConcurrentReferenceHashMap.this.clear();
ConcurrentIdentityHashMap.this.clear();
}
}
@ -1752,7 +1395,7 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
return false;
}
Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
V v = ConcurrentReferenceHashMap.this.get(e.getKey());
V v = ConcurrentIdentityHashMap.this.get(e.getKey());
return v != null && v.equals(e.getValue());
}
@ -1762,85 +1405,22 @@ public class ConcurrentReferenceHashMap<K, V> extends AbstractMap<K, V>
return false;
}
Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
return ConcurrentReferenceHashMap.this.remove(e.getKey(), e.getValue());
return ConcurrentIdentityHashMap.this.remove(e.getKey(), e.getValue());
}
@Override
public int size() {
return ConcurrentReferenceHashMap.this.size();
return ConcurrentIdentityHashMap.this.size();
}
@Override
public boolean isEmpty() {
return ConcurrentReferenceHashMap.this.isEmpty();
return ConcurrentIdentityHashMap.this.isEmpty();
}
@Override
public void clear() {
ConcurrentReferenceHashMap.this.clear();
}
}
/* ---------------- Serialization Support -------------- */
/**
* Save the state of the <tt>ConcurrentReferenceHashMap</tt> instance to a
* stream (i.e., serialize it).
* @param s the stream
* @serialData the key (Object) and value (Object) for each key-value
* mapping, followed by a null pair. The key-value mappings
* are emitted in no particular order.
*/
private void writeObject(ObjectOutputStream s) throws IOException {
s.defaultWriteObject();
for (int k = 0; k < segments.length; ++ k) {
Segment<K, V> seg = segments[k];
seg.lock();
try {
HashEntry<K, V>[] tab = seg.table;
for (int i = 0; i < tab.length; ++ i) {
for (HashEntry<K, V> e = tab[i]; e != null; e = e.next) {
K key = e.key();
if (key == null) {
continue;
}
s.writeObject(key);
s.writeObject(e.value());
}
}
} finally {
seg.unlock();
}
}
s.writeObject(null);
s.writeObject(null);
}
/**
* Reconstitute the <tt>ConcurrentReferenceHashMap</tt> instance from a
* stream (i.e. deserialize it).
*
* @param s the stream
*/
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
// Initialize each segment to be minimally sized, and let grow.
for (int i = 0; i < segments.length; ++ i) {
segments[i].setTable(new HashEntry[1]);
}
// Read the keys and values, and put the mappings in the table
for (;;) {
K key = (K) s.readObject();
V value = (V) s.readObject();
if (key == null) {
break;
}
put(key, value);
ConcurrentIdentityHashMap.this.clear();
}
}
}

View File

@ -27,9 +27,6 @@
*/
package org.jboss.netty.util;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
@ -48,23 +45,20 @@ import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.netty.util.ConcurrentReferenceHashMap.KeyReference;
/**
* A {@link ConcurrentMap} with weak keys which is written based on
* {@link ConcurrentReferenceHashMap}.
* An alternative weak-key identity-comparing {@link ConcurrentMap} which is
* similar to {@link java.util.concurrent.ConcurrentHashMap}.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Doug Lea
* @author Jason T. Greene
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
* @version $Rev: 557 $, $Date: 2008-12-03 09:39:33 +0900 (Wed, 03 Dec 2008) $
*
* @param <K> the type of keys maintained by this map
* @param <V> the type of mapped values
*/
public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
public final class ConcurrentIdentityWeakHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> {
private static final long serialVersionUID = 1332231209347394089L;
@ -130,9 +124,9 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
*/
final Segment<K, V>[] segments;
transient Set<K> keySet;
transient Set<Map.Entry<K, V>> entrySet;
transient Collection<V> values;
Set<K> keySet;
Set<Map.Entry<K, V>> entrySet;
Collection<V> values;
/* ---------------- Small Utilities -------------- */
@ -165,7 +159,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
}
private int hashOf(Object key) {
return hash(key.hashCode());
return hash(System.identityHashCode(key));
}
/* ---------------- Inner Classes -------------- */
@ -229,7 +223,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
@SuppressWarnings("unchecked")
final V dereferenceValue(Object value) {
if (value instanceof KeyReference) {
if (value instanceof WeakKeyReference) {
return ((Reference<V>) value).get();
}
@ -292,7 +286,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
/**
* The number of elements in this segment's region.
*/
transient volatile int count;
volatile int count;
/**
* Number of updates that alter the size of the table. This is used
@ -301,18 +295,18 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
* checking containsValue, then we might have an inconsistent view of
* state so (usually) must retry.
*/
transient int modCount;
int modCount;
/**
* The table is rehashed when its size exceeds this threshold.
* (The value of this field is always <tt>(capacity * loadFactor)</tt>.)
*/
transient int threshold;
int threshold;
/**
* The per-segment table.
*/
transient volatile HashEntry<K, V>[] table;
volatile HashEntry<K, V>[] table;
/**
* The load factor for the hash table. Even though this value is same
@ -327,7 +321,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
* The collected weak-key reference queue for this segment. This should
* be (re)initialized whenever table is assigned,
*/
transient volatile ReferenceQueue<Object> refQueue;
volatile ReferenceQueue<Object> refQueue;
Segment(int initialCapacity, float lf) {
loadFactor = lf;
@ -340,7 +334,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
}
private boolean keyEq(Object src, Object dest) {
return src.equals(dest);
return src == dest;
}
/**
@ -634,9 +628,10 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
}
}
@SuppressWarnings("unchecked")
final void removeStale() {
KeyReference ref;
while ((ref = (KeyReference) refQueue.poll()) != null) {
WeakKeyReference ref;
while ((ref = (WeakKeyReference) refQueue.poll()) != null) {
remove(ref.keyRef(), ref.keyHash(), null, true);
}
}
@ -679,7 +674,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
* the load factor or concurrencyLevel are
* nonpositive.
*/
public ConcurrentWeakHashMap(
public ConcurrentIdentityWeakHashMap(
int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {
throw new IllegalArgumentException();
@ -731,7 +726,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
* negative or the load factor is
* nonpositive
*/
public ConcurrentWeakHashMap(int initialCapacity, float loadFactor) {
public ConcurrentIdentityWeakHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
@ -745,7 +740,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
* @throws IllegalArgumentException if the initial capacity of elements is
* negative.
*/
public ConcurrentWeakHashMap(int initialCapacity) {
public ConcurrentIdentityWeakHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
@ -754,7 +749,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
* types (weak keys, strong values), default load factor (0.75) and
* concurrencyLevel (16).
*/
public ConcurrentWeakHashMap() {
public ConcurrentIdentityWeakHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
@ -766,7 +761,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
*
* @param m the map
*/
public ConcurrentWeakHashMap(Map<? extends K, ? extends V> m) {
public ConcurrentIdentityWeakHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR,
DEFAULT_CONCURRENCY_LEVEL);
@ -1212,6 +1207,15 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
advance();
}
public void rewind() {
nextSegmentIndex = segments.length - 1;
nextTableIndex = -1;
currentTable = null;
nextEntry = null;
lastReturned = null;
advance();
}
public boolean hasMoreElements() {
return hasNext();
}
@ -1270,13 +1274,13 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
if (lastReturned == null) {
throw new IllegalStateException();
}
ConcurrentWeakHashMap.this.remove(currentKey);
ConcurrentIdentityWeakHashMap.this.remove(currentKey);
lastReturned = null;
}
}
final class KeyIterator
extends HashIterator implements Iterator<K>, Enumeration<K> {
extends HashIterator implements ReusableIterator<K>, Enumeration<K> {
public K next() {
return super.nextEntry().key();
@ -1288,7 +1292,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
}
final class ValueIterator
extends HashIterator implements Iterator<V>, Enumeration<V> {
extends HashIterator implements ReusableIterator<V>, Enumeration<V> {
public V next() {
return super.nextEntry().value();
@ -1386,14 +1390,14 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
throw new NullPointerException();
}
V v = super.setValue(value);
ConcurrentWeakHashMap.this.put(getKey(), value);
ConcurrentIdentityWeakHashMap.this.put(getKey(), value);
return v;
}
}
final class EntryIterator extends HashIterator implements
Iterator<Entry<K, V>> {
ReusableIterator<Entry<K, V>> {
public Map.Entry<K, V> next() {
HashEntry<K, V> e = super.nextEntry();
return new WriteThroughEntry(e.key(), e.value());
@ -1409,28 +1413,28 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
@Override
public int size() {
return ConcurrentWeakHashMap.this.size();
return ConcurrentIdentityWeakHashMap.this.size();
}
@Override
public boolean isEmpty() {
return ConcurrentWeakHashMap.this.isEmpty();
return ConcurrentIdentityWeakHashMap.this.isEmpty();
}
@Override
public boolean contains(Object o) {
return ConcurrentWeakHashMap.this.containsKey(o);
return ConcurrentIdentityWeakHashMap.this.containsKey(o);
}
@Override
public boolean remove(Object o) {
return ConcurrentWeakHashMap.this.remove(o) != null;
return ConcurrentIdentityWeakHashMap.this.remove(o) != null;
}
@Override
public void clear() {
ConcurrentWeakHashMap.this.clear();
ConcurrentIdentityWeakHashMap.this.clear();
}
}
@ -1442,22 +1446,22 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
@Override
public int size() {
return ConcurrentWeakHashMap.this.size();
return ConcurrentIdentityWeakHashMap.this.size();
}
@Override
public boolean isEmpty() {
return ConcurrentWeakHashMap.this.isEmpty();
return ConcurrentIdentityWeakHashMap.this.isEmpty();
}
@Override
public boolean contains(Object o) {
return ConcurrentWeakHashMap.this.containsValue(o);
return ConcurrentIdentityWeakHashMap.this.containsValue(o);
}
@Override
public void clear() {
ConcurrentWeakHashMap.this.clear();
ConcurrentIdentityWeakHashMap.this.clear();
}
}
@ -1473,7 +1477,7 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
return false;
}
Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
V v = ConcurrentWeakHashMap.this.get(e.getKey());
V v = ConcurrentIdentityWeakHashMap.this.get(e.getKey());
return v != null && v.equals(e.getValue());
}
@ -1483,85 +1487,22 @@ public class ConcurrentWeakHashMap<K, V> extends AbstractMap<K, V>
return false;
}
Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
return ConcurrentWeakHashMap.this.remove(e.getKey(), e.getValue());
return ConcurrentIdentityWeakHashMap.this.remove(e.getKey(), e.getValue());
}
@Override
public int size() {
return ConcurrentWeakHashMap.this.size();
return ConcurrentIdentityWeakHashMap.this.size();
}
@Override
public boolean isEmpty() {
return ConcurrentWeakHashMap.this.isEmpty();
return ConcurrentIdentityWeakHashMap.this.isEmpty();
}
@Override
public void clear() {
ConcurrentWeakHashMap.this.clear();
}
}
/* ---------------- Serialization Support -------------- */
/**
* Save the state of the <tt>ConcurrentReferenceHashMap</tt> instance to a
* stream (i.e., serialize it).
* @param s the stream
* @serialData the key (Object) and value (Object) for each key-value
* mapping, followed by a null pair. The key-value mappings
* are emitted in no particular order.
*/
private void writeObject(ObjectOutputStream s) throws IOException {
s.defaultWriteObject();
for (int k = 0; k < segments.length; ++ k) {
Segment<K, V> seg = segments[k];
seg.lock();
try {
HashEntry<K, V>[] tab = seg.table;
for (int i = 0; i < tab.length; ++ i) {
for (HashEntry<K, V> e = tab[i]; e != null; e = e.next) {
K key = e.key();
if (key == null) {
continue;
}
s.writeObject(key);
s.writeObject(e.value());
}
}
} finally {
seg.unlock();
}
}
s.writeObject(null);
s.writeObject(null);
}
/**
* Reconstitute the <tt>ConcurrentReferenceHashMap</tt> instance from a
* stream (i.e. deserialize it).
*
* @param s the stream
*/
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
// Initialize each segment to be minimally sized, and let grow.
for (int i = 0; i < segments.length; ++ i) {
segments[i].setTable(new HashEntry[1]);
}
// Read the keys and values, and put the mappings in the table
for (;;) {
K key = (K) s.readObject();
V value = (V) s.readObject();
if (key == null) {
break;
}
put(key, value);
ConcurrentIdentityWeakHashMap.this.clear();
}
}
}

View File

@ -38,6 +38,21 @@ import java.util.concurrent.TimeUnit;
*/
public class ExecutorUtil {
/**
* Returns {@code true} if and only if the specified {@code executor}
* is an {@link ExecutorService} and is shut down. Please note that this
* method returns {@code false} if the specified {@code executor} is not an
* {@link ExecutorService}.
*/
public static boolean isShutdown(Executor executor) {
if (executor instanceof ExecutorService) {
if (((ExecutorService) executor).isShutdown()) {
return true;
}
}
return false;
}
/**
* Shuts down the specified executors.
*/

View File

@ -0,0 +1,34 @@
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2009, Red Hat Middleware LLC, and individual contributors
* by the @author tags. See the COPYRIGHT.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.util;
import java.util.Iterator;
/**
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$
*/
public interface ReusableIterator<E> extends Iterator<E> {
void rewind();
}