From d4a26c3c52f11b92f2f321f909777b0ae21b8054 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Fri, 18 May 2012 17:37:41 +0900 Subject: [PATCH] Add StreamToMessageDecoder.replace() (#332) --- .../handler/codec/StreamToMessageDecoder.java | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java index afbf9a8f73..06bfc926b1 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java @@ -4,14 +4,19 @@ import static io.netty.handler.codec.MessageToMessageEncoder.*; import io.netty.buffer.ChannelBuffer; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelPipeline; public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAdapter { + private ChannelInboundHandlerContext ctx; + @Override public ChannelBufferHolder newInboundBuffer( ChannelInboundHandlerContext ctx) throws Exception { + this.ctx = ctx; return ChannelBufferHolders.byteBuffer(); } @@ -84,6 +89,31 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda } } + /** + * Replace this decoder in the {@link ChannelPipeline} with the given handler. + * All remaining bytes in the inbound buffer will be forwarded to the new handler's + * inbound buffer. + */ + public void replace(String newHandlerName, ChannelInboundHandler newHandler) { + if (!ctx.channel().eventLoop().inEventLoop()) { + throw new IllegalStateException("not in event loop"); + } + + // We do not use ChannelPipeline.replace() here so that the current context points + // the new handler. + ctx.pipeline().addAfter(ctx.name(), newHandlerName, newHandler); + + ChannelBuffer in = ctx.in().byteBuffer(); + try { + if (in.readable()) { + ctx.nextIn().byteBuffer().writeBytes(ctx.in().byteBuffer()); + ctx.fireInboundBufferUpdated(); + } + } finally { + ctx.pipeline().remove(this); + } + } + public abstract O decode(ChannelInboundHandlerContext ctx, ChannelBuffer in) throws Exception; public O decodeLast(ChannelInboundHandlerContext ctx, ChannelBuffer in) throws Exception {