Fixed NETTY-365 HashedWheelTimer.stop() enters an infinite loop when
called from TimerTask Fixed NETTY-379 Intermittent slippery task timeout in HashedWheelTimer * Throw an IllegalStateException if HashedWheelTimer.stop() is called from TimerTask * Reschedule the slipped task accurately instead of delaying it by one round
This commit is contained in:
parent
23f33629ca
commit
ad320f6cae
@ -274,6 +274,13 @@ public class HashedWheelTimer implements Timer {
|
||||
|
||||
@Override
|
||||
public synchronized Set<Timeout> stop() {
|
||||
if (Thread.currentThread() == workerThread) {
|
||||
throw new IllegalStateException(
|
||||
HashedWheelTimer.class.getSimpleName() +
|
||||
".stop() cannot be called from " +
|
||||
TimerTask.class.getSimpleName());
|
||||
}
|
||||
|
||||
if (!shutdown.compareAndSet(false, true)) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
@ -314,22 +321,28 @@ public class HashedWheelTimer implements Timer {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
|
||||
delay = unit.toMillis(delay);
|
||||
if (delay < tickDuration) {
|
||||
delay = tickDuration;
|
||||
}
|
||||
|
||||
if (!workerThread.isAlive()) {
|
||||
start();
|
||||
}
|
||||
|
||||
// Prepare the required parameters to create the timeout object.
|
||||
HashedWheelTimeout timeout;
|
||||
delay = unit.toMillis(delay);
|
||||
HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay);
|
||||
scheduleTimeout(timeout, delay);
|
||||
return timeout;
|
||||
}
|
||||
|
||||
void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
|
||||
// delay must be equal to or greater than tickDuration so that the
|
||||
// worker thread never misses the timeout.
|
||||
if (delay < tickDuration) {
|
||||
delay = tickDuration;
|
||||
}
|
||||
|
||||
// Prepare the required parameters to schedule the timeout object.
|
||||
final long lastRoundDelay = delay % roundDuration;
|
||||
final long lastTickDelay = delay % tickDuration;
|
||||
final long relativeIndex =
|
||||
lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
|
||||
final long deadline = currentTime + delay;
|
||||
|
||||
final long remainingRounds =
|
||||
delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
|
||||
@ -337,18 +350,14 @@ public class HashedWheelTimer implements Timer {
|
||||
// Add the timeout to the wheel.
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
timeout =
|
||||
new HashedWheelTimeout(
|
||||
task, deadline,
|
||||
(int) (wheelCursor + relativeIndex & mask),
|
||||
remainingRounds);
|
||||
int stopIndex = (int) (wheelCursor + relativeIndex & mask);
|
||||
timeout.stopIndex = stopIndex;
|
||||
timeout.remainingRounds = remainingRounds;
|
||||
|
||||
wheel[timeout.stopIndex].add(timeout);
|
||||
wheel[stopIndex].add(timeout);
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
return timeout;
|
||||
}
|
||||
|
||||
private final class Worker implements Runnable {
|
||||
@ -369,14 +378,16 @@ public class HashedWheelTimer implements Timer {
|
||||
tick = 1;
|
||||
|
||||
while (!shutdown.get()) {
|
||||
waitForNextTick();
|
||||
fetchExpiredTimeouts(expiredTimeouts);
|
||||
final long deadline = waitForNextTick();
|
||||
if (deadline > 0) {
|
||||
fetchExpiredTimeouts(expiredTimeouts, deadline);
|
||||
notifyExpiredTimeouts(expiredTimeouts);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void fetchExpiredTimeouts(
|
||||
List<HashedWheelTimeout> expiredTimeouts) {
|
||||
List<HashedWheelTimeout> expiredTimeouts, long deadline) {
|
||||
|
||||
// Find the expired timeouts and decrease the round counter
|
||||
// if necessary. Note that we don't send the notification
|
||||
@ -384,12 +395,9 @@ public class HashedWheelTimer implements Timer {
|
||||
// an exclusive lock.
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
int oldBucketHead = wheelCursor;
|
||||
int newBucketHead = oldBucketHead + 1 & mask;
|
||||
wheelCursor = newBucketHead;
|
||||
|
||||
ReusableIterator<HashedWheelTimeout> i = iterators[oldBucketHead];
|
||||
fetchExpiredTimeouts(expiredTimeouts, i);
|
||||
int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
|
||||
ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
|
||||
fetchExpiredTimeouts(expiredTimeouts, i, deadline);
|
||||
} finally {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
@ -397,24 +405,37 @@ public class HashedWheelTimer implements Timer {
|
||||
|
||||
private void fetchExpiredTimeouts(
|
||||
List<HashedWheelTimeout> expiredTimeouts,
|
||||
ReusableIterator<HashedWheelTimeout> i) {
|
||||
ReusableIterator<HashedWheelTimeout> i, long deadline) {
|
||||
|
||||
long currentDeadline = System.currentTimeMillis() + tickDuration;
|
||||
List<HashedWheelTimeout> slipped = null;
|
||||
i.rewind();
|
||||
while (i.hasNext()) {
|
||||
HashedWheelTimeout timeout = i.next();
|
||||
if (timeout.remainingRounds <= 0) {
|
||||
if (timeout.deadline < currentDeadline) {
|
||||
i.remove();
|
||||
if (timeout.deadline <= deadline) {
|
||||
expiredTimeouts.add(timeout);
|
||||
} else {
|
||||
// A rare case where a timeout is put for the next
|
||||
// round: just wait for the next round.
|
||||
// Handle the case where the timeout is put into a wrong
|
||||
// place, usually one tick earlier. For now, just add
|
||||
// it to a temporary list - we will reschedule it in a
|
||||
// separate loop.
|
||||
if (slipped == null) {
|
||||
slipped = new ArrayList<HashedWheelTimer.HashedWheelTimeout>();
|
||||
}
|
||||
slipped.add(timeout);
|
||||
}
|
||||
} else {
|
||||
timeout.remainingRounds --;
|
||||
}
|
||||
}
|
||||
|
||||
// Reschedule the slipped timeouts.
|
||||
if (slipped != null) {
|
||||
for (HashedWheelTimeout timeout: slipped) {
|
||||
scheduleTimeout(timeout, timeout.deadline - deadline);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyExpiredTimeouts(
|
||||
@ -428,7 +449,9 @@ public class HashedWheelTimer implements Timer {
|
||||
expiredTimeouts.clear();
|
||||
}
|
||||
|
||||
private void waitForNextTick() {
|
||||
private long waitForNextTick() {
|
||||
long deadline = startTime + tickDuration * tick;
|
||||
|
||||
for (;;) {
|
||||
final long currentTime = System.currentTimeMillis();
|
||||
final long sleepTime = tickDuration * tick - (currentTime - startTime);
|
||||
@ -441,36 +464,28 @@ public class HashedWheelTimer implements Timer {
|
||||
Thread.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
if (shutdown.get()) {
|
||||
return;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reset the tick if overflow is expected.
|
||||
if (tickDuration * tick > Long.MAX_VALUE - tickDuration) {
|
||||
startTime = System.currentTimeMillis();
|
||||
tick = 1;
|
||||
} else {
|
||||
// Increase the tick if overflow is not likely to happen.
|
||||
// Increase the tick.
|
||||
tick ++;
|
||||
}
|
||||
return deadline;
|
||||
}
|
||||
}
|
||||
|
||||
private final class HashedWheelTimeout implements Timeout {
|
||||
|
||||
private final TimerTask task;
|
||||
final int stopIndex;
|
||||
final long deadline;
|
||||
volatile int stopIndex;
|
||||
volatile long remainingRounds;
|
||||
private volatile boolean cancelled;
|
||||
|
||||
HashedWheelTimeout(
|
||||
TimerTask task, long deadline, int stopIndex, long remainingRounds) {
|
||||
HashedWheelTimeout(TimerTask task, long deadline) {
|
||||
this.task = task;
|
||||
this.deadline = deadline;
|
||||
this.stopIndex = stopIndex;
|
||||
this.remainingRounds = remainingRounds;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Loading…
Reference in New Issue
Block a user