Better handling of canceling. See #210 and #209

This commit is contained in:
Norman Maurer 2012-02-29 21:37:26 +01:00
parent a545157f4b
commit 1589dadcce
4 changed files with 24 additions and 14 deletions

View File

@ -74,7 +74,7 @@ class SctpWorker implements Worker {
private final Object startStopLock = new Object(); private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<ChannelRunnableWrapper> eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
@ -301,13 +301,11 @@ class SctpWorker implements Worker {
private void processEventQueue() throws IOException { private void processEventQueue() throws IOException {
for (;;) { for (;;) {
final ChannelRunnableWrapper task = eventQueue.poll(); final Runnable task = eventQueue.poll();
if (task == null) { if (task == null) {
break; break;
} }
if (!task.isCancelled()) {
task.run(); task.run();
}
cleanUpCancelledKeys(); cleanUpCancelledKeys();
} }
} }

View File

@ -21,6 +21,7 @@ import io.netty.channel.DefaultChannelFuture;
public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable { public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable {
private final Runnable task; private final Runnable task;
private boolean started = false;
public ChannelRunnableWrapper(Channel channel, Runnable task) { public ChannelRunnableWrapper(Channel channel, Runnable task) {
super(channel, true); super(channel, true);
@ -29,6 +30,13 @@ public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runn
@Override @Override
public void run() { public void run() {
synchronized(this) {
if (!isCancelled()) {
started = true;
} else {
return;
}
}
try { try {
task.run(); task.run();
setSuccess(); setSuccess();
@ -37,6 +45,14 @@ public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runn
} }
} }
@Override
public synchronized boolean cancel() {
if (started) {
return false;
}
return super.cancel();
}
} }

View File

@ -109,7 +109,7 @@ abstract class AbstractNioWorker implements Worker {
*/ */
protected final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class); protected final Queue<Runnable> writeTaskQueue = QueueFactory.createQueue(Runnable.class);
private final Queue<ChannelRunnableWrapper> eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
@ -324,13 +324,11 @@ abstract class AbstractNioWorker implements Worker {
private void processEventQueue() throws IOException { private void processEventQueue() throws IOException {
for (;;) { for (;;) {
final ChannelRunnableWrapper task = eventQueue.poll(); final Runnable task = eventQueue.poll();
if (task == null) { if (task == null) {
break; break;
} }
if (!task.isCancelled()) {
task.run(); task.run();
}
cleanUpCancelledKeys(); cleanUpCancelledKeys();
} }
} }

View File

@ -119,11 +119,9 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
if (task == null) { if (task == null) {
break; break;
} }
if (!task.isCancelled()) {
task.run(); task.run();
} }
} }
}
/** /**