Respect canceled tasks. See #209 and #210

This commit is contained in:
Norman Maurer 2012-02-29 21:23:31 +01:00
parent 8579f09c59
commit a545157f4b
3 changed files with 15 additions and 12 deletions

View File

@ -74,7 +74,7 @@ class SctpWorker implements Worker {
private final Object startStopLock = new Object(); private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class); private final Queue<ChannelRunnableWrapper> eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class);
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
@ -301,12 +301,13 @@ class SctpWorker implements Worker {
private void processEventQueue() throws IOException { private void processEventQueue() throws IOException {
for (;;) { for (;;) {
final Runnable task = eventQueue.poll(); final ChannelRunnableWrapper task = eventQueue.poll();
if (task == null) { if (task == null) {
break; break;
} }
if (!task.isCancelled()) {
task.run(); task.run();
}
cleanUpCancelledKeys(); cleanUpCancelledKeys();
} }
} }

View File

@ -109,7 +109,7 @@ abstract class AbstractNioWorker implements Worker {
*/ */
protected final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class); protected final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class); private final Queue<ChannelRunnableWrapper> eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class);
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
@ -324,12 +324,13 @@ abstract class AbstractNioWorker implements Worker {
private void processEventQueue() throws IOException { private void processEventQueue() throws IOException {
for (;;) { for (;;) {
final Runnable task = eventQueue.poll(); final ChannelRunnableWrapper task = eventQueue.poll();
if (task == null) { if (task == null) {
break; break;
} }
if (!task.isCancelled()) {
task.run(); task.run();
}
cleanUpCancelledKeys(); cleanUpCancelledKeys();
} }
} }

View File

@ -34,7 +34,7 @@ import java.util.concurrent.RejectedExecutionException;
*/ */
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker { abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class); private final Queue<ChannelRunnableWrapper> eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class);
protected final C channel; protected final C channel;
@ -115,12 +115,13 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
private void processEventQueue() throws IOException { private void processEventQueue() throws IOException {
for (;;) { for (;;) {
final Runnable task = eventQueue.poll(); final ChannelRunnableWrapper task = eventQueue.poll();
if (task == null) { if (task == null) {
break; break;
} }
if (!task.isCancelled()) {
task.run(); task.run();
}
} }
} }