Make EventLoop a ScheduledExecutorService
- SingleThreadEventLoop now implements ScheduledExecutorService - Scheduled tasks are automatically fetched into taskQueue by pollTask() and takeTask() - Removed MapBackedSet because Java 6 provides it
This commit is contained in:
parent
a4678a6030
commit
83026f29a4
14
buffer/src/main/java/io/netty/array/ObjectArray.java
Normal file
14
buffer/src/main/java/io/netty/array/ObjectArray.java
Normal file
@ -0,0 +1,14 @@
|
||||
package io.netty.array;
|
||||
|
||||
|
||||
public class ObjectArray<E> extends AbstractArray<E> {
|
||||
|
||||
public ObjectArray(E[] array, int offset, int length) {
|
||||
super(array, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public E[] array() {
|
||||
return (E[]) super.array();
|
||||
}
|
||||
}
|
@ -1,69 +0,0 @@
|
||||
/*
|
||||
* Copyright 2011 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.AbstractSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A {@link Map}-backed {@link Set}.
|
||||
*/
|
||||
final class MapBackedSet<E> extends AbstractSet<E> implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = -6761513279741915432L;
|
||||
|
||||
private final Map<E, Boolean> map;
|
||||
|
||||
/**
|
||||
* Creates a new instance which wraps the specified {@code map}.
|
||||
*/
|
||||
MapBackedSet(Map<E, Boolean> map) {
|
||||
this.map = map;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return map.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean contains(Object o) {
|
||||
return map.containsKey(o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean add(E o) {
|
||||
return map.put(o, Boolean.TRUE) == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Object o) {
|
||||
return map.remove(o) != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
map.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<E> iterator() {
|
||||
return map.keySet().iterator();
|
||||
}
|
||||
}
|
@ -1,108 +0,0 @@
|
||||
/*
|
||||
* Copyright 2011 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.util;
|
||||
|
||||
import static org.easymock.EasyMock.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class MapBackedSetTest {
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void testSize() {
|
||||
Map map = createStrictMock(Map.class);
|
||||
expect(map.size()).andReturn(0);
|
||||
replay(map);
|
||||
|
||||
assertEquals(0, new MapBackedSet(map).size());
|
||||
verify(map);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void testContains() {
|
||||
Map map = createStrictMock(Map.class);
|
||||
expect(map.containsKey("key")).andReturn(true);
|
||||
replay(map);
|
||||
|
||||
assertTrue(new MapBackedSet(map).contains("key"));
|
||||
verify(map);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void testRemove() {
|
||||
Map map = createStrictMock(Map.class);
|
||||
expect(map.remove("key")).andReturn(true);
|
||||
expect(map.remove("key")).andReturn(null);
|
||||
replay(map);
|
||||
|
||||
assertTrue(new MapBackedSet(map).remove("key"));
|
||||
assertFalse(new MapBackedSet(map).remove("key"));
|
||||
verify(map);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public void testAdd() {
|
||||
Map map = createStrictMock(Map.class);
|
||||
expect(map.put("key", true)).andReturn(null);
|
||||
expect(map.put("key", true)).andReturn(true);
|
||||
replay(map);
|
||||
|
||||
assertTrue(new MapBackedSet(map).add("key"));
|
||||
assertFalse(new MapBackedSet(map).add("key"));
|
||||
verify(map);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void testClear() {
|
||||
Map map = createStrictMock(Map.class);
|
||||
map.clear();
|
||||
replay(map);
|
||||
|
||||
new MapBackedSet(map).clear();
|
||||
verify(map);
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void testIterator() {
|
||||
Map map = createStrictMock(Map.class);
|
||||
Set keySet = createStrictMock(Set.class);
|
||||
Iterator keySetIterator = createStrictMock(Iterator.class);
|
||||
|
||||
expect(map.keySet()).andReturn(keySet);
|
||||
expect(keySet.iterator()).andReturn(keySetIterator);
|
||||
replay(map);
|
||||
replay(keySet);
|
||||
replay(keySetIterator);
|
||||
|
||||
assertSame(keySetIterator, new MapBackedSet(map).iterator());
|
||||
|
||||
verify(map);
|
||||
verify(keySet);
|
||||
verify(keySetIterator);
|
||||
}
|
||||
}
|
@ -1,8 +1,8 @@
|
||||
package io.netty.channel;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
||||
public interface EventLoop extends ExecutorService {
|
||||
public interface EventLoop extends ScheduledExecutorService {
|
||||
ChannelFuture register(Channel channel);
|
||||
ChannelFuture register(Channel channel, ChannelFuture future);
|
||||
boolean inEventLoop();
|
||||
|
@ -0,0 +1,200 @@
|
||||
package io.netty.channel;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/**
|
||||
* A global single-threaded {@link ScheduledExecutorService} which is purposed
|
||||
* to trigger scheduled events in {@link SingleThreadEventLoop}.
|
||||
*/
|
||||
public final class GlobalScheduledExecutorService implements ScheduledExecutorService {
|
||||
|
||||
private static final GlobalScheduledExecutorService INSTANCE = new GlobalScheduledExecutorService();
|
||||
|
||||
public static final GlobalScheduledExecutorService instance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
private final ThreadFactory threadFactory = Executors.defaultThreadFactory();
|
||||
private final ScheduledThreadPoolExecutor timer;
|
||||
|
||||
private GlobalScheduledExecutorService() {
|
||||
timer = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = threadFactory.newThread(r);
|
||||
t.setDaemon(true);
|
||||
t.setName(String.format("EventLoopTimer-%08x", GlobalScheduledExecutorService.this.hashCode()));
|
||||
return t;
|
||||
}
|
||||
});
|
||||
|
||||
// Avoid unnecessary memory consumption on a burst of cancellation.
|
||||
timer.scheduleWithFixedDelay(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
timer.purge();
|
||||
}
|
||||
}, 1, 1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void finalize() throws Throwable {
|
||||
shutdownNow();
|
||||
super.finalize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
timer.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Runnable> shutdownNow() {
|
||||
return timer.shutdownNow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShutdown() {
|
||||
return timer.isShutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTerminated() {
|
||||
return timer.isTerminated();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
return timer.awaitTermination(timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Callable<T> task) {
|
||||
if (task == null) {
|
||||
throw new NullPointerException("task");
|
||||
}
|
||||
return timer.submit(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Runnable task, T result) {
|
||||
if (task == null) {
|
||||
throw new NullPointerException("task");
|
||||
}
|
||||
return timer.submit(task, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<?> submit(Runnable task) {
|
||||
if (task == null) {
|
||||
throw new NullPointerException("task");
|
||||
}
|
||||
|
||||
return timer.submit(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
|
||||
return timer.invokeAll(tasks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
return timer.invokeAll(tasks, timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
return timer.invokeAny(tasks);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
|
||||
if (tasks == null) {
|
||||
throw new NullPointerException("tasks");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
return timer.invokeAny(tasks, timeout, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
timer.execute(command);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
return timer.schedule(command, delay, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
||||
if (callable == null) {
|
||||
throw new NullPointerException("callable");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
return timer.schedule(callable, delay, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
return timer.scheduleAtFixedRate(command, initialDelay, period, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
return timer.scheduleWithFixedDelay(command, initialDelay, delay, unit);
|
||||
}
|
||||
}
|
@ -7,6 +7,7 @@ import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
@ -153,6 +154,27 @@ public class MultithreadEventLoop implements EventLoop {
|
||||
currentEventLoop().execute(command);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay,
|
||||
TimeUnit unit) {
|
||||
return currentEventLoop().schedule(command, delay, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
||||
return currentEventLoop().schedule(callable, delay, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||
return currentEventLoop().scheduleAtFixedRate(command, initialDelay, period, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
return currentEventLoop().scheduleWithFixedDelay(command, initialDelay, delay, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture register(Channel channel) {
|
||||
return nextEventLoop().register(channel);
|
||||
|
@ -3,25 +3,50 @@ package io.netty.channel;
|
||||
import io.netty.util.internal.QueueFactory;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.AbstractExecutorService;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.DelayQueue;
|
||||
import java.util.concurrent.Delayed;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop {
|
||||
|
||||
private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10);
|
||||
private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1);
|
||||
private static final long START_TIME = System.nanoTime();
|
||||
private static final AtomicLong nextTaskId = new AtomicLong();
|
||||
|
||||
static final ThreadLocal<SingleThreadEventLoop> CURRENT_EVENT_LOOP = new ThreadLocal<SingleThreadEventLoop>();
|
||||
|
||||
private static long nanoTime() {
|
||||
return System.nanoTime() - START_TIME;
|
||||
}
|
||||
|
||||
private static long deadlineNanos(long delay) {
|
||||
return nanoTime() + delay;
|
||||
}
|
||||
|
||||
// Fields for event loop
|
||||
private final BlockingQueue<Runnable> taskQueue = QueueFactory.createQueue(Runnable.class);
|
||||
private final Thread thread;
|
||||
private final Object stateLock = new Object();
|
||||
private final Semaphore threadLock = new Semaphore(0);
|
||||
private final Queue<ScheduledFutureTask<?>> scheduledTasks = new DelayQueue<ScheduledFutureTask<?>>();
|
||||
/** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */
|
||||
private volatile int state;
|
||||
private long lastCheckTimeNanos;
|
||||
private long lastPurgeTimeNanos;
|
||||
|
||||
protected SingleThreadEventLoop() {
|
||||
this(Executors.defaultThreadFactory());
|
||||
@ -39,6 +64,7 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
|
||||
state = 3;
|
||||
}
|
||||
try {
|
||||
cancelScheduledTasks();
|
||||
cleanup();
|
||||
} finally {
|
||||
threadLock.release();
|
||||
@ -78,12 +104,23 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
|
||||
|
||||
protected Runnable pollTask() {
|
||||
assert inEventLoop();
|
||||
return taskQueue.poll();
|
||||
Runnable task = taskQueue.poll();
|
||||
if (task == null) {
|
||||
fetchScheduledTasks();
|
||||
task = taskQueue.poll();
|
||||
}
|
||||
return task;
|
||||
}
|
||||
|
||||
protected Runnable takeTask() throws InterruptedException {
|
||||
assert inEventLoop();
|
||||
return taskQueue.take();
|
||||
for (;;) {
|
||||
Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS);
|
||||
if (task != null) {
|
||||
return task;
|
||||
}
|
||||
fetchScheduledTasks();
|
||||
}
|
||||
}
|
||||
|
||||
protected Runnable peekTask() {
|
||||
@ -200,9 +237,6 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
|
||||
}
|
||||
|
||||
if (inEventLoop()) {
|
||||
if (isShutdown()) {
|
||||
reject();
|
||||
}
|
||||
addTask(task);
|
||||
wakeup(true);
|
||||
} else {
|
||||
@ -223,4 +257,226 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
|
||||
private static void reject() {
|
||||
throw new RejectedExecutionException("event loop shut down");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (delay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("delay: %d (expected: >= 0)", delay));
|
||||
}
|
||||
return schedule(new ScheduledFutureTask<Void>(command, null, deadlineNanos(unit.toNanos(delay))));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
|
||||
if (callable == null) {
|
||||
throw new NullPointerException("callable");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (delay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("delay: %d (expected: >= 0)", delay));
|
||||
}
|
||||
return schedule(new ScheduledFutureTask<V>(callable, deadlineNanos(unit.toNanos(delay))));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (initialDelay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
||||
}
|
||||
if (period <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("period: %d (expected: > 0)", period));
|
||||
}
|
||||
|
||||
return schedule(new ScheduledFutureTask<Void>(
|
||||
command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
if (command == null) {
|
||||
throw new NullPointerException("command");
|
||||
}
|
||||
if (unit == null) {
|
||||
throw new NullPointerException("unit");
|
||||
}
|
||||
if (initialDelay < 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("initialDelay: %d (expected: >= 0)", initialDelay));
|
||||
}
|
||||
if (delay <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("delay: %d (expected: > 0)", delay));
|
||||
}
|
||||
|
||||
return schedule(new ScheduledFutureTask<Void>(
|
||||
command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
|
||||
}
|
||||
|
||||
private <V> ScheduledFuture<V> schedule(ScheduledFutureTask<V> task) {
|
||||
if (isShutdown()) {
|
||||
reject();
|
||||
}
|
||||
scheduledTasks.add(task);
|
||||
if (isShutdown()) {
|
||||
task.cancel(false);
|
||||
}
|
||||
|
||||
if (!inEventLoop()) {
|
||||
synchronized (stateLock) {
|
||||
if (state == 0) {
|
||||
state = 1;
|
||||
thread.start();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
fetchScheduledTasks();
|
||||
}
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
private void fetchScheduledTasks() {
|
||||
if (scheduledTasks.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
long nanoTime = nanoTime();
|
||||
if (nanoTime - lastPurgeTimeNanos >= SCHEDULE_PURGE_INTERVAL) {
|
||||
for (Iterator<ScheduledFutureTask<?>> i = scheduledTasks.iterator(); i.hasNext();) {
|
||||
ScheduledFutureTask<?> task = i.next();
|
||||
if (task.isCancelled()) {
|
||||
i.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (nanoTime - lastCheckTimeNanos >= SCHEDULE_CHECK_INTERVAL) {
|
||||
for (;;) {
|
||||
ScheduledFutureTask<?> task = scheduledTasks.poll();
|
||||
if (task == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (!task.isCancelled()) {
|
||||
if (isShutdown()) {
|
||||
task.cancel(false);
|
||||
} else {
|
||||
taskQueue.add(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cancelScheduledTasks() {
|
||||
if (scheduledTasks.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (ScheduledFutureTask<?> task: scheduledTasks.toArray(new ScheduledFutureTask<?>[scheduledTasks.size()])) {
|
||||
task.cancel(false);
|
||||
}
|
||||
scheduledTasks.clear();
|
||||
}
|
||||
|
||||
private class ScheduledFutureTask<V> extends FutureTask<V> implements ScheduledFuture<V> {
|
||||
|
||||
private final long id = nextTaskId.getAndIncrement();
|
||||
private long deadlineNanos;
|
||||
/** 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
|
||||
private final long periodNanos;
|
||||
|
||||
ScheduledFutureTask(Runnable runnable, V result, long nanoTime) {
|
||||
super(runnable, result);
|
||||
this.deadlineNanos = nanoTime;
|
||||
this.periodNanos = 0;
|
||||
}
|
||||
|
||||
ScheduledFutureTask(Runnable runnable, V result, long nanoTime, long period) {
|
||||
super(runnable, result);
|
||||
if (period == 0) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("period: %d (expected: != 0)", period));
|
||||
}
|
||||
this.deadlineNanos = nanoTime;
|
||||
this.periodNanos = period;
|
||||
}
|
||||
|
||||
ScheduledFutureTask(Callable<V> callable, long nanoTime) {
|
||||
super(callable);
|
||||
this.deadlineNanos = nanoTime;
|
||||
this.periodNanos = 0;
|
||||
}
|
||||
|
||||
public long deadlineNanos() {
|
||||
return deadlineNanos;
|
||||
}
|
||||
|
||||
public long delayNanos() {
|
||||
return Math.max(0, deadlineNanos() - nanoTime());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDelay(TimeUnit unit) {
|
||||
return unit.convert(delayNanos(), TimeUnit.NANOSECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(Delayed o) {
|
||||
if (this == o) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
|
||||
long d = deadlineNanos() - that.deadlineNanos();
|
||||
if (d < 0) {
|
||||
return -1;
|
||||
} else if (d > 0) {
|
||||
return 1;
|
||||
} else if (id < that.id) {
|
||||
return -1;
|
||||
} else if (id == that.id) {
|
||||
throw new Error();
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (periodNanos == 0) {
|
||||
super.run();
|
||||
} else {
|
||||
boolean reset = runAndReset();
|
||||
if (reset && !isShutdown()) {
|
||||
long p = periodNanos;
|
||||
if (p > 0) {
|
||||
deadlineNanos += p;
|
||||
} else {
|
||||
deadlineNanos = nanoTime() - p;
|
||||
}
|
||||
|
||||
schedule(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,16 +1,20 @@
|
||||
package io.netty.channel;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class SingleThreadEventLoopTest {
|
||||
|
||||
private SingleThreadEventLoopImpl loop;
|
||||
@ -78,6 +82,122 @@ public class SingleThreadEventLoopTest {
|
||||
assertTrue(interrupted.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void scheduleTask() throws Exception {
|
||||
long startTime = System.nanoTime();
|
||||
final AtomicLong endTime = new AtomicLong();
|
||||
loop.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
endTime.set(System.nanoTime());
|
||||
}
|
||||
}, 500, TimeUnit.MILLISECONDS).get();
|
||||
assertTrue(endTime.get() - startTime >= TimeUnit.MILLISECONDS.toNanos(500));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void scheduleTaskAtFixedRate() throws Exception {
|
||||
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
|
||||
ScheduledFuture<?> f = loop.scheduleAtFixedRate(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
timestamps.add(System.nanoTime());
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}, 100, 100, TimeUnit.MILLISECONDS);
|
||||
Thread.sleep(550);
|
||||
assertTrue(f.cancel(true));
|
||||
assertEquals(5, timestamps.size());
|
||||
|
||||
// Check if the task was run without a lag.
|
||||
Long previousTimestamp = null;
|
||||
for (Long t: timestamps) {
|
||||
if (previousTimestamp == null) {
|
||||
previousTimestamp = t;
|
||||
continue;
|
||||
}
|
||||
|
||||
assertTrue(t.longValue() - previousTimestamp.longValue() >= TimeUnit.MILLISECONDS.toNanos(90));
|
||||
previousTimestamp = t;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void scheduleLaggyTaskAtFixedRate() throws Exception {
|
||||
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
|
||||
ScheduledFuture<?> f = loop.scheduleAtFixedRate(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
boolean empty = timestamps.isEmpty();
|
||||
timestamps.add(System.nanoTime());
|
||||
if (empty) {
|
||||
try {
|
||||
Thread.sleep(400);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 100, 100, TimeUnit.MILLISECONDS);
|
||||
Thread.sleep(550);
|
||||
assertTrue(f.cancel(true));
|
||||
assertEquals(5, timestamps.size());
|
||||
|
||||
// Check if the task was run with lag.
|
||||
int i = 0;
|
||||
Long previousTimestamp = null;
|
||||
for (Long t: timestamps) {
|
||||
if (previousTimestamp == null) {
|
||||
previousTimestamp = t;
|
||||
continue;
|
||||
}
|
||||
|
||||
long diff = t.longValue() - previousTimestamp.longValue();
|
||||
if (i == 0) {
|
||||
assertTrue(diff >= TimeUnit.MILLISECONDS.toNanos(400));
|
||||
} else {
|
||||
assertTrue(diff <= TimeUnit.MILLISECONDS.toNanos(10));
|
||||
}
|
||||
previousTimestamp = t;
|
||||
i ++;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void scheduleTaskWithFixedDelay() throws Exception {
|
||||
final Queue<Long> timestamps = new LinkedBlockingQueue<Long>();
|
||||
ScheduledFuture<?> f = loop.scheduleWithFixedDelay(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
timestamps.add(System.nanoTime());
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}, 100, 100, TimeUnit.MILLISECONDS);
|
||||
Thread.sleep(500);
|
||||
assertTrue(f.cancel(true));
|
||||
assertEquals(3, timestamps.size());
|
||||
|
||||
// Check if the task was run without a lag.
|
||||
Long previousTimestamp = null;
|
||||
for (Long t: timestamps) {
|
||||
if (previousTimestamp == null) {
|
||||
previousTimestamp = t;
|
||||
continue;
|
||||
}
|
||||
|
||||
assertTrue(t.longValue() - previousTimestamp.longValue() >= TimeUnit.MILLISECONDS.toNanos(150));
|
||||
previousTimestamp = t;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shutdownWithPendingTasks() throws Exception {
|
||||
final int NUM_TASKS = 3;
|
||||
@ -143,6 +263,7 @@ public class SingleThreadEventLoopTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanup() {
|
||||
cleanedUp.incrementAndGet();
|
||||
}
|
||||
@ -155,8 +276,9 @@ public class SingleThreadEventLoopTest {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(Channel channel, ChannelFuture future) {
|
||||
public ChannelFuture register(Channel channel, ChannelFuture future) {
|
||||
// Untested
|
||||
return future;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user