diff --git a/common/src/main/java/io/netty/util/HashedWheelTimer.java b/common/src/main/java/io/netty/util/HashedWheelTimer.java index eaec8bdc25..4b6236784a 100644 --- a/common/src/main/java/io/netty/util/HashedWheelTimer.java +++ b/common/src/main/java/io/netty/util/HashedWheelTimer.java @@ -30,6 +30,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * A {@link Timer} optimized for approximated I/O timeout scheduling. @@ -358,10 +360,16 @@ public class HashedWheelTimer implements Timer { do { final long deadline = waitForNextTick(); if (deadline > 0) { - transferTimeoutsToBuckets(); + int idx = (int) (tick & mask); HashedWheelBucket bucket = - wheel[(int) (tick & mask)]; - bucket.expireTimeouts(deadline); + wheel[idx]; + bucket.lock.lock(); + try { + transferTimeoutsToBuckets(); + bucket.expireTimeouts(deadline); + } finally { + bucket.lock.unlock(); + } tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); @@ -388,13 +396,11 @@ public class HashedWheelTimer implements Timer { // all processed break; } - if (timeout.state() == HashedWheelTimeout.ST_CANCELLED - || !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) { - // Was cancelled in the meantime. So just remove it and continue with next HashedWheelTimeout - // in the queue - timeout.remove(); + if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { + // Was cancelled in the meantime. continue; } + long calculated = timeout.deadline / tickDuration; timeout.remainingRounds = (calculated - tick) / wheel.length; @@ -454,9 +460,8 @@ public class HashedWheelTimer implements Timer { implements Timeout { private static final int ST_INIT = 0; - private static final int ST_IN_BUCKET = 1; - private static final int ST_CANCELLED = 2; - private static final int ST_EXPIRED = 3; + private static final int ST_CANCELLED = 1; + private static final int ST_EXPIRED = 2; private static final AtomicIntegerFieldUpdater STATE_UPDATER; static { @@ -505,32 +510,23 @@ public class HashedWheelTimer implements Timer { @Override public boolean cancel() { - int state = state(); - if (state >= ST_CANCELLED) { - // fail fast if the task was cancelled or expired before. - return false; - } - if (state != ST_IN_BUCKET && compareAndSetState(ST_INIT, ST_CANCELLED)) { - // Was cancelled before the HashedWheelTimeout was added to its HashedWheelBucket. - // In this case we can just return here as it will be discarded by the WorkerThread when handling - // the adding of HashedWheelTimeout to the HashedWheelBuckets. - return true; - } // only update the state it will be removed from HashedWheelBucket on next tick. - if (!compareAndSetState(ST_IN_BUCKET, ST_CANCELLED)) { + if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { return false; } - // Add the HashedWheelTimeout back to the timeouts queue so it will be picked up on the next tick - // and remove this HashedTimeTask from the HashedWheelBucket. After this is done it is ready to get - // GC'ed once the user has no reference to it anymore. - timer.timeouts.add(this); - return true; - } - - public void remove() { + HashedWheelBucket bucket = this.bucket; if (bucket != null) { - bucket.remove(this); + // if tryLock fails it means that HashedWheelBucket is currently processed and so there is nothing for + // us to do as the remove itself will be done while processing. + if (bucket.lock.tryLock()) { + try { + bucket.remove(this); + } finally { + bucket.lock.unlock(); + } + } } + return true; } public boolean compareAndSetState(int expected, int state) { @@ -548,7 +544,7 @@ public class HashedWheelTimer implements Timer { @Override public boolean isExpired() { - return state() > ST_IN_BUCKET; + return state() == ST_EXPIRED; } @Override @@ -557,8 +553,7 @@ public class HashedWheelTimer implements Timer { } public void expire() { - if (!compareAndSetState(ST_IN_BUCKET, ST_EXPIRED)) { - assert state() != ST_INIT; + if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } @@ -609,6 +604,11 @@ public class HashedWheelTimer implements Timer { */ private static final class HashedWheelBucket { + // Lock used during processing of each HashedWheelBucket. The Lock will be acquired on each tick for the + // current HashedWheelBucket and also tried to acquired when a HashedWheelTimeout should be cancelled. + // This allows fast GC for cancelled HashedWheelTimeouts. + private final Lock lock = new ReentrantLock(); + // Used for the linked-list datastructure private HashedWheelTimeout head; private HashedWheelTimeout tail; diff --git a/common/src/test/java/io/netty/util/HashedWheelTimerTest.java b/common/src/test/java/io/netty/util/HashedWheelTimerTest.java index db1f4625d4..945921b695 100644 --- a/common/src/test/java/io/netty/util/HashedWheelTimerTest.java +++ b/common/src/test/java/io/netty/util/HashedWheelTimerTest.java @@ -17,8 +17,10 @@ package io.netty.util; import org.junit.Test; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -122,5 +124,34 @@ public class HashedWheelTimerTest { }, 1, TimeUnit.SECONDS); Thread.sleep(3500); assertEquals(3, counter.get()); + timer.stop(); + } + + @Test + public void testExecutionOnTime() throws InterruptedException { + int tickDuration = 200; + int timeout = 125; + int maxTimeout = tickDuration + timeout; + final HashedWheelTimer timer = new HashedWheelTimer(tickDuration, TimeUnit.MILLISECONDS); + final BlockingQueue queue = new LinkedBlockingQueue(); + + int scheduledTasks = 100000; + for (int i = 0; i < scheduledTasks; i++) { + final long start = System.nanoTime(); + timer.newTimeout(new TimerTask() { + @Override + public void run(final Timeout timeout) throws Exception { + queue.add(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + } + }, timeout, TimeUnit.MILLISECONDS); + } + + for (int i = 0; i < scheduledTasks; i++) { + long delay = queue.take(); + assertTrue("Timeout + " + scheduledTasks + " delay " + delay + " must be " + timeout + " <= " + maxTimeout, + delay >= timeout && delay <= maxTimeout); + } + + timer.stop(); } }