diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java deleted file mode 100644 index e578a2dcfe..0000000000 --- a/transport-http/src/main/java/io/netty/channel/socket/http/AbstractHttpChannelSink.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package io.netty.channel.socket.http; - -import io.netty.channel.AbstractChannelSink; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelPipeline; - -public abstract class AbstractHttpChannelSink extends AbstractChannelSink{ - - @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { - pipeline.sendUpstream(e); - } - -} diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java index ff0be867c6..591d20ac38 100644 --- a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java +++ b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelAcceptedChannelSink.java @@ -18,6 +18,7 @@ package io.netty.channel.socket.http; import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ChannelBuffer; +import io.netty.channel.AbstractChannelSink; import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; @@ -32,7 +33,7 @@ import io.netty.channel.MessageEvent; * from here to the ServerMessageSwitch, which queues the data awaiting a poll request from the * client end of the tunnel. */ -class HttpTunnelAcceptedChannelSink extends AbstractHttpChannelSink { +class HttpTunnelAcceptedChannelSink extends AbstractChannelSink { final SaturationManager saturationManager; diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java index 6a55acf7c1..5f1ecce1c5 100644 --- a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java +++ b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelClientChannelSink.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.http; import java.net.InetSocketAddress; +import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelStateEvent; @@ -26,7 +27,7 @@ import io.netty.channel.MessageEvent; * Sink of a client channel, deals with sunk events and then makes appropriate calls * on the channel itself to push data. */ -class HttpTunnelClientChannelSink extends AbstractHttpChannelSink { +class HttpTunnelClientChannelSink extends AbstractChannelSink { @Override public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) diff --git a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java index a7ed9ee1bb..e755229674 100644 --- a/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java +++ b/transport-http/src/main/java/io/netty/channel/socket/http/HttpTunnelServerChannelSink.java @@ -17,6 +17,7 @@ package io.netty.channel.socket.http; import java.net.SocketAddress; +import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -26,7 +27,7 @@ import io.netty.channel.socket.ServerSocketChannel; /** */ -class HttpTunnelServerChannelSink extends AbstractHttpChannelSink { +class HttpTunnelServerChannelSink extends AbstractChannelSink { private ChannelFutureListener closeHook; diff --git a/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java b/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java index 0906755de0..0798eb94ad 100644 --- a/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java +++ b/transport-http/src/test/java/io/netty/channel/socket/http/FakeChannelSink.java @@ -19,13 +19,14 @@ package io.netty.channel.socket.http; import java.util.LinkedList; import java.util.Queue; +import io.netty.channel.AbstractChannelSink; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; /** * A fake channel sink for use in testing */ -public class FakeChannelSink extends AbstractHttpChannelSink { +public class FakeChannelSink extends AbstractChannelSink { public Queue events = new LinkedList(); diff --git a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java index 5009dc1427..fd1c34c0a0 100644 --- a/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java +++ b/transport-rxtx/src/main/java/io/netty/channel/rxtx/RxtxChannelSink.java @@ -329,12 +329,4 @@ public class RxtxChannelSink extends AbstractChannelSink { } } } - - /** - * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise - */ - @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { - pipeline.sendUpstream(event); - } } diff --git a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java index 5daba78839..f7f458639c 100644 --- a/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java +++ b/transport-sctp/src/main/java/io/netty/channel/sctp/AbstractScptChannelSink.java @@ -21,7 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; -public abstract class AbstractScptChannelSink extends AbstractChannelSink{ +public abstract class AbstractScptChannelSink extends AbstractChannelSink { @Override public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { @@ -41,7 +41,7 @@ public abstract class AbstractScptChannelSink extends AbstractChannelSink{ pipeline.sendUpstream(e); } } else { - throw new UnsupportedOperationException(); + super.fireUpstreamEventLater(pipeline, e); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelSink.java b/transport/src/main/java/io/netty/channel/AbstractChannelSink.java index a1c27839ff..69b199aeb7 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelSink.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelSink.java @@ -43,7 +43,24 @@ public abstract class AbstractChannelSink implements ChannelSink { if (actualCause == null) { actualCause = cause; } - - fireExceptionCaught(event.getChannel(), actualCause); + if (isFireExceptionCaughtLater(event, actualCause)) { + fireExceptionCaughtLater(event.getChannel(), actualCause); + } else { + fireExceptionCaught(event.getChannel(), actualCause); + } } + + protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { + return false; + } + + /** + * This implementation just send the event now via {@link ChannelPipeline#sendUpstream(ChannelEvent)}. Sub-classes should override this if they can handle it + * in a better way + */ + @Override + public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { + pipeline.sendUpstream(e); + } + } diff --git a/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java b/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java index 51de5d3311..a0d75ceef7 100755 --- a/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java +++ b/transport/src/main/java/io/netty/channel/iostream/IoStreamChannelSink.java @@ -177,12 +177,4 @@ public class IoStreamChannelSink extends AbstractChannelSink { } } } - - /** - * This just calls {@link ChannelPipeline#sendUpstream(ChannelEvent)} as the transport does not support it - */ - @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception { - pipeline.sendUpstream(e); - } } diff --git a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java index d882725de0..9b6bd455de 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java +++ b/transport/src/main/java/io/netty/channel/local/LocalClientChannelSink.java @@ -85,14 +85,6 @@ final class LocalClientChannelSink extends AbstractChannelSink { } } - /** - * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise - */ - @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { - pipeline.sendUpstream(event); - } - private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) { try { if (!LocalChannelRegistry.register(localAddress, channel)) { diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java index 6ead0ad3a2..6bfa780f28 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannelSink.java @@ -42,14 +42,6 @@ final class LocalServerChannelSink extends AbstractChannelSink { } } - /** - * Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise - */ - @Override - public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception { - pipeline.sendUpstream(event); - } - private void handleServerChannel(ChannelEvent e) { if (!(e instanceof ChannelStateEvent)) { return; diff --git a/transport/src/main/java/io/netty/channel/socket/Worker.java b/transport/src/main/java/io/netty/channel/socket/Worker.java index eebfc74d40..64dc433038 100644 --- a/transport/src/main/java/io/netty/channel/socket/Worker.java +++ b/transport/src/main/java/io/netty/channel/socket/Worker.java @@ -20,7 +20,7 @@ package io.netty.channel.socket; * A {@link Worker} is responsible to dispatch IO operations * */ -public interface Worker extends Runnable{ +public interface Worker extends Runnable { /** * Execute the given {@link Runnable} in the IO-Thread. This may be now or later once the IO-Thread do some other work. diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java index 85a8d03d3a..9ecdcb0699 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioChannelSink.java @@ -21,7 +21,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; -public abstract class AbstractNioChannelSink extends AbstractChannelSink{ +public abstract class AbstractNioChannelSink extends AbstractChannelSink { @Override public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { @@ -41,9 +41,19 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink{ pipeline.sendUpstream(e); } } else { - throw new UnsupportedOperationException(); + super.fireUpstreamEventLater(pipeline, e); } } + @Override + protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { + Channel channel = event.getChannel(); + boolean fireLater = false; + if (channel instanceof AbstractNioChannel) { + fireLater = !AbstractNioWorker.isIoThread((AbstractNioChannel) channel); + } + return fireLater; + } + } diff --git a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java index 14ffdc6d99..8f6dafd468 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/AbstractNioWorker.java @@ -504,7 +504,7 @@ abstract class AbstractNioWorker implements Worker { } static boolean isIoThread(AbstractNioChannel channel) { - return Thread.currentThread() == channel.worker.thread; + return channel.worker.thread == null || Thread.currentThread() == channel.worker.thread; } private void setOpWrite(AbstractNioChannel channel) { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java index 889a08aa26..633818ec01 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannelSink.java @@ -22,7 +22,7 @@ import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.Worker; -public abstract class AbstractOioChannelSink extends AbstractChannelSink{ +public abstract class AbstractOioChannelSink extends AbstractChannelSink { @Override public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { @@ -44,9 +44,19 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink{ } } else { - throw new UnsupportedOperationException(); + super.fireUpstreamEventLater(pipeline, e); } } + @Override + protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) { + Channel channel = event.getChannel(); + boolean fireLater = false; + if (channel instanceof AbstractOioChannel) { + fireLater = !AbstractOioWorker.isIoThead((AbstractOioChannel) channel); + } + return fireLater; + } + } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java index 97b20149a9..167b58309d 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioWorker.java @@ -30,7 +30,7 @@ import java.util.Queue; * * @param {@link AbstractOioChannel} */ -abstract class AbstractOioWorker implements Worker{ +abstract class AbstractOioWorker implements Worker { private final Queue eventQueue = QueueFactory.createQueue(Runnable.class); @@ -85,7 +85,7 @@ abstract class AbstractOioWorker implements Worker } static boolean isIoThead(AbstractOioChannel channel) { - return Thread.currentThread() == channel.workerThread; + return channel.workerThread == null || Thread.currentThread() == channel.workerThread; } @Override