Also fix the exception handling if a ChannelHandler throws an Exception

based of if its a io thread or not. See #187 and #140
This commit is contained in:
Norman Maurer 2012-02-25 15:54:33 +01:00
parent ef64e8c332
commit f2d1f1e8ad
16 changed files with 57 additions and 78 deletions

View File

@ -1,30 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.socket.http;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline;
public abstract class AbstractHttpChannelSink extends AbstractChannelSink{
@Override
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
pipeline.sendUpstream(e);
}
}

View File

@ -18,6 +18,7 @@ package io.netty.channel.socket.http;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -32,7 +33,7 @@ import io.netty.channel.MessageEvent;
* from here to the ServerMessageSwitch, which queues the data awaiting a poll request from the * from here to the ServerMessageSwitch, which queues the data awaiting a poll request from the
* client end of the tunnel. * client end of the tunnel.
*/ */
class HttpTunnelAcceptedChannelSink extends AbstractHttpChannelSink { class HttpTunnelAcceptedChannelSink extends AbstractChannelSink {
final SaturationManager saturationManager; final SaturationManager saturationManager;

View File

@ -17,6 +17,7 @@ package io.netty.channel.socket.http;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelStateEvent; import io.netty.channel.ChannelStateEvent;
@ -26,7 +27,7 @@ import io.netty.channel.MessageEvent;
* Sink of a client channel, deals with sunk events and then makes appropriate calls * Sink of a client channel, deals with sunk events and then makes appropriate calls
* on the channel itself to push data. * on the channel itself to push data.
*/ */
class HttpTunnelClientChannelSink extends AbstractHttpChannelSink { class HttpTunnelClientChannelSink extends AbstractChannelSink {
@Override @Override
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) public void eventSunk(ChannelPipeline pipeline, ChannelEvent e)

View File

@ -17,6 +17,7 @@ package io.netty.channel.socket.http;
import java.net.SocketAddress; import java.net.SocketAddress;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -26,7 +27,7 @@ import io.netty.channel.socket.ServerSocketChannel;
/** /**
*/ */
class HttpTunnelServerChannelSink extends AbstractHttpChannelSink { class HttpTunnelServerChannelSink extends AbstractChannelSink {
private ChannelFutureListener closeHook; private ChannelFutureListener closeHook;

View File

@ -19,13 +19,14 @@ package io.netty.channel.socket.http;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import io.netty.channel.AbstractChannelSink;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
/** /**
* A fake channel sink for use in testing * A fake channel sink for use in testing
*/ */
public class FakeChannelSink extends AbstractHttpChannelSink { public class FakeChannelSink extends AbstractChannelSink {
public Queue<ChannelEvent> events = new LinkedList<ChannelEvent>(); public Queue<ChannelEvent> events = new LinkedList<ChannelEvent>();

View File

@ -329,12 +329,4 @@ public class RxtxChannelSink extends AbstractChannelSink {
} }
} }
} }
/**
* Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise
*/
@Override
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
pipeline.sendUpstream(event);
}
} }

View File

@ -21,7 +21,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
public abstract class AbstractScptChannelSink extends AbstractChannelSink{ public abstract class AbstractScptChannelSink extends AbstractChannelSink {
@Override @Override
public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
@ -41,7 +41,7 @@ public abstract class AbstractScptChannelSink extends AbstractChannelSink{
pipeline.sendUpstream(e); pipeline.sendUpstream(e);
} }
} else { } else {
throw new UnsupportedOperationException(); super.fireUpstreamEventLater(pipeline, e);
} }
} }

View File

@ -43,7 +43,24 @@ public abstract class AbstractChannelSink implements ChannelSink {
if (actualCause == null) { if (actualCause == null) {
actualCause = cause; actualCause = cause;
} }
if (isFireExceptionCaughtLater(event, actualCause)) {
fireExceptionCaughtLater(event.getChannel(), actualCause);
} else {
fireExceptionCaught(event.getChannel(), actualCause); fireExceptionCaught(event.getChannel(), actualCause);
} }
}
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
return false;
}
/**
* This implementation just send the event now via {@link ChannelPipeline#sendUpstream(ChannelEvent)}. Sub-classes should override this if they can handle it
* in a better way
*/
@Override
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
pipeline.sendUpstream(e);
}
} }

View File

@ -177,12 +177,4 @@ public class IoStreamChannelSink extends AbstractChannelSink {
} }
} }
} }
/**
* This just calls {@link ChannelPipeline#sendUpstream(ChannelEvent)} as the transport does not support it
*/
@Override
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
pipeline.sendUpstream(e);
}
} }

