diff --git a/src/main/java/org/jboss/netty/handler/timeout/HashedWheelTimer.java b/src/main/java/org/jboss/netty/handler/timeout/HashedWheelTimer.java index 50c1e1e9ba..ecf28dfad1 100644 --- a/src/main/java/org/jboss/netty/handler/timeout/HashedWheelTimer.java +++ b/src/main/java/org/jboss/netty/handler/timeout/HashedWheelTimer.java @@ -25,16 +25,15 @@ package org.jboss.netty.handler.timeout; import java.util.ArrayList; import java.util.List; import java.util.Set; -import java.util.concurrent.Executor; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ConcurrentIdentityHashMap; -import org.jboss.netty.util.ExecutorUtil; import org.jboss.netty.util.MapBackedSet; import org.jboss.netty.util.ReusableIterator; @@ -48,28 +47,28 @@ public class HashedWheelTimer implements Timer { static final InternalLogger logger = InternalLoggerFactory.getInstance(HashedWheelTimer.class); - final Executor executor; - final Worker worker = new Worker(); - final AtomicInteger activeTimeouts = new AtomicInteger(); + private final Worker worker = new Worker(); + private final Thread workerThread; + final AtomicBoolean shutdown = new AtomicBoolean(); + private final long roundDuration; final long tickDuration; - final long roundDuration; final Set[] wheel; final ReusableIterator[] iterators; final int mask; final ReadWriteLock lock = new ReentrantReadWriteLock(); volatile int wheelCursor; - public HashedWheelTimer(Executor executor) { - this(executor, 100, TimeUnit.MILLISECONDS, 512); // about 50 sec + public HashedWheelTimer(ThreadFactory threadFactory) { + this(threadFactory, 100, TimeUnit.MILLISECONDS, 512); // about 50 sec } public HashedWheelTimer( - Executor executor, + ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) { - if (executor == null) { - throw new NullPointerException("executor"); + if (threadFactory == null) { + throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); @@ -79,8 +78,6 @@ public class HashedWheelTimer implements Timer { "tickDuration must be greater than 0: " + tickDuration); } - this.executor = executor; - // Normalize ticksPerWheel to power of two and initialize the wheel. wheel = createWheel(ticksPerWheel); iterators = createIterators(wheel); @@ -98,6 +95,8 @@ public class HashedWheelTimer implements Timer { } roundDuration = tickDuration * wheel.length; + + workerThread = threadFactory.newThread(worker); } @SuppressWarnings("unchecked") @@ -114,7 +113,8 @@ public class HashedWheelTimer implements Timer { ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); Set[] wheel = new Set[ticksPerWheel]; for (int i = 0; i < wheel.length; i ++) { - wheel[i] = new MapBackedSet(new ConcurrentIdentityHashMap()); + wheel[i] = new MapBackedSet( + new ConcurrentIdentityHashMap(16, 0.95f, 4)); } return wheel; } @@ -136,8 +136,22 @@ public class HashedWheelTimer implements Timer { return normalizedTicksPerWheel; } - public void releaseExternalResources() { - ExecutorUtil.terminate(executor); + public void start() { + workerThread.start(); + } + + public void stop() { + if (!shutdown.compareAndSet(false, true)) { + return; + } + while (workerThread.isAlive()) { + workerThread.interrupt(); + try { + workerThread.join(100); + } catch (InterruptedException e) { + // Ignore + } + } } public Timeout newTimeout(TimerTask task, long initialDelay, TimeUnit unit) { @@ -151,6 +165,10 @@ public class HashedWheelTimer implements Timer { initialDelay = unit.toNanos(initialDelay); checkDelay(initialDelay); + if (!workerThread.isAlive()) { + start(); + } + // Add the timeout to the wheel. HashedWheelTimeout timeout; long currentTime = System.nanoTime(); @@ -160,7 +178,6 @@ public class HashedWheelTimer implements Timer { task, wheelCursor, currentTime, initialDelay); wheel[schedule(timeout)].add(timeout); - increaseActiveTimeouts(); } finally { lock.readLock().unlock(); } @@ -168,13 +185,6 @@ public class HashedWheelTimer implements Timer { return timeout; } - void increaseActiveTimeouts() { - // Start the worker if necessary. - if (activeTimeouts.getAndIncrement() == 0) { - executor.execute(worker); - } - } - private int schedule(HashedWheelTimeout timeout) { return schedule(timeout, timeout.initialDelay); } @@ -218,8 +228,6 @@ public class HashedWheelTimer implements Timer { private final class Worker implements Runnable { - private volatile long threadSafeStartTime; - private volatile long threadSafeTick; private long startTime; private long tick; @@ -227,31 +235,21 @@ public class HashedWheelTimer implements Timer { super(); } - public void run() { + public synchronized void run() { List expiredTimeouts = new ArrayList(); - startTime = threadSafeStartTime; - tick = threadSafeTick; - if (startTime == 0) { - startTime = System.nanoTime(); - tick = 1; - } + startTime = System.nanoTime(); + tick = 1; - try { - boolean continueTheLoop; - do { - startTime = waitForNextTick(); - continueTheLoop = fetchExpiredTimeouts(expiredTimeouts); - notifyExpiredTimeouts(expiredTimeouts); - } while (continueTheLoop && !ExecutorUtil.isShutdown(executor)); - } finally{ - threadSafeStartTime = startTime; - threadSafeTick = tick; + while (!shutdown.get()) { + waitForNextTick(); + fetchExpiredTimeouts(expiredTimeouts); + notifyExpiredTimeouts(expiredTimeouts); } } - private boolean fetchExpiredTimeouts( + private void fetchExpiredTimeouts( List expiredTimeouts) { // Find the expired timeouts and decrease the round counter @@ -266,23 +264,9 @@ public class HashedWheelTimer implements Timer { ReusableIterator i = iterators[oldBucketHead]; fetchExpiredTimeouts(expiredTimeouts, i); - - if (activeTimeouts.get() == 0) { - // Exit the loop - the worker will be executed again if - // there are more timeouts to expire. Please note that - // this block is protected by a write lock where all - // scheduling operations are protected by a read lock, - // which means they are mutually exclusive and there's - // no risk of race conditions (i.e. no stalled timeouts, - // no two running workers.) - return false; - } } finally { lock.writeLock().unlock(); } - - // Continue the loop. - return true; } private void fetchExpiredTimeouts( @@ -298,7 +282,6 @@ public class HashedWheelTimer implements Timer { if (timeout.deadline <= currentTime) { i.remove(); expiredTimeouts.add(timeout); - activeTimeouts.getAndDecrement(); } else { // A rare case where a timeout is put for the next // round: just wait for the next round. @@ -322,7 +305,7 @@ public class HashedWheelTimer implements Timer { expiredTimeouts.clear(); } - private long waitForNextTick() { + private void waitForNextTick() { for (;;) { final long currentTime = System.nanoTime(); final long sleepTime = tickDuration * tick - (currentTime - startTime); @@ -334,8 +317,8 @@ public class HashedWheelTimer implements Timer { try { Thread.sleep(sleepTime / 1000000, (int) (sleepTime % 1000000)); } catch (InterruptedException e) { - if (ExecutorUtil.isShutdown(executor) || isWheelEmpty()) { - return startTime; + if (shutdown.get()) { + return; } } } @@ -348,8 +331,6 @@ public class HashedWheelTimer implements Timer { // Increase the tick if overflow is not likely to happen. tick ++; } - - return startTime; } } @@ -388,13 +369,8 @@ public class HashedWheelTimer implements Timer { return; } - boolean removed; synchronized (this) { - removed = wheel[stopIndex].remove(this); - } - - if (removed) { - activeTimeouts.getAndDecrement(); + wheel[stopIndex].remove(this); } } @@ -419,10 +395,7 @@ public class HashedWheelTimer implements Timer { synchronized (this) { newStopIndex = stopIndex = schedule(this, additionalDelay); } - - if (wheel[newStopIndex].add(this)) { - increaseActiveTimeouts(); - } + wheel[newStopIndex].add(this); } finally { extensionCount ++; lock.readLock().unlock(); diff --git a/src/main/java/org/jboss/netty/handler/timeout/Timer.java b/src/main/java/org/jboss/netty/handler/timeout/Timer.java index 7023ccc29c..73f18107fc 100644 --- a/src/main/java/org/jboss/netty/handler/timeout/Timer.java +++ b/src/main/java/org/jboss/netty/handler/timeout/Timer.java @@ -24,13 +24,13 @@ package org.jboss.netty.handler.timeout; import java.util.concurrent.TimeUnit; -import org.jboss.netty.util.ExternalResourceReleasable; - /** * @author The Netty Project (netty-dev@lists.jboss.org) * @author Trustin Lee (tlee@redhat.com) * @version $Rev$, $Date$ */ -public interface Timer extends ExternalResourceReleasable { +public interface Timer { Timeout newTimeout(TimerTask task, long timeout, TimeUnit unit); + // XXX Should we make stop() return the list of unfinished Timeouts? + void stop(); }