[#2662] Fix race in cancellation of TimerTasks which could let to NPE

Motivation:

Due some race-condition while handling canellation of TimerTasks it was possibleto corrupt the linked-list structure that is represent by HashedWheelBucket and so produce a NPE.

Modification:

Fix the problem by adding another MpscLinkedQueue which holds the cancellation tasks and process them on each tick. This allows to use no synchronization / locking at all while introduce a latency of max 1 tick before the TimerTask can be GC'ed.

Result:

No more NPE
This commit is contained in:
Norman Maurer 2014-07-21 14:17:35 +02:00
parent f5faada77c
commit d989b24351

View File

@ -30,8 +30,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** /**
* A {@link Timer} optimized for approximated I/O timeout scheduling. * A {@link Timer} optimized for approximated I/O timeout scheduling.
@ -108,6 +106,7 @@ public class HashedWheelTimer implements Timer {
private final int mask; private final int mask;
private final CountDownLatch startTimeInitialized = new CountDownLatch(1); private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue(); private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
private final Queue<Runnable> cancelledTimeouts = PlatformDependent.newMpscQueue();
private volatile long startTime; private volatile long startTime;
@ -361,15 +360,11 @@ public class HashedWheelTimer implements Timer {
final long deadline = waitForNextTick(); final long deadline = waitForNextTick();
if (deadline > 0) { if (deadline > 0) {
int idx = (int) (tick & mask); int idx = (int) (tick & mask);
processCancelledTasks();
HashedWheelBucket bucket = HashedWheelBucket bucket =
wheel[idx]; wheel[idx];
bucket.lock.lock();
try {
transferTimeoutsToBuckets(); transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline); bucket.expireTimeouts(deadline);
} finally {
bucket.lock.unlock();
}
tick++; tick++;
} }
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
@ -383,9 +378,12 @@ public class HashedWheelTimer implements Timer {
if (timeout == null) { if (timeout == null) {
break; break;
} }
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout); unprocessedTimeouts.add(timeout);
} }
} }
processCancelledTasks();
}
private void transferTimeoutsToBuckets() { private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
@ -411,6 +409,24 @@ public class HashedWheelTimer implements Timer {
bucket.addTimeout(timeout); bucket.addTimeout(timeout);
} }
} }
private void processCancelledTasks() {
for (;;) {
Runnable task = cancelledTimeouts.poll();
if (task == null) {
// all processed
break;
}
try {
task.run();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}
/** /**
* calculate goal nanoTime from startTime and current tick number, * calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached. * then wait until that goal has been reached.
@ -514,18 +530,22 @@ public class HashedWheelTimer implements Timer {
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false; return false;
} }
HashedWheelBucket bucket = this.bucket; // If a task should be canceled we create a new Runnable for this to another queue which will
// be processed on each tick. So this means that we will have a GC latency of max. 1 tick duration
// which is good enough. This way we can make again use of our MpscLinkedQueue and so minimize the
// locking / overhead as much as possible.
//
// It is important that we not just add the HashedWheelTimeout itself again as it extends
// MpscLinkedQueueNode and so may still be used as tombstone.
timer.cancelledTimeouts.add(new Runnable() {
@Override
public void run() {
HashedWheelBucket bucket = HashedWheelTimeout.this.bucket;
if (bucket != null) { if (bucket != null) {
// if tryLock fails it means that HashedWheelBucket is currently processed and so there is nothing for bucket.remove(HashedWheelTimeout.this);
// us to do as the remove itself will be done while processing.
if (bucket.lock.tryLock()) {
try {
bucket.remove(this);
} finally {
bucket.lock.unlock();
}
} }
} }
});
return true; return true;
} }
@ -603,12 +623,6 @@ public class HashedWheelTimer implements Timer {
* extra object creation is needed. * extra object creation is needed.
*/ */
private static final class HashedWheelBucket { private static final class HashedWheelBucket {
// Lock used during processing of each HashedWheelBucket. The Lock will be acquired on each tick for the
// current HashedWheelBucket and also tried to acquired when a HashedWheelTimeout should be cancelled.
// This allows fast GC for cancelled HashedWheelTimeouts.
private final Lock lock = new ReentrantLock();
// Used for the linked-list datastructure // Used for the linked-list datastructure
private HashedWheelTimeout head; private HashedWheelTimeout head;
private HashedWheelTimeout tail; private HashedWheelTimeout tail;