Revert e66fc219ffe6621e71a409b078ace9cab87eac16

This commit is contained in:
Trustin Lee 2013-03-12 16:43:46 +09:00
parent fe66f33f42
commit cef81f1bff

View File

@ -76,7 +76,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class HashedWheelTimer implements Timer { public class HashedWheelTimer implements Timer {
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(HashedWheelTimer.class); InternalLoggerFactory.getInstance(HashedWheelTimer.class);
private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = private static final ResourceLeakDetector<HashedWheelTimer> leakDetector =
new ResourceLeakDetector<HashedWheelTimer>( new ResourceLeakDetector<HashedWheelTimer>(
@ -87,6 +87,7 @@ public class HashedWheelTimer implements Timer {
final Thread workerThread; final Thread workerThread;
final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down final AtomicInteger workerState = new AtomicInteger(); // 0 - init, 1 - started, 2 - shut down
private final long roundDuration;
final long tickDuration; final long tickDuration;
final Set<HashedWheelTimeout>[] wheel; final Set<HashedWheelTimeout>[] wheel;
final int mask; final int mask;
@ -200,6 +201,8 @@ public class HashedWheelTimer implements Timer {
throw new IllegalArgumentException("tickDuration is too long: " + tickDuration + ' ' + unit); throw new IllegalArgumentException("tickDuration is too long: " + tickDuration + ' ' + unit);
} }
roundDuration = tickDuration * wheel.length;
workerThread = threadFactory.newThread(worker); workerThread = threadFactory.newThread(worker);
} }
@ -240,17 +243,17 @@ public class HashedWheelTimer implements Timer {
*/ */
public void start() { public void start() {
switch (workerState.get()) { switch (workerState.get()) {
case 0: case 0:
if (workerState.compareAndSet(0, 1)) { if (workerState.compareAndSet(0, 1)) {
workerThread.start(); workerThread.start();
} }
break; break;
case 1: case 1:
break; break;
case 2: case 2:
throw new IllegalStateException("cannot be started once stopped"); throw new IllegalStateException("cannot be started once stopped");
default: default:
throw new Error(); throw new Error();
} }
} }
@ -259,8 +262,8 @@ public class HashedWheelTimer implements Timer {
if (Thread.currentThread() == workerThread) { if (Thread.currentThread() == workerThread) {
throw new IllegalStateException( throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() + HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " + ".stop() cannot be called from " +
TimerTask.class.getSimpleName()); TimerTask.class.getSimpleName());
} }
if (!workerState.compareAndSet(1, 2)) { if (!workerState.compareAndSet(1, 2)) {
@ -314,17 +317,20 @@ public class HashedWheelTimer implements Timer {
} }
void scheduleTimeout(HashedWheelTimeout timeout, long delay) { void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
long relativeIndex = (delay + tickDuration - 1) / tickDuration; // delay must be equal to or greater than tickDuration so that the
// worker thread never misses the timeout.
// if the previous line had an overflow going on, then well just schedule this timeout if (delay < tickDuration) {
// one tick early; that shouldnt matter since were talking 270 years here delay = tickDuration;
if (relativeIndex < 0) {
relativeIndex = delay / tickDuration;
} else if (relativeIndex == 0) {
relativeIndex = 1;
} }
final long remainingRounds = relativeIndex / wheel.length; // Prepare the required parameters to schedule the timeout object.
final long lastRoundDelay = delay % roundDuration;
final long lastTickDelay = delay % tickDuration;
final long relativeIndex =
lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
final long remainingRounds =
delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
// Add the timeout to the wheel. // Add the timeout to the wheel.
lock.readLock().lock(); lock.readLock().lock();
@ -332,6 +338,7 @@ public class HashedWheelTimer implements Timer {
int stopIndex = (int) (wheelCursor + relativeIndex & mask); int stopIndex = (int) (wheelCursor + relativeIndex & mask);
timeout.stopIndex = stopIndex; timeout.stopIndex = stopIndex;
timeout.remainingRounds = remainingRounds; timeout.remainingRounds = remainingRounds;
wheel[stopIndex].add(timeout); wheel[stopIndex].add(timeout);
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
@ -349,14 +356,14 @@ public class HashedWheelTimer implements Timer {
@Override @Override
public void run() { public void run() {
List<HashedWheelTimeout> expiredTimeouts = List<HashedWheelTimeout> expiredTimeouts =
new ArrayList<HashedWheelTimeout>(); new ArrayList<HashedWheelTimeout>();
startTime = System.currentTimeMillis(); startTime = System.currentTimeMillis();
tick = 1; tick = 1;
while (workerState.get() == 1) { while (workerState.get() == 1) {
final long deadline = waitForNextTick(); final long deadline = waitForNextTick();
if (deadline > Long.MIN_VALUE) { if (deadline > 0) {
fetchExpiredTimeouts(expiredTimeouts, deadline); fetchExpiredTimeouts(expiredTimeouts, deadline);
notifyExpiredTimeouts(expiredTimeouts); notifyExpiredTimeouts(expiredTimeouts);
} }
@ -388,7 +395,7 @@ public class HashedWheelTimer implements Timer {
HashedWheelTimeout timeout = i.next(); HashedWheelTimeout timeout = i.next();
if (timeout.remainingRounds <= 0) { if (timeout.remainingRounds <= 0) {
i.remove(); i.remove();
if (timeout.deadline - deadline <= 0) { if (timeout.deadline <= deadline) {
expiredTimeouts.add(timeout); expiredTimeouts.add(timeout);
} else { } else {
// Handle the case where the timeout is put into a wrong // Handle the case where the timeout is put into a wrong
@ -424,26 +431,12 @@ public class HashedWheelTimer implements Timer {
expiredTimeouts.clear(); expiredTimeouts.clear();
} }
/**
* calculate goal nanoTime from startTime and current tick number,
* then wait until that goal has been reached.
* @return Long.MIN_VALUE if received a shutdown request, current time otherwise
*/
private long waitForNextTick() { private long waitForNextTick() {
final long deadline = startTime + tickDuration * tick; long deadline = startTime + tickDuration * tick;
for (;;) { for (;;) {
final long currentTime = System.nanoTime(); final long currentTime = System.currentTimeMillis();
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; long sleepTime = tickDuration * tick - (currentTime - startTime);
if (sleepTimeMs <= 0) {
tick++;
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// Check if we run on windows, as if thats the case we will need // Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect // to round the sleepTime as workaround for a bug that only affect
@ -451,17 +444,25 @@ public class HashedWheelTimer implements Timer {
// //
// See https://github.com/netty/netty/issues/356 // See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) { if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10; sleepTime = sleepTime / 10 * 10;
}
if (sleepTime <= 0) {
break;
} }
try { try {
Thread.sleep(sleepTimeMs); Thread.sleep(sleepTime);
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (workerState.get() != 1) { if (workerState.get() != 1) {
return Long.MIN_VALUE; return -1;
} }
} }
} }
// Increase the tick.
tick ++;
return deadline;
} }
} }
@ -521,9 +522,7 @@ public class HashedWheelTimer implements Timer {
task.run(this); task.run(this);
} catch (Throwable t) { } catch (Throwable t) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn( logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
"An exception was thrown by " +
TimerTask.class.getSimpleName() + '.', t);
} }
} }
} }