diff --git a/src/main/java/org/jboss/netty/util/HashedWheelTimer.java b/src/main/java/org/jboss/netty/util/HashedWheelTimer.java index 815fcf7f3a..399640a8a1 100644 --- a/src/main/java/org/jboss/netty/util/HashedWheelTimer.java +++ b/src/main/java/org/jboss/netty/util/HashedWheelTimer.java @@ -121,7 +121,14 @@ public class HashedWheelTimer implements Timer { ExecutorUtil.terminate(executor); } - public Timeout newTimeout(long initialDelay, TimeUnit unit) { + public Timeout newTimeout(TimerTask task, long initialDelay, TimeUnit unit) { + if (task == null) { + throw new NullPointerException("task"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + initialDelay = unit.toNanos(initialDelay); checkDelay(initialDelay); @@ -130,7 +137,7 @@ public class HashedWheelTimer implements Timer { lock.readLock().lock(); try { timeout = new HashedWheelTimeout( - wheelCursor, System.nanoTime(), initialDelay); + task, wheelCursor, System.nanoTime(), initialDelay); wheel[schedule(timeout)].add(timeout); } finally { @@ -168,7 +175,7 @@ public class HashedWheelTimer implements Timer { void checkDelay(long delay) { if (delay < tickDuration) { throw new IllegalArgumentException( - "delay must be greated than " + + "delay must be greater than " + tickDuration + " nanoseconds"); } } @@ -271,6 +278,8 @@ public class HashedWheelTimer implements Timer { private final class HashedWheelTimeout implements Timeout { + private final TimerTask task; + final int startIndex; int stopIndex; @@ -284,18 +293,29 @@ public class HashedWheelTimer implements Timer { long slippedRounds; private volatile int extensionCount; + private volatile boolean cancelled; - HashedWheelTimeout(int startIndex, long startTime, long initialDelay) { + HashedWheelTimeout(TimerTask task, int startIndex, long startTime, long initialDelay) { + this.task = task; this.startIndex = startIndex; this.startTime = startTime; this.initialDelay = initialDelay; } - public synchronized void cancel() { - if (!wheel[stopIndex].remove(this)) { + public TimerTask getTask() { + return task; + } + + public void cancel() { + if (cancelled) { return; } - // TODO Notify handlers + + synchronized (this) { + if (!wheel[stopIndex].remove(this)) { + return; + } + } } public void extend() { @@ -308,44 +328,52 @@ public class HashedWheelTimer implements Timer { private void extend(long additionalDelay) { checkDelay(additionalDelay); + if (cancelled) { + throw new IllegalStateException("cancelled"); + } + lock.readLock().lock(); try { int newStopIndex; synchronized (this) { newStopIndex = stopIndex = schedule(this, additionalDelay); - extensionCount ++; } wheel[newStopIndex].add(this); } finally { + extensionCount ++; lock.readLock().unlock(); } } - public synchronized int getExtensionCount() { + public int getExtensionCount() { return extensionCount; } public boolean isCancelled() { - return false; + return cancelled; } public boolean isExpired() { - return false; - } + if (cancelled) { + return false; + } - public boolean isExtended() { - return false; + long currentTime = System.nanoTime(); + synchronized (this) { + return currentTime > deadline; + } } public void expire() { - System.out.println("BOOM: " + (System.nanoTime() - startTime) / 1000000); - extend(); - } + if (cancelled) { + return; + } - public void addListener(TimeoutListener listener) { - } - - public void removeListener(TimeoutListener listener) { + try { + task.run(this); + } catch (Throwable t) { + // FIXME log the exception + } } @Override @@ -358,19 +386,41 @@ public class HashedWheelTimer implements Timer { StringBuilder buf = new StringBuilder(); buf.append("TimingWheelTimeout("); - buf.append("timeout: "); - buf.append(initialDelay); - buf.append("ns, "); + buf.append("initialDelay: "); + buf.append(initialDelay / 1000000); + buf.append(" ms, "); + + buf.append("cumulativeDelay: "); + buf.append(cumulativeDelay / 1000000); + buf.append(" ms, "); buf.append("deadline: "); if (remaining > 0) { - buf.append(remaining); - buf.append("ns later"); + buf.append(remaining / 1000000); + buf.append(" ms later, "); } else if (remaining < 0) { - buf.append(-remaining); - buf.append("ns ago"); + buf.append(-remaining / 1000000); + buf.append(" ms ago, "); } else { - buf.append("now"); + buf.append("now, "); + } + + buf.append("extended: "); + switch (getExtensionCount()) { + case 0: + buf.append("never"); + break; + case 1: + buf.append("once"); + break; + default: + buf.append(getExtensionCount()); + buf.append(" times"); + break; + } + + if (isCancelled()) { + buf.append (", cancelled"); } return buf.append(')').toString(); @@ -380,9 +430,14 @@ public class HashedWheelTimer implements Timer { public static void main(String[] args) throws Exception { Timer timer = new HashedWheelTimer( Executors.newCachedThreadPool(), - 1, TimeUnit.SECONDS, 4); + 100, TimeUnit.MILLISECONDS, 4); //Timeout timeout = timer.newTimeout(1200, TimeUnit.MILLISECONDS); - timer.newTimeout(1200, TimeUnit.MILLISECONDS); + timer.newTimeout(new TimerTask() { + public void run(Timeout timeout) throws Exception { + System.out.println(timeout.getExtensionCount() + ": " + timeout); + timeout.extend(); + } + }, 1200, TimeUnit.MILLISECONDS); } } diff --git a/src/main/java/org/jboss/netty/util/Timeout.java b/src/main/java/org/jboss/netty/util/Timeout.java index fbf994a427..3d10a667e9 100644 --- a/src/main/java/org/jboss/netty/util/Timeout.java +++ b/src/main/java/org/jboss/netty/util/Timeout.java @@ -30,15 +30,13 @@ import java.util.concurrent.TimeUnit; * @version $Rev$, $Date$ */ public interface Timeout { + TimerTask getTask(); + boolean isExpired(); boolean isCancelled(); - boolean isExtended(); int getExtensionCount(); void extend(); void extend(long extensionTime, TimeUnit unit); void cancel(); - - void addListener(TimeoutListener listener); - void removeListener(TimeoutListener listener); } diff --git a/src/main/java/org/jboss/netty/util/Timer.java b/src/main/java/org/jboss/netty/util/Timer.java index faf46bc946..d85badfc50 100644 --- a/src/main/java/org/jboss/netty/util/Timer.java +++ b/src/main/java/org/jboss/netty/util/Timer.java @@ -30,5 +30,5 @@ import java.util.concurrent.TimeUnit; * @version $Rev$, $Date$ */ public interface Timer extends ExternalResourceReleasable { - Timeout newTimeout(long timeout, TimeUnit unit); + Timeout newTimeout(TimerTask task, long timeout, TimeUnit unit); } diff --git a/src/main/java/org/jboss/netty/util/TimeoutListener.java b/src/main/java/org/jboss/netty/util/TimerTask.java similarity index 87% rename from src/main/java/org/jboss/netty/util/TimeoutListener.java rename to src/main/java/org/jboss/netty/util/TimerTask.java index 4e029e2f5f..e7fd1db529 100644 --- a/src/main/java/org/jboss/netty/util/TimeoutListener.java +++ b/src/main/java/org/jboss/netty/util/TimerTask.java @@ -27,8 +27,6 @@ package org.jboss.netty.util; * @author Trustin Lee (tlee@redhat.com) * @version $Rev$, $Date$ */ -public interface TimeoutListener { - void timeoutExpired(Timeout timeout); - void timeoutExtended(Timeout timeout); - void timeoutCancelled(Timeout timeout); +public interface TimerTask { + void run(Timeout timeout) throws Exception; }