Add StreamToMessageDecoder.replace() (#332)
This commit is contained in:
parent
ca12e41406
commit
d4a26c3c52
@ -4,14 +4,19 @@ import static io.netty.handler.codec.MessageToMessageEncoder.*;
|
|||||||
import io.netty.buffer.ChannelBuffer;
|
import io.netty.buffer.ChannelBuffer;
|
||||||
import io.netty.channel.ChannelBufferHolder;
|
import io.netty.channel.ChannelBufferHolder;
|
||||||
import io.netty.channel.ChannelBufferHolders;
|
import io.netty.channel.ChannelBufferHolders;
|
||||||
|
import io.netty.channel.ChannelInboundHandler;
|
||||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||||
import io.netty.channel.ChannelInboundHandlerContext;
|
import io.netty.channel.ChannelInboundHandlerContext;
|
||||||
|
import io.netty.channel.ChannelPipeline;
|
||||||
|
|
||||||
public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAdapter<Byte> {
|
public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAdapter<Byte> {
|
||||||
|
|
||||||
|
private ChannelInboundHandlerContext<Byte> ctx;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelBufferHolder<Byte> newInboundBuffer(
|
public ChannelBufferHolder<Byte> newInboundBuffer(
|
||||||
ChannelInboundHandlerContext<Byte> ctx) throws Exception {
|
ChannelInboundHandlerContext<Byte> ctx) throws Exception {
|
||||||
|
this.ctx = ctx;
|
||||||
return ChannelBufferHolders.byteBuffer();
|
return ChannelBufferHolders.byteBuffer();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,6 +89,31 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Replace this decoder in the {@link ChannelPipeline} with the given handler.
|
||||||
|
* All remaining bytes in the inbound buffer will be forwarded to the new handler's
|
||||||
|
* inbound buffer.
|
||||||
|
*/
|
||||||
|
public void replace(String newHandlerName, ChannelInboundHandler<Byte> newHandler) {
|
||||||
|
if (!ctx.channel().eventLoop().inEventLoop()) {
|
||||||
|
throw new IllegalStateException("not in event loop");
|
||||||
|
}
|
||||||
|
|
||||||
|
// We do not use ChannelPipeline.replace() here so that the current context points
|
||||||
|
// the new handler.
|
||||||
|
ctx.pipeline().addAfter(ctx.name(), newHandlerName, newHandler);
|
||||||
|
|
||||||
|
ChannelBuffer in = ctx.in().byteBuffer();
|
||||||
|
try {
|
||||||
|
if (in.readable()) {
|
||||||
|
ctx.nextIn().byteBuffer().writeBytes(ctx.in().byteBuffer());
|
||||||
|
ctx.fireInboundBufferUpdated();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
ctx.pipeline().remove(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public abstract O decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception;
|
public abstract O decode(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception;
|
||||||
|
|
||||||
public O decodeLast(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
|
public O decodeLast(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
|
||||||
|
Loading…
Reference in New Issue
Block a user