From 1589dadccef348293fa02bc9c47b64ba0daf4393 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 29 Feb 2012 21:37:26 +0100 Subject: [PATCH] Better handling of canceling. See #210 and #209 --- .../java/io/netty/channel/sctp/SctpWorker.java | 8 +++----- .../channel/socket/ChannelRunnableWrapper.java | 18 +++++++++++++++++- .../channel/socket/nio/AbstractNioWorker.java | 8 +++----- .../channel/socket/oio/AbstractOioWorker.java | 4 +--- 4 files changed, 24 insertions(+), 14 deletions(-) diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index 11352df6d3..d5726dae1d 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -74,7 +74,7 @@ class SctpWorker implements Worker { private final Object startStopLock = new Object(); private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); - private final Queue eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation @@ -301,13 +301,11 @@ class SctpWorker implements Worker { private void processEventQueue() throws IOException { for (;;) { - final ChannelRunnableWrapper task = eventQueue.poll(); + final Runnable task = eventQueue.poll(); if (task == null) { break; } - if (!task.isCancelled()) { - task.run(); - } + task.run(); cleanUpCancelledKeys(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java b/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java index aece47526a..4b519a5cf4 100644 --- a/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java +++ b/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java @@ -21,7 +21,8 @@ import io.netty.channel.DefaultChannelFuture; public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable { private final Runnable task; - + private boolean started = false; + public ChannelRunnableWrapper(Channel channel, Runnable task) { super(channel, true); this.task = task; @@ -29,6 +30,13 @@ public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runn @Override public void run() { + synchronized(this) { + if (!isCancelled()) { + started = true; + } else { + return; + } + } try { task.run(); setSuccess(); @@ -36,6 +44,14 @@ public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runn setFailure(t); } } + + @Override + public synchronized boolean cancel() { + if (started) { + return false; + } + return super.cancel(); + } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 419a85c408..db13f68d04 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -109,7 +109,7 @@ abstract class AbstractNioWorker implements Worker { */ protected final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); - private final Queue eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation @@ -324,13 +324,11 @@ abstract class AbstractNioWorker implements Worker { private void processEventQueue() throws IOException { for (;;) { - final ChannelRunnableWrapper task = eventQueue.poll(); + final Runnable task = eventQueue.poll(); if (task == null) { break; } - if (!task.isCancelled()) { - task.run(); - } + task.run(); cleanUpCancelledKeys(); } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index 5f97e0bb1a..930abbce59 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -119,9 +119,7 @@ abstract class AbstractOioWorker implements Worker if (task == null) { break; } - if (!task.isCancelled()) { - task.run(); - } + task.run(); } }