From 42e31a444579b24e96dae11c594827311c1217d0 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 13 Mar 2013 15:17:06 +0900 Subject: [PATCH] Revert "[#1131] Codecs must not cache next buffer during processing" This reverts commit b1775a3223e54e076f0abba85360a19ded368498. --- .../handler/codec/ByteToByteDecoder.java | 19 ++++--------------- .../handler/codec/ByteToByteEncoder.java | 3 ++- .../handler/codec/ByteToMessageDecoder.java | 7 ++++--- .../codec/MessageToMessageDecoder.java | 3 +-- .../codec/MessageToMessageEncoder.java | 3 +-- .../netty/handler/codec/ReplayingDecoder.java | 4 +++- 6 files changed, 15 insertions(+), 24 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java index 45aee8c2d6..018a68e743 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToByteDecoder.java @@ -68,17 +68,17 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter @Override public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception { - callDecode(ctx, in); + callDecode(ctx, in, ctx.nextInboundByteBuffer()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ByteBuf in = ctx.inboundByteBuffer(); + ByteBuf out = ctx.nextInboundByteBuffer(); if (!in.isReadable()) { - callDecode(ctx, in); + callDecode(ctx, in, out); } - ByteBuf out = ctx.nextInboundByteBuffer(); int oldOutSize = out.readableBytes(); try { decodeLast(ctx, in, out); @@ -100,8 +100,7 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter /** * Call the {@link #decode(ChannelHandlerContext, ByteBuf, ByteBuf)} method until it is done. */ - private void callDecode(ChannelHandlerContext ctx, ByteBuf in) { - ByteBuf out = ctx.nextInboundByteBuffer(); + private void callDecode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) { int oldOutSize = out.readableBytes(); while (in.isReadable()) { int oldInSize = in.readableBytes(); @@ -117,16 +116,6 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter if (oldInSize == in.readableBytes() || isSingleDecode()) { break; } - - ByteBuf buf = ctx.nextInboundByteBuffer(); - if (out != buf) { - // user changed handlers in the pipeline need to trigger fireInboundBufferUpdated maybe ? - if (out.readableBytes() > oldOutSize) { - ctx.fireInboundBufferUpdated(); - } - out = ctx.nextInboundByteBuffer(); - oldOutSize = out.readableBytes(); - } } if (out.readableBytes() > oldOutSize) { diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java index 490aa505e6..78a3018bdc 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToByteEncoder.java @@ -54,12 +54,13 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte @Override protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception { + ByteBuf out = ctx.nextOutboundByteBuffer(); boolean encoded = false; while (in.isReadable()) { int oldInSize = in.readableBytes(); try { - encode(ctx, in, ctx.nextOutboundByteBuffer()); + encode(ctx, in, out); encoded = true; } catch (Throwable t) { Throwable cause; diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 056c3234c2..cf53bf02ad 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandlerAdapter; @@ -87,8 +88,7 @@ public abstract class ByteToMessageDecoder } try { - Object msg = decodeLast(ctx, in); - if (ctx.nextInboundMessageBuffer().unfoldAndAdd(msg)) { + if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, in))) { ctx.fireInboundBufferUpdated(); } } catch (Throwable t) { @@ -106,6 +106,7 @@ public abstract class ByteToMessageDecoder boolean wasNull = false; boolean decoded = false; + MessageBuf out = ctx.nextInboundMessageBuffer(); while (in.isReadable()) { try { int oldInputLength = in.readableBytes(); @@ -124,7 +125,7 @@ public abstract class ByteToMessageDecoder "decode() did not read anything but decoded a message."); } - if (ctx.nextInboundMessageBuffer().unfoldAndAdd(o)) { + if (out.unfoldAndAdd(o)) { decoded = true; if (isSingleDecode()) { break; diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java index 49b889b32b..a9b7311a84 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageDecoder.java @@ -46,8 +46,7 @@ public abstract class MessageToMessageDecoder extends ChannelInboundMessageHa @Override public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception { - Object omsg = decode(ctx, msg); - ctx.nextInboundMessageBuffer().unfoldAndAdd(omsg); + ctx.nextInboundMessageBuffer().unfoldAndAdd(decode(ctx, msg)); } /** diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index db9e8138c4..3263b8fc0d 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -45,8 +45,7 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundMessageH @Override public final void flush(ChannelHandlerContext ctx, I msg) throws Exception { try { - Object omsg = encode(ctx, msg); - ctx.nextOutboundMessageBuffer().unfoldAndAdd(omsg); + ctx.nextOutboundMessageBuffer().unfoldAndAdd(encode(ctx, msg)); } catch (CodecException e) { throw e; } catch (Exception e) { diff --git a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java index 5079fe0870..9fa7603d33 100644 --- a/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ReplayingDecoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec; import io.netty.buffer.ByteBuf; +import io.netty.buffer.MessageBuf; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -392,6 +393,7 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { boolean wasNull = false; ByteBuf in = cumulation; + MessageBuf out = ctx.nextInboundMessageBuffer(); boolean decoded = false; while (in.isReadable()) { try { @@ -439,7 +441,7 @@ public abstract class ReplayingDecoder extends ByteToMessageDecoder { } // A successful decode - if (ctx.nextInboundMessageBuffer().unfoldAndAdd(result)) { + if (out.unfoldAndAdd(result)) { decoded = true; if (isSingleDecode()) { break;