From 5fdd2dea12e4eb4f3254886335509449796e087b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 24 Feb 2012 20:26:50 +0100 Subject: [PATCH 01/13] Make it possible to schedule upstream events to get fired later in the io-thread. This is the first part of #140 and #187 --- .../codec/embedder/AbstractCodecEmbedder.java | 5 ++ .../socket/http/AbstractHttpChannelSink.java | 30 +++++++++++ .../http/HttpTunnelAcceptedChannelSink.java | 3 +- .../http/HttpTunnelClientChannelSink.java | 3 +- .../http/HttpTunnelServerChannelSink.java | 3 +- .../channel/socket/http/FakeChannelSink.java | 3 +- .../netty/channel/rxtx/RxtxChannelSink.java | 8 +++ .../channel/sctp/AbstractScptChannelSink.java | 43 +++++++++++++++ .../channel/sctp/SctpClientPipelineSink.java | 3 +- .../channel/sctp/SctpServerPipelineSink.java | 3 +- .../io/netty/channel/sctp/SctpWorker.java | 27 +++++++++- .../io/netty/channel/ChannelPipeline.java | 10 ++++ .../java/io/netty/channel/ChannelSink.java | 2 + .../netty/channel/DefaultChannelPipeline.java | 16 ++++++ .../channel/iostream/IoStreamChannelSink.java | 8 +++ .../channel/local/LocalClientChannelSink.java | 8 +++ .../channel/local/LocalServerChannelSink.java | 8 +++ .../java/io/netty/channel/socket/Worker.java | 22 ++++++++ .../socket/nio/AbstractNioChannelSink.java | 44 ++++++++++++++++ .../channel/socket/nio/AbstractNioWorker.java | 27 +++++++++- .../nio/NioClientSocketPipelineSink.java | 3 +- .../socket/nio/NioDatagramPipelineSink.java | 3 +- .../nio/NioServerSocketPipelineSink.java | 3 +- .../socket/oio/AbstractOioChannel.java | 3 ++ .../socket/oio/AbstractOioChannelSink.java | 52 +++++++++++++++++++ .../channel/socket/oio/AbstractOioWorker.java | 36 +++++++++++-- .../oio/OioClientSocketPipelineSink.java | 3 +- .../socket/oio/OioDatagramPipelineSink.java | 3 +- .../oio/OioServerSocketPipelineSink.java | 3 +- 29 files changed, 353 insertions(+), 32 deletions(-) create mode 100644 transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java create mode 100644 transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java create mode 100644 transport/src/main/java/io/netty/channel/socket/Worker.java create mode 100644 transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java create mode 100644 transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java index 4e85a1d01c..206c8278f7 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java @@ -224,6 +224,11 @@ abstract class AbstractCodecEmbedder implements CodecEmbedder { throw new CodecEmbedderException(actualCause); } + + @Override + public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + handleEvent(e); + } } private static final class EmbeddedChannelPipeline extends DefaultChannelPipeline { diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java new file mode 100644 index 0000000000..a89a8aa837 --- /dev/null +++ b/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java @@ -0,0 +1,30 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket.http; + +import io.netty.channel.AbstractChannelSink; +import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelPipeline; + +public abstract class AbstractHttpChannelSink extends AbstractChannelSink{ + + @Override + public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + pipeline.sendUpstream(e); + } + +} diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java index 591d20ac38..ff0be867c6 100644 --- a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java +++ b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java @@ -18,7 +18,6 @@ package io.netty.channel.socket.http; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ChannelBuffer; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -33,7 +32,7 @@ import io.netty.channel.MessageEvent; * from here to the ServerMessageSwitch, which queues the data awaiting a poll request from the * client end of the tunnel. */ -class HttpTunnelAcceptedChannelSink extends AbstractChannelSink { +class HttpTunnelAcceptedChannelSink extends AbstractHttpChannelSink { final SaturationManager saturationManager; diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java index 5f1ecce1c5..6a55acf7c1 100644 --- a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java +++ b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java @@ -17,7 +17,6 @@ package io.netty.channel.socket.http; import java.net.InetSocketAddress; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelStateEvent; @@ -27,7 +26,7 @@ import io.netty.channel.MessageEvent; * Sink of a client channel, deals with sunk events and then makes appropriate calls * on the channel itself to push data. */ -class HttpTunnelClientChannelSink extends AbstractChannelSink { +class HttpTunnelClientChannelSink extends AbstractHttpChannelSink { @Override public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java index e755229674..a7ed9ee1bb 100644 --- a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java +++ b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java @@ -17,7 +17,6 @@ package io.netty.channel.socket.http; import java.net.SocketAddress; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -27,7 +26,7 @@ import io.netty.channel.socket.ServerSocketChannel; /** */ -class HttpTunnelServerChannelSink extends AbstractChannelSink { +class HttpTunnelServerChannelSink extends AbstractHttpChannelSink { private ChannelFutureListener closeHook; diff --git a/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java b/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java index 0798eb94ad..0906755de0 100644 --- a/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java +++ b/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java @@ -19,14 +19,13 @@ package io.netty.channel.socket.http; import java.util.LinkedList; import java.util.Queue; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; /** * A fake channel sink for use in testing */ -public class FakeChannelSink extends AbstractChannelSink { +public class FakeChannelSink extends AbstractHttpChannelSink { public Queue events = new LinkedList(); diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java index fd1c34c0a0..a1692c2718 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java @@ -329,4 +329,12 @@ public class RxtxChannelSink extends AbstractChannelSink { } } } + + /** + * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise + */ + @Override + public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { + pipeline.sendUpstream(event); + } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java new file mode 100644 index 0000000000..ebb42a2a93 --- /dev/null +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java @@ -0,0 +1,43 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.sctp; + +import io.netty.channel.AbstractChannelSink; +import io.netty.channel.Channel; +import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelPipeline; + +public abstract class AbstractScptChannelSink extends AbstractChannelSink{ + + @Override + public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + Channel ch = e.getChannel(); + if (ch instanceof SctpChannelImpl) { + SctpChannelImpl channel = (SctpChannelImpl) ch; + channel.worker.fireEventLater(new Runnable() { + + @Override + public void run() { + pipeline.sendUpstream(e); + } + }); + } else { + throw new UnsupportedOperationException(); + } + + } +} diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java index 363407f7d5..bd65b89e57 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java @@ -32,7 +32,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; @@ -48,7 +47,7 @@ import io.netty.util.internal.QueueFactory; /** */ -class SctpClientPipelineSink extends AbstractChannelSink { +class SctpClientPipelineSink extends AbstractScptChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(SctpClientPipelineSink.class); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java index 3a0f86bb16..ce34643315 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java @@ -31,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger; import com.sun.nio.sctp.SctpChannel; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -45,7 +44,7 @@ import io.netty.util.internal.DeadLockProofWorker; /** */ -class SctpServerPipelineSink extends AbstractChannelSink { +class SctpServerPipelineSink extends AbstractScptChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(SctpServerPipelineSink.class); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index a7878d77ba..ea6f9710a7 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -45,6 +45,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.MessageEvent; import io.netty.channel.ReceiveBufferSizePredictor; import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; +import io.netty.channel.socket.Worker; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DeadLockProofWorker; @@ -53,7 +54,7 @@ import io.netty.util.internal.QueueFactory; /** */ @SuppressWarnings("unchecked") -class SctpWorker implements Runnable { +class SctpWorker implements Worker { private static final InternalLogger logger = InternalLoggerFactory.getInstance(SctpWorker.class); @@ -71,6 +72,8 @@ class SctpWorker implements Runnable { private final Object startStopLock = new Object(); private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); private final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); + private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private final SctpReceiveBufferPool recvBufferPool = new SctpReceiveBufferPool(); @@ -188,6 +191,7 @@ class SctpWorker implements Runnable { cancelledKeys = 0; processRegisterTaskQueue(); + processEventQueue(); processWriteTaskQueue(); processSelectedKeys(selector.selectedKeys()); @@ -240,7 +244,14 @@ class SctpWorker implements Runnable { } } } - + + public void fireEventLater(Runnable eventRunnable) { + assert eventQueue.offer(eventRunnable); + + // wake up the selector to speed things + selector.wakeup(); + } + private void processRegisterTaskQueue() throws IOException { for (; ;) { final Runnable task = registerTaskQueue.poll(); @@ -264,7 +275,19 @@ class SctpWorker implements Runnable { cleanUpCancelledKeys(); } } + + private void processEventQueue() throws IOException { + for (;;) { + final Runnable task = eventQueue.poll(); + if (task == null) { + break; + } + task.run(); + cleanUpCancelledKeys(); + } + } + private void processSelectedKeys(final Set selectedKeys) throws IOException { for (Iterator i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index cbdeeab097..6a74e5033c 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -442,6 +442,16 @@ public interface ChannelPipeline { */ void sendUpstream(ChannelEvent e); + + /** + * Sends the specified {@link ChannelEvent} to the first + * {@link ChannelUpstreamHandler} in this pipeline when the next IO-Worker operation is performed. + * + * @throws NullPointerException + * if the specified event is {@code null} + */ + void sendUpstreamLater(ChannelEvent e); + /** * Sends the specified {@link ChannelEvent} to the last * {@link ChannelDownstreamHandler} in this pipeline. diff --git a/transport/src/main/java/io/netty/channel/ChannelSink.java b/transport/src/main/java/io/netty/channel/ChannelSink.java index 86d042e56f..16ebb8c52e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelSink.java +++ b/transport/src/main/java/io/netty/channel/ChannelSink.java @@ -37,4 +37,6 @@ public interface ChannelSink { * one of its {@link ChannelHandler}s process a {@link ChannelEvent}. */ void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception; + + void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 50487ae36e..b8379b1b98 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -583,6 +583,15 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } + @Override + public void sendUpstreamLater(ChannelEvent e) { + try { + getSink().fireEventLater(this, e); + } catch (Throwable t) { + notifyHandlerException(e, t); + } + } + @Override public void sendDownstream(ChannelEvent e) { DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail); @@ -832,5 +841,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { ChannelEvent e, ChannelPipelineException cause) throws Exception { throw cause; } + + @Override + public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + if (logger.isWarnEnabled()) { + logger.warn("Not attached yet; discarding: " + e); + } + } } } diff --git a/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java b/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java index a0d75ceef7..5596540da2 100755 --- a/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java +++ b/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java @@ -177,4 +177,12 @@ public class IoStreamChannelSink extends AbstractChannelSink { } } } + + /** + * This just calls {@link ChannelPipeline#sendUpstream(ChannelEvent)} as the transport does not support it + */ + @Override + public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + pipeline.sendUpstream(e); + } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java index 9b6bd455de..066bf27cc4 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java +++ b/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java @@ -85,6 +85,14 @@ final class LocalClientChannelSink extends AbstractChannelSink { } } + /** + * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise + */ + @Override + public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { + pipeline.sendUpstream(event); + } + private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) { try { if (!LocalChannelRegistry.register(localAddress, channel)) { diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java index 6bfa780f28..29f5b9c2d0 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java @@ -42,6 +42,14 @@ final class LocalServerChannelSink extends AbstractChannelSink { } } + /** + * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise + */ + @Override + public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { + pipeline.sendUpstream(event); + } + private void handleServerChannel(ChannelEvent e) { if (!(e instanceof ChannelStateEvent)) { return; diff --git a/transport/src/main/java/io/netty/channel/socket/Worker.java b/transport/src/main/java/io/netty/channel/socket/Worker.java new file mode 100644 index 0000000000..8a2bf10424 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/Worker.java @@ -0,0 +1,22 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket; + +public interface Worker extends Runnable{ + + void fireEventLater(Runnable eventRunnable); +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java new file mode 100644 index 0000000000..879a5d433a --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -0,0 +1,44 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket.nio; + +import io.netty.channel.AbstractChannelSink; +import io.netty.channel.Channel; +import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelPipeline; + +public abstract class AbstractNioChannelSink extends AbstractChannelSink{ + + @Override + public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + Channel ch = e.getChannel(); + if (ch instanceof AbstractNioChannel) { + AbstractNioChannel channel = (AbstractNioChannel) ch; + channel.worker.fireEventLater(new Runnable() { + + @Override + public void run() { + pipeline.sendUpstream(e); + } + }); + } else { + throw new UnsupportedOperationException(); + } + + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 2b96e04f40..708b0ee343 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -21,6 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.MessageEvent; +import io.netty.channel.socket.Worker; import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -44,7 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -abstract class AbstractNioWorker implements Runnable { +abstract class AbstractNioWorker implements Worker { /** * Internal Netty logger. */ @@ -106,6 +107,9 @@ abstract class AbstractNioWorker implements Runnable { */ protected final Queue writeTaskQueue = QueueFactory.createQueue(Runnable.class); + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); + + private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); @@ -216,6 +220,7 @@ abstract class AbstractNioWorker implements Runnable { cancelledKeys = 0; processRegisterTaskQueue(); + processEventQueue(); processWriteTaskQueue(); processSelectedKeys(selector.selectedKeys()); @@ -266,7 +271,13 @@ abstract class AbstractNioWorker implements Runnable { } } - + public void fireEventLater(Runnable eventRunnable) { + assert eventQueue.offer(eventRunnable); + + // wake up the selector to speed things + selector.wakeup(); + } + private void processRegisterTaskQueue() throws IOException { for (;;) { final Runnable task = registerTaskQueue.poll(); @@ -291,6 +302,18 @@ abstract class AbstractNioWorker implements Runnable { } } + private void processEventQueue() throws IOException { + for (;;) { + final Runnable task = eventQueue.poll(); + if (task == null) { + break; + } + + task.run(); + cleanUpCancelledKeys(); + } + } + private void processSelectedKeys(Set selectedKeys) throws IOException { for (Iterator i = selectedKeys.iterator(); i.hasNext();) { SelectionKey k = i.next(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java index ca2333659f..0cd8f1b124 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -31,7 +31,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; @@ -45,7 +44,7 @@ import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DeadLockProofWorker; import io.netty.util.internal.QueueFactory; -class NioClientSocketPipelineSink extends AbstractChannelSink { +class NioClientSocketPipelineSink extends AbstractNioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java index 60b6943725..401d6dbf8b 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramPipelineSink.java @@ -22,7 +22,6 @@ import java.net.SocketAddress; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -35,7 +34,7 @@ import io.netty.channel.MessageEvent; * Receives downstream events from a {@link ChannelPipeline}. It contains * an array of I/O workers. */ -class NioDatagramPipelineSink extends AbstractChannelSink { +class NioDatagramPipelineSink extends AbstractNioChannelSink { private final NioDatagramWorker[] workers; private final AtomicInteger workerIndex = new AtomicInteger(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 5de57a6f01..965c585827 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -29,7 +29,6 @@ import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -41,7 +40,7 @@ import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DeadLockProofWorker; -class NioServerSocketPipelineSink extends AbstractChannelSink { +class NioServerSocketPipelineSink extends AbstractNioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java index 7447254cde..2c7009050a 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java @@ -25,11 +25,14 @@ import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelSink; +import io.netty.channel.socket.Worker; abstract class AbstractOioChannel extends AbstractChannel { private volatile InetSocketAddress localAddress; volatile InetSocketAddress remoteAddress; volatile Thread workerThread; + volatile Worker worker; + final Object interestOpsLock = new Object(); AbstractOioChannel( diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java new file mode 100644 index 0000000000..00e480a446 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -0,0 +1,52 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.channel.socket.oio; + +import io.netty.channel.AbstractChannelSink; +import io.netty.channel.Channel; +import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.Worker; + +public abstract class AbstractOioChannelSink extends AbstractChannelSink{ + + @Override + public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + Channel ch = e.getChannel(); + if (ch instanceof AbstractOioChannel) { + AbstractOioChannel channel = (AbstractOioChannel) ch; + Worker worker = channel.worker; + if (worker != null) { + channel.worker.fireEventLater(new Runnable() { + + @Override + public void run() { + pipeline.sendUpstream(e); + } + }); + } else { + // no worker thread yet so just fire the event now + pipeline.sendUpstream(e); + } + + } else { + throw new UnsupportedOperationException(); + } + + } + +} diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index 898ef994c3..b20b24a926 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -24,20 +24,26 @@ import static io.netty.channel.Channels.succeededFuture; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.Channels; +import io.netty.channel.socket.Worker; +import io.netty.util.internal.QueueFactory; import java.io.IOException; +import java.util.Queue; /** * Abstract base class for Oio-Worker implementations * * @param {@link AbstractOioChannel} */ -abstract class AbstractOioWorker implements Runnable { +abstract class AbstractOioWorker implements Worker{ + private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); + protected final C channel; public AbstractOioWorker(C channel) { this.channel = channel; + channel.worker = this; } @Override @@ -60,9 +66,13 @@ abstract class AbstractOioWorker implements Runnab } try { - if (!process()) { - break; - } + boolean cont = process(); + + processEventQueue(); + + if (!cont) { + break; + } } catch (Throwable t) { if (!channel.isSocketClosed()) { fireExceptionCaught(channel, t); @@ -79,6 +89,24 @@ abstract class AbstractOioWorker implements Runnab close(channel, succeededFuture(channel)); } + + @Override + public void fireEventLater(Runnable eventRunnable) { + assert eventQueue.offer(eventRunnable); + } + + private void processEventQueue() throws IOException { + for (;;) { + final Runnable task = eventQueue.poll(); + if (task == null) { + break; + } + + task.run(); + } + } + + /** * Process the incoming messages and also is responsible for call {@link Channels#fireMessageReceived(Channel, Object)} once a message * was processed without errors. diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioClientSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/oio/OioClientSocketPipelineSink.java index e5cf415a02..e607d3282e 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioClientSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioClientSocketPipelineSink.java @@ -21,7 +21,6 @@ import java.io.PushbackInputStream; import java.net.SocketAddress; import java.util.concurrent.Executor; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -31,7 +30,7 @@ import io.netty.channel.ChannelStateEvent; import io.netty.channel.MessageEvent; import io.netty.util.internal.DeadLockProofWorker; -class OioClientSocketPipelineSink extends AbstractChannelSink { +class OioClientSocketPipelineSink extends AbstractOioChannelSink { private final Executor workerExecutor; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramPipelineSink.java index 3cf3e6baf6..2b198080b5 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramPipelineSink.java @@ -20,7 +20,6 @@ import static io.netty.channel.Channels.*; import java.net.SocketAddress; import java.util.concurrent.Executor; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -30,7 +29,7 @@ import io.netty.channel.ChannelStateEvent; import io.netty.channel.MessageEvent; import io.netty.util.internal.DeadLockProofWorker; -class OioDatagramPipelineSink extends AbstractChannelSink { +class OioDatagramPipelineSink extends AbstractOioChannelSink { private final Executor workerExecutor; diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketPipelineSink.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketPipelineSink.java index e8c4c3278f..5daad24afc 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketPipelineSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketPipelineSink.java @@ -24,7 +24,6 @@ import java.net.SocketTimeoutException; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; -import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -36,7 +35,7 @@ import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; import io.netty.util.internal.DeadLockProofWorker; -class OioServerSocketPipelineSink extends AbstractChannelSink { +class OioServerSocketPipelineSink extends AbstractOioChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class); From c2bc463d611ee2f15ce6205cf7fecd0c4c69cb2b Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 24 Feb 2012 22:03:32 +0100 Subject: [PATCH 02/13] Optimize the handling of fireEventLater if the current thread is the worker thread. See #187 and #140 --- .../channel/sctp/AbstractScptChannelSink.java | 17 +++++++++++------ .../java/io/netty/channel/sctp/SctpWorker.java | 2 +- .../socket/nio/AbstractNioChannelSink.java | 17 +++++++++++------ .../socket/oio/AbstractOioChannelSink.java | 4 ++-- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java index ebb42a2a93..642cd891b1 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java @@ -28,13 +28,18 @@ public abstract class AbstractScptChannelSink extends AbstractChannelSink{ Channel ch = e.getChannel(); if (ch instanceof SctpChannelImpl) { SctpChannelImpl channel = (SctpChannelImpl) ch; - channel.worker.fireEventLater(new Runnable() { + // check if the current thread is a worker thread, and only fire the event later if thats not the case + if (channel.worker.thread != Thread.currentThread()) { + channel.worker.fireEventLater(new Runnable() { - @Override - public void run() { - pipeline.sendUpstream(e); - } - }); + @Override + public void run() { + pipeline.sendUpstream(e); + } + }); + } else { + pipeline.sendUpstream(e); + } } else { throw new UnsupportedOperationException(); } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index ea6f9710a7..e546019da3 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -65,7 +65,7 @@ class SctpWorker implements Worker { private final Executor executor; private boolean started; - private volatile Thread thread; + volatile Thread thread; volatile Selector selector; private final AtomicBoolean wakenUp = new AtomicBoolean(); private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock(); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index 879a5d433a..6812b59a58 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -28,13 +28,18 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink{ Channel ch = e.getChannel(); if (ch instanceof AbstractNioChannel) { AbstractNioChannel channel = (AbstractNioChannel) ch; - channel.worker.fireEventLater(new Runnable() { + // check if the current thread is a worker thread if so we can send the event now + if (channel.worker.thread != Thread.currentThread()) { + channel.worker.fireEventLater(new Runnable() { - @Override - public void run() { - pipeline.sendUpstream(e); - } - }); + @Override + public void run() { + pipeline.sendUpstream(e); + } + }); + } else { + pipeline.sendUpstream(e); + } } else { throw new UnsupportedOperationException(); } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java index 00e480a446..4a7a1a998c 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -30,7 +30,7 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink{ if (ch instanceof AbstractOioChannel) { AbstractOioChannel channel = (AbstractOioChannel) ch; Worker worker = channel.worker; - if (worker != null) { + if (worker != null && channel.workerThread != Thread.currentThread()) { channel.worker.fireEventLater(new Runnable() { @Override @@ -39,7 +39,7 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink{ } }); } else { - // no worker thread yet so just fire the event now + // no worker thread yet or the current thread is a worker thread so just fire the event now pipeline.sendUpstream(e); } From 301a17c029dd9f0a6539342ac8494781c4efd028 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 25 Feb 2012 14:19:11 +0100 Subject: [PATCH 03/13] Rename method to better reflect its usage and update some javadocs. See #187 and #140 --- .../handler/codec/embedder/AbstractCodecEmbedder.java | 2 +- .../channel/socket/http/AbstractHttpChannelSink.java | 2 +- .../java/io/netty/channel/rxtx/RxtxChannelSink.java | 2 +- .../netty/channel/sctp/AbstractScptChannelSink.java | 4 ++-- .../main/java/io/netty/channel/sctp/SctpWorker.java | 3 ++- .../src/main/java/io/netty/channel/ChannelSink.java | 5 ++++- .../java/io/netty/channel/DefaultChannelPipeline.java | 4 ++-- .../netty/channel/iostream/IoStreamChannelSink.java | 2 +- .../netty/channel/local/LocalClientChannelSink.java | 2 +- .../netty/channel/local/LocalServerChannelSink.java | 2 +- .../src/main/java/io/netty/channel/socket/Worker.java | 11 ++++++++++- .../channel/socket/nio/AbstractNioChannelSink.java | 4 ++-- .../netty/channel/socket/nio/AbstractNioWorker.java | 3 ++- .../channel/socket/oio/AbstractOioChannelSink.java | 4 ++-- .../netty/channel/socket/oio/AbstractOioWorker.java | 4 +++- 15 files changed, 35 insertions(+), 19 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java index 206c8278f7..b0796cd994 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java @@ -226,7 +226,7 @@ abstract class AbstractCodecEmbedder implements CodecEmbedder { } @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { handleEvent(e); } } diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java index a89a8aa837..e578a2dcfe 100644 --- a/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java +++ b/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java @@ -23,7 +23,7 @@ import io.netty.channel.ChannelPipeline; public abstract class AbstractHttpChannelSink extends AbstractChannelSink{ @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { pipeline.sendUpstream(e); } diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java index a1692c2718..5009dc1427 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java @@ -334,7 +334,7 @@ public class RxtxChannelSink extends AbstractChannelSink { * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise */ @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { pipeline.sendUpstream(event); } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java index 642cd891b1..5daba78839 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java @@ -24,13 +24,13 @@ import io.netty.channel.ChannelPipeline; public abstract class AbstractScptChannelSink extends AbstractChannelSink{ @Override - public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { Channel ch = e.getChannel(); if (ch instanceof SctpChannelImpl) { SctpChannelImpl channel = (SctpChannelImpl) ch; // check if the current thread is a worker thread, and only fire the event later if thats not the case if (channel.worker.thread != Thread.currentThread()) { - channel.worker.fireEventLater(new Runnable() { + channel.worker.executeInIoThread(new Runnable() { @Override public void run() { diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index e546019da3..1cc3a764ac 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -245,7 +245,8 @@ class SctpWorker implements Worker { } } - public void fireEventLater(Runnable eventRunnable) { + @Override + public void executeInIoThread(Runnable eventRunnable) { assert eventQueue.offer(eventRunnable); // wake up the selector to speed things diff --git a/transport/src/main/java/io/netty/channel/ChannelSink.java b/transport/src/main/java/io/netty/channel/ChannelSink.java index 16ebb8c52e..1a086ef982 100644 --- a/transport/src/main/java/io/netty/channel/ChannelSink.java +++ b/transport/src/main/java/io/netty/channel/ChannelSink.java @@ -38,5 +38,8 @@ public interface ChannelSink { */ void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception; - void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception; + /** + * Schedule the given {@link ChannelEvent} for later execution (in the io-thread). Some implementation may not support his and just fire it directly + */ + void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception; } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index b8379b1b98..4e4499e814 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -586,7 +586,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public void sendUpstreamLater(ChannelEvent e) { try { - getSink().fireEventLater(this, e); + getSink().fireUpstreamEventLater(this, e); } catch (Throwable t) { notifyHandlerException(e, t); } @@ -843,7 +843,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { if (logger.isWarnEnabled()) { logger.warn("Not attached yet; discarding: " + e); } diff --git a/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java b/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java index 5596540da2..51de5d3311 100755 --- a/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java +++ b/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java @@ -182,7 +182,7 @@ public class IoStreamChannelSink extends AbstractChannelSink { * This just calls {@link ChannelPipeline#sendUpstream(ChannelEvent)} as the transport does not support it */ @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { pipeline.sendUpstream(e); } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java index 066bf27cc4..d882725de0 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java +++ b/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java @@ -89,7 +89,7 @@ final class LocalClientChannelSink extends AbstractChannelSink { * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise */ @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { pipeline.sendUpstream(event); } diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java index 29f5b9c2d0..6ead0ad3a2 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java @@ -46,7 +46,7 @@ final class LocalServerChannelSink extends AbstractChannelSink { * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise */ @Override - public void fireEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { pipeline.sendUpstream(event); } diff --git a/transport/src/main/java/io/netty/channel/socket/Worker.java b/transport/src/main/java/io/netty/channel/socket/Worker.java index 8a2bf10424..eebfc74d40 100644 --- a/transport/src/main/java/io/netty/channel/socket/Worker.java +++ b/transport/src/main/java/io/netty/channel/socket/Worker.java @@ -16,7 +16,16 @@ package io.netty.channel.socket; +/** + * A {@link Worker} is responsible to dispatch IO operations + * + */ public interface Worker extends Runnable{ - void fireEventLater(Runnable eventRunnable); + /** + * Execute the given {@link Runnable} in the IO-Thread. This may be now or later once the IO-Thread do some other work. + * + * @param task the {@link Runnable} to execute + */ + void executeInIoThread(Runnable task); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index 6812b59a58..85a8d03d3a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -24,13 +24,13 @@ import io.netty.channel.ChannelPipeline; public abstract class AbstractNioChannelSink extends AbstractChannelSink{ @Override - public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { Channel ch = e.getChannel(); if (ch instanceof AbstractNioChannel) { AbstractNioChannel channel = (AbstractNioChannel) ch; // check if the current thread is a worker thread if so we can send the event now if (channel.worker.thread != Thread.currentThread()) { - channel.worker.fireEventLater(new Runnable() { + channel.worker.executeInIoThread(new Runnable() { @Override public void run() { diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 708b0ee343..43ad21ad9c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -271,7 +271,8 @@ abstract class AbstractNioWorker implements Worker { } } - public void fireEventLater(Runnable eventRunnable) { + @Override + public void executeInIoThread(Runnable eventRunnable) { assert eventQueue.offer(eventRunnable); // wake up the selector to speed things diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java index 4a7a1a998c..889a08aa26 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -25,13 +25,13 @@ import io.netty.channel.socket.Worker; public abstract class AbstractOioChannelSink extends AbstractChannelSink{ @Override - public void fireEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { + public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { Channel ch = e.getChannel(); if (ch instanceof AbstractOioChannel) { AbstractOioChannel channel = (AbstractOioChannel) ch; Worker worker = channel.worker; if (worker != null && channel.workerThread != Thread.currentThread()) { - channel.worker.fireEventLater(new Runnable() { + channel.worker.executeInIoThread(new Runnable() { @Override public void run() { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index b20b24a926..5dd3e8b14d 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -91,8 +91,10 @@ abstract class AbstractOioWorker implements Worker @Override - public void fireEventLater(Runnable eventRunnable) { + public void executeInIoThread(Runnable eventRunnable) { assert eventQueue.offer(eventRunnable); + + // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest } private void processEventQueue() throws IOException { From 04a6ff92afe2c590a465065135798c438d3f5511 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 25 Feb 2012 14:28:43 +0100 Subject: [PATCH 04/13] Add static helper methods to fire upstream events later. See #187 and #140 --- .../main/java/io/netty/channel/Channels.java | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/transport/src/main/java/io/netty/channel/Channels.java b/transport/src/main/java/io/netty/channel/Channels.java index e5852d2ced..879758c347 100644 --- a/transport/src/main/java/io/netty/channel/Channels.java +++ b/transport/src/main/java/io/netty/channel/Channels.java @@ -321,6 +321,20 @@ public final class Channels { public static void fireWriteComplete(ChannelHandlerContext ctx, long amount) { ctx.sendUpstream(new DefaultWriteCompletionEvent(ctx.getChannel(), amount)); } + + + + /** + * Sends a {@code "channelInterestChanged"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} once the io-thread runs again. + */ + public static void fireChannelInterestChangedLater(Channel channel) { + channel.getPipeline().sendUpstreamLater( + new UpstreamChannelStateEvent( + channel, ChannelState.INTEREST_OPS, Channel.OP_READ)); + } + /** * Sends a {@code "channelInterestChanged"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of @@ -346,6 +360,17 @@ public final class Channels { ctx.getChannel(), ChannelState.INTEREST_OPS, Channel.OP_READ)); } + /** + * Sends a {@code "channelDisconnected"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} once the io-thread runs again. + */ + public static void fireChannelDisconnectedLater(Channel channel) { + channel.getPipeline().sendUpstreamLater( + new UpstreamChannelStateEvent( + channel, ChannelState.CONNECTED, null)); + } + /** * Sends a {@code "channelDisconnected"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of @@ -368,6 +393,18 @@ public final class Channels { ctx.getChannel(), ChannelState.CONNECTED, null)); } + + + /** + * Sends a {@code "channelUnbound"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} once the io-thread runs again. + */ + public static void fireChannelUnboundLater(Channel channel) { + channel.getPipeline().sendUpstreamLater(new UpstreamChannelStateEvent( + channel, ChannelState.BOUND, null)); + } + /** * Sends a {@code "channelUnbound"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of @@ -390,6 +427,24 @@ public final class Channels { ctx.getChannel(), ChannelState.BOUND, null)); } + + + /** + * Sends a {@code "channelClosed"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} once the io-thread runs again. + */ + public static void fireChannelClosedLater(Channel channel) { + channel.getPipeline().sendUpstream( + new UpstreamChannelStateEvent( + channel, ChannelState.OPEN, Boolean.FALSE)); + + // Notify the parent handler. + if (channel.getParent() != null) { + fireChildChannelStateChangedLater(channel.getParent(), channel); + } + } + /** * Sends a {@code "channelClosed"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of @@ -418,6 +473,19 @@ public final class Channels { ctx.getChannel(), ChannelState.OPEN, Boolean.FALSE)); } + + + /** + * Sends a {@code "exceptionCaught"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} once the io-thread runs again. + */ + public static void fireExceptionCaughtLater(Channel channel, Throwable cause) { + channel.getPipeline().sendUpstream( + new DefaultExceptionEvent(channel, cause)); + } + + /** * Sends a {@code "exceptionCaught"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of @@ -444,6 +512,13 @@ public final class Channels { new DefaultChildChannelStateEvent(channel, childChannel)); } + private static void fireChildChannelStateChangedLater( + Channel channel, Channel childChannel) { + channel.getPipeline().sendUpstreamLater( + new DefaultChildChannelStateEvent(channel, childChannel)); + } + + /** * Sends a {@code "bind"} request to the last * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of From c4a437e16b42e88eb9947c65d3dd2413bd9ef5ac Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 25 Feb 2012 14:30:10 +0100 Subject: [PATCH 05/13] Fix later sending of exceptionCaught events. See #187 and #140 --- transport/src/main/java/io/netty/channel/Channels.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/src/main/java/io/netty/channel/Channels.java b/transport/src/main/java/io/netty/channel/Channels.java index 879758c347..c70978987d 100644 --- a/transport/src/main/java/io/netty/channel/Channels.java +++ b/transport/src/main/java/io/netty/channel/Channels.java @@ -481,7 +481,7 @@ public final class Channels { * the specified {@link Channel} once the io-thread runs again. */ public static void fireExceptionCaughtLater(Channel channel, Throwable cause) { - channel.getPipeline().sendUpstream( + channel.getPipeline().sendUpstreamLater( new DefaultExceptionEvent(channel, cause)); } From ef64e8c332717257642583d48ccca6be82aa575e Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 25 Feb 2012 15:12:58 +0100 Subject: [PATCH 06/13] oio and nio transport now make sure that a upstream event get only executed from an io thread. See #140 and #187 --- .../main/java/io/netty/channel/Channels.java | 15 ++++ .../channel/socket/nio/AbstractNioWorker.java | 69 ++++++++++++++++--- .../channel/socket/nio/NioDatagramWorker.java | 15 +++- .../channel/socket/oio/AbstractOioWorker.java | 52 ++++++++++---- .../channel/socket/oio/OioDatagramWorker.java | 28 ++++++-- .../netty/channel/socket/oio/OioWorker.java | 19 ++++- 6 files changed, 164 insertions(+), 34 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/Channels.java b/transport/src/main/java/io/netty/channel/Channels.java index c70978987d..c24aa19399 100644 --- a/transport/src/main/java/io/netty/channel/Channels.java +++ b/transport/src/main/java/io/netty/channel/Channels.java @@ -298,6 +298,21 @@ public final class Channels { ctx.getChannel(), message, remoteAddress)); } + /** + * Sends a {@code "writeComplete"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} in the next io-thread. + */ + public static void fireWriteCompleteLater(Channel channel, long amount) { + if (amount == 0) { + return; + } + + channel.getPipeline().sendUpstreamLater( + new DefaultWriteCompletionEvent(channel, amount)); + } + + /** * Sends a {@code "writeComplete"} event to the first * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 43ad21ad9c..14ffdc6d99 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -398,6 +398,7 @@ abstract class AbstractNioWorker implements Worker { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; + boolean iothread = isIoThread(channel); long writtenBytes = 0; @@ -468,7 +469,11 @@ abstract class AbstractNioWorker implements Worker { buf = null; evt = null; future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } if (t instanceof IOException) { open = false; close(channel, succeededFuture(channel)); @@ -491,10 +496,17 @@ abstract class AbstractNioWorker implements Worker { } } } - - fireWriteComplete(channel, writtenBytes); + if (iothread) { + fireWriteComplete(channel, writtenBytes); + } else { + fireWriteCompleteLater(channel, writtenBytes); + } } + static boolean isIoThread(AbstractNioChannel channel) { + return Thread.currentThread() == channel.worker.thread; + } + private void setOpWrite(AbstractNioChannel channel) { Selector selector = this.selector; SelectionKey key = channel.channel.keyFor(selector); @@ -545,6 +557,8 @@ abstract class AbstractNioWorker implements Worker { void close(AbstractNioChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); + boolean iothread = isIoThread(channel); + try { channel.channel.close(); cancelledKeys ++; @@ -552,20 +566,36 @@ abstract class AbstractNioWorker implements Worker { if (channel.setClosed()) { future.setSuccess(); if (connected) { - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } if (bound) { - fireChannelUnbound(channel); + if (iothread) { + fireChannelUnbound(channel); + } else { + fireChannelUnboundLater(channel); + } } cleanUpWriteBuffer(channel); - fireChannelClosed(channel); + if (iothread) { + fireChannelClosed(channel); + } else { + fireChannelClosedLater(channel); + } } else { future.setSuccess(); } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } @@ -618,12 +648,17 @@ abstract class AbstractNioWorker implements Worker { } if (fireExceptionCaught) { - fireExceptionCaught(channel, cause); + if (isIoThread(channel)) { + fireExceptionCaught(channel, cause); + } else { + fireExceptionCaughtLater(channel, cause); + } } } void setInterestOps(AbstractNioChannel channel, ChannelFuture future, int interestOps) { boolean changed = false; + boolean iothread = isIoThread(channel); try { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. @@ -684,16 +719,28 @@ abstract class AbstractNioWorker implements Worker { future.setSuccess(); if (changed) { - fireChannelInterestChanged(channel); + if (iothread) { + fireChannelInterestChanged(channel); + } else { + fireChannelInterestChangedLater(channel); + } } } catch (CancelledKeyException e) { // setInterestOps() was called on a closed channel. ClosedChannelException cce = new ClosedChannelException(); future.setFailure(cce); - fireExceptionCaught(channel, cce); + if (iothread) { + fireExceptionCaught(channel, cce); + } else { + fireExceptionCaughtLater(channel, cce); + } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java index fb7ddde380..6191df2e08 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioDatagramWorker.java @@ -16,7 +16,9 @@ package io.netty.channel.socket.nio; import static io.netty.channel.Channels.fireChannelDisconnected; +import static io.netty.channel.Channels.fireChannelDisconnectedLater; import static io.netty.channel.Channels.fireExceptionCaught; +import static io.netty.channel.Channels.fireExceptionCaughtLater; import static io.netty.channel.Channels.fireMessageReceived; import static io.netty.channel.Channels.succeededFuture; import io.netty.buffer.ChannelBufferFactory; @@ -126,15 +128,24 @@ class NioDatagramWorker extends AbstractNioWorker { static void disconnect(NioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); + boolean iothread = isIoThread(channel); try { channel.getDatagramChannel().disconnect(); future.setSuccess(); if (connected) { - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index 5dd3e8b14d..97b20149a9 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -15,12 +15,7 @@ */ package io.netty.channel.socket.oio; -import static io.netty.channel.Channels.fireChannelClosed; -import static io.netty.channel.Channels.fireChannelDisconnected; -import static io.netty.channel.Channels.fireChannelInterestChanged; -import static io.netty.channel.Channels.fireChannelUnbound; -import static io.netty.channel.Channels.fireExceptionCaught; -import static io.netty.channel.Channels.succeededFuture; +import static io.netty.channel.Channels.*; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.Channels; @@ -89,6 +84,9 @@ abstract class AbstractOioWorker implements Worker close(channel, succeededFuture(channel)); } + static boolean isIoThead(AbstractOioChannel channel) { + return Thread.currentThread() == channel.workerThread; + } @Override public void executeInIoThread(Runnable eventRunnable) { @@ -120,7 +118,8 @@ abstract class AbstractOioWorker implements Worker static void setInterestOps( AbstractOioChannel channel, ChannelFuture future, int interestOps) { - + boolean iothread = isIoThead(channel); + // Override OP_WRITE flag - a user cannot change this flag. interestOps &= ~Channel.OP_WRITE; interestOps |= channel.getInterestOps() & Channel.OP_WRITE; @@ -148,18 +147,27 @@ abstract class AbstractOioWorker implements Worker workerThread.interrupt(); } } - - fireChannelInterestChanged(channel); + if (iothread) { + fireChannelInterestChanged(channel); + } else { + fireChannelInterestChangedLater(channel); + } } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } static void close(AbstractOioChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); + boolean iothread = isIoThead(channel); + try { channel.closeSocket(); if (channel.setClosed()) { @@ -171,18 +179,34 @@ abstract class AbstractOioWorker implements Worker if (workerThread != null && currentThread != workerThread) { workerThread.interrupt(); } - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } if (bound) { - fireChannelUnbound(channel); + if (iothread) { + fireChannelUnbound(channel); + } else { + fireChannelUnboundLater(channel); + } + } + if (iothread) { + fireChannelClosed(channel); + } else { + fireChannelClosedLater(channel); } - fireChannelClosed(channel); } else { future.setSuccess(); } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java index f1b42b42f9..8ce169b90a 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java @@ -63,6 +63,8 @@ class OioDatagramWorker extends AbstractOioWorker { static void write( OioDatagramChannel channel, ChannelFuture future, Object message, SocketAddress remoteAddress) { + boolean iothread = isIoThead(channel); + try { ChannelBuffer buf = (ChannelBuffer) message; int offset = buf.readerIndex(); @@ -84,27 +86,45 @@ class OioDatagramWorker extends AbstractOioWorker { packet.setSocketAddress(remoteAddress); } channel.socket.send(packet); - fireWriteComplete(channel, length); + if (iothread) { + fireWriteComplete(channel, length); + } else { + fireWriteCompleteLater(channel, length); + } future.setSuccess(); } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } static void disconnect(OioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); + boolean iothread = isIoThead(channel); + try { channel.socket.disconnect(); future.setSuccess(); if (connected) { // Notify. - fireChannelDisconnected(channel); + if (iothread) { + fireChannelDisconnected(channel); + } else { + fireChannelDisconnectedLater(channel); + } } } catch (Throwable t) { future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java index bb0f7148d9..180d756d36 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java @@ -65,11 +65,16 @@ class OioWorker extends AbstractOioWorker { OioSocketChannel channel, ChannelFuture future, Object message) { + boolean iothread = isIoThead(channel); OutputStream out = channel.getOutputStream(); if (out == null) { Exception e = new ClosedChannelException(); future.setFailure(e); - fireExceptionCaught(channel, e); + if (iothread) { + fireExceptionCaught(channel, e); + } else { + fireExceptionCaughtLater(channel, e); + } return; } @@ -106,7 +111,11 @@ class OioWorker extends AbstractOioWorker { } } - fireWriteComplete(channel, length); + if (iothread) { + fireWriteComplete(channel, length); + } else { + fireWriteCompleteLater(channel, length); + } future.setSuccess(); } catch (Throwable t) { @@ -118,7 +127,11 @@ class OioWorker extends AbstractOioWorker { t = new ClosedChannelException(); } future.setFailure(t); - fireExceptionCaught(channel, t); + if (iothread) { + fireExceptionCaught(channel, t); + } else { + fireExceptionCaughtLater(channel, t); + } } } From f2d1f1e8ad00ad2c6f81b8c63ec1d37a0d9bdbb2 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 25 Feb 2012 15:54:33 +0100 Subject: [PATCH 07/13] Also fix the exception handling if a ChannelHandler throws an Exception based of if its a io thread or not. See #187 and #140 --- .../socket/http/AbstractHttpChannelSink.java | 30 ------------------- .../http/HttpTunnelAcceptedChannelSink.java | 3 +- .../http/HttpTunnelClientChannelSink.java | 3 +- .../http/HttpTunnelServerChannelSink.java | 3 +- .../channel/socket/http/FakeChannelSink.java | 3 +- .../netty/channel/rxtx/RxtxChannelSink.java | 8 ----- .../channel/sctp/AbstractScptChannelSink.java | 4 +-- .../io/netty/channel/AbstractChannelSink.java | 21 +++++++++++-- .../channel/iostream/IoStreamChannelSink.java | 8 ----- .../channel/local/LocalClientChannelSink.java | 8 ----- .../channel/local/LocalServerChannelSink.java | 8 ----- .../java/io/netty/channel/socket/Worker.java | 2 +- .../socket/nio/AbstractNioChannelSink.java | 14 +++++++-- .../channel/socket/nio/AbstractNioWorker.java | 2 +- .../socket/oio/AbstractOioChannelSink.java | 14 +++++++-- .../channel/socket/oio/AbstractOioWorker.java | 4 +-- 16 files changed, 57 insertions(+), 78 deletions(-) delete mode 100644 transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java deleted file mode 100644 index e578a2dcfe..0000000000 --- a/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.channel.socket.http; - -import io.netty.channel.AbstractChannelSink; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelPipeline; - -public abstract class AbstractHttpChannelSink extends AbstractChannelSink{ - - @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { - pipeline.sendUpstream(e); - } - -} diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java index ff0be867c6..591d20ac38 100644 --- a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java +++ b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java @@ -18,6 +18,7 @@ package io.netty.channel.socket.http; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ChannelBuffer; +import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -32,7 +33,7 @@ import io.netty.channel.MessageEvent; * from here to the ServerMessageSwitch, which queues the data awaiting a poll request from the * client end of the tunnel. */ -class HttpTunnelAcceptedChannelSink extends AbstractHttpChannelSink { +class HttpTunnelAcceptedChannelSink extends AbstractChannelSink { final SaturationManager saturationManager; diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java index 6a55acf7c1..5f1ecce1c5 100644 --- a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java +++ b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.http; import java.net.InetSocketAddress; +import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelStateEvent; @@ -26,7 +27,7 @@ import io.netty.channel.MessageEvent; * Sink of a client channel, deals with sunk events and then makes appropriate calls * on the channel itself to push data. */ -class HttpTunnelClientChannelSink extends AbstractHttpChannelSink { +class HttpTunnelClientChannelSink extends AbstractChannelSink { @Override public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java index a7ed9ee1bb..e755229674 100644 --- a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java +++ b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.http; import java.net.SocketAddress; +import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -26,7 +27,7 @@ import io.netty.channel.socket.ServerSocketChannel; /** */ -class HttpTunnelServerChannelSink extends AbstractHttpChannelSink { +class HttpTunnelServerChannelSink extends AbstractChannelSink { private ChannelFutureListener closeHook; diff --git a/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java b/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java index 0906755de0..0798eb94ad 100644 --- a/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java +++ b/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java @@ -19,13 +19,14 @@ package io.netty.channel.socket.http; import java.util.LinkedList; import java.util.Queue; +import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; /** * A fake channel sink for use in testing */ -public class FakeChannelSink extends AbstractHttpChannelSink { +public class FakeChannelSink extends AbstractChannelSink { public Queue events = new LinkedList(); diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java index 5009dc1427..fd1c34c0a0 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java @@ -329,12 +329,4 @@ public class RxtxChannelSink extends AbstractChannelSink { } } } - - /** - * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise - */ - @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { - pipeline.sendUpstream(event); - } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java index 5daba78839..f7f458639c 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java @@ -21,7 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; -public abstract class AbstractScptChannelSink extends AbstractChannelSink{ +public abstract class AbstractScptChannelSink extends AbstractChannelSink { @Override public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { @@ -41,7 +41,7 @@ public abstract class AbstractScptChannelSink extends AbstractChannelSink{ pipeline.sendUpstream(e); } } else { - throw new UnsupportedOperationException(); + super.fireUpstreamEventLater(pipeline, e); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelSink.java b/transport/src/main/java/io/netty/channel/AbstractChannelSink.java index a1c27839ff..69b199aeb7 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelSink.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelSink.java @@ -43,7 +43,24 @@ public abstract class AbstractChannelSink implements ChannelSink { if (actualCause == null) { actualCause = cause; } - - fireExceptionCaught(event.getChannel(), actualCause); + if (isFireExceptionCaughtLater(event, actualCause)) { + fireExceptionCaughtLater(event.getChannel(), actualCause); + } else { + fireExceptionCaught(event.getChannel(), actualCause); + } } + + protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { + return false; + } + + /** + * This implementation just send the event now via {@link ChannelPipeline#sendUpstream(ChannelEvent)}. Sub-classes should override this if they can handle it + * in a better way + */ + @Override + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + pipeline.sendUpstream(e); + } + } diff --git a/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java b/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java index 51de5d3311..a0d75ceef7 100755 --- a/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java +++ b/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java @@ -177,12 +177,4 @@ public class IoStreamChannelSink extends AbstractChannelSink { } } } - - /** - * This just calls {@link ChannelPipeline#sendUpstream(ChannelEvent)} as the transport does not support it - */ - @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { - pipeline.sendUpstream(e); - } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java index d882725de0..9b6bd455de 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java +++ b/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java @@ -85,14 +85,6 @@ final class LocalClientChannelSink extends AbstractChannelSink { } } - /** - * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise - */ - @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { - pipeline.sendUpstream(event); - } - private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) { try { if (!LocalChannelRegistry.register(localAddress, channel)) { diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java index 6ead0ad3a2..6bfa780f28 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java @@ -42,14 +42,6 @@ final class LocalServerChannelSink extends AbstractChannelSink { } } - /** - * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise - */ - @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { - pipeline.sendUpstream(event); - } - private void handleServerChannel(ChannelEvent e) { if (!(e instanceof ChannelStateEvent)) { return; diff --git a/transport/src/main/java/io/netty/channel/socket/Worker.java b/transport/src/main/java/io/netty/channel/socket/Worker.java index eebfc74d40..64dc433038 100644 --- a/transport/src/main/java/io/netty/channel/socket/Worker.java +++ b/transport/src/main/java/io/netty/channel/socket/Worker.java @@ -20,7 +20,7 @@ package io.netty.channel.socket; * A {@link Worker} is responsible to dispatch IO operations * */ -public interface Worker extends Runnable{ +public interface Worker extends Runnable { /** * Execute the given {@link Runnable} in the IO-Thread. This may be now or later once the IO-Thread do some other work. diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index 85a8d03d3a..9ecdcb0699 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -21,7 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; -public abstract class AbstractNioChannelSink extends AbstractChannelSink{ +public abstract class AbstractNioChannelSink extends AbstractChannelSink { @Override public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { @@ -41,9 +41,19 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink{ pipeline.sendUpstream(e); } } else { - throw new UnsupportedOperationException(); + super.fireUpstreamEventLater(pipeline, e); } } + @Override + protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { + Channel channel = event.getChannel(); + boolean fireLater = false; + if (channel instanceof AbstractNioChannel) { + fireLater = !AbstractNioWorker.isIoThread((AbstractNioChannel) channel); + } + return fireLater; + } + } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 14ffdc6d99..8f6dafd468 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -504,7 +504,7 @@ abstract class AbstractNioWorker implements Worker { } static boolean isIoThread(AbstractNioChannel channel) { - return Thread.currentThread() == channel.worker.thread; + return channel.worker.thread == null || Thread.currentThread() == channel.worker.thread; } private void setOpWrite(AbstractNioChannel channel) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java index 889a08aa26..633818ec01 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -22,7 +22,7 @@ import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.Worker; -public abstract class AbstractOioChannelSink extends AbstractChannelSink{ +public abstract class AbstractOioChannelSink extends AbstractChannelSink { @Override public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { @@ -44,9 +44,19 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink{ } } else { - throw new UnsupportedOperationException(); + super.fireUpstreamEventLater(pipeline, e); } } + @Override + protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { + Channel channel = event.getChannel(); + boolean fireLater = false; + if (channel instanceof AbstractOioChannel) { + fireLater = !AbstractOioWorker.isIoThead((AbstractOioChannel) channel); + } + return fireLater; + } + } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index 97b20149a9..167b58309d 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -30,7 +30,7 @@ import java.util.Queue; * * @param {@link AbstractOioChannel} */ -abstract class AbstractOioWorker implements Worker{ +abstract class AbstractOioWorker implements Worker { private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); @@ -85,7 +85,7 @@ abstract class AbstractOioWorker implements Worker } static boolean isIoThead(AbstractOioChannel channel) { - return Thread.currentThread() == channel.workerThread; + return channel.workerThread == null || Thread.currentThread() == channel.workerThread; } @Override From 68066c5e4b53df40a35d439a3f0b49d51a449738 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 25 Feb 2012 16:05:41 +0100 Subject: [PATCH 08/13] Make sure that ChannelDownstreamHandler impl fire exception caughts later via the io-worker. See #140 and #187 --- .../handler/stream/ChunkedWriteHandler.java | 30 ++++++++++++------- .../handler/timeout/WriteTimeoutHandler.java | 2 +- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index c5387a33e9..6fc29eaebd 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -90,7 +90,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns } try { - flush(ctx); + flush(ctx, false); } catch (Exception e) { if (logger.isWarnEnabled()) { logger.warn("Unexpected exception while sending chunks.", e); @@ -112,10 +112,10 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns final Channel channel = ctx.getChannel(); if (channel.isWritable()) { this.ctx = ctx; - flush(ctx); + flush(ctx, false); } else if (!channel.isConnected()) { this.ctx = ctx; - discard(ctx); + discard(ctx, false); } } @@ -127,12 +127,12 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns switch (cse.getState()) { case INTEREST_OPS: // Continue writing when the channel becomes writable. - flush(ctx); + flush(ctx, true); break; case OPEN: if (!Boolean.TRUE.equals(cse.getValue())) { // Fail all pending writes - discard(ctx); + discard(ctx, true); } break; } @@ -140,7 +140,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns ctx.sendUpstream(e); } - private void discard(ChannelHandlerContext ctx) { + private void discard(ChannelHandlerContext ctx, boolean fireNow) { ClosedChannelException cause = null; boolean fireExceptionCaught = false; @@ -175,14 +175,18 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns if (fireExceptionCaught) { - Channels.fireExceptionCaught(ctx.getChannel(), cause); + if (fireNow) { + fireExceptionCaught(ctx.getChannel(), cause); + } else { + fireExceptionCaughtLater(ctx.getChannel(), cause); + } } } - private synchronized void flush(ChannelHandlerContext ctx) throws Exception { + private synchronized void flush(ChannelHandlerContext ctx, boolean fireNow) throws Exception { final Channel channel = ctx.getChannel(); if (!channel.isConnected()) { - discard(ctx); + discard(ctx, fireNow); } while (channel.isWritable()) { @@ -220,7 +224,11 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns this.currentEvent = null; currentEvent.getFuture().setFailure(t); - fireExceptionCaught(ctx, t); + if (fireNow) { + fireExceptionCaught(ctx, t); + } else { + fireExceptionCaughtLater(ctx.getChannel(), t); + } closeInput(chunks); break; @@ -262,7 +270,7 @@ public class ChunkedWriteHandler implements ChannelUpstreamHandler, ChannelDowns } if (!channel.isConnected()) { - discard(ctx); + discard(ctx, fireNow); break; } } diff --git a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java index b5af55d670..5dd2b4e19c 100644 --- a/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/WriteTimeoutHandler.java @@ -154,7 +154,7 @@ public class WriteTimeoutHandler extends SimpleChannelDownstreamHandler } protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception { - Channels.fireExceptionCaught(ctx, EXCEPTION); + Channels.fireExceptionCaughtLater(ctx.getChannel(), EXCEPTION); } private final class WriteTimeoutTask implements TimerTask { From cfe7b4959475f8690a5a0b3357230a6a539a916d Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Sat, 25 Feb 2012 17:11:14 +0100 Subject: [PATCH 09/13] Cleaner impl of AbstractNioChannelSink and AbstractOioChannelSink. See #140 and #187 --- .../netty/channel/socket/nio/AbstractNioChannelSink.java | 2 +- .../io/netty/channel/socket/nio/AbstractNioWorker.java | 2 +- .../netty/channel/socket/oio/AbstractOioChannelSink.java | 2 +- .../io/netty/channel/socket/oio/AbstractOioWorker.java | 9 ++++++--- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index 9ecdcb0699..bf034eca3a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -29,7 +29,7 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink { if (ch instanceof AbstractNioChannel) { AbstractNioChannel channel = (AbstractNioChannel) ch; // check if the current thread is a worker thread if so we can send the event now - if (channel.worker.thread != Thread.currentThread()) { + if (!AbstractNioWorker.isIoThread(channel)) { channel.worker.executeInIoThread(new Runnable() { @Override diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 8f6dafd468..14ffdc6d99 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -504,7 +504,7 @@ abstract class AbstractNioWorker implements Worker { } static boolean isIoThread(AbstractNioChannel channel) { - return channel.worker.thread == null || Thread.currentThread() == channel.worker.thread; + return Thread.currentThread() == channel.worker.thread; } private void setOpWrite(AbstractNioChannel channel) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java index 633818ec01..485e5cb452 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -30,7 +30,7 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink { if (ch instanceof AbstractOioChannel) { AbstractOioChannel channel = (AbstractOioChannel) ch; Worker worker = channel.worker; - if (worker != null && channel.workerThread != Thread.currentThread()) { + if (worker != null && !AbstractOioWorker.isIoThead(channel)) { channel.worker.executeInIoThread(new Runnable() { @Override diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index 167b58309d..ea773bc442 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -81,11 +81,11 @@ abstract class AbstractOioWorker implements Worker channel.workerThread = null; // Clean up. - close(channel, succeededFuture(channel)); + close(channel, succeededFuture(channel), true); } static boolean isIoThead(AbstractOioChannel channel) { - return channel.workerThread == null || Thread.currentThread() == channel.workerThread; + return Thread.currentThread() == channel.workerThread; } @Override @@ -164,9 +164,12 @@ abstract class AbstractOioWorker implements Worker } static void close(AbstractOioChannel channel, ChannelFuture future) { + close(channel, future, isIoThead(channel)); + } + + private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) { boolean connected = channel.isConnected(); boolean bound = channel.isBound(); - boolean iothread = isIoThead(channel); try { channel.closeSocket(); From 0beaa107b4be5e68928349c4131d01a655003641 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 27 Feb 2012 20:45:46 +0100 Subject: [PATCH 10/13] Fix assert usage. Thanks Trustin for review --- .../java/io/netty/channel/socket/nio/AbstractNioWorker.java | 4 +++- .../java/io/netty/channel/socket/oio/AbstractOioWorker.java | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 14ffdc6d99..a5a109d9c1 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -273,7 +273,9 @@ abstract class AbstractNioWorker implements Worker { @Override public void executeInIoThread(Runnable eventRunnable) { - assert eventQueue.offer(eventRunnable); + boolean added = eventQueue.offer(eventRunnable); + + assert added; // wake up the selector to speed things selector.wakeup(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index ea773bc442..fb1b403894 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -90,8 +90,9 @@ abstract class AbstractOioWorker implements Worker @Override public void executeInIoThread(Runnable eventRunnable) { - assert eventQueue.offer(eventRunnable); + boolean added = eventQueue.offer(eventRunnable); + assert added; // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest } From b6700fbe58894c59f1caae1fe97cf962ac066016 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 27 Feb 2012 20:46:40 +0100 Subject: [PATCH 11/13] Fix naming of class. Thanks Trustin for review --- ...bstractScptChannelSink.java => AbstractSctpChannelSink.java} | 2 +- .../main/java/io/netty/channel/sctp/SctpClientPipelineSink.java | 2 +- .../main/java/io/netty/channel/sctp/SctpServerPipelineSink.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename transport-sctp/src/main/java/io/netty/channel/sctp/{AbstractScptChannelSink.java => AbstractSctpChannelSink.java} (93%) diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java similarity index 93% rename from transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java rename to transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java index f7f458639c..fcc0c9f1aa 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java @@ -21,7 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; -public abstract class AbstractScptChannelSink extends AbstractChannelSink { +public abstract class AbstractSctpChannelSink extends AbstractChannelSink { @Override public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java index bd65b89e57..7005422e80 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpClientPipelineSink.java @@ -47,7 +47,7 @@ import io.netty.util.internal.QueueFactory; /** */ -class SctpClientPipelineSink extends AbstractScptChannelSink { +class SctpClientPipelineSink extends AbstractSctpChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(SctpClientPipelineSink.class); diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java index ce34643315..c4001af990 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpServerPipelineSink.java @@ -44,7 +44,7 @@ import io.netty.util.internal.DeadLockProofWorker; /** */ -class SctpServerPipelineSink extends AbstractScptChannelSink { +class SctpServerPipelineSink extends AbstractSctpChannelSink { static final InternalLogger logger = InternalLoggerFactory.getInstance(SctpServerPipelineSink.class); From 4df3c612339bf717cfe902cb213abe3f59eda558 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 29 Feb 2012 21:07:02 +0100 Subject: [PATCH 12/13] Allow to submit a Runnable that get execute in the io-thread. This is also used to workout flaws in the thread-model. See #209 #140 #187 --- .../codec/embedder/AbstractCodecEmbedder.java | 11 ++- .../channel/sctp/AbstractSctpChannelSink.java | 22 ++--- .../io/netty/channel/sctp/SctpWorker.java | 32 +++++-- .../io/netty/channel/AbstractChannelSink.java | 11 ++- .../io/netty/channel/ChannelPipeline.java | 9 +- .../java/io/netty/channel/ChannelSink.java | 4 +- .../main/java/io/netty/channel/Channels.java | 86 +++++++++++-------- .../netty/channel/DefaultChannelPipeline.java | 15 ++-- .../socket/ChannelRunnableWrapper.java | 42 +++++++++ .../java/io/netty/channel/socket/Worker.java | 5 +- .../socket/nio/AbstractNioChannelSink.java | 23 ++--- .../channel/socket/nio/AbstractNioWorker.java | 29 +++++-- .../socket/oio/AbstractOioChannelSink.java | 28 +++--- .../channel/socket/oio/AbstractOioWorker.java | 33 +++++-- .../channel/socket/oio/OioDatagramWorker.java | 4 +- .../netty/channel/socket/oio/OioWorker.java | 2 +- 16 files changed, 224 insertions(+), 132 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java diff --git a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java index b0796cd994..2521682ca7 100644 --- a/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java +++ b/codec/src/main/java/io/netty/handler/codec/embedder/AbstractCodecEmbedder.java @@ -22,9 +22,11 @@ import java.util.ConcurrentModificationException; import java.util.LinkedList; import java.util.Queue; +import io.netty.channel.Channels; import io.netty.buffer.ChannelBufferFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -226,8 +228,13 @@ abstract class AbstractCodecEmbedder implements CodecEmbedder { } @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { - handleEvent(e); + public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) { + try { + task.run(); + return Channels.succeededFuture(pipeline.getChannel()); + } catch (Throwable t) { + return Channels.failedFuture(pipeline.getChannel(), t); + } } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java index fcc0c9f1aa..f01321c2ce 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractSctpChannelSink.java @@ -18,30 +18,20 @@ package io.netty.channel.sctp; import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; -import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; public abstract class AbstractSctpChannelSink extends AbstractChannelSink { @Override - public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { - Channel ch = e.getChannel(); + public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { + Channel ch = pipeline.getChannel(); if (ch instanceof SctpChannelImpl) { SctpChannelImpl channel = (SctpChannelImpl) ch; - // check if the current thread is a worker thread, and only fire the event later if thats not the case - if (channel.worker.thread != Thread.currentThread()) { - channel.worker.executeInIoThread(new Runnable() { - - @Override - public void run() { - pipeline.sendUpstream(e); - } - }); - } else { - pipeline.sendUpstream(e); - } + return channel.worker.executeInIoThread(channel, task); + } else { - super.fireUpstreamEventLater(pipeline, e); + return super.execute(pipeline, task); } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java index 1cc3a764ac..1912f1ae46 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/SctpWorker.java @@ -31,6 +31,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -45,6 +46,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.MessageEvent; import io.netty.channel.ReceiveBufferSizePredictor; import io.netty.channel.sctp.SctpSendBufferPool.SendBuffer; +import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -246,11 +248,31 @@ class SctpWorker implements Worker { } @Override - public void executeInIoThread(Runnable eventRunnable) { - assert eventQueue.offer(eventRunnable); - - // wake up the selector to speed things - selector.wakeup(); + public ChannelFuture executeInIoThread(Channel channel, Runnable task) { + if (channel instanceof SctpChannelImpl && isIoThread((SctpChannelImpl) channel)) { + try { + task.run(); + return succeededFuture(channel); + } catch (Throwable t) { + return failedFuture(channel, t); + } + } else { + ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); + boolean added = eventQueue.offer(channelRunnable); + + if (added) { + // wake up the selector to speed things + selector.wakeup(); + } else { + channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); + } + return channelRunnable; + } + + } + + static boolean isIoThread(SctpChannelImpl channel) { + return Thread.currentThread() == channel.worker.thread; } private void processRegisterTaskQueue() throws IOException { diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelSink.java b/transport/src/main/java/io/netty/channel/AbstractChannelSink.java index 69b199aeb7..805995ac72 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelSink.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelSink.java @@ -55,12 +55,17 @@ public abstract class AbstractChannelSink implements ChannelSink { } /** - * This implementation just send the event now via {@link ChannelPipeline#sendUpstream(ChannelEvent)}. Sub-classes should override this if they can handle it + * This implementation just directly call {@link Runnable#run()}. Sub-classes should override this if they can handle it * in a better way */ @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { - pipeline.sendUpstream(e); + public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) { + try { + task.run(); + return Channels.succeededFuture(pipeline.getChannel()); + } catch (Throwable t) { + return Channels.failedFuture(pipeline.getChannel(), t); + } } } diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 6a74e5033c..4620e83fbd 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -443,14 +443,7 @@ public interface ChannelPipeline { void sendUpstream(ChannelEvent e); - /** - * Sends the specified {@link ChannelEvent} to the first - * {@link ChannelUpstreamHandler} in this pipeline when the next IO-Worker operation is performed. - * - * @throws NullPointerException - * if the specified event is {@code null} - */ - void sendUpstreamLater(ChannelEvent e); + ChannelFuture execute(Runnable task); /** * Sends the specified {@link ChannelEvent} to the last diff --git a/transport/src/main/java/io/netty/channel/ChannelSink.java b/transport/src/main/java/io/netty/channel/ChannelSink.java index 1a086ef982..e0e2650211 100644 --- a/transport/src/main/java/io/netty/channel/ChannelSink.java +++ b/transport/src/main/java/io/netty/channel/ChannelSink.java @@ -39,7 +39,7 @@ public interface ChannelSink { void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception; /** - * Schedule the given {@link ChannelEvent} for later execution (in the io-thread). Some implementation may not support his and just fire it directly + * Execute the given {@link Runnable} later in the io-thread. Some implementation may not support his and just execute it directly */ - void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception; + ChannelFuture execute(ChannelPipeline pipeline, Runnable task); } diff --git a/transport/src/main/java/io/netty/channel/Channels.java b/transport/src/main/java/io/netty/channel/Channels.java index c24aa19399..c5aa9ce93e 100644 --- a/transport/src/main/java/io/netty/channel/Channels.java +++ b/transport/src/main/java/io/netty/channel/Channels.java @@ -303,13 +303,14 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} in the next io-thread. */ - public static void fireWriteCompleteLater(Channel channel, long amount) { - if (amount == 0) { - return; - } - - channel.getPipeline().sendUpstreamLater( - new DefaultWriteCompletionEvent(channel, amount)); + public static ChannelFuture fireWriteCompleteLater(final Channel channel, final long amount) { + return channel.getPipeline().execute(new Runnable() { + @Override + public void run() { + fireWriteComplete(channel, amount); + } + }); + } @@ -344,10 +345,15 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} once the io-thread runs again. */ - public static void fireChannelInterestChangedLater(Channel channel) { - channel.getPipeline().sendUpstreamLater( - new UpstreamChannelStateEvent( - channel, ChannelState.INTEREST_OPS, Channel.OP_READ)); + public static ChannelFuture fireChannelInterestChangedLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelInterestChanged(channel); + + } + }); } /** @@ -380,10 +386,14 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} once the io-thread runs again. */ - public static void fireChannelDisconnectedLater(Channel channel) { - channel.getPipeline().sendUpstreamLater( - new UpstreamChannelStateEvent( - channel, ChannelState.CONNECTED, null)); + public static ChannelFuture fireChannelDisconnectedLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelDisconnected(channel); + } + }); } /** @@ -415,9 +425,14 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} once the io-thread runs again. */ - public static void fireChannelUnboundLater(Channel channel) { - channel.getPipeline().sendUpstreamLater(new UpstreamChannelStateEvent( - channel, ChannelState.BOUND, null)); + public static ChannelFuture fireChannelUnboundLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelUnbound(channel); + } + }); } /** @@ -449,15 +464,15 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} once the io-thread runs again. */ - public static void fireChannelClosedLater(Channel channel) { - channel.getPipeline().sendUpstream( - new UpstreamChannelStateEvent( - channel, ChannelState.OPEN, Boolean.FALSE)); - - // Notify the parent handler. - if (channel.getParent() != null) { - fireChildChannelStateChangedLater(channel.getParent(), channel); - } + public static ChannelFuture fireChannelClosedLater(final Channel channel) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireChannelClosed(channel); + } + }); + } /** @@ -495,9 +510,14 @@ public final class Channels { * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of * the specified {@link Channel} once the io-thread runs again. */ - public static void fireExceptionCaughtLater(Channel channel, Throwable cause) { - channel.getPipeline().sendUpstreamLater( - new DefaultExceptionEvent(channel, cause)); + public static ChannelFuture fireExceptionCaughtLater(final Channel channel, final Throwable cause) { + return channel.getPipeline().execute(new Runnable() { + + @Override + public void run() { + fireExceptionCaught(channel, cause); + } + }); } @@ -527,13 +547,7 @@ public final class Channels { new DefaultChildChannelStateEvent(channel, childChannel)); } - private static void fireChildChannelStateChangedLater( - Channel channel, Channel childChannel) { - channel.getPipeline().sendUpstreamLater( - new DefaultChildChannelStateEvent(channel, childChannel)); - } - /** * Sends a {@code "bind"} request to the last * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 4e4499e814..de75705212 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -21,6 +21,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.concurrent.RejectedExecutionException; import io.netty.logging.InternalLogger; import io.netty.logging.InternalLoggerFactory; @@ -584,12 +585,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public void sendUpstreamLater(ChannelEvent e) { - try { - getSink().fireUpstreamEventLater(this, e); - } catch (Throwable t) { - notifyHandlerException(e, t); - } + public ChannelFuture execute(Runnable task) { + return getSink().execute(this, task); } @Override @@ -843,10 +840,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { } @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) { if (logger.isWarnEnabled()) { - logger.warn("Not attached yet; discarding: " + e); + logger.warn("Not attached yet; rejecting: " + task); } + return Channels.failedFuture(pipeline.getChannel(), new RejectedExecutionException("Not attached yet")); } + } } diff --git a/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java b/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java new file mode 100644 index 0000000000..a8110e7db8 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java @@ -0,0 +1,42 @@ +/* + * Copyright 2011 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.socket; + +import io.netty.channel.Channel; +import io.netty.channel.DefaultChannelFuture; + +public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable { + + private Runnable task; + + public ChannelRunnableWrapper(Channel channel, Runnable task) { + super(channel, true); + this.task = task; + } + + @Override + public void run() { + try { + task.run(); + setSuccess(); + } catch (Throwable t) { + setFailure(t); + } + } + + + +} diff --git a/transport/src/main/java/io/netty/channel/socket/Worker.java b/transport/src/main/java/io/netty/channel/socket/Worker.java index 64dc433038..271897881a 100644 --- a/transport/src/main/java/io/netty/channel/socket/Worker.java +++ b/transport/src/main/java/io/netty/channel/socket/Worker.java @@ -16,6 +16,9 @@ package io.netty.channel.socket; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; + /** * A {@link Worker} is responsible to dispatch IO operations * @@ -27,5 +30,5 @@ public interface Worker extends Runnable { * * @param task the {@link Runnable} to execute */ - void executeInIoThread(Runnable task); + ChannelFuture executeInIoThread(Channel channel, Runnable task); } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index bf034eca3a..1a29f5ef19 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -19,30 +19,21 @@ package io.netty.channel.socket.nio; import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; public abstract class AbstractNioChannelSink extends AbstractChannelSink { @Override - public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { - Channel ch = e.getChannel(); + public ChannelFuture execute(ChannelPipeline pipeline, final Runnable task) { + Channel ch = pipeline.getChannel(); if (ch instanceof AbstractNioChannel) { AbstractNioChannel channel = (AbstractNioChannel) ch; - // check if the current thread is a worker thread if so we can send the event now - if (!AbstractNioWorker.isIoThread(channel)) { - channel.worker.executeInIoThread(new Runnable() { - - @Override - public void run() { - pipeline.sendUpstream(e); - } - }); - } else { - pipeline.sendUpstream(e); - } - } else { - super.fireUpstreamEventLater(pipeline, e); + + return channel.worker.executeInIoThread(ch, task); } + return super.execute(pipeline, task); + } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index a5a109d9c1..85bc12c61a 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -21,6 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.MessageEvent; +import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; import io.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import io.netty.logging.InternalLogger; @@ -41,6 +42,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -272,13 +274,28 @@ abstract class AbstractNioWorker implements Worker { } @Override - public void executeInIoThread(Runnable eventRunnable) { - boolean added = eventQueue.offer(eventRunnable); - - assert added; + public ChannelFuture executeInIoThread(Channel channel, Runnable task) { + if (channel instanceof AbstractNioChannel && isIoThread((AbstractNioChannel) channel)) { + try { + task.run(); + return succeededFuture(channel); + } catch (Throwable t) { + return failedFuture(channel, t); + } + } else { + ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); + boolean added = eventQueue.offer(channelRunnable); + + if (added) { + // wake up the selector to speed things + selector.wakeup(); + } else { + channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); + } + return channelRunnable; + } + - // wake up the selector to speed things - selector.wakeup(); } private void processRegisterTaskQueue() throws IOException { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java index 485e5cb452..d57c198534 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -19,33 +19,25 @@ package io.netty.channel.socket.oio; import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.Worker; public abstract class AbstractOioChannelSink extends AbstractChannelSink { @Override - public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { - Channel ch = e.getChannel(); + public ChannelFuture execute(final ChannelPipeline pipeline, final Runnable task) { + Channel ch = pipeline.getChannel(); if (ch instanceof AbstractOioChannel) { AbstractOioChannel channel = (AbstractOioChannel) ch; Worker worker = channel.worker; - if (worker != null && !AbstractOioWorker.isIoThead(channel)) { - channel.worker.executeInIoThread(new Runnable() { - - @Override - public void run() { - pipeline.sendUpstream(e); - } - }); - } else { - // no worker thread yet or the current thread is a worker thread so just fire the event now - pipeline.sendUpstream(e); + if (worker != null) { + return channel.worker.executeInIoThread(ch, task); } - - } else { - super.fireUpstreamEventLater(pipeline, e); - } + } + + return super.execute(pipeline, task); + } @@ -54,7 +46,7 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink { Channel channel = event.getChannel(); boolean fireLater = false; if (channel instanceof AbstractOioChannel) { - fireLater = !AbstractOioWorker.isIoThead((AbstractOioChannel) channel); + fireLater = !AbstractOioWorker.isIoThread((AbstractOioChannel) channel); } return fireLater; } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index fb1b403894..a2176f32c6 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -19,11 +19,13 @@ import static io.netty.channel.Channels.*; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.Channels; +import io.netty.channel.socket.ChannelRunnableWrapper; import io.netty.channel.socket.Worker; import io.netty.util.internal.QueueFactory; import java.io.IOException; import java.util.Queue; +import java.util.concurrent.RejectedExecutionException; /** * Abstract base class for Oio-Worker implementations @@ -84,16 +86,31 @@ abstract class AbstractOioWorker implements Worker close(channel, succeededFuture(channel), true); } - static boolean isIoThead(AbstractOioChannel channel) { + static boolean isIoThread(AbstractOioChannel channel) { return Thread.currentThread() == channel.workerThread; } @Override - public void executeInIoThread(Runnable eventRunnable) { - boolean added = eventQueue.offer(eventRunnable); - - assert added; - // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest + public ChannelFuture executeInIoThread(Channel channel, Runnable task) { + if (channel instanceof AbstractOioChannel && isIoThread((AbstractOioChannel) channel)) { + try { + task.run(); + return succeededFuture(channel); + } catch (Throwable t) { + return failedFuture(channel, t); + } + } else { + ChannelRunnableWrapper channelRunnable = new ChannelRunnableWrapper(channel, task); + boolean added = eventQueue.offer(channelRunnable); + + if (added) { + // as we set the SO_TIMEOUT to 1 second this task will get picked up in 1 second at latest + + } else { + channelRunnable.setFailure(new RejectedExecutionException("Unable to queue task " + task)); + } + return channelRunnable; + } } private void processEventQueue() throws IOException { @@ -119,7 +136,7 @@ abstract class AbstractOioWorker implements Worker static void setInterestOps( AbstractOioChannel channel, ChannelFuture future, int interestOps) { - boolean iothread = isIoThead(channel); + boolean iothread = isIoThread(channel); // Override OP_WRITE flag - a user cannot change this flag. interestOps &= ~Channel.OP_WRITE; @@ -165,7 +182,7 @@ abstract class AbstractOioWorker implements Worker } static void close(AbstractOioChannel channel, ChannelFuture future) { - close(channel, future, isIoThead(channel)); + close(channel, future, isIoThread(channel)); } private static void close(AbstractOioChannel channel, ChannelFuture future, boolean iothread) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java index 8ce169b90a..2581425616 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramWorker.java @@ -63,7 +63,7 @@ class OioDatagramWorker extends AbstractOioWorker { static void write( OioDatagramChannel channel, ChannelFuture future, Object message, SocketAddress remoteAddress) { - boolean iothread = isIoThead(channel); + boolean iothread = isIoThread(channel); try { ChannelBuffer buf = (ChannelBuffer) message; @@ -105,7 +105,7 @@ class OioDatagramWorker extends AbstractOioWorker { static void disconnect(OioDatagramChannel channel, ChannelFuture future) { boolean connected = channel.isConnected(); - boolean iothread = isIoThead(channel); + boolean iothread = isIoThread(channel); try { channel.socket.disconnect(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java index 180d756d36..5d4eb6aa3b 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioWorker.java @@ -65,7 +65,7 @@ class OioWorker extends AbstractOioWorker { OioSocketChannel channel, ChannelFuture future, Object message) { - boolean iothread = isIoThead(channel); + boolean iothread = isIoThread(channel); OutputStream out = channel.getOutputStream(); if (out == null) { Exception e = new ClosedChannelException(); From 5f465da38ded7ef73cb30bb20fd600e8af3266fe Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 29 Feb 2012 21:08:18 +0100 Subject: [PATCH 13/13] Add final keyword --- .../java/io/netty/channel/socket/ChannelRunnableWrapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java b/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java index a8110e7db8..aece47526a 100644 --- a/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java +++ b/transport/src/main/java/io/netty/channel/socket/ChannelRunnableWrapper.java @@ -20,7 +20,7 @@ import io.netty.channel.DefaultChannelFuture; public class ChannelRunnableWrapper extends DefaultChannelFuture implements Runnable { - private Runnable task; + private final Runnable task; public ChannelRunnableWrapper(Channel channel, Runnable task) { super(channel, true);