[#744] Port fixes from Akka to HashedWheelTimer
port fix from Akka with following commits: *cb4e3536b0
*7e590f3071
And also use constants for worker state for time instead of numeric.
This commit is contained in:
parent
982c3ee9ee
commit
cb13e78342
@ -89,9 +89,12 @@ public class HashedWheelTimer implements Timer {
|
||||
|
||||
private final Worker worker = new Worker();
|
||||
final Thread workerThread;
|
||||
|
||||
public static final int WORKER_STATE_INIT = 0;
|
||||
public static final int WORKER_STATE_STARTED = 1;
|
||||
public static final int WORKER_STATE_SHUTDOWN = 2;
|
||||
final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down
|
||||
|
||||
private final long roundDuration;
|
||||
final long tickDuration;
|
||||
final Set<HashedWheelTimeout>[] wheel;
|
||||
final ReusableIterator<HashedWheelTimeout>[] iterators;
|
||||
@ -221,8 +224,6 @@ public class HashedWheelTimer implements Timer {
|
||||
tickDuration + ' ' + unit);
|
||||
}
|
||||
|
||||
roundDuration = tickDuration * wheel.length;
|
||||
|
||||
workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
|
||||
worker, "Hashed wheel timer #" + id.incrementAndGet(),
|
||||
determiner));
|
||||
@ -277,17 +278,17 @@ public class HashedWheelTimer implements Timer {
|
||||
*/
|
||||
public void start() {
|
||||
switch (workerState.get()) {
|
||||
case 0:
|
||||
if (workerState.compareAndSet(0, 1)) {
|
||||
case WORKER_STATE_INIT:
|
||||
if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
|
||||
workerThread.start();
|
||||
}
|
||||
break;
|
||||
case 1:
|
||||
case WORKER_STATE_STARTED:
|
||||
break;
|
||||
case 2:
|
||||
case WORKER_STATE_SHUTDOWN:
|
||||
throw new IllegalStateException("cannot be started once stopped");
|
||||
default:
|
||||
throw new Error();
|
||||
throw new Error("Invalid WorkerState");
|
||||
}
|
||||
}
|
||||
|
||||
@ -299,8 +300,9 @@ public class HashedWheelTimer implements Timer {
|
||||
TimerTask.class.getSimpleName());
|
||||
}
|
||||
|
||||
if (workerState.getAndSet(2) != 1) {
|
||||
// workerState wasn't 1, so return an empty set
|
||||
if (!workerState.compareAndSet(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
|
||||
// workerState can be 0 or 2 at this moment - let it always be 2.
|
||||
workerState.set(WORKER_STATE_SHUTDOWN);
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
@ -348,25 +350,30 @@ public class HashedWheelTimer implements Timer {
|
||||
}
|
||||
|
||||
void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
|
||||
// delay must be equal to or greater than tickDuration so that the
|
||||
// worker thread never misses the timeout.
|
||||
if (delay < tickDuration) {
|
||||
delay = tickDuration;
|
||||
}
|
||||
|
||||
// Prepare the required parameters to schedule the timeout object.
|
||||
final long lastRoundDelay = delay % roundDuration;
|
||||
final long lastTickDelay = delay % tickDuration;
|
||||
final long relativeIndex =
|
||||
lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
|
||||
long relativeIndex = (delay + tickDuration - 1) / tickDuration;
|
||||
|
||||
final long remainingRounds =
|
||||
delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
|
||||
// if the previous line had an overflow going on, then we’ll just schedule this timeout
|
||||
// one tick early; that shouldn’t matter since we’re talking 270 years here
|
||||
if (relativeIndex < 0) {
|
||||
relativeIndex = delay / tickDuration;
|
||||
}
|
||||
if (relativeIndex == 0) {
|
||||
relativeIndex = 1;
|
||||
}
|
||||
if ((relativeIndex & mask) == 0) {
|
||||
relativeIndex--;
|
||||
}
|
||||
final long remainingRounds = relativeIndex / wheel.length;
|
||||
|
||||
// Add the timeout to the wheel.
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
int stopIndex = (int) (wheelCursor + relativeIndex & mask);
|
||||
if (workerState.get() == WORKER_STATE_SHUTDOWN) {
|
||||
throw new IllegalStateException("Cannot enqueue after shutdown");
|
||||
}
|
||||
final int stopIndex = (int) ((wheelCursor + relativeIndex) & mask);
|
||||
timeout.stopIndex = stopIndex;
|
||||
timeout.remainingRounds = remainingRounds;
|
||||
|
||||
@ -391,7 +398,7 @@ public class HashedWheelTimer implements Timer {
|
||||
startTime = System.currentTimeMillis();
|
||||
tick = 1;
|
||||
|
||||
while (workerState.get() == 1) {
|
||||
while (workerState.get() == WORKER_STATE_STARTED) {
|
||||
final long deadline = waitForNextTick();
|
||||
if (deadline > 0) {
|
||||
fetchExpiredTimeouts(expiredTimeouts, deadline);
|
||||
@ -463,12 +470,27 @@ public class HashedWheelTimer implements Timer {
|
||||
expiredTimeouts.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* calculate goal nanoTime from startTime and current tick number,
|
||||
* then wait until that goal has been reached.
|
||||
* @return Long.MIN_VALUE if received a shutdown request,
|
||||
* current time otherwise (with Long.MIN_VALUE changed by +1)
|
||||
*/
|
||||
private long waitForNextTick() {
|
||||
long deadline = startTime + tickDuration * tick;
|
||||
|
||||
for (;;) {
|
||||
final long currentTime = System.currentTimeMillis();
|
||||
long sleepTime = tickDuration * tick - (currentTime - startTime);
|
||||
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
|
||||
|
||||
if (sleepTimeMs <= 0) {
|
||||
tick += 1;
|
||||
if (currentTime == Long.MIN_VALUE) {
|
||||
return -Long.MAX_VALUE;
|
||||
} else {
|
||||
return currentTime;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we run on windows, as if thats the case we will need
|
||||
// to round the sleepTime as workaround for a bug that only affect
|
||||
@ -476,24 +498,16 @@ public class HashedWheelTimer implements Timer {
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/356
|
||||
if (DetectionUtil.isWindows()) {
|
||||
sleepTime = sleepTime / 10 * 10;
|
||||
}
|
||||
|
||||
if (sleepTime <= 0) {
|
||||
break;
|
||||
sleepTimeMs = sleepTimeMs / 10 * 10;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
Thread.sleep(sleepTimeMs);
|
||||
} catch (InterruptedException e) {
|
||||
if (workerState.get() != 1) {
|
||||
return -1;
|
||||
if (workerState.get() == WORKER_STATE_SHUTDOWN) {
|
||||
return Long.MIN_VALUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Increase the tick.
|
||||
tick ++;
|
||||
return deadline;
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user