Revert "[#1131] Codecs must not cache next buffer during processing"

This reverts commit b1775a3223.
This commit is contained in:
Trustin Lee 2013-03-13 15:17:06 +09:00
parent 5830875b42
commit 42e31a4445
6 changed files with 15 additions and 24 deletions

View File

@ -68,17 +68,17 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter
@Override @Override
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception { public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
callDecode(ctx, in); callDecode(ctx, in, ctx.nextInboundByteBuffer());
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ByteBuf in = ctx.inboundByteBuffer(); ByteBuf in = ctx.inboundByteBuffer();
ByteBuf out = ctx.nextInboundByteBuffer();
if (!in.isReadable()) { if (!in.isReadable()) {
callDecode(ctx, in); callDecode(ctx, in, out);
} }
ByteBuf out = ctx.nextInboundByteBuffer();
int oldOutSize = out.readableBytes(); int oldOutSize = out.readableBytes();
try { try {
decodeLast(ctx, in, out); decodeLast(ctx, in, out);
@ -100,8 +100,7 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter
/** /**
* Call the {@link #decode(ChannelHandlerContext, ByteBuf, ByteBuf)} method until it is done. * Call the {@link #decode(ChannelHandlerContext, ByteBuf, ByteBuf)} method until it is done.
*/ */
private void callDecode(ChannelHandlerContext ctx, ByteBuf in) { private void callDecode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) {
ByteBuf out = ctx.nextInboundByteBuffer();
int oldOutSize = out.readableBytes(); int oldOutSize = out.readableBytes();
while (in.isReadable()) { while (in.isReadable()) {
int oldInSize = in.readableBytes(); int oldInSize = in.readableBytes();
@ -117,16 +116,6 @@ public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter
if (oldInSize == in.readableBytes() || isSingleDecode()) { if (oldInSize == in.readableBytes() || isSingleDecode()) {
break; break;
} }
ByteBuf buf = ctx.nextInboundByteBuffer();
if (out != buf) {
// user changed handlers in the pipeline need to trigger fireInboundBufferUpdated maybe ?
if (out.readableBytes() > oldOutSize) {
ctx.fireInboundBufferUpdated();
}
out = ctx.nextInboundByteBuffer();
oldOutSize = out.readableBytes();
}
} }
if (out.readableBytes() > oldOutSize) { if (out.readableBytes() > oldOutSize) {

View File

@ -54,12 +54,13 @@ public abstract class ByteToByteEncoder extends ChannelOutboundByteHandlerAdapte
@Override @Override
protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception { protected void flush(ChannelHandlerContext ctx, ByteBuf in, ChannelPromise promise) throws Exception {
ByteBuf out = ctx.nextOutboundByteBuffer();
boolean encoded = false; boolean encoded = false;
while (in.isReadable()) { while (in.isReadable()) {
int oldInSize = in.readableBytes(); int oldInSize = in.readableBytes();
try { try {
encode(ctx, in, ctx.nextOutboundByteBuffer()); encode(ctx, in, out);
encoded = true; encoded = true;
} catch (Throwable t) { } catch (Throwable t) {
Throwable cause; Throwable cause;

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundByteHandler; import io.netty.channel.ChannelInboundByteHandler;
import io.netty.channel.ChannelInboundByteHandlerAdapter; import io.netty.channel.ChannelInboundByteHandlerAdapter;
@ -87,8 +88,7 @@ public abstract class ByteToMessageDecoder
} }
try { try {
Object msg = decodeLast(ctx, in); if (ctx.nextInboundMessageBuffer().unfoldAndAdd(decodeLast(ctx, in))) {
if (ctx.nextInboundMessageBuffer().unfoldAndAdd(msg)) {
ctx.fireInboundBufferUpdated(); ctx.fireInboundBufferUpdated();
} }
} catch (Throwable t) { } catch (Throwable t) {
@ -106,6 +106,7 @@ public abstract class ByteToMessageDecoder
boolean wasNull = false; boolean wasNull = false;
boolean decoded = false; boolean decoded = false;
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
while (in.isReadable()) { while (in.isReadable()) {
try { try {
int oldInputLength = in.readableBytes(); int oldInputLength = in.readableBytes();
@ -124,7 +125,7 @@ public abstract class ByteToMessageDecoder
"decode() did not read anything but decoded a message."); "decode() did not read anything but decoded a message.");
} }
if (ctx.nextInboundMessageBuffer().unfoldAndAdd(o)) { if (out.unfoldAndAdd(o)) {
decoded = true; decoded = true;
if (isSingleDecode()) { if (isSingleDecode()) {
break; break;

View File

@ -46,8 +46,7 @@ public abstract class MessageToMessageDecoder<I> extends ChannelInboundMessageHa
@Override @Override
public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception { public final void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception {
Object omsg = decode(ctx, msg); ctx.nextInboundMessageBuffer().unfoldAndAdd(decode(ctx, msg));
ctx.nextInboundMessageBuffer().unfoldAndAdd(omsg);
} }
/** /**

View File

@ -45,8 +45,7 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundMessageH
@Override @Override
public final void flush(ChannelHandlerContext ctx, I msg) throws Exception { public final void flush(ChannelHandlerContext ctx, I msg) throws Exception {
try { try {
Object omsg = encode(ctx, msg); ctx.nextOutboundMessageBuffer().unfoldAndAdd(encode(ctx, msg));
ctx.nextOutboundMessageBuffer().unfoldAndAdd(omsg);
} catch (CodecException e) { } catch (CodecException e) {
throw e; throw e;
} catch (Exception e) { } catch (Exception e) {

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.MessageBuf;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
@ -392,6 +393,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
boolean wasNull = false; boolean wasNull = false;
ByteBuf in = cumulation; ByteBuf in = cumulation;
MessageBuf<Object> out = ctx.nextInboundMessageBuffer();
boolean decoded = false; boolean decoded = false;
while (in.isReadable()) { while (in.isReadable()) {
try { try {
@ -439,7 +441,7 @@ public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
} }
// A successful decode // A successful decode
if (ctx.nextInboundMessageBuffer().unfoldAndAdd(result)) { if (out.unfoldAndAdd(result)) {
decoded = true; decoded = true;
if (isSingleDecode()) { if (isSingleDecode()) {
break; break;