Add SingleThreadEventLoop.runAllTasks()

- Removed duplicated processTaskQueue() in child event loops
- Simplified the cleanup of cancelled keys in NIO transport
This commit is contained in:
Trustin Lee 2012-05-27 04:43:48 -07:00
parent 3b8de9f133
commit 7327bb3522
4 changed files with 26 additions and 35 deletions

View File

@ -168,6 +168,17 @@ public abstract class SingleThreadEventLoop extends AbstractExecutorService impl
return taskQueue.remove(task); return taskQueue.remove(task);
} }
protected void runAllTasks() {
for (;;) {
final Runnable task = pollTask();
if (task == null) {
break;
}
task.run();
}
}
protected abstract void run(); protected abstract void run();
protected void cleanup() { protected void cleanup() {

View File

@ -127,8 +127,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override @Override
protected void doDeregister() throws Exception { protected void doDeregister() throws Exception {
selectionKey().cancel(); ((NioChildEventLoop) eventLoop()).cancel(selectionKey());
((NioChildEventLoop) eventLoop()).cancelledKeys ++;
} }
protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception; protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;

View File

@ -57,7 +57,8 @@ final class NioChildEventLoop extends SingleThreadEventLoop {
*/ */
protected final AtomicBoolean wakenUp = new AtomicBoolean(); protected final AtomicBoolean wakenUp = new AtomicBoolean();
int cancelledKeys; private int cancelledKeys;
private boolean cleanedCancelledKeys;
NioChildEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) { NioChildEventLoop(ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(threadFactory); super(threadFactory);
@ -118,7 +119,7 @@ final class NioChildEventLoop extends SingleThreadEventLoop {
} }
cancelledKeys = 0; cancelledKeys = 0;
processTaskQueue(); runAllTasks();
processSelectedKeys(); processSelectedKeys();
if (isShutdown()) { if (isShutdown()) {
@ -150,15 +151,13 @@ final class NioChildEventLoop extends SingleThreadEventLoop {
} }
} }
private void processTaskQueue() { void cancel(SelectionKey key) {
for (;;) { key.cancel();
final Runnable task = pollTask(); cancelledKeys ++;
if (task == null) { if (cancelledKeys >= CLEANUP_INTERVAL) {
break; cancelledKeys = 0;
} cleanedCancelledKeys = true;
SelectorUtil.cleanupKeys(selector);
task.run();
cleanUpCancelledKeys();
} }
} }
@ -168,6 +167,7 @@ final class NioChildEventLoop extends SingleThreadEventLoop {
return; return;
} }
Iterator<SelectionKey> i; Iterator<SelectionKey> i;
cleanedCancelledKeys = false;
for (i = selectedKeys.iterator(); i.hasNext();) { for (i = selectedKeys.iterator(); i.hasNext();) {
final SelectionKey k = i.next(); final SelectionKey k = i.next();
i.remove(); i.remove();
@ -192,7 +192,7 @@ final class NioChildEventLoop extends SingleThreadEventLoop {
unsafe.close(unsafe.voidFuture()); unsafe.close(unsafe.voidFuture());
} }
if (cleanUpCancelledKeys()) { if (cleanedCancelledKeys) {
// Create the iterator again to avoid ConcurrentModificationException // Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) { if (selectedKeys.isEmpty()) {
break; break;
@ -203,15 +203,6 @@ final class NioChildEventLoop extends SingleThreadEventLoop {
} }
} }
private boolean cleanUpCancelledKeys() {
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
SelectorUtil.cleanupKeys(selector);
return true;
}
return false;
}
private void closeAll() { private void closeAll() {
SelectorUtil.cleanupKeys(selector); SelectorUtil.cleanupKeys(selector);
Set<SelectionKey> keys = selector.keys(); Set<SelectionKey> keys = selector.keys();

View File

@ -43,12 +43,12 @@ class OioChildEventLoop extends SingleThreadEventLoop {
// Waken up by interruptThread() // Waken up by interruptThread()
} }
} else { } else {
processTaskQueue(); runAllTasks();
ch.unsafe().read(); ch.unsafe().read();
// Handle deregistration // Handle deregistration
if (!ch.isRegistered()) { if (!ch.isRegistered()) {
processTaskQueue(); runAllTasks();
deregister(); deregister();
} }
} }
@ -59,16 +59,6 @@ class OioChildEventLoop extends SingleThreadEventLoop {
} }
} }
private void processTaskQueue() {
for (;;) {
Runnable task = pollTask();
if (task == null) {
break;
}
task.run();
}
}
@Override @Override
protected void wakeup(boolean inEventLoop) { protected void wakeup(boolean inEventLoop) {
interruptThread(); interruptThread();