From a545157f4bb36178c871bea3a9215e83d1d4e907 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 29 Feb 2012 21:23:31 +0100 Subject: [PATCH] Respect canceled tasks. See #209 and #210 --- .../src/main/java/io/netty/channel/sctp/SctpWorker.java | 9 +++++---- .../io/netty/channel/socket/nio/AbstractNioWorker.java | 9 +++++---- .../io/netty/channel/socket/oio/AbstractOioWorker.java | 9 +++++---- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index 1912f1ae46..11352df6d3 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -74,7 +74,7 @@ class SctpWorker implements Worker { private final Object startStopLock = new Object(); private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); - private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); + private final Queue eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation @@ -301,12 +301,13 @@ class SctpWorker implements Worker { private void processEventQueue() throws IOException { for (;;) { - final Runnable task = eventQueue.poll(); + final ChannelRunnableWrapper task = eventQueue.poll(); if (task == null) { break; } - - task.run(); + if (!task.isCancelled()) { + task.run(); + } cleanUpCancelledKeys(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 85bc12c61a..419a85c408 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -109,7 +109,7 @@ abstract class AbstractNioWorker implements Worker { */ protected final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); - private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); + private final Queue eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation @@ -324,12 +324,13 @@ abstract class AbstractNioWorker implements Worker { private void processEventQueue() throws IOException { for (;;) { - final Runnable task = eventQueue.poll(); + final ChannelRunnableWrapper task = eventQueue.poll(); if (task == null) { break; } - - task.run(); + if (!task.isCancelled()) { + task.run(); + } cleanUpCancelledKeys(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index a2176f32c6..5f97e0bb1a 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -34,7 +34,7 @@ import java.util.concurrent.RejectedExecutionException; */ abstract class AbstractOioWorker implements Worker { - private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); + private final Queue eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); protected final C channel; @@ -115,12 +115,13 @@ abstract class AbstractOioWorker implements Worker private void processEventQueue() throws IOException { for (;;) { - final Runnable task = eventQueue.poll(); + final ChannelRunnableWrapper task = eventQueue.poll(); if (task == null) { break; } - - task.run(); + if (!task.isCancelled()) { + task.run(); + } } }