Fix the problem where HashedWheelTimer puts a timeout into an incorrect place

- the stopIndex of a timeout is calculated based on the start time of the worker thread and the current tick count for greater accuracy
This commit is contained in:
Trustin Lee 2013-10-07 17:09:00 +09:00
parent 20a16ae8dc
commit e307979a0d

View File

@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -95,7 +96,9 @@ public class HashedWheelTimer implements Timer {
final Set<HashedWheelTimeout>[] wheel; final Set<HashedWheelTimeout>[] wheel;
final int mask; final int mask;
final ReadWriteLock lock = new ReentrantReadWriteLock(); final ReadWriteLock lock = new ReentrantReadWriteLock();
volatile int wheelCursor; final CountDownLatch startTimeInitialized = new CountDownLatch(1);
volatile long startTime;
volatile long tick;
/** /**
* Creates a new timer with the default thread factory * Creates a new timer with the default thread factory
@ -259,6 +262,15 @@ public class HashedWheelTimer implements Timer {
default: default:
throw new Error("Invalid WorkerState"); throw new Error("Invalid WorkerState");
} }
// Wait until the startTime is initialized by the worker.
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
} }
@Override @Override
@ -310,7 +322,7 @@ public class HashedWheelTimer implements Timer {
@Override @Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
final long currentTime = System.nanoTime(); start();
if (task == null) { if (task == null) {
throw new NullPointerException("task"); throw new NullPointerException("task");
@ -319,68 +331,50 @@ public class HashedWheelTimer implements Timer {
throw new NullPointerException("unit"); throw new NullPointerException("unit");
} }
start(); long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
long delayInNanos = unit.toNanos(delay);
HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delayInNanos);
scheduleTimeout(timeout, delayInNanos);
return timeout;
}
void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
// Prepare the required parameters to schedule the timeout object.
long relativeIndex = (delay + tickDuration - 1) / tickDuration;
// if the previous line had an overflow going on, then well just schedule this timeout
// one tick early; that shouldnt matter since were talking 270 years here
if (relativeIndex < 0) {
relativeIndex = delay / tickDuration;
}
if (relativeIndex == 0) {
relativeIndex = 1;
}
if ((relativeIndex & mask) == 0) {
relativeIndex--;
}
final long remainingRounds = relativeIndex / wheel.length;
// Add the timeout to the wheel. // Add the timeout to the wheel.
HashedWheelTimeout timeout;
lock.readLock().lock(); lock.readLock().lock();
try { try {
timeout = new HashedWheelTimeout(task, deadline);
if (workerState.get() == WORKER_STATE_SHUTDOWN) { if (workerState.get() == WORKER_STATE_SHUTDOWN) {
throw new IllegalStateException("Cannot enqueue after shutdown"); throw new IllegalStateException("Cannot enqueue after shutdown");
} }
final int stopIndex = (int) (wheelCursor + relativeIndex & mask); wheel[timeout.stopIndex].add(timeout);
timeout.stopIndex = stopIndex;
timeout.remainingRounds = remainingRounds;
wheel[stopIndex].add(timeout);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
} }
return timeout;
} }
private final class Worker implements Runnable { private final class Worker implements Runnable {
private long startTime;
private long tick;
Worker() { Worker() {
} }
@Override @Override
public void run() { public void run() {
List<HashedWheelTimeout> expiredTimeouts = // Initialize the startTime.
new ArrayList<HashedWheelTimeout>();
startTime = System.nanoTime(); startTime = System.nanoTime();
tick = 1; if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
while (workerState.get() == WORKER_STATE_STARTED) { // Notify the other threads waiting for the initialization at start().
startTimeInitialized.countDown();
List<HashedWheelTimeout> expiredTimeouts = new ArrayList<HashedWheelTimeout>();
do {
final long deadline = waitForNextTick(); final long deadline = waitForNextTick();
if (deadline > 0) { if (deadline > 0) {
fetchExpiredTimeouts(expiredTimeouts, deadline); fetchExpiredTimeouts(expiredTimeouts, deadline);
notifyExpiredTimeouts(expiredTimeouts); notifyExpiredTimeouts(expiredTimeouts);
} }
} } while (workerState.get() == WORKER_STATE_STARTED);
} }
private void fetchExpiredTimeouts( private void fetchExpiredTimeouts(
@ -392,9 +386,12 @@ public class HashedWheelTimer implements Timer {
// an exclusive lock. // an exclusive lock.
lock.writeLock().lock(); lock.writeLock().lock();
try { try {
int newWheelCursor = wheelCursor = wheelCursor + 1 & mask; fetchExpiredTimeouts(expiredTimeouts, wheel[(int) (tick & mask)].iterator(), deadline);
fetchExpiredTimeouts(expiredTimeouts, wheel[newWheelCursor].iterator(), deadline);
} finally { } finally {
// Note that the tick is updated only while the writer lock is held,
// so that newTimeout() and consequently new HashedWheelTimeout() never see an old value
// while the reader lock is held.
tick ++;
lock.writeLock().unlock(); lock.writeLock().unlock();
} }
} }
@ -403,7 +400,6 @@ public class HashedWheelTimer implements Timer {
List<HashedWheelTimeout> expiredTimeouts, List<HashedWheelTimeout> expiredTimeouts,
Iterator<HashedWheelTimeout> i, long deadline) { Iterator<HashedWheelTimeout> i, long deadline) {
List<HashedWheelTimeout> slipped = null;
while (i.hasNext()) { while (i.hasNext()) {
HashedWheelTimeout timeout = i.next(); HashedWheelTimeout timeout = i.next();
if (timeout.remainingRounds <= 0) { if (timeout.remainingRounds <= 0) {
@ -411,26 +407,14 @@ public class HashedWheelTimer implements Timer {
if (timeout.deadline <= deadline) { if (timeout.deadline <= deadline) {
expiredTimeouts.add(timeout); expiredTimeouts.add(timeout);
} else { } else {
// Handle the case where the timeout is put into a wrong // The timeout was placed into a wrong slot. This should never happen.
// place, usually one tick earlier. For now, just add throw new Error(String.format(
// it to a temporary list - we will reschedule it in a "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
// separate loop.
if (slipped == null) {
slipped = new ArrayList<HashedWheelTimeout>();
}
slipped.add(timeout);
} }
} else { } else {
timeout.remainingRounds --; timeout.remainingRounds --;
} }
} }
// Reschedule the slipped timeouts.
if (slipped != null) {
for (HashedWheelTimeout timeout: slipped) {
scheduleTimeout(timeout, timeout.deadline - deadline);
}
}
} }
private void notifyExpiredTimeouts( private void notifyExpiredTimeouts(
@ -451,14 +435,13 @@ public class HashedWheelTimer implements Timer {
* current time otherwise (with Long.MIN_VALUE changed by +1) * current time otherwise (with Long.MIN_VALUE changed by +1)
*/ */
private long waitForNextTick() { private long waitForNextTick() {
long deadline = startTime + tickDuration * tick; long deadline = tickDuration * (tick + 1);
for (;;) { for (;;) {
final long currentTime = System.nanoTime(); final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) { if (sleepTimeMs <= 0) {
tick += 1;
if (currentTime == Long.MIN_VALUE) { if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE; return -Long.MAX_VALUE;
} else { } else {
@ -494,13 +477,17 @@ public class HashedWheelTimer implements Timer {
private final TimerTask task; private final TimerTask task;
final long deadline; final long deadline;
volatile int stopIndex; final int stopIndex;
volatile long remainingRounds; volatile long remainingRounds;
private final AtomicInteger state = new AtomicInteger(ST_INIT); private final AtomicInteger state = new AtomicInteger(ST_INIT);
HashedWheelTimeout(TimerTask task, long deadline) { HashedWheelTimeout(TimerTask task, long deadline) {
this.task = task; this.task = task;
this.deadline = deadline; this.deadline = deadline;
final long ticks = Math.max(deadline / tickDuration, tick); // Ensure we don't schedule for past.
stopIndex = (int) (ticks & mask);
remainingRounds = ticks / wheel.length;
} }
@Override @Override
@ -550,7 +537,7 @@ public class HashedWheelTimer implements Timer {
@Override @Override
public String toString() { public String toString() {
final long currentTime = System.nanoTime(); final long currentTime = System.nanoTime();
long remaining = deadline - currentTime; long remaining = deadline - currentTime + startTime;
StringBuilder buf = new StringBuilder(192); StringBuilder buf = new StringBuilder(192);
buf.append(getClass().getSimpleName()); buf.append(getClass().getSimpleName());
@ -559,18 +546,21 @@ public class HashedWheelTimer implements Timer {
buf.append("deadline: "); buf.append("deadline: ");
if (remaining > 0) { if (remaining > 0) {
buf.append(remaining); buf.append(remaining);
buf.append(" ms later, "); buf.append(" ns later");
} else if (remaining < 0) { } else if (remaining < 0) {
buf.append(-remaining); buf.append(-remaining);
buf.append(" ms ago, "); buf.append(" ns ago");
} else { } else {
buf.append("now, "); buf.append("now");
} }
if (isCancelled()) { if (isCancelled()) {
buf.append(", cancelled"); buf.append(", cancelled");
} }
buf.append(", task: ");
buf.append(task());
return buf.append(')').toString(); return buf.append(')').toString();
} }
} }