[#2651] Fix possible infinite-loop when cancel tasks

Motivations:
In our new version of HWT we used some kind of lazy cancelation of timeouts by put them back in the queue and let them pick up on the next tick. This  multiple problems:
 - we may corrupt the MpscLinkedQueue if the task is used as tombstone
 - this sometimes lead to an uncessary delay especially when someone did executed some "heavy" logic in the TimeTask

Modifications:
Use a Lock per HashedWheelBucket for save and fast removal.

Modifications:
Cancellation of tasks can be done fast and so stuff can be GC'ed and no more infinite-loop possible
This commit is contained in:
Norman Maurer 2014-07-11 13:41:03 +00:00
parent edc7abb461
commit 1cdbbe0f9b
2 changed files with 66 additions and 35 deletions

View File

@ -30,6 +30,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* A {@link Timer} optimized for approximated I/O timeout scheduling.
@ -358,10 +360,16 @@ public class HashedWheelTimer implements Timer {
do {
final long deadline = waitForNextTick();
if (deadline > 0) {
transferTimeoutsToBuckets();
int idx = (int) (tick & mask);
HashedWheelBucket bucket =
wheel[(int) (tick & mask)];
bucket.expireTimeouts(deadline);
wheel[idx];
bucket.lock.lock();
try {
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
} finally {
bucket.lock.unlock();
}
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
@ -388,13 +396,11 @@ public class HashedWheelTimer implements Timer {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED
|| !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {
// Was cancelled in the meantime. So just remove it and continue with next HashedWheelTimeout
// in the queue
timeout.remove();
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
long calculated = timeout.deadline / tickDuration;
timeout.remainingRounds = (calculated - tick) / wheel.length;
@ -454,9 +460,8 @@ public class HashedWheelTimer implements Timer {
implements Timeout {
private static final int ST_INIT = 0;
private static final int ST_IN_BUCKET = 1;
private static final int ST_CANCELLED = 2;
private static final int ST_EXPIRED = 3;
private static final int ST_CANCELLED = 1;
private static final int ST_EXPIRED = 2;
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER;
static {
@ -505,32 +510,23 @@ public class HashedWheelTimer implements Timer {
@Override
public boolean cancel() {
int state = state();
if (state >= ST_CANCELLED) {
// fail fast if the task was cancelled or expired before.
return false;
}
if (state != ST_IN_BUCKET && compareAndSetState(ST_INIT, ST_CANCELLED)) {
// Was cancelled before the HashedWheelTimeout was added to its HashedWheelBucket.
// In this case we can just return here as it will be discarded by the WorkerThread when handling
// the adding of HashedWheelTimeout to the HashedWheelBuckets.
return true;
}
// only update the state it will be removed from HashedWheelBucket on next tick.
if (!compareAndSetState(ST_IN_BUCKET, ST_CANCELLED)) {
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
// Add the HashedWheelTimeout back to the timeouts queue so it will be picked up on the next tick
// and remove this HashedTimeTask from the HashedWheelBucket. After this is done it is ready to get
// GC'ed once the user has no reference to it anymore.
timer.timeouts.add(this);
return true;
}
public void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
bucket.remove(this);
// if tryLock fails it means that HashedWheelBucket is currently processed and so there is nothing for
// us to do as the remove itself will be done while processing.
if (bucket.lock.tryLock()) {
try {
bucket.remove(this);
} finally {
bucket.lock.unlock();
}
}
}
return true;
}
public boolean compareAndSetState(int expected, int state) {
@ -548,7 +544,7 @@ public class HashedWheelTimer implements Timer {
@Override
public boolean isExpired() {
return state() > ST_IN_BUCKET;
return state() == ST_EXPIRED;
}
@Override
@ -557,8 +553,7 @@ public class HashedWheelTimer implements Timer {
}
public void expire() {
if (!compareAndSetState(ST_IN_BUCKET, ST_EXPIRED)) {
assert state() != ST_INIT;
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
@ -609,6 +604,11 @@ public class HashedWheelTimer implements Timer {
*/
private static final class HashedWheelBucket {
// Lock used during processing of each HashedWheelBucket. The Lock will be acquired on each tick for the
// current HashedWheelBucket and also tried to acquired when a HashedWheelTimeout should be cancelled.
// This allows fast GC for cancelled HashedWheelTimeouts.
private final Lock lock = new ReentrantLock();
// Used for the linked-list datastructure
private HashedWheelTimeout head;
private HashedWheelTimeout tail;

View File

@ -17,8 +17,10 @@ package io.netty.util;
import org.junit.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -122,5 +124,34 @@ public class HashedWheelTimerTest {
}, 1, TimeUnit.SECONDS);
Thread.sleep(3500);
assertEquals(3, counter.get());
timer.stop();
}
@Test
public void testExecutionOnTime() throws InterruptedException {
int tickDuration = 200;
int timeout = 125;
int maxTimeout = tickDuration + timeout;
final HashedWheelTimer timer = new HashedWheelTimer(tickDuration, TimeUnit.MILLISECONDS);
final BlockingQueue<Long> queue = new LinkedBlockingQueue<Long>();
int scheduledTasks = 100000;
for (int i = 0; i < scheduledTasks; i++) {
final long start = System.nanoTime();
timer.newTimeout(new TimerTask() {
@Override
public void run(final Timeout timeout) throws Exception {
queue.add(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
}
}, timeout, TimeUnit.MILLISECONDS);
}
for (int i = 0; i < scheduledTasks; i++) {
long delay = queue.take();
assertTrue("Timeout + " + scheduledTasks + " delay " + delay + " must be " + timeout + " <= " + maxTimeout,
delay >= timeout && delay <= maxTimeout);
}
timer.stop();
}
}