Fixed NETTY-365 HashedWheelTimer.stop() enters an infinite loop when
called from TimerTask Fixed NETTY-379 Intermittent slippery task timeout in HashedWheelTimer * Throw an IllegalStateException if HashedWheelTimer.stop() is called from TimerTask * Reschedule the slipped task accurately instead of delaying it by one round
This commit is contained in:
parent
d1f05ea4e7
commit
31df7fa9b1
@ -271,6 +271,13 @@ public class HashedWheelTimer implements Timer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public synchronized Set<Timeout> stop() {
|
public synchronized Set<Timeout> stop() {
|
||||||
|
if (Thread.currentThread() == workerThread) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
HashedWheelTimer.class.getSimpleName() +
|
||||||
|
".stop() cannot be called from " +
|
||||||
|
TimerTask.class.getSimpleName());
|
||||||
|
}
|
||||||
|
|
||||||
if (!shutdown.compareAndSet(false, true)) {
|
if (!shutdown.compareAndSet(false, true)) {
|
||||||
return Collections.emptySet();
|
return Collections.emptySet();
|
||||||
}
|
}
|
||||||
@ -310,22 +317,28 @@ public class HashedWheelTimer implements Timer {
|
|||||||
throw new NullPointerException("unit");
|
throw new NullPointerException("unit");
|
||||||
}
|
}
|
||||||
|
|
||||||
delay = unit.toMillis(delay);
|
|
||||||
if (delay < tickDuration) {
|
|
||||||
delay = tickDuration;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!workerThread.isAlive()) {
|
if (!workerThread.isAlive()) {
|
||||||
start();
|
start();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare the required parameters to create the timeout object.
|
delay = unit.toMillis(delay);
|
||||||
HashedWheelTimeout timeout;
|
HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay);
|
||||||
|
scheduleTimeout(timeout, delay);
|
||||||
|
return timeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
|
||||||
|
// delay must be equal to or greater than tickDuration so that the
|
||||||
|
// worker thread never misses the timeout.
|
||||||
|
if (delay < tickDuration) {
|
||||||
|
delay = tickDuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare the required parameters to schedule the timeout object.
|
||||||
final long lastRoundDelay = delay % roundDuration;
|
final long lastRoundDelay = delay % roundDuration;
|
||||||
final long lastTickDelay = delay % tickDuration;
|
final long lastTickDelay = delay % tickDuration;
|
||||||
final long relativeIndex =
|
final long relativeIndex =
|
||||||
lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
|
lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
|
||||||
final long deadline = currentTime + delay;
|
|
||||||
|
|
||||||
final long remainingRounds =
|
final long remainingRounds =
|
||||||
delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
|
delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
|
||||||
@ -333,18 +346,14 @@ public class HashedWheelTimer implements Timer {
|
|||||||
// Add the timeout to the wheel.
|
// Add the timeout to the wheel.
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
timeout =
|
int stopIndex = (int) (wheelCursor + relativeIndex & mask);
|
||||||
new HashedWheelTimeout(
|
timeout.stopIndex = stopIndex;
|
||||||
task, deadline,
|
timeout.remainingRounds = remainingRounds;
|
||||||
(int) (wheelCursor + relativeIndex & mask),
|
|
||||||
remainingRounds);
|
|
||||||
|
|
||||||
wheel[timeout.stopIndex].add(timeout);
|
wheel[stopIndex].add(timeout);
|
||||||
} finally {
|
} finally {
|
||||||
lock.readLock().unlock();
|
lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
return timeout;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class Worker implements Runnable {
|
private final class Worker implements Runnable {
|
||||||
@ -364,14 +373,16 @@ public class HashedWheelTimer implements Timer {
|
|||||||
tick = 1;
|
tick = 1;
|
||||||
|
|
||||||
while (!shutdown.get()) {
|
while (!shutdown.get()) {
|
||||||
waitForNextTick();
|
final long deadline = waitForNextTick();
|
||||||
fetchExpiredTimeouts(expiredTimeouts);
|
if (deadline > 0) {
|
||||||
notifyExpiredTimeouts(expiredTimeouts);
|
fetchExpiredTimeouts(expiredTimeouts, deadline);
|
||||||
|
notifyExpiredTimeouts(expiredTimeouts);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fetchExpiredTimeouts(
|
private void fetchExpiredTimeouts(
|
||||||
List<HashedWheelTimeout> expiredTimeouts) {
|
List<HashedWheelTimeout> expiredTimeouts, long deadline) {
|
||||||
|
|
||||||
// Find the expired timeouts and decrease the round counter
|
// Find the expired timeouts and decrease the round counter
|
||||||
// if necessary. Note that we don't send the notification
|
// if necessary. Note that we don't send the notification
|
||||||
@ -379,12 +390,9 @@ public class HashedWheelTimer implements Timer {
|
|||||||
// an exclusive lock.
|
// an exclusive lock.
|
||||||
lock.writeLock().lock();
|
lock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
int oldBucketHead = wheelCursor;
|
int newWheelCursor = wheelCursor = wheelCursor + 1 & mask;
|
||||||
int newBucketHead = oldBucketHead + 1 & mask;
|
ReusableIterator<HashedWheelTimeout> i = iterators[newWheelCursor];
|
||||||
wheelCursor = newBucketHead;
|
fetchExpiredTimeouts(expiredTimeouts, i, deadline);
|
||||||
|
|
||||||
ReusableIterator<HashedWheelTimeout> i = iterators[oldBucketHead];
|
|
||||||
fetchExpiredTimeouts(expiredTimeouts, i);
|
|
||||||
} finally {
|
} finally {
|
||||||
lock.writeLock().unlock();
|
lock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
@ -392,24 +400,37 @@ public class HashedWheelTimer implements Timer {
|
|||||||
|
|
||||||
private void fetchExpiredTimeouts(
|
private void fetchExpiredTimeouts(
|
||||||
List<HashedWheelTimeout> expiredTimeouts,
|
List<HashedWheelTimeout> expiredTimeouts,
|
||||||
ReusableIterator<HashedWheelTimeout> i) {
|
ReusableIterator<HashedWheelTimeout> i, long deadline) {
|
||||||
|
|
||||||
long currentDeadline = System.currentTimeMillis() + tickDuration;
|
List<HashedWheelTimeout> slipped = null;
|
||||||
i.rewind();
|
i.rewind();
|
||||||
while (i.hasNext()) {
|
while (i.hasNext()) {
|
||||||
HashedWheelTimeout timeout = i.next();
|
HashedWheelTimeout timeout = i.next();
|
||||||
if (timeout.remainingRounds <= 0) {
|
if (timeout.remainingRounds <= 0) {
|
||||||
if (timeout.deadline < currentDeadline) {
|
i.remove();
|
||||||
i.remove();
|
if (timeout.deadline <= deadline) {
|
||||||
expiredTimeouts.add(timeout);
|
expiredTimeouts.add(timeout);
|
||||||
} else {
|
} else {
|
||||||
// A rare case where a timeout is put for the next
|
// Handle the case where the timeout is put into a wrong
|
||||||
// round: just wait for the next round.
|
// place, usually one tick earlier. For now, just add
|
||||||
|
// it to a temporary list - we will reschedule it in a
|
||||||
|
// separate loop.
|
||||||
|
if (slipped == null) {
|
||||||
|
slipped = new ArrayList<HashedWheelTimer.HashedWheelTimeout>();
|
||||||
|
}
|
||||||
|
slipped.add(timeout);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
timeout.remainingRounds --;
|
timeout.remainingRounds --;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reschedule the slipped timeouts.
|
||||||
|
if (slipped != null) {
|
||||||
|
for (HashedWheelTimeout timeout: slipped) {
|
||||||
|
scheduleTimeout(timeout, timeout.deadline - deadline);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void notifyExpiredTimeouts(
|
private void notifyExpiredTimeouts(
|
||||||
@ -423,7 +444,9 @@ public class HashedWheelTimer implements Timer {
|
|||||||
expiredTimeouts.clear();
|
expiredTimeouts.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForNextTick() {
|
private long waitForNextTick() {
|
||||||
|
long deadline = startTime + tickDuration * tick;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
final long currentTime = System.currentTimeMillis();
|
final long currentTime = System.currentTimeMillis();
|
||||||
final long sleepTime = tickDuration * tick - (currentTime - startTime);
|
final long sleepTime = tickDuration * tick - (currentTime - startTime);
|
||||||
@ -436,36 +459,28 @@ public class HashedWheelTimer implements Timer {
|
|||||||
Thread.sleep(sleepTime);
|
Thread.sleep(sleepTime);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
if (shutdown.get()) {
|
if (shutdown.get()) {
|
||||||
return;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset the tick if overflow is expected.
|
// Increase the tick.
|
||||||
if (tickDuration * tick > Long.MAX_VALUE - tickDuration) {
|
tick ++;
|
||||||
startTime = System.currentTimeMillis();
|
return deadline;
|
||||||
tick = 1;
|
|
||||||
} else {
|
|
||||||
// Increase the tick if overflow is not likely to happen.
|
|
||||||
tick ++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final class HashedWheelTimeout implements Timeout {
|
private final class HashedWheelTimeout implements Timeout {
|
||||||
|
|
||||||
private final TimerTask task;
|
private final TimerTask task;
|
||||||
final int stopIndex;
|
|
||||||
final long deadline;
|
final long deadline;
|
||||||
|
volatile int stopIndex;
|
||||||
volatile long remainingRounds;
|
volatile long remainingRounds;
|
||||||
private volatile boolean cancelled;
|
private volatile boolean cancelled;
|
||||||
|
|
||||||
HashedWheelTimeout(
|
HashedWheelTimeout(TimerTask task, long deadline) {
|
||||||
TimerTask task, long deadline, int stopIndex, long remainingRounds) {
|
|
||||||
this.task = task;
|
this.task = task;
|
||||||
this.deadline = deadline;
|
this.deadline = deadline;
|
||||||
this.stopIndex = stopIndex;
|
|
||||||
this.remainingRounds = remainingRounds;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Timer getTimer() {
|
public Timer getTimer() {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user