View File

@ -85,14 +85,6 @@ final class LocalClientChannelSink extends AbstractChannelSink {
} }
} }
/**
* Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise
*/
@Override
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
pipeline.sendUpstream(event);
}
private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) { private void bind(DefaultLocalChannel channel, ChannelFuture future, LocalAddress localAddress) {
try { try {
if (!LocalChannelRegistry.register(localAddress, channel)) { if (!LocalChannelRegistry.register(localAddress, channel)) {

View File

@ -42,14 +42,6 @@ final class LocalServerChannelSink extends AbstractChannelSink {
} }
} }
/**
* Just fire the event now by calling {@link ChannelPipeline#sendUpstream(ChannelEvent)} as this implementation does not support it otherwise
*/
@Override
public void fireUpstreamEventLater(ChannelPipeline pipeline, ChannelEvent event) throws Exception {
pipeline.sendUpstream(event);
}
private void handleServerChannel(ChannelEvent e) { private void handleServerChannel(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) { if (!(e instanceof ChannelStateEvent)) {
return; return;

View File

@ -20,7 +20,7 @@ package io.netty.channel.socket;
* A {@link Worker} is responsible to dispatch IO operations * A {@link Worker} is responsible to dispatch IO operations
* *
*/ */
public interface Worker extends Runnable{ public interface Worker extends Runnable {
/** /**
* Execute the given {@link Runnable} in the IO-Thread. This may be now or later once the IO-Thread do some other work. * Execute the given {@link Runnable} in the IO-Thread. This may be now or later once the IO-Thread do some other work.

View File

@ -21,7 +21,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
public abstract class AbstractNioChannelSink extends AbstractChannelSink{ public abstract class AbstractNioChannelSink extends AbstractChannelSink {
@Override @Override
public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
@ -41,9 +41,19 @@ public abstract class AbstractNioChannelSink extends AbstractChannelSink{
pipeline.sendUpstream(e); pipeline.sendUpstream(e);
} }
} else { } else {
throw new UnsupportedOperationException(); super.fireUpstreamEventLater(pipeline, e);
} }
} }
@Override
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
Channel channel = event.getChannel();
boolean fireLater = false;
if (channel instanceof AbstractNioChannel<?>) {
fireLater = !AbstractNioWorker.isIoThread((AbstractNioChannel<?>) channel);
}
return fireLater;
}
} }

View File

@ -504,7 +504,7 @@ abstract class AbstractNioWorker implements Worker {
} }
static boolean isIoThread(AbstractNioChannel<?> channel) { static boolean isIoThread(AbstractNioChannel<?> channel) {
return Thread.currentThread() == channel.worker.thread; return channel.worker.thread == null || Thread.currentThread() == channel.worker.thread;
} }
private void setOpWrite(AbstractNioChannel<?> channel) { private void setOpWrite(AbstractNioChannel<?> channel) {

View File

@ -22,7 +22,7 @@ import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.Worker; import io.netty.channel.socket.Worker;
public abstract class AbstractOioChannelSink extends AbstractChannelSink{ public abstract class AbstractOioChannelSink extends AbstractChannelSink {
@Override @Override
public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception { public void fireUpstreamEventLater(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
@ -44,9 +44,19 @@ public abstract class AbstractOioChannelSink extends AbstractChannelSink{
} }
} else { } else {
throw new UnsupportedOperationException(); super.fireUpstreamEventLater(pipeline, e);
} }
} }
@Override
protected boolean isFireExceptionCaughtLater(ChannelEvent event, Throwable actualCause) {
Channel channel = event.getChannel();
boolean fireLater = false;
if (channel instanceof AbstractOioChannel) {
fireLater = !AbstractOioWorker.isIoThead((AbstractOioChannel) channel);
}
return fireLater;
}
} }

View File

@ -30,7 +30,7 @@ import java.util.Queue;
* *
* @param <C> {@link AbstractOioChannel} * @param <C> {@link AbstractOioChannel}
*/ */
abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker{ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker {
private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class); private final Queue<Runnable> eventQueue = QueueFactory.createQueue(Runnable.class);
@ -85,7 +85,7 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
} }
static boolean isIoThead(AbstractOioChannel channel) { static boolean isIoThead(AbstractOioChannel channel) {
return Thread.currentThread() == channel.workerThread; return channel.workerThread == null || Thread.currentThread() == channel.workerThread;
} }
@Override @Override