BlockingReadHandler does not need to implement LifeCycleAwareChannelHandler

This commit is contained in:
Trustin Lee 2009-12-16 06:39:53 +00:00
parent bab9af2b52
commit d87408936f

View File

@ -24,8 +24,8 @@ import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage; import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.IoWorkerRunnable;
@ -34,15 +34,16 @@ import org.jboss.netty.util.internal.LinkedTransferQueue;
/** /**
* Emulates blocking read operation. This handler stores all received messages * Emulates blocking read operation. This handler stores all received messages
* into a {@link BlockingQueue} and returns the received messages when * into a {@link BlockingQueue} and returns the received messages when
* {@link #read()} or {@link #read(long, TimeUnit)} method is called. * {@link #read()}, {@link #read(long, TimeUnit)}, {@link #readEvent()}, or
* {@link #readEvent(long, TimeUnit)} method is called.
* <p> * <p>
* Please note that this handler is only useful for the cases where there are * Please note that this handler is only useful for the cases where there are
* very small number of connections, such as testing and simple client-side * very small number of connections, such as testing and simple client-side
* application development. * application development.
* <p> * <p>
* Also, any handler placed after this handler will never receive a * Also, any handler placed after this handler will never receive
* {@code messageReceived} event and an {@code exceptionCaught} event, hence it * {@code messageReceived}, {@code exceptionCaught}, and {@code channelClosed}
* should be placed in the last place in a pipeline. * events, hence it should be placed in the last place in a pipeline.
* <p> * <p>
* Here is an example that demonstrates the usage: * Here is an example that demonstrates the usage:
* <pre> * <pre>
@ -75,11 +76,10 @@ import org.jboss.netty.util.internal.LinkedTransferQueue;
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
@ChannelPipelineCoverage("one") @ChannelPipelineCoverage("one")
public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler {
implements LifeCycleAwareChannelHandler {
private final BlockingQueue<ChannelEvent> queue; private final BlockingQueue<ChannelEvent> queue;
private volatile Channel channel; private volatile boolean closed;
/** /**
* Creates a new instance with the default unbounded {@link BlockingQueue} * Creates a new instance with the default unbounded {@link BlockingQueue}
@ -115,12 +115,7 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
* if this handler was not added to a {@link ChannelPipeline} yet * if this handler was not added to a {@link ChannelPipeline} yet
*/ */
public boolean isClosed() { public boolean isClosed() {
Channel ch = channel; return closed;
if (ch == null) {
throw new IllegalStateException("not added to the pipeline yet");
}
return ch.getCloseFuture().isDone();
} }
/** /**
@ -201,7 +196,14 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
} }
} }
return getQueue().take(); ChannelEvent e = getQueue().take();
if (e instanceof ChannelStateEvent) {
// channelClosed has been triggered.
assert closed;
return null;
} else {
return e;
}
} }
/** /**
@ -233,6 +235,10 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
ChannelEvent e = getQueue().poll(timeout, unit); ChannelEvent e = getQueue().poll(timeout, unit);
if (e == null) { if (e == null) {
throw new BlockingReadTimeoutException(); throw new BlockingReadTimeoutException();
} else if (e instanceof ChannelStateEvent) {
// channelClosed has been triggered.
assert closed;
return null;
} else { } else {
return e; return e;
} }
@ -241,9 +247,9 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
private void detectDeadLock() { private void detectDeadLock() {
if (IoWorkerRunnable.IN_IO_THREAD.get()) { if (IoWorkerRunnable.IN_IO_THREAD.get()) {
throw new IllegalStateException( throw new IllegalStateException(
"read(...) in I/O thread causes a dead lock or " + "read*(...) in I/O thread causes a dead lock or " +
"sudden performance drop. Implement a state machine or " + "sudden performance drop. Implement a state machine or " +
"call await*() from a different thread."); "call read*() from a different thread.");
} }
} }
@ -259,29 +265,15 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
getQueue().put(e); getQueue().put(e);
} }
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
closed = true;
getQueue().put(e);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private E getMessage(MessageEvent e) { private E getMessage(MessageEvent e) {
return (E) e.getMessage(); return (E) e.getMessage();
} }
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (channel != null) {
throw new IllegalStateException(
"cannot be added to multiple pipelines");
}
channel = ctx.getChannel();
}
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
} }