diff --git a/src/main/java/org/jboss/netty/handler/codec/embedder/AbstractCodecEmbedder.java b/src/main/java/org/jboss/netty/handler/codec/embedder/AbstractCodecEmbedder.java index 8b150a19c2..22466fd2d8 100644 --- a/src/main/java/org/jboss/netty/handler/codec/embedder/AbstractCodecEmbedder.java +++ b/src/main/java/org/jboss/netty/handler/codec/embedder/AbstractCodecEmbedder.java @@ -30,9 +30,12 @@ import java.util.Queue; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandler; +import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineCoverage; import org.jboss.netty.channel.ChannelPipelineException; import org.jboss.netty.channel.ChannelSink; +import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; @@ -48,6 +51,8 @@ abstract class AbstractCodecEmbedder implements CodecEmbedder { private final Channel channel; private final ChannelPipeline pipeline; + private final EmbeddedChannelSink sink = new EmbeddedChannelSink(); + final Queue productQueue = new LinkedList(); AbstractCodecEmbedder(ChannelHandler handler) { @@ -57,7 +62,8 @@ abstract class AbstractCodecEmbedder implements CodecEmbedder { pipeline = Channels.pipeline(); pipeline.addLast(NAME, handler); - channel = new EmbeddedChannel(pipeline, new EmbeddedChannelSink()); + pipeline.addLast("LAST", sink); + channel = new EmbeddedChannel(pipeline, sink); // Fire the typical initial events. fireChannelOpen(channel); @@ -90,12 +96,21 @@ abstract class AbstractCodecEmbedder implements CodecEmbedder { return (T) productQueue.peek(); } - private final class EmbeddedChannelSink implements ChannelSink { + @ChannelPipelineCoverage("all") + private final class EmbeddedChannelSink implements ChannelSink, ChannelUpstreamHandler { EmbeddedChannelSink() { super(); } + public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) { + handleEvent(e); + } + public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) { + handleEvent(e); + } + + private void handleEvent(ChannelEvent e) { if (e instanceof MessageEvent) { productQueue.offer(((MessageEvent) e).getMessage()); } else if (e instanceof ExceptionEvent) {