diff --git a/src/main/java/org/jboss/netty/handler/codec/embedder/AbstractCodecEmbedder.java b/src/main/java/org/jboss/netty/handler/codec/embedder/AbstractCodecEmbedder.java index 1c98b193b6..8b150a19c2 100644 --- a/src/main/java/org/jboss/netty/handler/codec/embedder/AbstractCodecEmbedder.java +++ b/src/main/java/org/jboss/netty/handler/codec/embedder/AbstractCodecEmbedder.java @@ -24,8 +24,18 @@ package org.jboss.netty.handler.codec.embedder; import static org.jboss.netty.channel.Channels.*; +import java.util.LinkedList; +import java.util.Queue; + import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandler; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineException; +import org.jboss.netty.channel.ChannelSink; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; /** * @author The Netty Project (netty-dev@lists.jboss.org) @@ -34,37 +44,76 @@ import org.jboss.netty.channel.ChannelHandler; */ abstract class AbstractCodecEmbedder implements CodecEmbedder { - final EmbeddedChannelHandlerContext context; + private static final String NAME = "__embedded__"; + + private final Channel channel; + private final ChannelPipeline pipeline; + final Queue productQueue = new LinkedList(); AbstractCodecEmbedder(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } - this.context = new EmbeddedChannelHandlerContext(handler); + pipeline = Channels.pipeline(); + pipeline.addLast(NAME, handler); + channel = new EmbeddedChannel(pipeline, new EmbeddedChannelSink()); // Fire the typical initial events. - Channel channel = context.getChannel(); - fireChannelOpen(context, channel); - fireChannelBound(context, channel, channel.getLocalAddress()); - fireChannelConnected(context, channel, channel.getRemoteAddress()); + fireChannelOpen(channel); + fireChannelBound(channel, channel.getLocalAddress()); + fireChannelConnected(channel, channel.getRemoteAddress()); } public boolean finish() { - Channel channel = context.getChannel(); - fireChannelDisconnected(context, channel); - fireChannelUnbound(context, channel); - fireChannelClosed(context, channel); - return !context.productQueue.isEmpty(); + fireChannelDisconnected(channel); + fireChannelUnbound(channel); + fireChannelClosed(channel); + return !productQueue.isEmpty(); + } + + protected final Channel getChannel() { + return channel; + } + + protected final boolean isEmpty() { + return productQueue.isEmpty(); } @SuppressWarnings("unchecked") - public T poll() { - return (T) context.productQueue.poll(); + public final T poll() { + return (T) productQueue.poll(); } @SuppressWarnings("unchecked") - public T peek() { - return (T) context.productQueue.peek(); + public final T peek() { + return (T) productQueue.peek(); + } + + private final class EmbeddedChannelSink implements ChannelSink { + EmbeddedChannelSink() { + super(); + } + + public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) { + if (e instanceof MessageEvent) { + productQueue.offer(((MessageEvent) e).getMessage()); + } else if (e instanceof ExceptionEvent) { + throw new CodecEmbedderException(((ExceptionEvent) e).getCause()); + } + + // Swallow otherwise. + } + + public void exceptionCaught( + ChannelPipeline pipeline, ChannelEvent e, + ChannelPipelineException cause) throws Exception { + Throwable actualCause = cause.getCause(); + if (actualCause == null) { + actualCause = cause; + } + + throw new CodecEmbedderException(actualCause); + } } } diff --git a/src/main/java/org/jboss/netty/handler/codec/embedder/DecoderEmbedder.java b/src/main/java/org/jboss/netty/handler/codec/embedder/DecoderEmbedder.java index 066fc96bd7..bea6e1fab1 100644 --- a/src/main/java/org/jboss/netty/handler/codec/embedder/DecoderEmbedder.java +++ b/src/main/java/org/jboss/netty/handler/codec/embedder/DecoderEmbedder.java @@ -38,7 +38,7 @@ public class DecoderEmbedder extends AbstractCodecEmbedder { } public boolean offer(Object input) { - fireMessageReceived(context, context.getChannel(), input); - return !context.productQueue.isEmpty(); + fireMessageReceived(getChannel(), input); + return !super.isEmpty(); } } diff --git a/src/main/java/org/jboss/netty/handler/codec/embedder/EmbeddedChannel.java b/src/main/java/org/jboss/netty/handler/codec/embedder/EmbeddedChannel.java index d4d0d1d6b6..45a3aedc3a 100644 --- a/src/main/java/org/jboss/netty/handler/codec/embedder/EmbeddedChannel.java +++ b/src/main/java/org/jboss/netty/handler/codec/embedder/EmbeddedChannel.java @@ -27,6 +27,7 @@ import java.net.SocketAddress; import org.jboss.netty.channel.AbstractChannel; import org.jboss.netty.channel.ChannelConfig; import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelSink; /** * @author The Netty Project (netty-dev@lists.jboss.org) @@ -38,9 +39,8 @@ class EmbeddedChannel extends AbstractChannel { private final SocketAddress localAddress = new EmbeddedSocketAddress(); private final SocketAddress remoteAddress = new EmbeddedSocketAddress(); - EmbeddedChannel( - EmbeddedChannelHandlerContext context, ChannelPipeline pipeline) { - super(null, EmbeddedChannelFactory.INSTANCE, pipeline, context); + EmbeddedChannel(ChannelPipeline pipeline, ChannelSink sink) { + super(null, EmbeddedChannelFactory.INSTANCE, pipeline, sink); } public ChannelConfig getConfig() { diff --git a/src/main/java/org/jboss/netty/handler/codec/embedder/EmbeddedChannelHandlerContext.java b/src/main/java/org/jboss/netty/handler/codec/embedder/EmbeddedChannelHandlerContext.java deleted file mode 100644 index 81d1c68cd2..0000000000 --- a/src/main/java/org/jboss/netty/handler/codec/embedder/EmbeddedChannelHandlerContext.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * JBoss, Home of Professional Open Source - * - * Copyright 2008, Red Hat Middleware LLC, and individual contributors - * by the @author tags. See the COPYRIGHT.txt in the distribution for a - * full listing of individual contributors. - * - * This is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License as - * published by the Free Software Foundation; either version 2.1 of - * the License, or (at your option) any later version. - * - * This software is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this software; if not, write to the Free - * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA - * 02110-1301 USA, or see the FSF site: http://www.fsf.org. - */ -package org.jboss.netty.handler.codec.embedder; - -import java.util.LinkedList; -import java.util.Queue; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelDownstreamHandler; -import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelHandler; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineException; -import org.jboss.netty.channel.ChannelSink; -import org.jboss.netty.channel.ChannelUpstreamHandler; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; - -/** - * @author The Netty Project (netty-dev@lists.jboss.org) - * @author Trustin Lee (tlee@redhat.com) - * @version $Rev$, $Date$ - */ -class EmbeddedChannelHandlerContext implements ChannelHandlerContext, ChannelSink { - - private static final String NAME = "__embedded__"; - - private final Channel channel; - private final ChannelHandler handler; - private final ChannelPipeline pipeline; - final Queue productQueue = new LinkedList(); - - EmbeddedChannelHandlerContext(ChannelHandler handler) { - this.handler = handler; - pipeline = Channels.pipeline(); - pipeline.addLast(NAME, handler); - channel = new EmbeddedChannel(this, pipeline); - } - - public boolean canHandleDownstream() { - return handler instanceof ChannelDownstreamHandler; - } - - public boolean canHandleUpstream() { - return handler instanceof ChannelUpstreamHandler; - } - - public ChannelHandler getHandler() { - return handler; - } - - public String getName() { - return NAME; - } - - public Channel getChannel() { - return channel; - } - - public ChannelPipeline getPipeline() { - return pipeline; - } - - public void sendDownstream(ChannelEvent e) { - handleEvent(e); - } - - public void sendUpstream(ChannelEvent e) { - handleEvent(e); - } - - public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) { - handleEvent(e); - } - - private void handleEvent(ChannelEvent e) { - if (e instanceof MessageEvent) { - productQueue.offer(((MessageEvent) e).getMessage()); - } else if (e instanceof ExceptionEvent) { - throw new CodecEmbedderException(((ExceptionEvent) e).getCause()); - } - - // Swallow otherwise. - } - - public void exceptionCaught( - ChannelPipeline pipeline, ChannelEvent e, - ChannelPipelineException cause) throws Exception { - Throwable actualCause = cause.getCause(); - if (actualCause == null) { - actualCause = cause; - } - - throw new CodecEmbedderException(actualCause); - } -} diff --git a/src/main/java/org/jboss/netty/handler/codec/embedder/EncoderEmbedder.java b/src/main/java/org/jboss/netty/handler/codec/embedder/EncoderEmbedder.java index 54725d1fe2..f90d03bd23 100644 --- a/src/main/java/org/jboss/netty/handler/codec/embedder/EncoderEmbedder.java +++ b/src/main/java/org/jboss/netty/handler/codec/embedder/EncoderEmbedder.java @@ -24,7 +24,6 @@ package org.jboss.netty.handler.codec.embedder; import static org.jboss.netty.channel.Channels.*; -import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelDownstreamHandler; /** @@ -39,8 +38,7 @@ public class EncoderEmbedder extends AbstractCodecEmbedder { } public boolean offer(Object input) { - Channel channel = context.getChannel(); - write(context, channel, succeededFuture(channel), input); - return !context.productQueue.isEmpty(); + write(getChannel(), input).setSuccess(); + return !isEmpty(); } }