diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java index f01321c2ce..35403acb18 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java @@ -20,6 +20,7 @@ import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.ChannelRunnableWrapper; public abstract class AbstractSctpChannelSink extends AbstractChannelSink { @@ -28,7 +29,9 @@ public abstract class AbstractSctpChannelSink extends AbstractChannelSink { Channel ch = pipeline.getChannel(); if (ch instanceof SctpChannelImpl) { SctpChannelImpl channel = (SctpChannelImpl) ch; - return channel.worker.executeInIoThread(channel, task); + ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(channel, task); + channel.worker.executeInIoThread(task); + return wrapper; } else { return super.execute(pipeline, task); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index 1d0127d047..602b8c8105 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -15,7 +15,29 @@ */ package io.netty.channel.sctp; -import static io.netty.channel.Channels.*; +import static io.netty.channel.Channels.fireChannelBound; +import static io.netty.channel.Channels.fireChannelClosed; +import static io.netty.channel.Channels.fireChannelConnected; +import static io.netty.channel.Channels.fireChannelDisconnected; +import static io.netty.channel.Channels.fireChannelInterestChanged; +import static io.netty.channel.Channels.fireChannelUnbound; +import static io.netty.channel.Channels.fireExceptionCaught; +import static io.netty.channel.Channels.fireMessageReceived; +import static io.netty.channel.Channels.fireWriteComplete; +import static io.netty.channel.Channels.succeededFuture; +import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ChannelBufferFactory; +import io.netty.channel.Channel; +import io.netty.channel.ChannelException; +import io.netty.channel.ChannelFuture; +import io.netty.channel.MessageEvent; +import io.netty.channel.ReceiveBufferSizePredictor; +import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; +import io.netty.channel.socket.Worker; +import io.netty.logging.InternalLogger; +import io.netty.logging.InternalLoggerFactory; +import io.netty.util.internal.DeadLockProofWorker; +import io.netty.util.internal.QueueFactory; import java.io.IOException; import java.net.SocketAddress; @@ -31,28 +53,12 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.sun.nio.sctp.MessageInfo; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBufferFactory; -import io.netty.channel.Channel; -import io.netty.channel.ChannelException; -import io.netty.channel.ChannelFuture; -import io.netty.channel.MessageEvent; -import io.netty.channel.ReceiveBufferSizePredictor; -import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; -import io.netty.channel.socket.ChannelRunnableWrapper; -import io.netty.channel.socket.Worker; -import io.netty.logging.InternalLogger; -import io.netty.logging.InternalLoggerFactory; -import io.netty.util.internal.DeadLockProofWorker; -import io.netty.util.internal.QueueFactory; - /** */ @SuppressWarnings("unchecked") @@ -248,25 +254,17 @@ class SctpWorker implements Worker { } @Override - public ChannelFuture executeInIoThread(Channel channel, Runnable task) { - if (channel instanceof SctpChannelImpl && isIoThread((SctpChannelImpl) channel)) { - try { - task.run(); - return succeededFuture(channel); - } catch (Throwable t) { - return failedFuture(channel, t); - } + public void executeInIoThread(Runnable task) { + if (Thread.currentThread() == thread) { + task.run(); } else { - ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); - boolean added = eventQueue.offer(channelRunnable); - + boolean added = eventQueue.offer(task); + if (added) { // wake up the selector to speed things selector.wakeup(); - } else { - channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); } - return channelRunnable; + } } diff --git a/transport/src/main/java/io/netty/channel/socket/Worker.java b/transport/src/main/java/io/netty/channel/socket/Worker.java index 271897881a..64dc433038 100644 --- a/transport/src/main/java/io/netty/channel/socket/Worker.java +++ b/transport/src/main/java/io/netty/channel/socket/Worker.java @@ -16,9 +16,6 @@ package io.netty.channel.socket; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; - /** * A {@link Worker} is responsible to dispatch IO operations * @@ -30,5 +27,5 @@ public interface Worker extends Runnable { * * @param task the {@link Runnable} to execute */ - ChannelFuture executeInIoThread(Channel channel, Runnable task); + void executeInIoThread(Runnable task); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index 1a29f5ef19..3b389ce5a5 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -21,6 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.ChannelRunnableWrapper; public abstract class AbstractNioChannelSink extends AbstractChannelSink { @@ -29,8 +30,9 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink { Channel ch = pipeline.getChannel(); if (ch instanceof AbstractNioChannel) { AbstractNioChannel channel = (AbstractNioChannel) ch; - - return channel.worker.executeInIoThread(ch, task); + ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); + channel.worker.executeInIoThread(wrapper); + return wrapper; } return super.execute(pipeline, task); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 956305427f..a877b15629 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -21,7 +21,6 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.MessageEvent; -import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import io.netty.logging.InternalLogger; @@ -42,7 +41,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -282,28 +280,20 @@ abstract class AbstractNioWorker implements Worker { } @Override - public ChannelFuture executeInIoThread(Channel channel, Runnable task) { - if (channel instanceof AbstractNioChannel && isIoThread((AbstractNioChannel) channel)) { - try { - task.run(); - return succeededFuture(channel); - } catch (Throwable t) { - return failedFuture(channel, t); - } + public void executeInIoThread(Runnable task) { + if (Thread.currentThread() == thread) { + task.run(); } else { - ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); - boolean added = eventQueue.offer(channelRunnable); + boolean added = eventQueue.offer(task); + assert added; if (added) { // wake up the selector to speed things Selector selector = this.selector; if (selector != null) { selector.wakeup(); } - } else { - channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); } - return channelRunnable; } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java index d57c198534..e20d9ea5bb 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -21,6 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; public abstract class AbstractOioChannelSink extends AbstractChannelSink { @@ -32,7 +33,9 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink { AbstractOioChannel channel = (AbstractOioChannel) ch; Worker worker = channel.worker; if (worker != null) { - return channel.worker.executeInIoThread(ch, task); + ChannelRunnableWrapper wrapper = new ChannelRunnableWrapper(pipeline.getChannel(), task); + channel.worker.executeInIoThread(wrapper); + return wrapper; } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index 930abbce59..e0b12fef11 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -19,13 +19,11 @@ import static io.netty.channel.Channels.*; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.Channels; -import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; import io.netty.util.internal.QueueFactory; import java.io.IOException; import java.util.Queue; -import java.util.concurrent.RejectedExecutionException; /** * Abstract base class for Oio-Worker implementations @@ -34,10 +32,16 @@ import java.util.concurrent.RejectedExecutionException; */ abstract class AbstractOioWorker implements Worker { - private final Queue eventQueue = QueueFactory.createQueue(ChannelRunnableWrapper.class); + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); protected final C channel; + /** + * If this worker has been started thread will be a reference to the thread + * used when starting. i.e. the current thread when the run method is executed. + */ + protected volatile Thread thread; + public AbstractOioWorker(C channel) { this.channel = channel; channel.worker = this; @@ -45,7 +49,7 @@ abstract class AbstractOioWorker implements Worker @Override public void run() { - channel.workerThread = Thread.currentThread(); + thread = channel.workerThread = Thread.currentThread(); while (channel.isOpen()) { synchronized (channel.interestOpsLock) { @@ -91,31 +95,21 @@ abstract class AbstractOioWorker implements Worker } @Override - public ChannelFuture executeInIoThread(Channel channel, Runnable task) { - if (channel instanceof AbstractOioChannel && isIoThread((AbstractOioChannel) channel)) { - try { - task.run(); - return succeededFuture(channel); - } catch (Throwable t) { - return failedFuture(channel, t); - } + public void executeInIoThread(Runnable task) { + if (Thread.currentThread() == thread) { + task.run(); } else { - ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); - boolean added = eventQueue.offer(channelRunnable); + boolean added = eventQueue.offer(task); if (added) { // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest - - } else { - channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); - } - return channelRunnable; + } } } private void processEventQueue() throws IOException { for (;;) { - final ChannelRunnableWrapper task = eventQueue.poll(); + final Runnable task = eventQueue.poll(); if (task == null) { break; }