[#2662] Fix race in cancellation of TimerTasks which could let to NPE

Motivation:

Due some race-condition while handling canellation of TimerTasks it was possibleto corrupt the linked-list structure that is represent by HashedWheelBucket and so produce a NPE.

Modification:

Fix the problem by adding another MpscLinkedQueue which holds the cancellation tasks and process them on each tick. This allows to use no synchronization / locking at all while introduce a latency of max 1 tick before the TimerTask can be GC'ed.

Result:

No more NPE
This commit is contained in:
Norman Maurer 2014-07-21 14:17:35 +02:00
parent e1cc1fbabc
commit 5b2bdd844d

View File

@ -30,8 +30,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* A {@link Timer} optimized for approximated I/O timeout scheduling.
@ -108,6 +106,7 @@ public class HashedWheelTimer implements Timer {
private final int mask;
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
private final Queue<Runnable> cancelledTimeouts = PlatformDependent.newMpscQueue();
private volatile long startTime;
@ -361,15 +360,11 @@ public class HashedWheelTimer implements Timer {
final long deadline = waitForNextTick();
if (deadline > 0) {
int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket =
wheel[idx];
bucket.lock.lock();
try {
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
} finally {
bucket.lock.unlock();
}
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
@ -383,8 +378,11 @@ public class HashedWheelTimer implements Timer {
if (timeout == null) {
break;
}
unprocessedTimeouts.add(timeout);
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
private void transferTimeoutsToBuckets() {
@ -411,6 +409,24 @@ public class HashedWheelTimer implements Timer {
bucket.addTimeout(timeout);
}
}
private void processCancelledTasks() {
for (;;) {
Runnable task = cancelledTimeouts.poll();
if (task == null) {
// all processed
break;
}
try {
task.run();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
@ -514,18 +530,22 @@ public class HashedWheelTimer implements Timer {
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
// if tryLock fails it means that HashedWheelBucket is currently processed and so there is nothing for
// us to do as the remove itself will be done while processing.
if (bucket.lock.tryLock()) {
try {
bucket.remove(this);
} finally {
bucket.lock.unlock();
// If a task should be canceled we create a new Runnable for this to another queue which will
// be processed on each tick. So this means that we will have a GC latency of max. 1 tick duration
// which is good enough. This way we can make again use of our MpscLinkedQueue and so minimize the
// locking / overhead as much as possible.
//
// It is important that we not just add the HashedWheelTimeout itself again as it extends
// MpscLinkedQueueNode and so may still be used as tombstone.
timer.cancelledTimeouts.add(new Runnable() {
@Override
public void run() {
HashedWheelBucket bucket = HashedWheelTimeout.this.bucket;
if (bucket != null) {
bucket.remove(HashedWheelTimeout.this);
}
}
}
});
return true;
}
@ -603,12 +623,6 @@ public class HashedWheelTimer implements Timer {
* extra object creation is needed.
*/
private static final class HashedWheelBucket {
// Lock used during processing of each HashedWheelBucket. The Lock will be acquired on each tick for the
// current HashedWheelBucket and also tried to acquired when a HashedWheelTimeout should be cancelled.
// This allows fast GC for cancelled HashedWheelTimeouts.
private final Lock lock = new ReentrantLock();
// Used for the linked-list datastructure
private HashedWheelTimeout head;
private HashedWheelTimeout tail;