Fixed a bug where decoder embedder doesn't work

This commit is contained in:
Trustin Lee 2008-12-03 08:24:31 +00:00
parent f04d8ae99a
commit 2bf7467726

View File

@ -30,9 +30,12 @@ import java.util.Queue;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineException; import org.jboss.netty.channel.ChannelPipelineException;
import org.jboss.netty.channel.ChannelSink; import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
@ -48,6 +51,8 @@ abstract class AbstractCodecEmbedder<T> implements CodecEmbedder<T> {
private final Channel channel; private final Channel channel;
private final ChannelPipeline pipeline; private final ChannelPipeline pipeline;
private final EmbeddedChannelSink sink = new EmbeddedChannelSink();
final Queue<Object> productQueue = new LinkedList<Object>(); final Queue<Object> productQueue = new LinkedList<Object>();
AbstractCodecEmbedder(ChannelHandler handler) { AbstractCodecEmbedder(ChannelHandler handler) {
@ -57,7 +62,8 @@ abstract class AbstractCodecEmbedder<T> implements CodecEmbedder<T> {
pipeline = Channels.pipeline(); pipeline = Channels.pipeline();
pipeline.addLast(NAME, handler); pipeline.addLast(NAME, handler);
channel = new EmbeddedChannel(pipeline, new EmbeddedChannelSink()); pipeline.addLast("LAST", sink);
channel = new EmbeddedChannel(pipeline, sink);
// Fire the typical initial events. // Fire the typical initial events.
fireChannelOpen(channel); fireChannelOpen(channel);
@ -90,12 +96,21 @@ abstract class AbstractCodecEmbedder<T> implements CodecEmbedder<T> {
return (T) productQueue.peek(); return (T) productQueue.peek();
} }
private final class EmbeddedChannelSink implements ChannelSink { @ChannelPipelineCoverage("all")
private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler {
EmbeddedChannelSink() { EmbeddedChannelSink() {
super(); super();
} }
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) {
handleEvent(e);
}
public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) { public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
handleEvent(e);
}
private void handleEvent(ChannelEvent e) {
if (e instanceof MessageEvent) { if (e instanceof MessageEvent) {
productQueue.offer(((MessageEvent) e).getMessage()); productQueue.offer(((MessageEvent) e).getMessage());
} else if (e instanceof ExceptionEvent) { } else if (e instanceof ExceptionEvent) {