diff --git a/common/src/main/java/io/netty/util/HashedWheelTimer.java b/common/src/main/java/io/netty/util/HashedWheelTimer.java index 1c98980b22..cc1d71695e 100644 --- a/common/src/main/java/io/netty/util/HashedWheelTimer.java +++ b/common/src/main/java/io/netty/util/HashedWheelTimer.java @@ -106,6 +106,7 @@ public class HashedWheelTimer implements Timer { private final int mask; private final CountDownLatch startTimeInitialized = new CountDownLatch(1); private final Queue timeouts = PlatformDependent.newMpscQueue(); + private volatile long startTime; /** @@ -388,6 +389,13 @@ public class HashedWheelTimer implements Timer { // all processed break; } + if (timeout.state() == HashedWheelTimeout.ST_CANCELLED + || !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) { + // Was cancelled in the meantime. So just remove it and continue with next HashedWheelTimeout + // in the queue + timeout.remove(); + continue; + } long calculated = timeout.deadline / tickDuration; long remainingRounds = (calculated - tick) / wheel.length; timeout.remainingRounds = remainingRounds; @@ -448,8 +456,9 @@ public class HashedWheelTimer implements Timer { implements Timeout { private static final int ST_INIT = 0; - private static final int ST_CANCELLED = 1; - private static final int ST_EXPIRED = 2; + private static final int ST_IN_BUCKET = 1; + private static final int ST_CANCELLED = 2; + private static final int ST_EXPIRED = 3; private static final AtomicIntegerFieldUpdater STATE_UPDATER; static { @@ -477,6 +486,9 @@ public class HashedWheelTimer implements Timer { HashedWheelTimeout next; HashedWheelTimeout prev; + // The bucket to which the timeout was added + HashedWheelBucket bucket; + HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) { this.timer = timer; this.task = task; @@ -495,21 +507,50 @@ public class HashedWheelTimer implements Timer { @Override public boolean cancel() { - // only update the state it will be removed from HashedWheelBucket on next tick. - if (!STATE_UPDATER.compareAndSet(this, ST_INIT, ST_CANCELLED)) { + int state = state(); + if (state >= ST_CANCELLED) { + // fail fast if the task was cancelled or expired before. return false; } + if (state != ST_IN_BUCKET && compareAndSetState(ST_INIT, ST_CANCELLED)) { + // Was cancelled before the HashedWheelTimeout was added to its HashedWheelBucket. + // In this case we can just return here as it will be discarded by the WorkerThread when handling + // the adding of HashedWheelTimeout to the HashedWheelBuckets. + return true; + } + // only update the state it will be removed from HashedWheelBucket on next tick. + if (!compareAndSetState(ST_IN_BUCKET, ST_CANCELLED)) { + return false; + } + // Add the HashedWheelTimeout back to the timeouts queue so it will be picked up on the next tick + // and remove this HashedTimeTask from the HashedWheelBucket. After this is done it is ready to get + // GC'ed once the user has no reference to it anymore. + timer.timeouts.add(this); return true; } + public void remove() { + if (bucket != null) { + bucket.remove(this); + } + } + + public boolean compareAndSetState(int expected, int state) { + return STATE_UPDATER.compareAndSet(this, expected, state); + } + + public int state() { + return state; + } + @Override public boolean isCancelled() { - return STATE_UPDATER.get(this) == ST_CANCELLED; + return state() == ST_CANCELLED; } @Override public boolean isExpired() { - return STATE_UPDATER.get(this) != ST_INIT; + return state() > ST_IN_BUCKET; } @Override @@ -518,7 +559,8 @@ public class HashedWheelTimer implements Timer { } public void expire() { - if (!STATE_UPDATER.compareAndSet(this, ST_INIT, ST_EXPIRED)) { + if (!compareAndSetState(ST_IN_BUCKET, ST_EXPIRED)) { + assert state() != ST_INIT; return; } @@ -577,6 +619,8 @@ public class HashedWheelTimer implements Timer { * Add {@link HashedWheelTimeout} to this bucket. */ public void addTimeout(HashedWheelTimeout timeout) { + assert timeout.bucket == null; + timeout.bucket = this; if (head == null) { head = tail = timeout; } else { @@ -612,33 +656,40 @@ public class HashedWheelTimer implements Timer { // store reference to next as we may null out timeout.next in the remove block. HashedWheelTimeout next = timeout.next; if (remove) { - // remove timeout that was either processed or cancelled by updating the linked-list - if (timeout.prev != null) { - timeout.prev.next = timeout.next; - } - if (timeout.next != null) { - timeout.next.prev = timeout.prev; - } - - if (timeout == head) { - // if timeout is head we need to replace the head with the next entry - head = next; - if (timeout == tail) { - // if timeout is also the tail we need to adjust the entry too - tail = timeout.next; - } - } else if (timeout == tail) { - // if the timeout is the tail modify the tail to be the prev node. - tail = timeout.prev; - } - // null out prev and next to allow for GC. - timeout.prev = null; - timeout.next = null; + remove(timeout); } timeout = next; } } + public void remove(HashedWheelTimeout timeout) { + HashedWheelTimeout next = timeout.next; + // remove timeout that was either processed or cancelled by updating the linked-list + if (timeout.prev != null) { + timeout.prev.next = next; + } + if (timeout.next != null) { + timeout.next.prev = timeout.prev; + } + + if (timeout == head) { + // if timeout is also the tail we need to adjust the entry too + if (timeout == tail) { + tail = null; + head = null; + } else { + head = next; + } + } else if (timeout == tail) { + // if the timeout is the tail modify the tail to be the prev node. + tail = timeout.prev; + } + // null out prev, next and bucket to allow for GC. + timeout.prev = null; + timeout.next = null; + timeout.bucket = null; + } + /** * Clear this bucket and return all not expired / cancelled {@link Timeout}s. */ @@ -671,6 +722,7 @@ public class HashedWheelTimer implements Timer { // null out prev and next to allow for GC. head.next = null; head.prev = null; + head.bucket = null; return head; } }