From 288ed13b6b14375204316d68c28f446cb7b9d23d Mon Sep 17 00:00:00 2001 From: norman Date: Fri, 18 May 2012 08:49:45 +0200 Subject: [PATCH] Add a replace(..) method to FrameDecoder and also to ReplayDecoder as it now extend FrameDecoder. This also fix #332. --- .../handler/codec/frame/FrameDecoder.java | 83 +++++++++++++- .../WebSocketClientHandshaker00.java | 4 +- .../WebSocketClientHandshaker08.java | 6 +- .../WebSocketClientHandshaker13.java | 6 +- .../codec/replay/ReplayingDecoder.java | 106 ++++-------------- 5 files changed, 111 insertions(+), 94 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java index 28daf90ef7..e9a8648027 100644 --- a/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/frame/FrameDecoder.java @@ -28,9 +28,9 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.LifeCycleAwareChannelHandler; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.handler.codec.replay.ReplayingDecoder; /** * Decodes the received {@link ChannelBuffer}s into a meaningful frame object. @@ -175,10 +175,11 @@ import org.jboss.netty.handler.codec.replay.ReplayingDecoder; * * @apiviz.landmark */ -public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { +public abstract class FrameDecoder extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler { private final boolean unfold; - private ChannelBuffer cumulation; + protected ChannelBuffer cumulation; + private volatile ChannelHandlerContext ctx; protected FrameDecoder() { this(false); @@ -336,7 +337,7 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { } } - private void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { + protected final void unfoldAndFireMessageReceived(ChannelHandlerContext context, SocketAddress remoteAddress, Object result) { if (unfold) { if (result instanceof Object[]) { for (Object r: (Object[]) result) { @@ -354,7 +355,11 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { } } - private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) + /** + * Gets called on {@link #channelDisconnected(ChannelHandlerContext, ChannelStateEvent)} and {@link #channelClosed(ChannelHandlerContext, ChannelStateEvent)} + * + */ + protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { try { ChannelBuffer cumulation = this.cumulation; @@ -393,4 +398,72 @@ public abstract class FrameDecoder extends SimpleChannelUpstreamHandler { ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); return factory.getBuffer(Math.max(minimumCapacity, 256)); } + + /** + * Replace this {@link FrameDecoder} in the {@link ChannelPipeline} with the given {@link ChannelHandler}. All + * remaining bytes in the {@link ChannelBuffer} will get send to the new {@link ChannelHandler} that was used + * as replacement + * + */ + public void replace(String handlerName, ChannelHandler handler) { + if (ctx == null) { + throw new IllegalStateException("Replace cann only be called once the FrameDecoder is added to the ChannelPipeline"); + } + ChannelPipeline pipeline = ctx.getPipeline(); + pipeline.addAfter(ctx.getName(), handlerName, handler); + + try { + if (cumulation != null) { + Channels.fireMessageReceived(ctx, cumulation.readBytes(actualReadableBytes())); + } + } finally { + pipeline.remove(this); + } + + } + + /** + * Returns the actual number of readable bytes in the internal cumulative + * buffer of this decoder. You usually do not need to rely on this value + * to write a decoder. Use it only when you muse use it at your own risk. + * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. + */ + protected int actualReadableBytes() { + return internalBuffer().readableBytes(); + } + + + + /** + * Returns the internal cumulative buffer of this decoder. You usually + * do not need to access the internal buffer directly to write a decoder. + * Use it only when you must use it at your own risk. + */ + protected ChannelBuffer internalBuffer() { + ChannelBuffer buf = this.cumulation; + if (buf == null) { + return ChannelBuffers.EMPTY_BUFFER; + } + return buf; + } + + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + } + + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // Nothing to do.. + + } + + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // Nothing to do.. + + } + + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // Nothing to do.. + + } + } diff --git a/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker00.java b/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker00.java index 7a32eaf0e5..3a9775690b 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker00.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker00.java @@ -240,10 +240,12 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker { String subprotocol = response.getHeader(Names.SEC_WEBSOCKET_PROTOCOL); setActualSubprotocol(subprotocol); - channel.getPipeline().replace(HttpResponseDecoder.class, "ws-decoder", + + channel.getPipeline().get(HttpResponseDecoder.class).replace("ws-decoder", new WebSocket00FrameDecoder(getMaxFramePayloadLength())); setHandshakeComplete(); + } private static String insertRandomCharacters(String key) { diff --git a/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker08.java b/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker08.java index 868dc37806..e6dbdd4840 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker08.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker08.java @@ -227,10 +227,12 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker { String subprotocol = response.getHeader(Names.SEC_WEBSOCKET_PROTOCOL); setActualSubprotocol(subprotocol); + - channel.getPipeline().replace(HttpResponseDecoder.class, "ws-decoder", + channel.getPipeline().get(HttpResponseDecoder.class).replace("ws-decoder", new WebSocket08FrameDecoder(false, allowExtensions, this.getMaxFramePayloadLength())); - + setHandshakeComplete(); + } } diff --git a/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker13.java b/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker13.java index 93cb216658..c67c0f6e07 100644 --- a/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker13.java +++ b/src/main/java/org/jboss/netty/handler/codec/http/websocketx/WebSocketClientHandshaker13.java @@ -224,9 +224,11 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker { String subprotocol = response.getHeader(Names.SEC_WEBSOCKET_PROTOCOL); setActualSubprotocol(subprotocol); - channel.getPipeline().replace(HttpResponseDecoder.class, "ws-decoder", + + channel.getPipeline().get(HttpResponseDecoder.class).replace("ws-decoder", new WebSocket13FrameDecoder(false, allowExtensions, this.getMaxFramePayloadLength())); - + setHandshakeComplete(); + } } diff --git a/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java b/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java index ead61c2d79..816100cc26 100644 --- a/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java +++ b/src/main/java/org/jboss/netty/handler/codec/replay/ReplayingDecoder.java @@ -18,17 +18,13 @@ package org.jboss.netty.handler.codec.replay; import java.net.SocketAddress; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferFactory; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandler; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.handler.codec.frame.FrameDecoder; /** @@ -286,11 +282,9 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder; * @apiviz.has org.jboss.netty.handler.codec.replay.UnreplayableOperationException oneway - - throws */ public abstract class ReplayingDecoder> - extends SimpleChannelUpstreamHandler { + extends FrameDecoder { - private ChannelBuffer cumulation; - private final boolean unfold; private ReplayingDecoderBuffer replayable; private T state; private int checkpoint; @@ -316,8 +310,8 @@ public abstract class ReplayingDecoder> } protected ReplayingDecoder(T initialState, boolean unfold) { + super(unfold); this.state = initialState; - this.unfold = unfold; } /** @@ -359,29 +353,6 @@ public abstract class ReplayingDecoder> return oldState; } - /** - * Returns the actual number of readable bytes in the internal cumulative - * buffer of this decoder. You usually do not need to rely on this value - * to write a decoder. Use it only when you muse use it at your own risk. - * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. - */ - protected int actualReadableBytes() { - return internalBuffer().readableBytes(); - } - - /** - * Returns the internal cumulative buffer of this decoder. You usually - * do not need to access the internal buffer directly to write a decoder. - * Use it only when you must use it at your own risk. - */ - protected ChannelBuffer internalBuffer() { - ChannelBuffer buf = this.cumulation; - if (buf == null) { - return ChannelBuffers.EMPTY_BUFFER; - } - return buf; - } - /** * Decodes the received packets so far into a frame. * @@ -417,6 +388,21 @@ public abstract class ReplayingDecoder> return decode(ctx, channel, buffer, state); } + /** + * Calls {@link #decode(ChannelHandlerContext, Channel, ChannelBuffer, Enum)}. This method should be never used by {@link ReplayingDecoder} itself. + * But to be safe we should handle it anyway + */ + @Override + protected final Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { + return decode(ctx, channel, buffer, state); + } + + @Override + protected final Object decodeLast( + ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { + return decodeLast(ctx, channel, buffer, state); + } + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { @@ -521,24 +507,6 @@ public abstract class ReplayingDecoder> } } - @Override - public void channelDisconnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { - cleanup(ctx, e); - } - - @Override - public void channelClosed(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { - cleanup(ctx, e); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception { - ctx.sendUpstream(e); - } - private void callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception { while (input.readable()) { int oldReaderIndex = checkpoint = input.readerIndex(); @@ -581,30 +549,12 @@ public abstract class ReplayingDecoder> } // A successful decode - unfoldAndFireMessageReceived(context, result, remoteAddress); + unfoldAndFireMessageReceived(context, remoteAddress, result); } } - private void unfoldAndFireMessageReceived( - ChannelHandlerContext context, Object result, SocketAddress remoteAddress) { - if (unfold) { - if (result instanceof Object[]) { - for (Object r: (Object[]) result) { - Channels.fireMessageReceived(context, r, remoteAddress); - } - } else if (result instanceof Iterable) { - for (Object r: (Iterable) result) { - Channels.fireMessageReceived(context, r, remoteAddress); - } - } else { - Channels.fireMessageReceived(context, result, remoteAddress); - } - } else { - Channels.fireMessageReceived(context, result, remoteAddress); - } - } - - private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) + @Override + protected void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { try { ChannelBuffer cumulation = this.cumulation; @@ -627,7 +577,7 @@ public abstract class ReplayingDecoder> // notify a user that the connection was closed explicitly. Object partiallyDecoded = decodeLast(ctx, e.getChannel(), replayable, state); if (partiallyDecoded != null) { - unfoldAndFireMessageReceived(ctx, partiallyDecoded, null); + unfoldAndFireMessageReceived(ctx, null, partiallyDecoded); } } catch (ReplayError replay) { // Ignore @@ -636,17 +586,5 @@ public abstract class ReplayingDecoder> ctx.sendUpstream(e); } } - - /** - * Create a new {@link ChannelBuffer} which is used for the cumulation. - * Sub-classes may override this. - * - * @param ctx {@link ChannelHandlerContext} for this handler - * @return buffer the {@link ChannelBuffer} which is used for cumulation - */ - protected ChannelBuffer newCumulationBuffer( - ChannelHandlerContext ctx, int minimumCapacity) { - ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory(); - return factory.getBuffer(Math.max(minimumCapacity, 256)); - } + }