From 915cb8b55c6088ae566d178612e939b3b29752e8 Mon Sep 17 00:00:00 2001 From: Prajwal Tuladhar Date: Fri, 15 Mar 2013 23:27:33 -0400 Subject: [PATCH] [#744] Port fixes from Akka to HashedWheelTimer port fix from Akka with following commits: * https://github.com/akka/akka/commit/cb4e3536b0ed3483bd3636d7789c0ddcadaf a2da * https://github.com/akka/akka/commit/7e590f3071bdf89a4aa9d7d262bac8923d85 e754 And also use constants for worker state for time instead of numeric. --- .../java/io/netty/util/HashedWheelTimer.java | 88 ++++++++------- .../io/netty/util/HashedWheelTimerTest.java | 105 ++++++++++++++++++ 2 files changed, 154 insertions(+), 39 deletions(-) create mode 100644 common/src/test/java/io/netty/util/HashedWheelTimerTest.java diff --git a/common/src/main/java/io/netty/util/HashedWheelTimer.java b/common/src/main/java/io/netty/util/HashedWheelTimer.java index d8d6a94bd2..5ad882a266 100644 --- a/common/src/main/java/io/netty/util/HashedWheelTimer.java +++ b/common/src/main/java/io/netty/util/HashedWheelTimer.java @@ -85,9 +85,12 @@ public class HashedWheelTimer implements Timer { private final ResourceLeak leak = leakDetector.open(this); private final Worker worker = new Worker(); final Thread workerThread; + + public static final int WORKER_STATE_INIT = 0; + public static final int WORKER_STATE_STARTED = 1; + public static final int WORKER_STATE_SHUTDOWN = 2; final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down - private final long roundDuration; final long tickDuration; final Set[] wheel; final int mask; @@ -201,8 +204,6 @@ public class HashedWheelTimer implements Timer { throw new IllegalArgumentException("tickDuration is too long: " + tickDuration + ' ' + unit); } - roundDuration = tickDuration * wheel.length; - workerThread = threadFactory.newThread(worker); } @@ -243,17 +244,17 @@ public class HashedWheelTimer implements Timer { */ public void start() { switch (workerState.get()) { - case 0: - if (workerState.compareAndSet(0, 1)) { + case WORKER_STATE_INIT: + if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; - case 1: + case WORKER_STATE_STARTED: break; - case 2: + case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: - throw new Error(); + throw new Error("Invalid WorkerState"); } } @@ -266,9 +267,9 @@ public class HashedWheelTimer implements Timer { TimerTask.class.getSimpleName()); } - if (!workerState.compareAndSet(1, 2)) { + if (!workerState.compareAndSet(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // workerState can be 0 or 2 at this moment - let it always be 2. - workerState.set(2); + workerState.set(WORKER_STATE_SHUTDOWN); return Collections.emptySet(); } @@ -317,28 +318,30 @@ public class HashedWheelTimer implements Timer { } void scheduleTimeout(HashedWheelTimeout timeout, long delay) { - // delay must be equal to or greater than tickDuration so that the - // worker thread never misses the timeout. - if (delay < tickDuration) { - delay = tickDuration; - } - // Prepare the required parameters to schedule the timeout object. - final long lastRoundDelay = delay % roundDuration; - final long lastTickDelay = delay % tickDuration; - final long relativeIndex = - lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0); - - final long remainingRounds = - delay / roundDuration - (delay % roundDuration == 0? 1 : 0); + long relativeIndex = (delay + tickDuration - 1) / tickDuration; + // if the previous line had an overflow going on, then we’ll just schedule this timeout + // one tick early; that shouldn’t matter since we’re talking 270 years here + if (relativeIndex < 0) { + relativeIndex = delay / tickDuration; + } + if (relativeIndex == 0) { + relativeIndex = 1; + } + if ((relativeIndex & mask) == 0) { + relativeIndex--; + } + final long remainingRounds = relativeIndex / wheel.length; // Add the timeout to the wheel. lock.readLock().lock(); try { - int stopIndex = (int) (wheelCursor + relativeIndex & mask); + if (workerState.get() == WORKER_STATE_SHUTDOWN) { + throw new IllegalStateException("Cannot enqueue after shutdown"); + } + final int stopIndex = (int) ((wheelCursor + relativeIndex) & mask); timeout.stopIndex = stopIndex; timeout.remainingRounds = remainingRounds; - wheel[stopIndex].add(timeout); } finally { lock.readLock().unlock(); @@ -361,7 +364,7 @@ public class HashedWheelTimer implements Timer { startTime = System.currentTimeMillis(); tick = 1; - while (workerState.get() == 1) { + while (workerState.get() == WORKER_STATE_STARTED) { final long deadline = waitForNextTick(); if (deadline > 0) { fetchExpiredTimeouts(expiredTimeouts, deadline); @@ -431,12 +434,27 @@ public class HashedWheelTimer implements Timer { expiredTimeouts.clear(); } + /** + * calculate goal nanoTime from startTime and current tick number, + * then wait until that goal has been reached. + * @return Long.MIN_VALUE if received a shutdown request, + * current time otherwise (with Long.MIN_VALUE changed by +1) + */ private long waitForNextTick() { long deadline = startTime + tickDuration * tick; for (;;) { final long currentTime = System.currentTimeMillis(); - long sleepTime = tickDuration * tick - (currentTime - startTime); + long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; + + if (sleepTimeMs <= 0) { + tick += 1; + if (currentTime == Long.MIN_VALUE) { + return -Long.MAX_VALUE; + } else { + return currentTime; + } + } // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect @@ -444,25 +462,17 @@ public class HashedWheelTimer implements Timer { // // See https://github.com/netty/netty/issues/356 if (PlatformDependent.isWindows()) { - sleepTime = sleepTime / 10 * 10; - } - - if (sleepTime <= 0) { - break; + sleepTimeMs = (sleepTimeMs / 10) * 10; } try { - Thread.sleep(sleepTime); + Thread.sleep(sleepTimeMs); } catch (InterruptedException e) { - if (workerState.get() != 1) { - return -1; + if (workerState.get() == WORKER_STATE_SHUTDOWN) { + return Long.MIN_VALUE; } } } - - // Increase the tick. - tick ++; - return deadline; } } diff --git a/common/src/test/java/io/netty/util/HashedWheelTimerTest.java b/common/src/test/java/io/netty/util/HashedWheelTimerTest.java new file mode 100644 index 0000000000..b9b4160f8d --- /dev/null +++ b/common/src/test/java/io/netty/util/HashedWheelTimerTest.java @@ -0,0 +1,105 @@ +/* + * Copyright 2013 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util; + +import org.junit.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class HashedWheelTimerTest { + + @Test + public void testScheduleTimeoutShouldNotRunBeforeDelay() throws InterruptedException { + final Timer timer = new HashedWheelTimer(); + final CountDownLatch barrier = new CountDownLatch(1); + final Timeout timeout = timer.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + fail("This should not have run"); + barrier.countDown(); + } + }, 10, TimeUnit.SECONDS); + assertFalse(barrier.await(3, TimeUnit.SECONDS)); + assertFalse("timer should not expire", timeout.isExpired()); + timer.stop(); + } + + @Test + public void testScheduleTimeoutShouldRunAfterDelay() throws InterruptedException { + final Timer timer = new HashedWheelTimer(); + final CountDownLatch barrier = new CountDownLatch(1); + final Timeout timeout = timer.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + barrier.countDown(); + } + }, 2, TimeUnit.SECONDS); + assertTrue(barrier.await(3, TimeUnit.SECONDS)); + assertTrue("timer should expire", timeout.isExpired()); + timer.stop(); + } + + @Test + public void testStopTimer() throws InterruptedException { + final Timer timerProcessed = new HashedWheelTimer(); + for (int i = 0; i < 3; i ++) { + timerProcessed.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + } + }, 1, TimeUnit.MILLISECONDS); + } + Thread.sleep(1000L); // sleep for a second + assertEquals("Number of unprocessed timeouts should be 0", 0, timerProcessed.stop().size()); + + final Timer timerUnprocessed = new HashedWheelTimer(); + for (int i = 0; i < 5; i ++) { + timerUnprocessed.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + } + }, 5, TimeUnit.SECONDS); + } + Thread.sleep(1000L); // sleep for a second + assertFalse("Number of unprocessed timeouts should be greater than 0", timerUnprocessed.stop().isEmpty()); + } + + @Test(expected = IllegalStateException.class) + public void testTimerShouldThrowExceptionAfterShutdownForNewTimeouts() throws InterruptedException { + final Timer timer = new HashedWheelTimer(); + for (int i = 0; i < 3; i ++) { + timer.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + } + }, 1, TimeUnit.MILLISECONDS); + } + + timer.stop(); + Thread.sleep(1000L); // sleep for a second + + timer.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + fail("This should not run"); + } + }, 1, TimeUnit.SECONDS); + } +}