#2183 Fix for releasing of the internal cumulation buffer in ByteToMessageDecoder
This commit is contained in:
parent
ff771c0fcb
commit
fc6fa2774e
@ -109,6 +109,8 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
|||||||
ByteBuf bytes = buf.readBytes(readable);
|
ByteBuf bytes = buf.readBytes(readable);
|
||||||
buf.release();
|
buf.release();
|
||||||
ctx.fireChannelRead(bytes);
|
ctx.fireChannelRead(bytes);
|
||||||
|
} else {
|
||||||
|
buf.release();
|
||||||
}
|
}
|
||||||
cumulation = null;
|
cumulation = null;
|
||||||
ctx.fireChannelReadComplete();
|
ctx.fireChannelReadComplete();
|
||||||
|
@ -19,6 +19,7 @@ import io.netty.buffer.ByteBuf;
|
|||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
|
import io.netty.util.ReferenceCountUtil;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -72,4 +73,50 @@ public class ByteToMessageDecoderTest {
|
|||||||
buf.release();
|
buf.release();
|
||||||
b.release();
|
b.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that internal buffer of the ByteToMessageDecoder is released once decoder is removed from pipeline. In
|
||||||
|
* this case input is read fully.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testInternalBufferClearReadAll() {
|
||||||
|
final ByteBuf buf = ReferenceCountUtil.releaseLater(Unpooled.buffer().writeBytes(new byte[]{'a'}));
|
||||||
|
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||||
|
@Override
|
||||||
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||||
|
ByteBuf byteBuf = internalBuffer();
|
||||||
|
Assert.assertEquals(1, byteBuf.refCnt());
|
||||||
|
in.readByte();
|
||||||
|
// Removal from pipeline should clear internal buffer
|
||||||
|
ctx.pipeline().remove(this);
|
||||||
|
Assert.assertEquals(0, byteBuf.refCnt());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Assert.assertFalse(channel.writeInbound(buf));
|
||||||
|
Assert.assertFalse(channel.finish());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that internal buffer of the ByteToMessageDecoder is released once decoder is removed from pipeline. In
|
||||||
|
* this case input was not fully read.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testInternalBufferClearReadPartly() {
|
||||||
|
final ByteBuf buf = ReferenceCountUtil.releaseLater(Unpooled.buffer().writeBytes(new byte[]{'a', 'b'}));
|
||||||
|
EmbeddedChannel channel = new EmbeddedChannel(new ByteToMessageDecoder() {
|
||||||
|
@Override
|
||||||
|
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||||
|
ByteBuf byteBuf = internalBuffer();
|
||||||
|
Assert.assertEquals(1, byteBuf.refCnt());
|
||||||
|
in.readByte();
|
||||||
|
// Removal from pipeline should clear internal buffer
|
||||||
|
ctx.pipeline().remove(this);
|
||||||
|
Assert.assertEquals(0, byteBuf.refCnt());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Assert.assertTrue(channel.writeInbound(buf));
|
||||||
|
Assert.assertTrue(channel.finish());
|
||||||
|
Assert.assertEquals(channel.readInbound(), Unpooled.wrappedBuffer(new byte[] {'b'}));
|
||||||
|
Assert.assertNull(channel.readInbound());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user