From d7bfd44e1093740c573f855972b73ea19eccd8bc Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 24 Jan 2013 18:58:05 +0100 Subject: [PATCH] [#982] [#977] [#858] Allow to transfer the content a ChannelHandlers inbound/outbound buffer on removal/replacement This changes the behavior of the ChannelPipeline.remove(..) and ChannelPipeline.replace(..) methods in that way that after invocation it is not possible anymore to access any data in the inbound or outbound buffer. This is because it empty it now to prevent side-effects. If a user want to preserve the content and forward it to the next handler in the pipeline it is adviced to use one of the new methods which where introduced. - ChannelPipeline.removeAndForward(..) - ChannelPipeline.replaceAndForward(..) --- .../WebSocketClientHandshaker00.java | 2 +- .../WebSocketClientHandshaker07.java | 2 +- .../WebSocketClientHandshaker08.java | 2 +- .../WebSocketClientHandshaker13.java | 2 +- .../WebSocketServerHandshaker00.java | 2 +- .../WebSocketServerHandshaker07.java | 2 +- .../WebSocketServerHandshaker08.java | 2 +- .../WebSocketServerHandshaker13.java | 2 +- .../handler/codec/ByteToByteDecoder.java | 9 +- .../handler/codec/ByteToByteEncoder.java | 9 +- .../handler/codec/ByteToMessageDecoder.java | 43 +---- .../handler/codec/MessageToByteEncoder.java | 9 +- .../codec/MessageToMessageDecoder.java | 9 +- .../codec/MessageToMessageEncoder.java | 9 +- .../PortUnificationServerHandler.java | 13 +- .../ChannelInboundMessageHandlerAdapter.java | 9 +- .../io/netty/channel/ChannelPipeline.java | 156 +++++++++++++++++- .../channel/DefaultChannelHandlerContext.java | 92 ++++++++++- .../netty/channel/DefaultChannelPipeline.java | 111 ++++++++----- 19 files changed, 328 insertions(+), 157 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker00.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker00.java index 6cb8768e10..bae00a3af9 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker00.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker00.java @@ -251,7 +251,7 @@ public class WebSocketClientHandshaker00 extends WebSocketClientHandshaker { ChannelPipeline p = channel.pipeline(); p.remove(HttpRequestEncoder.class); - p.get(HttpResponseDecoder.class).replace( + p.replaceAndForward(HttpResponseDecoder.class, "ws-decoder", new WebSocket00FrameDecoder(maxFramePayloadLength())); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker07.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker07.java index 118a281ef4..4ae6a60374 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker07.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker07.java @@ -224,7 +224,7 @@ public class WebSocketClientHandshaker07 extends WebSocketClientHandshaker { setHandshakeComplete(); ChannelPipeline p = channel.pipeline(); - p.get(HttpResponseDecoder.class).replace( + p.replaceAndForward(HttpResponseDecoder.class, "ws-decoder", new WebSocket07FrameDecoder(false, allowExtensions, maxFramePayloadLength())); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker08.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker08.java index cb227a25b0..e35cbdc1cc 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker08.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker08.java @@ -225,7 +225,7 @@ public class WebSocketClientHandshaker08 extends WebSocketClientHandshaker { ChannelPipeline p = channel.pipeline(); p.remove(HttpRequestEncoder.class); - p.get(HttpResponseDecoder.class).replace( + p.replaceAndForward(HttpResponseDecoder.class, "ws-decoder", new WebSocket08FrameDecoder(false, allowExtensions, maxFramePayloadLength())); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker13.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker13.java index b5adbe6eb2..610b04f463 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker13.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketClientHandshaker13.java @@ -224,7 +224,7 @@ public class WebSocketClientHandshaker13 extends WebSocketClientHandshaker { ChannelPipeline p = channel.pipeline(); p.remove(HttpRequestEncoder.class); - p.get(HttpResponseDecoder.class).replace( + p.replaceAndForward(HttpResponseDecoder.class, "ws-decoder", new WebSocket13FrameDecoder(false, allowExtensions, maxFramePayloadLength())); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java index c4c7dea25b..04a45ac54b 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker00.java @@ -188,7 +188,7 @@ public class WebSocketServerHandshaker00 extends WebSocketServerHandshaker { if (p.get(HttpObjectAggregator.class) != null) { p.remove(HttpObjectAggregator.class); } - p.get(HttpRequestDecoder.class).replace("wsdecoder", + p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder", new WebSocket00FrameDecoder(maxFramePayloadLength())); p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket00FrameEncoder()); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker07.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker07.java index 62bc3d38b2..0c61758f40 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker07.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker07.java @@ -155,7 +155,7 @@ public class WebSocketServerHandshaker07 extends WebSocketServerHandshaker { p.remove(HttpObjectAggregator.class); } - p.get(HttpRequestDecoder.class).replace("wsdecoder", + p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder", new WebSocket07FrameDecoder(true, allowExtensions, maxFramePayloadLength())); p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket07FrameEncoder(false)); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08.java index b6889a3f4c..d87204307e 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker08.java @@ -155,7 +155,7 @@ public class WebSocketServerHandshaker08 extends WebSocketServerHandshaker { p.remove(HttpObjectAggregator.class); } - p.get(HttpRequestDecoder.class).replace("wsdecoder", + p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder", new WebSocket08FrameDecoder(true, allowExtensions, maxFramePayloadLength())); p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket08FrameEncoder(false)); } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13.java b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13.java index 28f963e8fd..44bd5c6481 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/websocketx/WebSocketServerHandshaker13.java @@ -155,7 +155,7 @@ public class WebSocketServerHandshaker13 extends WebSocketServerHandshaker { p.remove(HttpObjectAggregator.class); } - p.get(HttpRequestDecoder.class).replace("wsdecoder", + p.replaceAndForward(HttpRequestDecoder.class, "wsdecoder", new WebSocket13FrameDecoder(true, allowExtensions, maxFramePayloadLength())); p.replace(HttpResponseEncoder.class, "wsencoder", new WebSocket13FrameEncoder(false)); } diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java index b405bc447e..31a3cf7e3e 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java @@ -45,7 +45,6 @@ import io.netty.channel.ChannelInboundByteHandlerAdapter; public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter { private volatile boolean singleDecode; - private boolean removed; /** * If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call. @@ -103,7 +102,7 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter */ private void callDecode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) { int oldOutSize = out.readableBytes(); - while (!removed && in.readable()) { + while (in.readable()) { int oldInSize = in.readableBytes(); try { decode(ctx, in, out); @@ -145,10 +144,4 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception { decode(ctx, in, out); } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - super.afterRemove(ctx); - removed = true; - } } diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java index f979629061..111417d4b4 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java @@ -45,7 +45,6 @@ import io.netty.channel.PartialFlushException; * */ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapter { - private boolean removed; @Override public void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { @@ -53,7 +52,7 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte ByteBuf out = ctx.nextOutboundByteBuffer(); boolean encoded = false; - while (!removed && in.readable()) { + while (in.readable()) { int oldInSize = in.readableBytes(); try { encode(ctx, in, out); @@ -90,10 +89,4 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte * @throws Exception is thrown if an error accour */ protected abstract void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception; - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - super.afterRemove(ctx); - removed = true; - } } diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 8a421c8b31..6f0df5f382 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -20,7 +20,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerUtil; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelPipeline; /** * {@link ChannelInboundByteHandler} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an other @@ -42,10 +41,7 @@ import io.netty.channel.ChannelPipeline; public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter implements ChannelInboundByteHandler { - private ChannelHandlerContext ctx; - private volatile boolean singleDecode; - private boolean removed; /** * If set then only one message is decoded on each {@link #inboundBufferUpdated(ChannelHandlerContext)} call. @@ -67,12 +63,6 @@ public abstract class ByteToMessageDecoder return singleDecode; } - @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { - this.ctx = ctx; - super.beforeAdd(ctx); - } - @Override public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) throws Exception { return ctx.alloc().buffer(); @@ -114,7 +104,7 @@ public abstract class ByteToMessageDecoder ByteBuf in = ctx.inboundByteBuffer(); boolean decoded = false; - while (!removed && in.readable()) { + while (in.readable()) { try { int oldInputLength = in.readableBytes(); Object o = decode(ctx, in); @@ -157,31 +147,6 @@ public abstract class ByteToMessageDecoder } } - /** - * Replace this decoder in the {@link ChannelPipeline} with the given handler. - * All remaining bytes in the inbound buffer will be forwarded to the new handler's - * inbound buffer. - */ - public void replace(String newHandlerName, ChannelInboundByteHandler newHandler) { - if (!ctx.executor().inEventLoop()) { - throw new IllegalStateException("not in event loop"); - } - - // We do not use ChannelPipeline.replace() here so that the current context points - // the new handler. - ctx.pipeline().addAfter(ctx.name(), newHandlerName, newHandler); - - ByteBuf in = ctx.inboundByteBuffer(); - try { - if (in.readable()) { - ctx.nextInboundByteBuffer().writeBytes(in); - ctx.fireInboundBufferUpdated(); - } - } finally { - ctx.pipeline().remove(this); - } - } - /** * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input * {@link ByteBuf} has nothing to read anymore, till nothing was read from the input {@link ByteBuf} or till @@ -205,10 +170,4 @@ public abstract class ByteToMessageDecoder protected Object decodeLast(ChannelHandlerContext ctx, ByteBuf in) throws Exception { return decode(ctx, in); } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - super.afterRemove(ctx); - removed = true; - } } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java index eccadea719..fc1b545bd2 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToByteEncoder.java @@ -45,7 +45,6 @@ import io.netty.channel.ChannelPromise; public abstract class MessageToByteEncoder extends ChannelOutboundMessageHandlerAdapter { private final Class[] acceptedMsgTypes; - private boolean removed; /** * The types which will be accepted by the encoder. If a received message is an other type it will be just forwared @@ -60,7 +59,7 @@ public abstract class MessageToByteEncoder extends ChannelOutboundMessageHand MessageBuf in = ctx.outboundMessageBuffer(); ByteBuf out = ctx.nextOutboundByteBuffer(); - while (!removed) { + for (;;) { Object msg = in.poll(); if (msg == null) { break; @@ -106,10 +105,4 @@ public abstract class MessageToByteEncoder extends ChannelOutboundMessageHand * @throws Exception is thrown if an error accour */ protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception; - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - super.afterRemove(ctx); - removed = true; - } } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index 7fd7af7a5a..c70eb8c167 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -49,7 +49,6 @@ public abstract class MessageToMessageDecoder extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler { private final Class[] acceptedMsgTypes; - private boolean removed; /** * The types which will be accepted by the decoder. If a received message is an other type it will be just forwarded @@ -69,7 +68,7 @@ public abstract class MessageToMessageDecoder throws Exception { MessageBuf in = ctx.inboundMessageBuffer(); boolean notify = false; - while (!removed) { + for (;;) { try { Object msg = in.poll(); if (msg == null) { @@ -142,10 +141,4 @@ public abstract class MessageToMessageDecoder protected void freeInboundMessage(I msg) throws Exception { ChannelHandlerUtil.freeMessage(msg); } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - super.afterRemove(ctx); - removed = true; - } } diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 2531988592..932aa0d62f 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -48,7 +48,6 @@ import io.netty.channel.PartialFlushException; public abstract class MessageToMessageEncoder extends ChannelOutboundMessageHandlerAdapter { private final Class[] acceptedMsgTypes; - private boolean removed; /** * The types which will be accepted by the decoder. If a received message is an other type it will be just forwared @@ -63,7 +62,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH MessageBuf in = ctx.outboundMessageBuffer(); boolean encoded = false; - while (!removed) { + for (;;) { try { Object msg = in.poll(); if (msg == null) { @@ -141,10 +140,4 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH protected void freeOutboundMessage(I msg) throws Exception { ChannelHandlerUtil.freeMessage(msg); } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - super.afterRemove(ctx); - removed = true; - } } diff --git a/example/src/main/java/io/netty/example/portunification/PortUnificationServerHandler.java b/example/src/main/java/io/netty/example/portunification/PortUnificationServerHandler.java index 556a78d88c..3cb2e0d56d 100644 --- a/example/src/main/java/io/netty/example/portunification/PortUnificationServerHandler.java +++ b/example/src/main/java/io/netty/example/portunification/PortUnificationServerHandler.java @@ -73,13 +73,8 @@ public class PortUnificationServerHandler extends ChannelInboundByteHandlerAdapt // Unknown protocol; discard everything and close the connection. in.clear(); ctx.close(); - return; } } - - // Forward the current read buffer as is to the new handlers. - ctx.nextInboundByteBuffer().writeBytes(in); - ctx.fireInboundBufferUpdated(); } private boolean isSsl(ByteBuf buf) { @@ -122,7 +117,7 @@ public class PortUnificationServerHandler extends ChannelInboundByteHandlerAdapt p.addLast("ssl", new SslHandler(engine)); p.addLast("unificationA", new PortUnificationServerHandler(false, detectGzip)); - p.remove(this); + p.removeAndForward(this); } private void enableGzip(ChannelHandlerContext ctx) { @@ -130,7 +125,7 @@ public class PortUnificationServerHandler extends ChannelInboundByteHandlerAdapt p.addLast("gzipdeflater", ZlibCodecFactory.newZlibEncoder(ZlibWrapper.GZIP)); p.addLast("gzipinflater", ZlibCodecFactory.newZlibDecoder(ZlibWrapper.GZIP)); p.addLast("unificationB", new PortUnificationServerHandler(detectSsl, false)); - p.remove(this); + p.removeAndForward(this); } private void switchToHttp(ChannelHandlerContext ctx) { @@ -139,7 +134,7 @@ public class PortUnificationServerHandler extends ChannelInboundByteHandlerAdapt p.addLast("encoder", new HttpResponseEncoder()); p.addLast("deflater", new HttpContentCompressor()); p.addLast("handler", new HttpSnoopServerHandler()); - p.remove(this); + p.removeAndForward(this); } private void switchToFactorial(ChannelHandlerContext ctx) { @@ -147,6 +142,6 @@ public class PortUnificationServerHandler extends ChannelInboundByteHandlerAdapt p.addLast("decoder", new BigIntegerDecoder()); p.addLast("encoder", new NumberEncoder()); p.addLast("handler", new FactorialServerHandler()); - p.remove(this); + p.removeAndForward(this); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java index 097f094d11..b354dbdd40 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundMessageHandlerAdapter.java @@ -44,7 +44,6 @@ public abstract class ChannelInboundMessageHandlerAdapter extends ChannelInboundHandlerAdapter implements ChannelInboundMessageHandler { private final Class[] acceptedMsgTypes; - private boolean removed; /** * The types which will be accepted by the message handler. If a received message is an other type it will be just @@ -74,7 +73,7 @@ public abstract class ChannelInboundMessageHandlerAdapter try { MessageBuf in = ctx.inboundMessageBuffer(); - while (!removed) { + for (;;) { Object msg = in.poll(); if (msg == null) { break; @@ -165,10 +164,4 @@ public abstract class ChannelInboundMessageHandlerAdapter protected void freeInboundMessage(I msg) throws Exception { ChannelHandlerUtil.freeMessage(msg); } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - super.afterRemove(ctx); - removed = true; - } } diff --git a/transport/src/main/java/io/netty/channel/ChannelPipeline.java b/transport/src/main/java/io/netty/channel/ChannelPipeline.java index 6a9d48feb7..c98ef2a73c 100644 --- a/transport/src/main/java/io/netty/channel/ChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/ChannelPipeline.java @@ -15,6 +15,7 @@ */ package io.netty.channel; +import io.netty.buffer.Buf; import io.netty.buffer.ByteBuf; import io.netty.buffer.MessageBuf; @@ -349,6 +350,8 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI /** * Removes the specified {@link ChannelHandler} from this pipeline. + * All the remaining content in the {@link Buf) (if any) of the + * {@link ChannelHandler} will be discarded. * * @throws NoSuchElementException * if there's no such handler in this pipeline @@ -357,9 +360,26 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI */ ChannelPipeline remove(ChannelHandler handler); + /** + * Removes the specified {@link ChannelHandler} from this pipeline + * and transfer the content of its {@link Buf} to the next + * {@link ChannelHandler} in the {@link ChannelPipeline}. + * + * @param handler the {@link ChannelHandler} to remove + * + * @throws NoSuchElementException + * if there's no such handler in this pipeline + * @throws NullPointerException + * if the specified handler is {@code null} + */ + ChannelPipeline removeAndForward(ChannelHandler handler); + /** * Removes the {@link ChannelHandler} with the specified name from this - * pipeline. + * pipeline. All the remaining content in the {@link Buf) (if any) of the + * {@link ChannelHandler} will be discarded. + * + * @param name the name under which the {@link ChannelHandler} was stored. * * @return the removed handler * @@ -370,9 +390,27 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI */ ChannelHandler remove(String name); + /** + * Removes the {@link ChannelHandler} with the specified name from this + * pipeline and transfer the content of its {@link Buf} to the next + * {@link ChannelHandler} in the {@link ChannelPipeline}. + * + * @param name the name under which the {@link ChannelHandler} was stored. + * + * @return the removed handler + * + * @throws NoSuchElementException + * if there's no such handler with the specified name in this pipeline + * @throws NullPointerException + * if the specified name is {@code null} + */ + ChannelHandler removeAndForward(String name); + /** * Removes the {@link ChannelHandler} of the specified type from this - * pipeline + * pipeline. All the remaining content in the {@link Buf) (if any) of the {@link ChannelHandler} + * will be discarded. + * * * @param the type of the handler * @param handlerType the type of the handler @@ -386,9 +424,29 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI */ T remove(Class handlerType); + /** + * Removes the {@link ChannelHandler} of the specified type from this + * pipeline and transfer the content of its {@link Buf} to the next + * {@link ChannelHandler} in the {@link ChannelPipeline}. + * + * @param the type of the handler + * @param handlerType the type of the handler + * + * @return the removed handler + * + * @throws NoSuchElementException + * if there's no such handler of the specified type in this pipeline + * @throws NullPointerException + * if the specified handler type is {@code null} + */ + T removeAndForward(Class handlerType); + /** * Removes the first {@link ChannelHandler} in this pipeline. * + * All the remaining content in the {@link Buf) (if any) of the {@link ChannelHandler} + * will be discarded. + * * @return the removed handler * * @throws NoSuchElementException @@ -399,6 +457,9 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI /** * Removes the last {@link ChannelHandler} in this pipeline. * + * All the remaining content in the {@link Buf) (if any) of the {@link ChannelHandler} + * will be discarded. + * * @return the removed handler * * @throws NoSuchElementException @@ -410,6 +471,15 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI * Replaces the specified {@link ChannelHandler} with a new handler in * this pipeline. * + * All the remaining content in the {@link Buf) (if any) of the {@link ChannelHandler} + * will be discarded. + * + * @param oldHandler the {@link ChannelHandler} to be replaced + * @param newName the name under which the replacement should be added + * @param newHandler the {@link ChannelHandler} which is used as replacement + * + * @return itself + * * @throws NoSuchElementException * if the specified old handler does not exist in this pipeline * @throws IllegalArgumentException @@ -421,10 +491,39 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI */ ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler); + /** + * Replaces the specified {@link ChannelHandler} with a new handler in + * this pipeline and transfer the content of its {@link Buf} to the next + * {@link ChannelHandler} in the {@link ChannelPipeline}. + * + * @param oldHandler the {@link ChannelHandler} to be replaced + * @param newName the name under which the replacement should be added + * @param newHandler the {@link ChannelHandler} which is used as replacement + * + * @return itself + + * @throws NoSuchElementException + * if the specified old handler does not exist in this pipeline + * @throws IllegalArgumentException + * if a handler with the specified new name already exists in this + * pipeline, except for the handler to be replaced + * @throws NullPointerException + * if the specified old handler, new name, or new handler is + * {@code null} + */ + ChannelPipeline replaceAndForward(ChannelHandler oldHandler, String newName, ChannelHandler newHandler); + /** * Replaces the {@link ChannelHandler} of the specified name with a new * handler in this pipeline. * + * All the remaining content of the {@link Buf) (if any) of the to be replaced + * {@link ChannelHandler} will be discarded. + * + * @param oldHandler the {@link ChannelHandler} to be replaced + * @param newName the name under which the replacement should be added + * @param newHandler the {@link ChannelHandler} which is used as replacement + * * @return the removed handler * * @throws NoSuchElementException @@ -438,10 +537,39 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI */ ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler); + /** + * Replaces the {@link ChannelHandler} of the specified name with a new + * handler in this pipeline and transfer the content of its {@link Buf} to the next + * {@link ChannelHandler} in the {@link ChannelPipeline}. + * + * @param oldName the name of the {@link ChannelHandler} to be replaced + * @param newName the name under which the replacement should be added + * @param newHandler the {@link ChannelHandler} which is used as replacement + * + * @return the removed handler + * + * @throws NoSuchElementException + * if the handler with the specified old name does not exist in this pipeline + * @throws IllegalArgumentException + * if a handler with the specified new name already exists in this + * pipeline, except for the handler to be replaced + * @throws NullPointerException + * if the specified old handler, new name, or new handler is + * {@code null} + */ + ChannelHandler replaceAndForward(String oldName, String newName, ChannelHandler newHandler); + /** * Replaces the {@link ChannelHandler} of the specified type with a new * handler in this pipeline. * + * All the remaining content of the {@link Buf) (if any) of the to be replaced + * {@link ChannelHandler} will be discarded. + * + * @param oldHandlerType the type of the handler to be removed + * @param newName the name under which the replacement should be added + * @param newHandler the {@link ChannelHandler} which is used as replacement + * * @return the removed handler * * @throws NoSuchElementException @@ -456,6 +584,30 @@ public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundI */ T replace(Class oldHandlerType, String newName, ChannelHandler newHandler); + /** + * Replaces the {@link ChannelHandler} of the specified type with a new + * handler in this pipeline and transfer the content of its {@link Buf} to the next + * {@link ChannelHandler} in the {@link ChannelPipeline}. + * + * @param oldHandlerType the type of the handler to be removed + * @param newName the name under which the replacement should be added + * @param newHandler the {@link ChannelHandler} which is used as replacement + * + * @return the removed handler + * + * @throws NoSuchElementException + * if the handler of the specified old handler type does not exist + * in this pipeline + * @throws IllegalArgumentException + * if a handler with the specified new name already exists in this + * pipeline, except for the handler to be replaced + * @throws NullPointerException + * if the specified old handler, new name, or new handler is + * {@code null} + */ + T replaceAndForward(Class oldHandlerType, String newName, + ChannelHandler newHandler); + /** * Returns the first {@link ChannelHandler} in this pipeline. * diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 883825412a..ab7e5ce300 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -80,7 +80,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements private Runnable invokeFreeInboundBuffer0Task; private Runnable invokeFreeOutboundBuffer0Task; private Runnable invokeRead0Task; - boolean removed; + volatile boolean removed; DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutorGroup group, @@ -179,6 +179,42 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements this.needsLazyBufInit = needsLazyBufInit; } + void forwardBufferContent() { + if (hasOutboundByteBuffer() && outboundByteBuffer().readable()) { + nextOutboundByteBuffer().writeBytes(outboundByteBuffer()); + flush(); + } + if (hasOutboundMessageBuffer() && !outboundMessageBuffer().isEmpty()) { + if (outboundMessageBuffer().drainTo(nextOutboundMessageBuffer()) > 0) { + flush(); + } + } + if (hasInboundByteBuffer() && inboundByteBuffer().readable()) { + nextInboundByteBuffer().writeBytes(inboundByteBuffer()); + fireInboundBufferUpdated(); + } + if (hasInboundMessageBuffer() && !inboundMessageBuffer().isEmpty()) { + if (inboundMessageBuffer().drainTo(nextInboundMessageBuffer()) > 0) { + fireInboundBufferUpdated(); + } + } + } + + void clearBuffer() { + if (hasOutboundByteBuffer()) { + outboundByteBuffer().clear(); + } + if (hasOutboundMessageBuffer()) { + outboundMessageBuffer().clear(); + } + if (hasInboundByteBuffer()) { + inboundByteBuffer().clear(); + } + if (hasInboundMessageBuffer()) { + inboundMessageBuffer().clear(); + } + } + private void lazyInitOutboundBuffer() { if (needsLazyBufInit) { if (outByteBuf == null && outMsgBuf == null) { @@ -283,6 +319,28 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } } + void freeHandlerBuffersAfterRemoval() { + if (!removed) { + return; + } + final ChannelHandler handler = handler(); + + if (handler instanceof ChannelInboundHandler) { + try { + ((ChannelInboundHandler) handler).freeInboundBuffer(this); + } catch (Exception e) { + pipeline.notifyHandlerException(e); + } + } + if (handler instanceof ChannelOutboundHandler) { + try { + ((ChannelOutboundHandler) handler).freeOutboundBuffer(this); + } catch (Exception e) { + pipeline.notifyHandlerException(e); + } + } + } + @Override public Channel channel() { return channel; @@ -865,6 +923,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ((ChannelStateHandler) handler()).channelRegistered(this); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -925,6 +985,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ((ChannelStateHandler) handler()).channelActive(this); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -955,6 +1017,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ((ChannelStateHandler) handler()).channelInactive(this); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -1001,6 +1065,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements "An exception was thrown by a user handler's " + "exceptionCaught() method while handling the following exception:", cause); } + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -1031,6 +1097,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements handler().userEventTriggered(this, event); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -1085,13 +1153,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } catch (Throwable t) { pipeline.notifyHandlerException(t); } finally { - if (!removed && handler instanceof ChannelInboundByteHandler && !isInboundBufferFreed()) { + if (handler instanceof ChannelInboundByteHandler && !isInboundBufferFreed()) { try { ((ChannelInboundByteHandler) handler).discardInboundReadBytes(this); } catch (Throwable t) { pipeline.notifyHandlerException(t); } } + freeHandlerBuffersAfterRemoval(); } } @@ -1122,6 +1191,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ((ChannelStateHandler) handler()).channelReadSuspended(this); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -1194,6 +1265,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ((ChannelOperationHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -1233,6 +1306,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ((ChannelOperationHandler) handler()).connect(this, remoteAddress, localAddress, promise); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -1270,6 +1345,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ((ChannelOperationHandler) handler()).disconnect(this, promise); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -1300,6 +1377,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ((ChannelOperationHandler) handler()).close(this, promise); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -1330,6 +1409,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ((ChannelOperationHandler) handler()).deregister(this, promise); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -1361,6 +1442,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ((ChannelOperationHandler) handler()).read(this); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } @@ -1425,13 +1508,14 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements } catch (Throwable t) { pipeline.notifyHandlerException(t); } finally { - if (!removed && handler instanceof ChannelOutboundByteHandler && !isOutboundBufferFreed()) { + if (handler instanceof ChannelOutboundByteHandler && !isOutboundBufferFreed()) { try { ((ChannelOutboundByteHandler) handler).discardOutboundReadBytes(this); } catch (Throwable t) { pipeline.notifyHandlerException(t); } } + freeHandlerBuffersAfterRemoval(); } } @@ -1471,6 +1555,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ((ChannelOperationHandler) handler()).sendFile(this, region, promise); } catch (Throwable t) { pipeline.notifyHandlerException(t); + } finally { + freeHandlerBuffersAfterRemoval(); } } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index eb27e88399..465fa6fe34 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -362,22 +362,33 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public ChannelPipeline remove(ChannelHandler handler) { - remove(getContextOrDie(handler)); + remove(getContextOrDie(handler), false); + return this; + } + + @Override + public ChannelPipeline removeAndForward(ChannelHandler handler) { + remove(getContextOrDie(handler), true); return this; } @Override public ChannelHandler remove(String name) { - return remove(getContextOrDie(name)).handler(); + return remove(getContextOrDie(name), false).handler(); + } + + @Override + public ChannelHandler removeAndForward(String name) { + return remove(getContextOrDie(name), true).handler(); } @SuppressWarnings("unchecked") @Override public T remove(Class handlerType) { - return (T) remove(getContextOrDie(handlerType)).handler(); + return (T) remove(getContextOrDie(handlerType), false).handler(); } - private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx) { + private DefaultChannelHandlerContext remove(final DefaultChannelHandlerContext ctx, final boolean forward) { assert ctx != head && ctx != tail; DefaultChannelHandlerContext context; @@ -385,14 +396,14 @@ final class DefaultChannelPipeline implements ChannelPipeline { synchronized (this) { if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) { - remove0(ctx); + remove0(ctx, forward); return ctx; } else { future = ctx.executor().submit(new Runnable() { @Override public void run() { synchronized (DefaultChannelPipeline.this) { - remove0(ctx); + remove0(ctx, forward); } } }); @@ -408,7 +419,13 @@ final class DefaultChannelPipeline implements ChannelPipeline { return context; } - private void remove0(DefaultChannelHandlerContext ctx) { + @SuppressWarnings("unchecked") + @Override + public T removeAndForward(Class handlerType) { + return (T) remove(getContextOrDie(handlerType), true).handler(); + } + + private void remove0(DefaultChannelHandlerContext ctx, boolean forward) { callBeforeRemove(ctx); DefaultChannelHandlerContext prev = ctx.prev; @@ -417,7 +434,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { next.prev = prev; name2ctx.remove(ctx.name()); - callAfterRemove(ctx); + callAfterRemove(ctx, forward); } @Override @@ -425,7 +442,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { if (head.next == tail) { throw new NoSuchElementException(); } - return remove(head.next).handler(); + return remove(head.next, false).handler(); } @Override @@ -433,29 +450,41 @@ final class DefaultChannelPipeline implements ChannelPipeline { if (head.next == tail) { throw new NoSuchElementException(); } - return remove(tail.prev).handler(); + return remove(tail.prev, false).handler(); } @Override public ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { - replace(getContextOrDie(oldHandler), newName, newHandler); + replace(getContextOrDie(oldHandler), newName, newHandler, false); + return this; + } + + @Override + public ChannelPipeline replaceAndForward(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { + replace(getContextOrDie(oldHandler), newName, newHandler, true); return this; } @Override public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) { - return replace(getContextOrDie(oldName), newName, newHandler); + return replace(getContextOrDie(oldName), newName, newHandler, false); + } + + @Override + public ChannelHandler replaceAndForward(String oldName, String newName, ChannelHandler newHandler) { + return replace(getContextOrDie(oldName), newName, newHandler, true); } @Override @SuppressWarnings("unchecked") public T replace( Class oldHandlerType, String newName, ChannelHandler newHandler) { - return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler); + return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler, false); } private ChannelHandler replace( - final DefaultChannelHandlerContext ctx, final String newName, ChannelHandler newHandler) { + final DefaultChannelHandlerContext ctx, final String newName, + ChannelHandler newHandler, final boolean forward) { assert ctx != head && ctx != tail; @@ -470,14 +499,15 @@ final class DefaultChannelPipeline implements ChannelPipeline { new DefaultChannelHandlerContext(this, ctx.executor, newName, newHandler); if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) { - replace0(ctx, newName, newCtx); + replace0(ctx, newName, newCtx, forward); + return ctx.handler(); } else { future = newCtx.executor().submit(new Runnable() { @Override public void run() { synchronized (DefaultChannelPipeline.this) { - replace0(ctx, newName, newCtx); + replace0(ctx, newName, newCtx, forward); } } }); @@ -492,7 +522,15 @@ final class DefaultChannelPipeline implements ChannelPipeline { return ctx.handler(); } - private void replace0(DefaultChannelHandlerContext ctx, String newName, DefaultChannelHandlerContext newCtx) { + @SuppressWarnings("unchecked") + @Override + public T replaceAndForward( + Class oldHandlerType, String newName, ChannelHandler newHandler) { + return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler, true); + } + + private void replace0(DefaultChannelHandlerContext ctx, String newName, + DefaultChannelHandlerContext newCtx, boolean forward) { boolean sameName = ctx.name().equals(newName); DefaultChannelHandlerContext prev = ctx.prev; @@ -515,7 +553,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { ChannelPipelineException addException = null; boolean removed = false; try { - callAfterRemove(ctx); + callAfterRemove(ctx, forward); removed = true; } catch (ChannelPipelineException e) { removeException = e; @@ -569,7 +607,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } catch (Throwable t) { boolean removed = false; try { - remove((DefaultChannelHandlerContext) ctx); + remove((DefaultChannelHandlerContext) ctx, false); removed = true; } catch (Throwable t2) { if (logger.isWarnEnabled()) { @@ -599,7 +637,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { } } - private void callAfterRemove(final DefaultChannelHandlerContext ctx) { + private void callAfterRemove(final DefaultChannelHandlerContext ctx, boolean forward) { final ChannelHandler handler = ctx.handler(); // Notify the complete removal. @@ -611,34 +649,17 @@ final class DefaultChannelPipeline implements ChannelPipeline { ".afterRemove() has thrown an exception.", t); } - // Free all buffers before completing removal. - if (channel.isRegistered()) { - ctx.executor().execute(new Runnable() { - @Override - public void run() { - freeHandlerBuffers(handler, ctx); - } - }); + if (forward) { + ctx.forwardBufferContent(); } else { - freeHandlerBuffers(handler, ctx); + ctx.clearBuffer(); } - ctx.removed = true; - } - private void freeHandlerBuffers(ChannelHandler handler, ChannelHandlerContext ctx) { - if (handler instanceof ChannelInboundHandler) { - try { - ((ChannelInboundHandler) handler).freeInboundBuffer(ctx); - } catch (Exception e) { - notifyHandlerException(e); - } - } - if (handler instanceof ChannelOutboundHandler) { - try { - ((ChannelOutboundHandler) handler).freeOutboundBuffer(ctx); - } catch (Exception e) { - notifyHandlerException(e); - } + ctx.removed = true; + + // Free all buffers before completing removal. + if (!channel.isRegistered()) { + ctx.freeHandlerBuffersAfterRemoval(); } }