From cb13e78342b5a9aea7ba70373930dfd6c8b44253 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 27 Mar 2013 11:51:43 +0100 Subject: [PATCH] [#744] Port fixes from Akka to HashedWheelTimer port fix from Akka with following commits: * https://github.com/akka/akka/commit/cb4e3536b0ed3483bd3636d7789c0ddcadafa2da * https://github.com/akka/akka/commit/7e590f3071bdf89a4aa9d7d262bac8923d85e754 And also use constants for worker state for time instead of numeric. --- .../jboss/netty/util/HashedWheelTimer.java | 86 +++++++++++-------- 1 file changed, 50 insertions(+), 36 deletions(-) diff --git a/src/main/java/org/jboss/netty/util/HashedWheelTimer.java b/src/main/java/org/jboss/netty/util/HashedWheelTimer.java index ecd2fd6da8..b8f8eeef64 100644 --- a/src/main/java/org/jboss/netty/util/HashedWheelTimer.java +++ b/src/main/java/org/jboss/netty/util/HashedWheelTimer.java @@ -89,9 +89,12 @@ public class HashedWheelTimer implements Timer { private final Worker worker = new Worker(); final Thread workerThread; + + public static final int WORKER_STATE_INIT = 0; + public static final int WORKER_STATE_STARTED = 1; + public static final int WORKER_STATE_SHUTDOWN = 2; final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down - private final long roundDuration; final long tickDuration; final Set[] wheel; final ReusableIterator[] iterators; @@ -221,8 +224,6 @@ public class HashedWheelTimer implements Timer { tickDuration + ' ' + unit); } - roundDuration = tickDuration * wheel.length; - workerThread = threadFactory.newThread(new ThreadRenamingRunnable( worker, "Hashed wheel timer #" + id.incrementAndGet(), determiner)); @@ -277,17 +278,17 @@ public class HashedWheelTimer implements Timer { */ public void start() { switch (workerState.get()) { - case 0: - if (workerState.compareAndSet(0, 1)) { + case WORKER_STATE_INIT: + if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; - case 1: + case WORKER_STATE_STARTED: break; - case 2: + case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: - throw new Error(); + throw new Error("Invalid WorkerState"); } } @@ -299,8 +300,9 @@ public class HashedWheelTimer implements Timer { TimerTask.class.getSimpleName()); } - if (workerState.getAndSet(2) != 1) { - // workerState wasn't 1, so return an empty set + if (!workerState.compareAndSet(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { + // workerState can be 0 or 2 at this moment - let it always be 2. + workerState.set(WORKER_STATE_SHUTDOWN); return Collections.emptySet(); } @@ -348,25 +350,30 @@ public class HashedWheelTimer implements Timer { } void scheduleTimeout(HashedWheelTimeout timeout, long delay) { - // delay must be equal to or greater than tickDuration so that the - // worker thread never misses the timeout. - if (delay < tickDuration) { - delay = tickDuration; - } // Prepare the required parameters to schedule the timeout object. - final long lastRoundDelay = delay % roundDuration; - final long lastTickDelay = delay % tickDuration; - final long relativeIndex = - lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0); + long relativeIndex = (delay + tickDuration - 1) / tickDuration; - final long remainingRounds = - delay / roundDuration - (delay % roundDuration == 0? 1 : 0); + // if the previous line had an overflow going on, then we’ll just schedule this timeout + // one tick early; that shouldn’t matter since we’re talking 270 years here + if (relativeIndex < 0) { + relativeIndex = delay / tickDuration; + } + if (relativeIndex == 0) { + relativeIndex = 1; + } + if ((relativeIndex & mask) == 0) { + relativeIndex--; + } + final long remainingRounds = relativeIndex / wheel.length; // Add the timeout to the wheel. lock.readLock().lock(); try { - int stopIndex = (int) (wheelCursor + relativeIndex & mask); + if (workerState.get() == WORKER_STATE_SHUTDOWN) { + throw new IllegalStateException("Cannot enqueue after shutdown"); + } + final int stopIndex = (int) ((wheelCursor + relativeIndex) & mask); timeout.stopIndex = stopIndex; timeout.remainingRounds = remainingRounds; @@ -391,7 +398,7 @@ public class HashedWheelTimer implements Timer { startTime = System.currentTimeMillis(); tick = 1; - while (workerState.get() == 1) { + while (workerState.get() == WORKER_STATE_STARTED) { final long deadline = waitForNextTick(); if (deadline > 0) { fetchExpiredTimeouts(expiredTimeouts, deadline); @@ -463,12 +470,27 @@ public class HashedWheelTimer implements Timer { expiredTimeouts.clear(); } + /** + * calculate goal nanoTime from startTime and current tick number, + * then wait until that goal has been reached. + * @return Long.MIN_VALUE if received a shutdown request, + * current time otherwise (with Long.MIN_VALUE changed by +1) + */ private long waitForNextTick() { long deadline = startTime + tickDuration * tick; for (;;) { final long currentTime = System.currentTimeMillis(); - long sleepTime = tickDuration * tick - (currentTime - startTime); + long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; + + if (sleepTimeMs <= 0) { + tick += 1; + if (currentTime == Long.MIN_VALUE) { + return -Long.MAX_VALUE; + } else { + return currentTime; + } + } // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect @@ -476,24 +498,16 @@ public class HashedWheelTimer implements Timer { // // See https://github.com/netty/netty/issues/356 if (DetectionUtil.isWindows()) { - sleepTime = sleepTime / 10 * 10; - } - - if (sleepTime <= 0) { - break; + sleepTimeMs = sleepTimeMs / 10 * 10; } try { - Thread.sleep(sleepTime); + Thread.sleep(sleepTimeMs); } catch (InterruptedException e) { - if (workerState.get() != 1) { - return -1; + if (workerState.get() == WORKER_STATE_SHUTDOWN) { + return Long.MIN_VALUE; } } } - - // Increase the tick. - tick ++; - return deadline; } }