[#4087] Correctly forward bytes when remove codec and handle channelInactive / channelReadComplete(...)
Motivation: We missed to correctly implement the handlerRemoved(...) / channelInactive(...) and channelReadComplete(...) method, this leaded to multiple problems: - Missed to forward bytes when the codec is removed from the pipeline - Missed to call decodeLast(...) once the Channel goes in active - No correct handling of channelReadComplete that could lead to grow of cumulation buffer. Modifications: - Correctly implement methods and forward to the internal ByteToMessageDecoder - Add unit test. Result: Correct behaviour
This commit is contained in:
parent
a0bde17eff
commit
5b4a96a7ac
@ -108,6 +108,34 @@ public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler {
|
|||||||
encoder.write(ctx, msg, promise);
|
encoder.write(ctx, msg, promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
decoder.channelReadComplete(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
decoder.channelInactive(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
try {
|
||||||
|
decoder.handlerAdded(ctx);
|
||||||
|
} finally {
|
||||||
|
encoder.handlerAdded(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
try {
|
||||||
|
decoder.handlerRemoved(ctx);
|
||||||
|
} finally {
|
||||||
|
encoder.handlerRemoved(ctx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @see MessageToByteEncoder#encode(ChannelHandlerContext, Object, ByteBuf)
|
* @see MessageToByteEncoder#encode(ChannelHandlerContext, Object, ByteBuf)
|
||||||
*/
|
*/
|
||||||
|
@ -16,12 +16,16 @@
|
|||||||
package io.netty.handler.codec;
|
package io.netty.handler.codec;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandler;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
public class ByteToMessageCodecTest {
|
public class ByteToMessageCodecTest {
|
||||||
|
|
||||||
@Test(expected = IllegalStateException.class)
|
@Test(expected = IllegalStateException.class)
|
||||||
@ -34,6 +38,39 @@ public class ByteToMessageCodecTest {
|
|||||||
new InvalidByteToMessageCodec2();
|
new InvalidByteToMessageCodec2();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testForwardPendingData() {
|
||||||
|
ByteToMessageCodec<Integer> codec = new ByteToMessageCodec<Integer>() {
|
||||||
|
@Override
|
||||||
|
protected void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception {
|
||||||
|
out.writeInt(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||||
|
if (in.readableBytes() >= 4) {
|
||||||
|
out.add(in.readInt());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
ByteBuf buffer = Unpooled.buffer();
|
||||||
|
buffer.writeInt(1);
|
||||||
|
buffer.writeByte('0');
|
||||||
|
|
||||||
|
EmbeddedChannel ch = new EmbeddedChannel(codec);
|
||||||
|
assertTrue(ch.writeInbound(buffer));
|
||||||
|
ch.pipeline().remove(codec);
|
||||||
|
assertTrue(ch.finish());
|
||||||
|
assertEquals(1, ch.readInbound());
|
||||||
|
|
||||||
|
ByteBuf buf = (ByteBuf) ch.readInbound();
|
||||||
|
assertEquals(Unpooled.wrappedBuffer(new byte[]{'0'}), buf);
|
||||||
|
buf.release();
|
||||||
|
assertNull(ch.readInbound());
|
||||||
|
assertNull(ch.readOutbound());
|
||||||
|
}
|
||||||
|
|
||||||
@ChannelHandler.Sharable
|
@ChannelHandler.Sharable
|
||||||
private static final class InvalidByteToMessageCodec extends ByteToMessageCodec<Integer> {
|
private static final class InvalidByteToMessageCodec extends ByteToMessageCodec<Integer> {
|
||||||
InvalidByteToMessageCodec() {
|
InvalidByteToMessageCodec() {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user