package io.netty.channel; import io.netty.util.internal.QueueFactory; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public abstract class SingleThreadEventLoop extends AbstractExecutorService implements EventLoop { private static final long SCHEDULE_CHECK_INTERVAL = TimeUnit.MILLISECONDS.toNanos(10); private static final long SCHEDULE_PURGE_INTERVAL = TimeUnit.SECONDS.toNanos(1); private static final long START_TIME = System.nanoTime(); private static final AtomicLong nextTaskId = new AtomicLong(); static final ThreadLocal CURRENT_EVENT_LOOP = new ThreadLocal(); private static long nanoTime() { return System.nanoTime() - START_TIME; } private static long deadlineNanos(long delay) { return nanoTime() + delay; } // Fields for event loop private final BlockingQueue taskQueue = QueueFactory.createQueue(Runnable.class); private final Thread thread; private final Object stateLock = new Object(); private final Semaphore threadLock = new Semaphore(0); private final Queue> scheduledTasks = new DelayQueue>(); /** 0 - not started, 1 - started, 2 - shut down, 3 - terminated */ private volatile int state; private long lastCheckTimeNanos; private long lastPurgeTimeNanos; protected SingleThreadEventLoop() { this(Executors.defaultThreadFactory()); } protected SingleThreadEventLoop(ThreadFactory threadFactory) { thread = threadFactory.newThread(new Runnable() { @Override public void run() { CURRENT_EVENT_LOOP.set(SingleThreadEventLoop.this); try { SingleThreadEventLoop.this.run(); } finally { synchronized (stateLock) { state = 3; } try { cancelScheduledTasks(); cleanup(); } finally { threadLock.release(); assert taskQueue.isEmpty(); } } } }); } @Override public ChannelFuture register(Channel channel) { if (channel == null) { throw new NullPointerException("channel"); } return register(channel, channel.newFuture()); } @Override public ChannelFuture register(final Channel channel, final ChannelFuture future) { if (inEventLoop()) { channel.unsafe().register(this, future); } else { execute(new Runnable() { @Override public void run() { channel.unsafe().register(SingleThreadEventLoop.this, future); } }); } return future; } protected void interruptThread() { thread.interrupt(); } protected Runnable pollTask() { assert inEventLoop(); Runnable task = taskQueue.poll(); if (task == null) { if (fetchScheduledTasks()) { task = taskQueue.poll(); } } return task; } protected Runnable takeTask() throws InterruptedException { assert inEventLoop(); for (;;) { Runnable task = taskQueue.poll(SCHEDULE_CHECK_INTERVAL * 2 / 3, TimeUnit.NANOSECONDS); if (task != null) { return task; } fetchScheduledTasks(); } } protected Runnable peekTask() { assert inEventLoop(); Runnable task = taskQueue.peek(); if (task == null) { if (fetchScheduledTasks()) { task = taskQueue.peek(); } } return task; } protected boolean hasTasks() { assert inEventLoop(); boolean empty = taskQueue.isEmpty(); if (empty) { if (fetchScheduledTasks()) { empty = taskQueue.isEmpty(); } } return !empty; } protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (isShutdown()) { reject(); } taskQueue.add(task); } protected boolean removeTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } return taskQueue.remove(task); } protected abstract void run(); protected void cleanup() { // Do nothing. Subclasses will override. } protected abstract void wakeup(boolean inEventLoop); @Override public boolean inEventLoop() { return Thread.currentThread() == thread; } @Override public void shutdown() { boolean inEventLoop = inEventLoop(); boolean wakeup = false; if (inEventLoop) { synchronized (stateLock) { assert state == 1; state = 2; wakeup = true; } } else { synchronized (stateLock) { switch (state) { case 0: state = 3; try { cleanup(); } finally { threadLock.release(); } break; case 1: state = 2; wakeup = true; break; } } } if (wakeup) { wakeup(inEventLoop); } } @Override public List shutdownNow() { shutdown(); return Collections.emptyList(); } @Override public boolean isShutdown() { return state >= 2; } @Override public boolean isTerminated() { return state == 3; } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { if (unit == null) { throw new NullPointerException("unit"); } if (inEventLoop()) { throw new IllegalStateException("cannot await termination of the current thread"); } if (threadLock.tryAcquire(timeout, unit)) { threadLock.release(); } return isTerminated(); } @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (inEventLoop()) { addTask(task); wakeup(true); } else { synchronized (stateLock) { if (state == 0) { state = 1; thread.start(); } } addTask(task); if (isShutdown() && removeTask(task)) { reject(); } wakeup(false); } } private static void reject() { throw new RejectedExecutionException("event loop shut down"); } @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { if (command == null) { throw new NullPointerException("command"); } if (unit == null) { throw new NullPointerException("unit"); } if (delay < 0) { throw new IllegalArgumentException( String.format("delay: %d (expected: >= 0)", delay)); } return schedule(new ScheduledFutureTask(command, null, deadlineNanos(unit.toNanos(delay)))); } @Override public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { if (callable == null) { throw new NullPointerException("callable"); } if (unit == null) { throw new NullPointerException("unit"); } if (delay < 0) { throw new IllegalArgumentException( String.format("delay: %d (expected: >= 0)", delay)); } return schedule(new ScheduledFutureTask(callable, deadlineNanos(unit.toNanos(delay)))); } @Override public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null) { throw new NullPointerException("command"); } if (unit == null) { throw new NullPointerException("unit"); } if (initialDelay < 0) { throw new IllegalArgumentException( String.format("initialDelay: %d (expected: >= 0)", initialDelay)); } if (period <= 0) { throw new IllegalArgumentException( String.format("period: %d (expected: > 0)", period)); } return schedule(new ScheduledFutureTask( command, null, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); } @Override public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null) { throw new NullPointerException("command"); } if (unit == null) { throw new NullPointerException("unit"); } if (initialDelay < 0) { throw new IllegalArgumentException( String.format("initialDelay: %d (expected: >= 0)", initialDelay)); } if (delay <= 0) { throw new IllegalArgumentException( String.format("delay: %d (expected: > 0)", delay)); } return schedule(new ScheduledFutureTask( command, null, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); } private ScheduledFuture schedule(ScheduledFutureTask task) { if (isShutdown()) { reject(); } scheduledTasks.add(task); if (isShutdown()) { task.cancel(false); } if (!inEventLoop()) { synchronized (stateLock) { if (state == 0) { state = 1; thread.start(); } } } else { fetchScheduledTasks(); } return task; } private boolean fetchScheduledTasks() { if (scheduledTasks.isEmpty()) { return false; } long nanoTime = nanoTime(); if (nanoTime - lastPurgeTimeNanos >= SCHEDULE_PURGE_INTERVAL) { for (Iterator> i = scheduledTasks.iterator(); i.hasNext();) { ScheduledFutureTask task = i.next(); if (task.isCancelled()) { i.remove(); } } } if (nanoTime - lastCheckTimeNanos >= SCHEDULE_CHECK_INTERVAL) { boolean added = false; for (;;) { ScheduledFutureTask task = scheduledTasks.poll(); if (task == null) { break; } if (!task.isCancelled()) { if (isShutdown()) { task.cancel(false); } else { taskQueue.add(task); added = true; } } } return added; } return false; } private void cancelScheduledTasks() { if (scheduledTasks.isEmpty()) { return; } for (ScheduledFutureTask task: scheduledTasks.toArray(new ScheduledFutureTask[scheduledTasks.size()])) { task.cancel(false); } scheduledTasks.clear(); } private class ScheduledFutureTask extends FutureTask implements ScheduledFuture { private final long id = nextTaskId.getAndIncrement(); private long deadlineNanos; /** 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */ private final long periodNanos; ScheduledFutureTask(Runnable runnable, V result, long nanoTime) { super(runnable, result); this.deadlineNanos = nanoTime; this.periodNanos = 0; } ScheduledFutureTask(Runnable runnable, V result, long nanoTime, long period) { super(runnable, result); if (period == 0) { throw new IllegalArgumentException( String.format("period: %d (expected: != 0)", period)); } this.deadlineNanos = nanoTime; this.periodNanos = period; } ScheduledFutureTask(Callable callable, long nanoTime) { super(callable); this.deadlineNanos = nanoTime; this.periodNanos = 0; } public long deadlineNanos() { return deadlineNanos; } public long delayNanos() { return Math.max(0, deadlineNanos() - nanoTime()); } @Override public long getDelay(TimeUnit unit) { return unit.convert(delayNanos(), TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { if (this == o) { return 0; } ScheduledFutureTask that = (ScheduledFutureTask) o; long d = deadlineNanos() - that.deadlineNanos(); if (d < 0) { return -1; } else if (d > 0) { return 1; } else if (id < that.id) { return -1; } else if (id == that.id) { throw new Error(); } else { return 1; } } @Override public void run() { if (periodNanos == 0) { super.run(); } else { boolean reset = runAndReset(); if (reset && !isShutdown()) { long p = periodNanos; if (p > 0) { deadlineNanos += p; } else { deadlineNanos = nanoTime() - p; } schedule(this); } } } } }