* Changed HashedWheelTimer to accept ThreadFactory instead of Executor - this change simplifies the code significantly

* Added Timer.stop() method
* Timer does not implement ExternalResourceReleasable anymore
This commit is contained in:
Trustin Lee 2009-01-20 11:04:27 +00:00
parent 84c943573b
commit 78f8e5b0c9
2 changed files with 51 additions and 78 deletions

View File

@ -25,16 +25,15 @@ package org.jboss.netty.handler.timeout;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ConcurrentIdentityHashMap; import org.jboss.netty.util.ConcurrentIdentityHashMap;
import org.jboss.netty.util.ExecutorUtil;
import org.jboss.netty.util.MapBackedSet; import org.jboss.netty.util.MapBackedSet;
import org.jboss.netty.util.ReusableIterator; import org.jboss.netty.util.ReusableIterator;
@ -48,28 +47,28 @@ public class HashedWheelTimer implements Timer {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(HashedWheelTimer.class); InternalLoggerFactory.getInstance(HashedWheelTimer.class);
final Executor executor; private final Worker worker = new Worker();
final Worker worker = new Worker(); private final Thread workerThread;
final AtomicInteger activeTimeouts = new AtomicInteger(); final AtomicBoolean shutdown = new AtomicBoolean();
private final long roundDuration;
final long tickDuration; final long tickDuration;
final long roundDuration;
final Set<HashedWheelTimeout>[] wheel; final Set<HashedWheelTimeout>[] wheel;
final ReusableIterator<HashedWheelTimeout>[] iterators; final ReusableIterator<HashedWheelTimeout>[] iterators;
final int mask; final int mask;
final ReadWriteLock lock = new ReentrantReadWriteLock(); final ReadWriteLock lock = new ReentrantReadWriteLock();
volatile int wheelCursor; volatile int wheelCursor;
public HashedWheelTimer(Executor executor) { public HashedWheelTimer(ThreadFactory threadFactory) {
this(executor, 100, TimeUnit.MILLISECONDS, 512); // about 50 sec this(threadFactory, 100, TimeUnit.MILLISECONDS, 512); // about 50 sec
} }
public HashedWheelTimer( public HashedWheelTimer(
Executor executor, ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) { long tickDuration, TimeUnit unit, int ticksPerWheel) {
if (executor == null) { if (threadFactory == null) {
throw new NullPointerException("executor"); throw new NullPointerException("threadFactory");
} }
if (unit == null) { if (unit == null) {
throw new NullPointerException("unit"); throw new NullPointerException("unit");
@ -79,8 +78,6 @@ public class HashedWheelTimer implements Timer {
"tickDuration must be greater than 0: " + tickDuration); "tickDuration must be greater than 0: " + tickDuration);
} }
this.executor = executor;
// Normalize ticksPerWheel to power of two and initialize the wheel. // Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel); wheel = createWheel(ticksPerWheel);
iterators = createIterators(wheel); iterators = createIterators(wheel);
@ -98,6 +95,8 @@ public class HashedWheelTimer implements Timer {
} }
roundDuration = tickDuration * wheel.length; roundDuration = tickDuration * wheel.length;
workerThread = threadFactory.newThread(worker);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -114,7 +113,8 @@ public class HashedWheelTimer implements Timer {
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel]; Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
for (int i = 0; i < wheel.length; i ++) { for (int i = 0; i < wheel.length; i ++) {
wheel[i] = new MapBackedSet<HashedWheelTimeout>(new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>()); wheel[i] = new MapBackedSet<HashedWheelTimeout>(
new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
} }
return wheel; return wheel;
} }
@ -136,8 +136,22 @@ public class HashedWheelTimer implements Timer {
return normalizedTicksPerWheel; return normalizedTicksPerWheel;
} }
public void releaseExternalResources() { public void start() {
ExecutorUtil.terminate(executor); workerThread.start();
}
public void stop() {
if (!shutdown.compareAndSet(false, true)) {
return;
}
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException e) {
// Ignore
}
}
} }
public Timeout newTimeout(TimerTask task, long initialDelay, TimeUnit unit) { public Timeout newTimeout(TimerTask task, long initialDelay, TimeUnit unit) {
@ -151,6 +165,10 @@ public class HashedWheelTimer implements Timer {
initialDelay = unit.toNanos(initialDelay); initialDelay = unit.toNanos(initialDelay);
checkDelay(initialDelay); checkDelay(initialDelay);
if (!workerThread.isAlive()) {
start();
}
// Add the timeout to the wheel. // Add the timeout to the wheel.
HashedWheelTimeout timeout; HashedWheelTimeout timeout;
long currentTime = System.nanoTime(); long currentTime = System.nanoTime();
@ -160,7 +178,6 @@ public class HashedWheelTimer implements Timer {
task, wheelCursor, currentTime, initialDelay); task, wheelCursor, currentTime, initialDelay);
wheel[schedule(timeout)].add(timeout); wheel[schedule(timeout)].add(timeout);
increaseActiveTimeouts();
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
@ -168,13 +185,6 @@ public class HashedWheelTimer implements Timer {
return timeout; return timeout;
} }
void increaseActiveTimeouts() {
// Start the worker if necessary.
if (activeTimeouts.getAndIncrement() == 0) {
executor.execute(worker);
}
}
private int schedule(HashedWheelTimeout timeout) { private int schedule(HashedWheelTimeout timeout) {
return schedule(timeout, timeout.initialDelay); return schedule(timeout, timeout.initialDelay);
} }
@ -218,8 +228,6 @@ public class HashedWheelTimer implements Timer {
private final class Worker implements Runnable { private final class Worker implements Runnable {
private volatile long threadSafeStartTime;
private volatile long threadSafeTick;
private long startTime; private long startTime;
private long tick; private long tick;
@ -227,31 +235,21 @@ public class HashedWheelTimer implements Timer {
super(); super();
} }
public void run() { public synchronized void run() {
List<HashedWheelTimeout> expiredTimeouts = List<HashedWheelTimeout> expiredTimeouts =
new ArrayList<HashedWheelTimeout>(); new ArrayList<HashedWheelTimeout>();
startTime = threadSafeStartTime; startTime = System.nanoTime();
tick = threadSafeTick; tick = 1;
if (startTime == 0) {
startTime = System.nanoTime();
tick = 1;
}
try { while (!shutdown.get()) {
boolean continueTheLoop; waitForNextTick();
do { fetchExpiredTimeouts(expiredTimeouts);
startTime = waitForNextTick(); notifyExpiredTimeouts(expiredTimeouts);
continueTheLoop = fetchExpiredTimeouts(expiredTimeouts);
notifyExpiredTimeouts(expiredTimeouts);
} while (continueTheLoop && !ExecutorUtil.isShutdown(executor));
} finally{
threadSafeStartTime = startTime;
threadSafeTick = tick;
} }
} }
private boolean fetchExpiredTimeouts( private void fetchExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts) { List<HashedWheelTimeout> expiredTimeouts) {
// Find the expired timeouts and decrease the round counter // Find the expired timeouts and decrease the round counter
@ -266,23 +264,9 @@ public class HashedWheelTimer implements Timer {
ReusableIterator<HashedWheelTimeout> i = iterators[oldBucketHead]; ReusableIterator<HashedWheelTimeout> i = iterators[oldBucketHead];
fetchExpiredTimeouts(expiredTimeouts, i); fetchExpiredTimeouts(expiredTimeouts, i);
if (activeTimeouts.get() == 0) {
// Exit the loop - the worker will be executed again if
// there are more timeouts to expire. Please note that
// this block is protected by a write lock where all
// scheduling operations are protected by a read lock,
// which means they are mutually exclusive and there's
// no risk of race conditions (i.e. no stalled timeouts,
// no two running workers.)
return false;
}
} finally { } finally {
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
// Continue the loop.
return true;
} }
private void fetchExpiredTimeouts( private void fetchExpiredTimeouts(
@ -298,7 +282,6 @@ public class HashedWheelTimer implements Timer {
if (timeout.deadline <= currentTime) { if (timeout.deadline <= currentTime) {
i.remove(); i.remove();
expiredTimeouts.add(timeout); expiredTimeouts.add(timeout);
activeTimeouts.getAndDecrement();
} else { } else {
// A rare case where a timeout is put for the next // A rare case where a timeout is put for the next
// round: just wait for the next round. // round: just wait for the next round.
@ -322,7 +305,7 @@ public class HashedWheelTimer implements Timer {
expiredTimeouts.clear(); expiredTimeouts.clear();
} }
private long waitForNextTick() { private void waitForNextTick() {
for (;;) { for (;;) {
final long currentTime = System.nanoTime(); final long currentTime = System.nanoTime();
final long sleepTime = tickDuration * tick - (currentTime - startTime); final long sleepTime = tickDuration * tick - (currentTime - startTime);
@ -334,8 +317,8 @@ public class HashedWheelTimer implements Timer {
try { try {
Thread.sleep(sleepTime / 1000000, (int) (sleepTime % 1000000)); Thread.sleep(sleepTime / 1000000, (int) (sleepTime % 1000000));
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (ExecutorUtil.isShutdown(executor) || isWheelEmpty()) { if (shutdown.get()) {
return startTime; return;
} }
} }
} }
@ -348,8 +331,6 @@ public class HashedWheelTimer implements Timer {
// Increase the tick if overflow is not likely to happen. // Increase the tick if overflow is not likely to happen.
tick ++; tick ++;
} }
return startTime;
} }
} }
@ -388,13 +369,8 @@ public class HashedWheelTimer implements Timer {
return; return;
} }
boolean removed;
synchronized (this) { synchronized (this) {
removed = wheel[stopIndex].remove(this); wheel[stopIndex].remove(this);
}
if (removed) {
activeTimeouts.getAndDecrement();
} }
} }
@ -419,10 +395,7 @@ public class HashedWheelTimer implements Timer {
synchronized (this) { synchronized (this) {
newStopIndex = stopIndex = schedule(this, additionalDelay); newStopIndex = stopIndex = schedule(this, additionalDelay);
} }
wheel[newStopIndex].add(this);
if (wheel[newStopIndex].add(this)) {
increaseActiveTimeouts();
}
} finally { } finally {
extensionCount ++; extensionCount ++;
lock.readLock().unlock(); lock.readLock().unlock();

View File

@ -24,13 +24,13 @@ package org.jboss.netty.handler.timeout;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.jboss.netty.util.ExternalResourceReleasable;
/** /**
* @author The Netty Project (netty-dev@lists.jboss.org) * @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com) * @author Trustin Lee (tlee@redhat.com)
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
public interface Timer extends ExternalResourceReleasable { public interface Timer {
Timeout newTimeout(TimerTask task, long timeout, TimeUnit unit); Timeout newTimeout(TimerTask task, long timeout, TimeUnit unit);
// XXX Should we make stop() return the list of unfinished Timeouts?
void stop();
} }