[#744] Port fixes from Akka to HashedWheelTimer

port fix from Akka with following commits:
*
https://github.com/akka/akka/commit/cb4e3536b0ed3483bd3636d7789c0ddcadaf
a2da
*
https://github.com/akka/akka/commit/7e590f3071bdf89a4aa9d7d262bac8923d85
e754

And also use constants for worker state for time instead of numeric.
This commit is contained in:
Prajwal Tuladhar 2013-03-15 23:27:33 -04:00 committed by Norman Maurer
parent 4bd9c0195f
commit 915cb8b55c
2 changed files with 154 additions and 39 deletions

View File

@ -85,9 +85,12 @@ public class HashedWheelTimer implements Timer {
private final ResourceLeak leak = leakDetector.open(this); private final ResourceLeak leak = leakDetector.open(this);
private final Worker worker = new Worker(); private final Worker worker = new Worker();
final Thread workerThread; final Thread workerThread;
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down
private final long roundDuration;
final long tickDuration; final long tickDuration;
final Set<HashedWheelTimeout>[] wheel; final Set<HashedWheelTimeout>[] wheel;
final int mask; final int mask;
@ -201,8 +204,6 @@ public class HashedWheelTimer implements Timer {
throw new IllegalArgumentException("tickDuration is too long: " + tickDuration + ' ' + unit); throw new IllegalArgumentException("tickDuration is too long: " + tickDuration + ' ' + unit);
} }
roundDuration = tickDuration * wheel.length;
workerThread = threadFactory.newThread(worker); workerThread = threadFactory.newThread(worker);
} }
@ -243,17 +244,17 @@ public class HashedWheelTimer implements Timer {
*/ */
public void start() { public void start() {
switch (workerState.get()) { switch (workerState.get()) {
case 0: case WORKER_STATE_INIT:
if (workerState.compareAndSet(0, 1)) { if (workerState.compareAndSet(WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start(); workerThread.start();
} }
break; break;
case 1: case WORKER_STATE_STARTED:
break; break;
case 2: case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped"); throw new IllegalStateException("cannot be started once stopped");
default: default:
throw new Error(); throw new Error("Invalid WorkerState");
} }
} }
@ -266,9 +267,9 @@ public class HashedWheelTimer implements Timer {
TimerTask.class.getSimpleName()); TimerTask.class.getSimpleName());
} }
if (!workerState.compareAndSet(1, 2)) { if (!workerState.compareAndSet(WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2. // workerState can be 0 or 2 at this moment - let it always be 2.
workerState.set(2); workerState.set(WORKER_STATE_SHUTDOWN);
return Collections.emptySet(); return Collections.emptySet();
} }
@ -317,28 +318,30 @@ public class HashedWheelTimer implements Timer {
} }
void scheduleTimeout(HashedWheelTimeout timeout, long delay) { void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
// delay must be equal to or greater than tickDuration so that the
// worker thread never misses the timeout.
if (delay < tickDuration) {
delay = tickDuration;
}
// Prepare the required parameters to schedule the timeout object. // Prepare the required parameters to schedule the timeout object.
final long lastRoundDelay = delay % roundDuration; long relativeIndex = (delay + tickDuration - 1) / tickDuration;
final long lastTickDelay = delay % tickDuration; // if the previous line had an overflow going on, then well just schedule this timeout
final long relativeIndex = // one tick early; that shouldnt matter since were talking 270 years here
lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0); if (relativeIndex < 0) {
relativeIndex = delay / tickDuration;
final long remainingRounds = }
delay / roundDuration - (delay % roundDuration == 0? 1 : 0); if (relativeIndex == 0) {
relativeIndex = 1;
}
if ((relativeIndex & mask) == 0) {
relativeIndex--;
}
final long remainingRounds = relativeIndex / wheel.length;
// Add the timeout to the wheel. // Add the timeout to the wheel.
lock.readLock().lock(); lock.readLock().lock();
try { try {
int stopIndex = (int) (wheelCursor + relativeIndex & mask); if (workerState.get() == WORKER_STATE_SHUTDOWN) {
throw new IllegalStateException("Cannot enqueue after shutdown");
}
final int stopIndex = (int) ((wheelCursor + relativeIndex) & mask);
timeout.stopIndex = stopIndex; timeout.stopIndex = stopIndex;
timeout.remainingRounds = remainingRounds; timeout.remainingRounds = remainingRounds;
wheel[stopIndex].add(timeout); wheel[stopIndex].add(timeout);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
@ -361,7 +364,7 @@ public class HashedWheelTimer implements Timer {
startTime = System.currentTimeMillis(); startTime = System.currentTimeMillis();
tick = 1; tick = 1;
while (workerState.get() == 1) { while (workerState.get() == WORKER_STATE_STARTED) {
final long deadline = waitForNextTick(); final long deadline = waitForNextTick();
if (deadline > 0) { if (deadline > 0) {
fetchExpiredTimeouts(expiredTimeouts, deadline); fetchExpiredTimeouts(expiredTimeouts, deadline);
@ -431,12 +434,27 @@ public class HashedWheelTimer implements Timer {
expiredTimeouts.clear(); expiredTimeouts.clear();
} }
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
* @return Long.MIN_VALUE if received a shutdown request,
* current time otherwise (with Long.MIN_VALUE changed by +1)
*/
private long waitForNextTick() { private long waitForNextTick() {
long deadline = startTime + tickDuration * tick; long deadline = startTime + tickDuration * tick;
for (;;) { for (;;) {
final long currentTime = System.currentTimeMillis(); final long currentTime = System.currentTimeMillis();
long sleepTime = tickDuration * tick - (currentTime - startTime); long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if (sleepTimeMs <= 0) {
tick += 1;
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// Check if we run on windows, as if thats the case we will need // Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect // to round the sleepTime as workaround for a bug that only affect
@ -444,25 +462,17 @@ public class HashedWheelTimer implements Timer {
// //
// See https://github.com/netty/netty/issues/356 // See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) { if (PlatformDependent.isWindows()) {
sleepTime = sleepTime / 10 * 10; sleepTimeMs = (sleepTimeMs / 10) * 10;
}
if (sleepTime <= 0) {
break;
} }
try { try {
Thread.sleep(sleepTime); Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (workerState.get() != 1) { if (workerState.get() == WORKER_STATE_SHUTDOWN) {
return -1; return Long.MIN_VALUE;
} }
} }
} }
// Increase the tick.
tick ++;
return deadline;
} }
} }

View File

@ -0,0 +1,105 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class HashedWheelTimerTest {
@Test
public void testScheduleTimeoutShouldNotRunBeforeDelay() throws InterruptedException {
final Timer timer = new HashedWheelTimer();
final CountDownLatch barrier = new CountDownLatch(1);
final Timeout timeout = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
fail("This should not have run");
barrier.countDown();
}
}, 10, TimeUnit.SECONDS);
assertFalse(barrier.await(3, TimeUnit.SECONDS));
assertFalse("timer should not expire", timeout.isExpired());
timer.stop();
}
@Test
public void testScheduleTimeoutShouldRunAfterDelay() throws InterruptedException {
final Timer timer = new HashedWheelTimer();
final CountDownLatch barrier = new CountDownLatch(1);
final Timeout timeout = timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
barrier.countDown();
}
}, 2, TimeUnit.SECONDS);
assertTrue(barrier.await(3, TimeUnit.SECONDS));
assertTrue("timer should expire", timeout.isExpired());
timer.stop();
}
@Test
public void testStopTimer() throws InterruptedException {
final Timer timerProcessed = new HashedWheelTimer();
for (int i = 0; i < 3; i ++) {
timerProcessed.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
}
}, 1, TimeUnit.MILLISECONDS);
}
Thread.sleep(1000L); // sleep for a second
assertEquals("Number of unprocessed timeouts should be 0", 0, timerProcessed.stop().size());
final Timer timerUnprocessed = new HashedWheelTimer();
for (int i = 0; i < 5; i ++) {
timerUnprocessed.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
}
}, 5, TimeUnit.SECONDS);
}
Thread.sleep(1000L); // sleep for a second
assertFalse("Number of unprocessed timeouts should be greater than 0", timerUnprocessed.stop().isEmpty());
}
@Test(expected = IllegalStateException.class)
public void testTimerShouldThrowExceptionAfterShutdownForNewTimeouts() throws InterruptedException {
final Timer timer = new HashedWheelTimer();
for (int i = 0; i < 3; i ++) {
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
}
}, 1, TimeUnit.MILLISECONDS);
}
timer.stop();
Thread.sleep(1000L); // sleep for a second
timer.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
fail("This should not run");
}
}, 1, TimeUnit.SECONDS);
}
}