[#4087] Correctly forward bytes when remove codec and handle channelInactive / channelReadComplete(...)

Motivation:

We missed to correctly implement the handlerRemoved(...) / channelInactive(...) and channelReadComplete(...) method, this leaded to multiple problems:

 - Missed to forward bytes when the codec is removed from the pipeline
 - Missed to call decodeLast(...) once the Channel goes in active
 - No correct handling of channelReadComplete that could lead to grow of cumulation buffer.

Modifications:

- Correctly implement methods and forward to the internal ByteToMessageDecoder
- Add unit test.

Result:

Correct behaviour
This commit is contained in:
Norman Maurer 2015-08-13 21:35:14 +02:00
parent bd5091a14f
commit 45067078d4
2 changed files with 65 additions and 0 deletions

View File

@ -108,6 +108,34 @@ public abstract class ByteToMessageCodec<I> extends ChannelHandlerAdapter {
encoder.write(ctx, msg, promise); encoder.write(ctx, msg, promise);
} }
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
decoder.channelReadComplete(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
decoder.channelInactive(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
try {
decoder.handlerAdded(ctx);
} finally {
encoder.handlerAdded(ctx);
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
try {
decoder.handlerRemoved(ctx);
} finally {
encoder.handlerRemoved(ctx);
}
}
/** /**
* @see MessageToByteEncoder#encode(ChannelHandlerContext, Object, ByteBuf) * @see MessageToByteEncoder#encode(ChannelHandlerContext, Object, ByteBuf)
*/ */

View File

@ -16,12 +16,16 @@
package io.netty.handler.codec; package io.netty.handler.codec;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test; import org.junit.Test;
import java.util.List; import java.util.List;
import static org.junit.Assert.*;
public class ByteToMessageCodecTest { public class ByteToMessageCodecTest {
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
@ -34,6 +38,39 @@ public class ByteToMessageCodecTest {
new InvalidByteToMessageCodec2(); new InvalidByteToMessageCodec2();
} }
@Test
public void testForwardPendingData() {
ByteToMessageCodec<Integer> codec = new ByteToMessageCodec<Integer>() {
@Override
protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
out.writeInt(msg);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() >= 4) {
out.add(in.readInt());
}
}
};
ByteBuf buffer = Unpooled.buffer();
buffer.writeInt(1);
buffer.writeByte('0');
EmbeddedChannel ch = new EmbeddedChannel(codec);
assertTrue(ch.writeInbound(buffer));
ch.pipeline().remove(codec);
assertTrue(ch.finish());
assertEquals(1, ch.readInbound());
ByteBuf buf = (ByteBuf) ch.readInbound();
assertEquals(Unpooled.wrappedBuffer(new byte[]{'0'}), buf);
buf.release();
assertNull(ch.readInbound());
assertNull(ch.readOutbound());
}
@ChannelHandler.Sharable @ChannelHandler.Sharable
private static final class InvalidByteToMessageCodec extends ByteToMessageCodec<Integer> { private static final class InvalidByteToMessageCodec extends ByteToMessageCodec<Integer> {
InvalidByteToMessageCodec() { InvalidByteToMessageCodec() {