port fix from Akka for HashedWheelTimer

Ported from commits:

* cb4e3536b0
* 7e590f3071
This commit is contained in:
Prajwal Tuladhar 2013-03-07 02:29:48 -05:00 committed by Trustin Lee
parent 61d6c48365
commit e66fc219ff

View File

@ -317,28 +317,24 @@ public class HashedWheelTimer implements Timer {
} }
void scheduleTimeout(HashedWheelTimeout timeout, long delay) { void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
// delay must be equal to or greater than tickDuration so that the long relativeIndex = (delay + tickDuration - 1) / tickDuration;
// worker thread never misses the timeout.
if (delay < tickDuration) { // if the previous line had an overflow going on, then well just schedule this timeout
delay = tickDuration; // one tick early; that shouldnt matter since were talking 270 years here
if (relativeIndex < 0) {
relativeIndex = delay / tickDuration;
} else if (relativeIndex == 0) {
relativeIndex = 1;
} }
// Prepare the required parameters to schedule the timeout object. final long remainingRounds = relativeIndex / wheel.length;
final long lastRoundDelay = delay % roundDuration;
final long lastTickDelay = delay % tickDuration;
final long relativeIndex =
lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
final long remainingRounds =
delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
// Add the timeout to the wheel. // Add the timeout to the wheel.
lock.readLock().lock(); lock.readLock().lock();
try { try {
int stopIndex = (int) (wheelCursor + relativeIndex & mask); int stopIndex = (int) ((wheelCursor + relativeIndex) & mask);
timeout.stopIndex = stopIndex; timeout.stopIndex = stopIndex;
timeout.remainingRounds = remainingRounds; timeout.remainingRounds = remainingRounds;
wheel[stopIndex].add(timeout); wheel[stopIndex].add(timeout);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
@ -363,7 +359,7 @@ public class HashedWheelTimer implements Timer {
while (workerState.get() == 1) { while (workerState.get() == 1) {
final long deadline = waitForNextTick(); final long deadline = waitForNextTick();
if (deadline > 0) { if (deadline > Long.MIN_VALUE) {
fetchExpiredTimeouts(expiredTimeouts, deadline); fetchExpiredTimeouts(expiredTimeouts, deadline);
notifyExpiredTimeouts(expiredTimeouts); notifyExpiredTimeouts(expiredTimeouts);
} }
@ -395,7 +391,7 @@ public class HashedWheelTimer implements Timer {
HashedWheelTimeout timeout = i.next(); HashedWheelTimeout timeout = i.next();
if (timeout.remainingRounds <= 0) { if (timeout.remainingRounds <= 0) {
i.remove(); i.remove();
if (timeout.deadline <= deadline) { if ((timeout.deadline - deadline) <= 0) {
expiredTimeouts.add(timeout); expiredTimeouts.add(timeout);
} else { } else {
// Handle the case where the timeout is put into a wrong // Handle the case where the timeout is put into a wrong
@ -431,12 +427,26 @@ public class HashedWheelTimer implements Timer {
expiredTimeouts.clear(); expiredTimeouts.clear();
} }
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
* @return Long.MIN_VALUE if received a shutdown request, current time otherwise
*/
private long waitForNextTick() { private long waitForNextTick() {
long deadline = startTime + tickDuration * tick; final long deadline = startTime + tickDuration * tick;
for (;;) { for (;;) {
final long currentTime = System.currentTimeMillis(); final long currentTime = System.nanoTime();
long sleepTime = tickDuration * tick - (currentTime - startTime); long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
tick++;
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// Check if we run on windows, as if thats the case we will need // Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect // to round the sleepTime as workaround for a bug that only affect
@ -444,25 +454,17 @@ public class HashedWheelTimer implements Timer {
// //
// See https://github.com/netty/netty/issues/356 // See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) { if (PlatformDependent.isWindows()) {
sleepTime = sleepTime / 10 * 10; sleepTimeMs = (sleepTimeMs / 10) * 10;
}
if (sleepTime <= 0) {
break;
} }
try { try {
Thread.sleep(sleepTime); Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (workerState.get() != 1) { if (workerState.get() != 1) {
return -1; return Long.MIN_VALUE;
} }
} }
} }
// Increase the tick.
tick ++;
return deadline;
} }
} }