diff --git a/codec/src/main/java/io/netty/handler/codec/MessageAggregator.java b/codec/src/main/java/io/netty/handler/codec/MessageAggregator.java index 38e5bd7575..2bede688bd 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageAggregator.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageAggregator.java @@ -399,6 +399,17 @@ public abstract class MessageAggregator { + + protected MockMessageAggregator() { + super(1024); + } + + @Override + protected ByteBufHolder beginAggregation(ByteBufHolder start, ByteBuf content) throws Exception { + return start.replace(content); + } + } + + private static ByteBufHolder message(String string) { + return new DefaultByteBufHolder( + Unpooled.copiedBuffer(string, CharsetUtil.US_ASCII)); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadFlowManagement() throws Exception { + ReadCounter counter = new ReadCounter(); + ByteBufHolder first = message("first"); + ByteBufHolder chunk = message("chunk"); + ByteBufHolder last = message("last"); + + MockMessageAggregator agg = spy(MockMessageAggregator.class); + when(agg.isStartMessage(first)).thenReturn(true); + when(agg.isContentMessage(chunk)).thenReturn(true); + when(agg.isContentMessage(last)).thenReturn(true); + when(agg.isLastContentMessage(last)).thenReturn(true); + + EmbeddedChannel embedded = new EmbeddedChannel(counter, agg); + embedded.config().setAutoRead(false); + + assertFalse(embedded.writeInbound(first)); + assertFalse(embedded.writeInbound(chunk)); + assertTrue(embedded.writeInbound(last)); + + assertEquals(3, counter.value); // 2 reads issued from MockMessageAggregator + // 1 read issued from EmbeddedChannel constructor + + ByteBufHolder all = new DefaultByteBufHolder(Unpooled.wrappedBuffer( + first.content().retain(), chunk.content().retain(), last.content().retain())); + ByteBufHolder out = embedded.readInbound(); + + assertEquals(all, out); + assertTrue(all.release() && out.release()); + assertFalse(embedded.finish()); + } +}