diff --git a/buffer/src/main/java/io/netty/array/ObjectArray.java b/buffer/src/main/java/io/netty/array/ObjectArray.java new file mode 100644 index 0000000000..5d1805f618 --- /dev/null +++ b/buffer/src/main/java/io/netty/array/ObjectArray.java @@ -0,0 +1,14 @@ +package io.netty.array; + + +public class ObjectArray extends AbstractArray { + + public ObjectArray(E[] array, int offset, int length) { + super(array, offset, length); + } + + @Override + public E[] array() { + return (E[]) super.array(); + } +} diff --git a/common/src/main/java/io/netty/util/MapBackedSet.java b/common/src/main/java/io/netty/util/MapBackedSet.java deleted file mode 100644 index 68a32c9db0..0000000000 --- a/common/src/main/java/io/netty/util/MapBackedSet.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util; - -import java.io.Serializable; -import java.util.AbstractSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -/** - * A {@link Map}-backed {@link Set}. - */ -final class MapBackedSet extends AbstractSet implements Serializable { - - private static final long serialVersionUID = -6761513279741915432L; - - private final Map map; - - /** - * Creates a new instance which wraps the specified {@code map}. - */ - MapBackedSet(Map map) { - this.map = map; - } - - @Override - public int size() { - return map.size(); - } - - @Override - public boolean contains(Object o) { - return map.containsKey(o); - } - - @Override - public boolean add(E o) { - return map.put(o, Boolean.TRUE) == null; - } - - @Override - public boolean remove(Object o) { - return map.remove(o) != null; - } - - @Override - public void clear() { - map.clear(); - } - - @Override - public Iterator iterator() { - return map.keySet().iterator(); - } -} diff --git a/common/src/test/java/io/netty/util/MapBackedSetTest.java b/common/src/test/java/io/netty/util/MapBackedSetTest.java deleted file mode 100644 index 5500063e9f..0000000000 --- a/common/src/test/java/io/netty/util/MapBackedSetTest.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util; - -import static org.easymock.EasyMock.*; -import static org.junit.Assert.*; - -import java.util.Iterator; -import java.util.Map; -import java.util.Set; - -import org.junit.Test; - -public class MapBackedSetTest { - - @Test - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void testSize() { - Map map = createStrictMock(Map.class); - expect(map.size()).andReturn(0); - replay(map); - - assertEquals(0, new MapBackedSet(map).size()); - verify(map); - } - - @Test - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void testContains() { - Map map = createStrictMock(Map.class); - expect(map.containsKey("key")).andReturn(true); - replay(map); - - assertTrue(new MapBackedSet(map).contains("key")); - verify(map); - } - - - @Test - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void testRemove() { - Map map = createStrictMock(Map.class); - expect(map.remove("key")).andReturn(true); - expect(map.remove("key")).andReturn(null); - replay(map); - - assertTrue(new MapBackedSet(map).remove("key")); - assertFalse(new MapBackedSet(map).remove("key")); - verify(map); - } - - @Test - @SuppressWarnings({"unchecked", "rawtypes"}) - public void testAdd() { - Map map = createStrictMock(Map.class); - expect(map.put("key", true)).andReturn(null); - expect(map.put("key", true)).andReturn(true); - replay(map); - - assertTrue(new MapBackedSet(map).add("key")); - assertFalse(new MapBackedSet(map).add("key")); - verify(map); - } - - @Test - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void testClear() { - Map map = createStrictMock(Map.class); - map.clear(); - replay(map); - - new MapBackedSet(map).clear(); - verify(map); - } - - @Test - @SuppressWarnings({ "unchecked", "rawtypes" }) - public void testIterator() { - Map map = createStrictMock(Map.class); - Set keySet = createStrictMock(Set.class); - Iterator keySetIterator = createStrictMock(Iterator.class); - - expect(map.keySet()).andReturn(keySet); - expect(keySet.iterator()).andReturn(keySetIterator); - replay(map); - replay(keySet); - replay(keySetIterator); - - assertSame(keySetIterator, new MapBackedSet(map).iterator()); - - verify(map); - verify(keySet); - verify(keySetIterator); - } -} diff --git a/transport/src/main/java/io/netty/channel/EventLoop.java b/transport/src/main/java/io/netty/channel/EventLoop.java index e9bf14f77a..bd2a702c90 100644 --- a/transport/src/main/java/io/netty/channel/EventLoop.java +++ b/transport/src/main/java/io/netty/channel/EventLoop.java @@ -1,8 +1,8 @@ package io.netty.channel; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; -public interface EventLoop extends ExecutorService { +public interface EventLoop extends ScheduledExecutorService { ChannelFuture register(Channel channel); ChannelFuture register(Channel channel, ChannelFuture future); boolean inEventLoop(); diff --git a/transport/src/main/java/io/netty/channel/GlobalScheduledExecutorService.java b/transport/src/main/java/io/netty/channel/GlobalScheduledExecutorService.java new file mode 100644 index 0000000000..b3f2cf3cb4 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/GlobalScheduledExecutorService.java @@ -0,0 +1,200 @@ +package io.netty.channel; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A global single-threaded {@link ScheduledExecutorService} which is purposed + * to trigger scheduled events in {@link SingleThreadEventLoop}. + */ +public final class GlobalScheduledExecutorService implements ScheduledExecutorService { + + private static final GlobalScheduledExecutorService INSTANCE = new GlobalScheduledExecutorService(); + + public static final GlobalScheduledExecutorService instance() { + return INSTANCE; + } + + private final ThreadFactory threadFactory = Executors.defaultThreadFactory(); + private final ScheduledThreadPoolExecutor timer; + + private GlobalScheduledExecutorService() { + timer = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + Thread t = threadFactory.newThread(r); + t.setDaemon(true); + t.setName(String.format("EventLoopTimer-%08x", GlobalScheduledExecutorService.this.hashCode())); + return t; + } + }); + + // Avoid unnecessary memory consumption on a burst of cancellation. + timer.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + timer.purge(); + } + }, 1, 1, TimeUnit.SECONDS); + } + + @Override + protected void finalize() throws Throwable { + shutdownNow(); + super.finalize(); + } + + @Override + public void shutdown() { + timer.shutdown(); + } + + @Override + public List shutdownNow() { + return timer.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return timer.isShutdown(); + } + + @Override + public boolean isTerminated() { + return timer.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (unit == null) { + throw new NullPointerException("unit"); + } + return timer.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + if (task == null) { + throw new NullPointerException("task"); + } + return timer.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + if (task == null) { + throw new NullPointerException("task"); + } + return timer.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + if (task == null) { + throw new NullPointerException("task"); + } + + return timer.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + if (tasks == null) { + throw new NullPointerException("tasks"); + } + + return timer.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + if (tasks == null) { + throw new NullPointerException("tasks"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + return timer.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + if (tasks == null) { + throw new NullPointerException("tasks"); + } + return timer.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (tasks == null) { + throw new NullPointerException("tasks"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + return timer.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + if (command == null) { + throw new NullPointerException("command"); + } + timer.execute(command); + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + return timer.schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + if (callable == null) { + throw new NullPointerException("callable"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + return timer.schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + return timer.scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + return timer.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } +} diff --git a/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java b/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java index 60b4d8a648..de580b963f 100644 --- a/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/MultithreadEventLoop.java @@ -7,6 +7,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -153,6 +154,27 @@ public class MultithreadEventLoop implements EventLoop { currentEventLoop().execute(command); } + @Override + public ScheduledFuture schedule(Runnable command, long delay, + TimeUnit unit) { + return currentEventLoop().schedule(command, delay, unit); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return currentEventLoop().schedule(callable, delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return currentEventLoop().scheduleAtFixedRate(command, initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return currentEventLoop().scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + @Override public ChannelFuture register(Channel channel) { return nextEventLoop().register(channel); diff --git a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java index c474b58aaa..fc418c25fe 100644 --- a/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java +++ b/transport/src/main/java/io/netty/channel/SingleThreadEventLoop.java @@ -3,25 +3,50 @@ package io.netty.channel; import io.netty.util.internal.QueueFactory; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.Queue; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop { + private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10); + private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); + private static final long START_TIME = System.nanoTime(); + private static final AtomicLong nextTaskId = new AtomicLong(); + static final ThreadLocal CURRENT_EVENT_LOOP = new ThreadLocal(); + private static long nanoTime() { + return System.nanoTime() - START_TIME; + } + + private static long deadlineNanos(long delay) { + return nanoTime() + delay; + } + + // Fields for event loop private final BlockingQueue taskQueue = QueueFactory.createQueue(Runnable.class); private final Thread thread; private final Object stateLock = new Object(); private final Semaphore threadLock = new Semaphore(0); + private final Queue> scheduledTasks = new DelayQueue>(); /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ private volatile int state; + private long lastCheckTimeNanos; + private long lastPurgeTimeNanos; protected SingleThreadEventLoop() { this(Executors.defaultThreadFactory()); @@ -39,6 +64,7 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl state = 3; } try { + cancelScheduledTasks(); cleanup(); } finally { threadLock.release(); @@ -78,12 +104,23 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl protected Runnable pollTask() { assert inEventLoop(); - return taskQueue.poll(); + Runnable task = taskQueue.poll(); + if (task == null) { + fetchScheduledTasks(); + task = taskQueue.poll(); + } + return task; } protected Runnable takeTask() throws InterruptedException { assert inEventLoop(); - return taskQueue.take(); + for (;;) { + Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS); + if (task != null) { + return task; + } + fetchScheduledTasks(); + } } protected Runnable peekTask() { @@ -200,9 +237,6 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl } if (inEventLoop()) { - if (isShutdown()) { - reject(); - } addTask(task); wakeup(true); } else { @@ -223,4 +257,226 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl private static void reject() { throw new RejectedExecutionException("event loop shut down"); } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask(command, null, deadlineNanos(unit.toNanos(delay)))); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + if (callable == null) { + throw new NullPointerException("callable"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (delay < 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: >= 0)", delay)); + } + return schedule(new ScheduledFutureTask(callable, deadlineNanos(unit.toNanos(delay)))); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (period <= 0) { + throw new IllegalArgumentException( + String.format("period: %d (expected: > 0)", period)); + } + + return schedule(new ScheduledFutureTask( + command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + if (command == null) { + throw new NullPointerException("command"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (initialDelay < 0) { + throw new IllegalArgumentException( + String.format("initialDelay: %d (expected: >= 0)", initialDelay)); + } + if (delay <= 0) { + throw new IllegalArgumentException( + String.format("delay: %d (expected: > 0)", delay)); + } + + return schedule(new ScheduledFutureTask( + command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); + } + + private ScheduledFuture schedule(ScheduledFutureTask task) { + if (isShutdown()) { + reject(); + } + scheduledTasks.add(task); + if (isShutdown()) { + task.cancel(false); + } + + if (!inEventLoop()) { + synchronized (stateLock) { + if (state == 0) { + state = 1; + thread.start(); + } + } + } else { + fetchScheduledTasks(); + } + + return task; + } + + private void fetchScheduledTasks() { + if (scheduledTasks.isEmpty()) { + return; + } + + long nanoTime = nanoTime(); + if (nanoTime - lastPurgeTimeNanos >= SCHEDULE_PURGE_INTERVAL) { + for (Iterator> i = scheduledTasks.iterator(); i.hasNext();) { + ScheduledFutureTask task = i.next(); + if (task.isCancelled()) { + i.remove(); + } + } + } + + if (nanoTime - lastCheckTimeNanos >= SCHEDULE_CHECK_INTERVAL) { + for (;;) { + ScheduledFutureTask task = scheduledTasks.poll(); + if (task == null) { + break; + } + + if (!task.isCancelled()) { + if (isShutdown()) { + task.cancel(false); + } else { + taskQueue.add(task); + } + } + } + } + } + + private void cancelScheduledTasks() { + if (scheduledTasks.isEmpty()) { + return; + } + + for (ScheduledFutureTask task: scheduledTasks.toArray(new ScheduledFutureTask[scheduledTasks.size()])) { + task.cancel(false); + } + scheduledTasks.clear(); + } + + private class ScheduledFutureTask extends FutureTask implements ScheduledFuture { + + private final long id = nextTaskId.getAndIncrement(); + private long deadlineNanos; + /** 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ + private final long periodNanos; + + ScheduledFutureTask(Runnable runnable, V result, long nanoTime) { + super(runnable, result); + this.deadlineNanos = nanoTime; + this.periodNanos = 0; + } + + ScheduledFutureTask(Runnable runnable, V result, long nanoTime, long period) { + super(runnable, result); + if (period == 0) { + throw new IllegalArgumentException( + String.format("period: %d (expected: != 0)", period)); + } + this.deadlineNanos = nanoTime; + this.periodNanos = period; + } + + ScheduledFutureTask(Callable callable, long nanoTime) { + super(callable); + this.deadlineNanos = nanoTime; + this.periodNanos = 0; + } + + public long deadlineNanos() { + return deadlineNanos; + } + + public long delayNanos() { + return Math.max(0, deadlineNanos() - nanoTime()); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (this == o) { + return 0; + } + + ScheduledFutureTask that = (ScheduledFutureTask) o; + long d = deadlineNanos() - that.deadlineNanos(); + if (d < 0) { + return -1; + } else if (d > 0) { + return 1; + } else if (id < that.id) { + return -1; + } else if (id == that.id) { + throw new Error(); + } else { + return 1; + } + } + + @Override + public void run() { + if (periodNanos == 0) { + super.run(); + } else { + boolean reset = runAndReset(); + if (reset && !isShutdown()) { + long p = periodNanos; + if (p > 0) { + deadlineNanos += p; + } else { + deadlineNanos = nanoTime() - p; + } + + schedule(this); + } + } + } + } } diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 218e134644..a8a27b2bbb 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -1,16 +1,20 @@ package io.netty.channel; +import static org.junit.Assert.*; + +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.*; - public class SingleThreadEventLoopTest { private SingleThreadEventLoopImpl loop; @@ -78,6 +82,122 @@ public class SingleThreadEventLoopTest { assertTrue(interrupted.get()); } + @Test + public void scheduleTask() throws Exception { + long startTime = System.nanoTime(); + final AtomicLong endTime = new AtomicLong(); + loop.schedule(new Runnable() { + @Override + public void run() { + endTime.set(System.nanoTime()); + } + }, 500, TimeUnit.MILLISECONDS).get(); + assertTrue(endTime.get() - startTime >= TimeUnit.MILLISECONDS.toNanos(500)); + } + + @Test + public void scheduleTaskAtFixedRate() throws Exception { + final Queue timestamps = new LinkedBlockingQueue(); + ScheduledFuture f = loop.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + timestamps.add(System.nanoTime()); + try { + Thread.sleep(50); + } catch (InterruptedException e) { + // Ignore + } + } + }, 100, 100, TimeUnit.MILLISECONDS); + Thread.sleep(550); + assertTrue(f.cancel(true)); + assertEquals(5, timestamps.size()); + + // Check if the task was run without a lag. + Long previousTimestamp = null; + for (Long t: timestamps) { + if (previousTimestamp == null) { + previousTimestamp = t; + continue; + } + + assertTrue(t.longValue() - previousTimestamp.longValue() >= TimeUnit.MILLISECONDS.toNanos(90)); + previousTimestamp = t; + } + } + + @Test + public void scheduleLaggyTaskAtFixedRate() throws Exception { + final Queue timestamps = new LinkedBlockingQueue(); + ScheduledFuture f = loop.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + boolean empty = timestamps.isEmpty(); + timestamps.add(System.nanoTime()); + if (empty) { + try { + Thread.sleep(400); + } catch (InterruptedException e) { + // Ignore + } + } + } + }, 100, 100, TimeUnit.MILLISECONDS); + Thread.sleep(550); + assertTrue(f.cancel(true)); + assertEquals(5, timestamps.size()); + + // Check if the task was run with lag. + int i = 0; + Long previousTimestamp = null; + for (Long t: timestamps) { + if (previousTimestamp == null) { + previousTimestamp = t; + continue; + } + + long diff = t.longValue() - previousTimestamp.longValue(); + if (i == 0) { + assertTrue(diff >= TimeUnit.MILLISECONDS.toNanos(400)); + } else { + assertTrue(diff <= TimeUnit.MILLISECONDS.toNanos(10)); + } + previousTimestamp = t; + i ++; + } + } + + @Test + public void scheduleTaskWithFixedDelay() throws Exception { + final Queue timestamps = new LinkedBlockingQueue(); + ScheduledFuture f = loop.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + timestamps.add(System.nanoTime()); + try { + Thread.sleep(50); + } catch (InterruptedException e) { + // Ignore + } + } + }, 100, 100, TimeUnit.MILLISECONDS); + Thread.sleep(500); + assertTrue(f.cancel(true)); + assertEquals(3, timestamps.size()); + + // Check if the task was run without a lag. + Long previousTimestamp = null; + for (Long t: timestamps) { + if (previousTimestamp == null) { + previousTimestamp = t; + continue; + } + + assertTrue(t.longValue() - previousTimestamp.longValue() >= TimeUnit.MILLISECONDS.toNanos(150)); + previousTimestamp = t; + } + } + @Test public void shutdownWithPendingTasks() throws Exception { final int NUM_TASKS = 3; @@ -143,6 +263,7 @@ public class SingleThreadEventLoopTest { } } + @Override protected void cleanup() { cleanedUp.incrementAndGet(); } @@ -155,8 +276,9 @@ public class SingleThreadEventLoopTest { } @Override - public void register(Channel channel, ChannelFuture future) { + public ChannelFuture register(Channel channel, ChannelFuture future) { // Untested + return future; } } }