From e307979a0d7318c2d3d0e6723f4beb458a4dd7a0 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Mon, 7 Oct 2013 17:09:00 +0900 Subject: [PATCH] Fix the problem where HashedWheelTimer puts a timeout into an incorrect place - the stopIndex of a timeout is calculated based on the start time of the worker thread and the current tick count for greater accuracy --- .../java/io/netty/util/HashedWheelTimer.java | 118 ++++++++---------- 1 file changed, 54 insertions(+), 64 deletions(-) diff --git a/common/src/main/java/io/netty/util/HashedWheelTimer.java b/common/src/main/java/io/netty/util/HashedWheelTimer.java index 0c0de97092..4a39914df8 100644 --- a/common/src/main/java/io/netty/util/HashedWheelTimer.java +++ b/common/src/main/java/io/netty/util/HashedWheelTimer.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @@ -95,7 +96,9 @@ public class HashedWheelTimer implements Timer { final Set[] wheel; final int mask; final ReadWriteLock lock = new ReentrantReadWriteLock(); - volatile int wheelCursor; + final CountDownLatch startTimeInitialized = new CountDownLatch(1); + volatile long startTime; + volatile long tick; /** * Creates a new timer with the default thread factory @@ -259,6 +262,15 @@ public class HashedWheelTimer implements Timer { default: throw new Error("Invalid WorkerState"); } + + // Wait until the startTime is initialized by the worker. + while (startTime == 0) { + try { + startTimeInitialized.await(); + } catch (InterruptedException ignore) { + // Ignore - it will be ready very soon. + } + } } @Override @@ -310,7 +322,7 @@ public class HashedWheelTimer implements Timer { @Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { - final long currentTime = System.nanoTime(); + start(); if (task == null) { throw new NullPointerException("task"); @@ -319,68 +331,50 @@ public class HashedWheelTimer implements Timer { throw new NullPointerException("unit"); } - start(); - - long delayInNanos = unit.toNanos(delay); - HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delayInNanos); - scheduleTimeout(timeout, delayInNanos); - return timeout; - } - - void scheduleTimeout(HashedWheelTimeout timeout, long delay) { - // Prepare the required parameters to schedule the timeout object. - long relativeIndex = (delay + tickDuration - 1) / tickDuration; - // if the previous line had an overflow going on, then we’ll just schedule this timeout - // one tick early; that shouldn’t matter since we’re talking 270 years here - if (relativeIndex < 0) { - relativeIndex = delay / tickDuration; - } - if (relativeIndex == 0) { - relativeIndex = 1; - } - if ((relativeIndex & mask) == 0) { - relativeIndex--; - } - final long remainingRounds = relativeIndex / wheel.length; + long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Add the timeout to the wheel. + HashedWheelTimeout timeout; lock.readLock().lock(); try { + timeout = new HashedWheelTimeout(task, deadline); if (workerState.get() == WORKER_STATE_SHUTDOWN) { throw new IllegalStateException("Cannot enqueue after shutdown"); } - final int stopIndex = (int) (wheelCursor + relativeIndex & mask); - timeout.stopIndex = stopIndex; - timeout.remainingRounds = remainingRounds; - wheel[stopIndex].add(timeout); + wheel[timeout.stopIndex].add(timeout); } finally { lock.readLock().unlock(); } + + return timeout; } private final class Worker implements Runnable { - private long startTime; - private long tick; - Worker() { } @Override public void run() { - List expiredTimeouts = - new ArrayList(); - + // Initialize the startTime. startTime = System.nanoTime(); - tick = 1; + if (startTime == 0) { + // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. + startTime = 1; + } - while (workerState.get() == WORKER_STATE_STARTED) { + // Notify the other threads waiting for the initialization at start(). + startTimeInitialized.countDown(); + + List expiredTimeouts = new ArrayList(); + + do { final long deadline = waitForNextTick(); if (deadline > 0) { fetchExpiredTimeouts(expiredTimeouts, deadline); notifyExpiredTimeouts(expiredTimeouts); } - } + } while (workerState.get() == WORKER_STATE_STARTED); } private void fetchExpiredTimeouts( @@ -392,9 +386,12 @@ public class HashedWheelTimer implements Timer { // an exclusive lock. lock.writeLock().lock(); try { - int newWheelCursor = wheelCursor = wheelCursor + 1 & mask; - fetchExpiredTimeouts(expiredTimeouts, wheel[newWheelCursor].iterator(), deadline); + fetchExpiredTimeouts(expiredTimeouts, wheel[(int) (tick & mask)].iterator(), deadline); } finally { + // Note that the tick is updated only while the writer lock is held, + // so that newTimeout() and consequently new HashedWheelTimeout() never see an old value + // while the reader lock is held. + tick ++; lock.writeLock().unlock(); } } @@ -403,7 +400,6 @@ public class HashedWheelTimer implements Timer { List expiredTimeouts, Iterator i, long deadline) { - List slipped = null; while (i.hasNext()) { HashedWheelTimeout timeout = i.next(); if (timeout.remainingRounds <= 0) { @@ -411,26 +407,14 @@ public class HashedWheelTimer implements Timer { if (timeout.deadline <= deadline) { expiredTimeouts.add(timeout); } else { - // Handle the case where the timeout is put into a wrong - // place, usually one tick earlier. For now, just add - // it to a temporary list - we will reschedule it in a - // separate loop. - if (slipped == null) { - slipped = new ArrayList(); - } - slipped.add(timeout); + // The timeout was placed into a wrong slot. This should never happen. + throw new Error(String.format( + "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else { timeout.remainingRounds --; } } - - // Reschedule the slipped timeouts. - if (slipped != null) { - for (HashedWheelTimeout timeout: slipped) { - scheduleTimeout(timeout, timeout.deadline - deadline); - } - } } private void notifyExpiredTimeouts( @@ -451,14 +435,13 @@ public class HashedWheelTimer implements Timer { * current time otherwise (with Long.MIN_VALUE changed by +1) */ private long waitForNextTick() { - long deadline = startTime + tickDuration * tick; + long deadline = tickDuration * (tick + 1); for (;;) { - final long currentTime = System.nanoTime(); + final long currentTime = System.nanoTime() - startTime; long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; if (sleepTimeMs <= 0) { - tick += 1; if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { @@ -494,13 +477,17 @@ public class HashedWheelTimer implements Timer { private final TimerTask task; final long deadline; - volatile int stopIndex; + final int stopIndex; volatile long remainingRounds; private final AtomicInteger state = new AtomicInteger(ST_INIT); HashedWheelTimeout(TimerTask task, long deadline) { this.task = task; this.deadline = deadline; + + final long ticks = Math.max(deadline / tickDuration, tick); // Ensure we don't schedule for past. + stopIndex = (int) (ticks & mask); + remainingRounds = ticks / wheel.length; } @Override @@ -550,7 +537,7 @@ public class HashedWheelTimer implements Timer { @Override public String toString() { final long currentTime = System.nanoTime(); - long remaining = deadline - currentTime; + long remaining = deadline - currentTime + startTime; StringBuilder buf = new StringBuilder(192); buf.append(getClass().getSimpleName()); @@ -559,18 +546,21 @@ public class HashedWheelTimer implements Timer { buf.append("deadline: "); if (remaining > 0) { buf.append(remaining); - buf.append(" ms later, "); + buf.append(" ns later"); } else if (remaining < 0) { buf.append(-remaining); - buf.append(" ms ago, "); + buf.append(" ns ago"); } else { - buf.append("now, "); + buf.append("now"); } if (isCancelled()) { buf.append(", cancelled"); } + buf.append(", task: "); + buf.append(task()); + return buf.append(')').toString(); } }