Suppress channelReadComplete until MessageAggregator completes aggregating a message. https://github.com/netty/netty/issues/3168

Motivation:
This fixes issue 3168 where HttpObjectAggregator does not suppress
channelReadComplete() when aggregation is not yet finished.

Modifications:
Ignore channelReadComplete until a message completes aggregation.
MessageAggregator currently tracks the currentMessage being aggregated.
This variable transitions to non-null when aggregation begins and back
to null when aggregation completes or fails. When the currentMessage is
null, it is safe to issue a channelReadComplete because the
corresponding channelRead will have completed aggregation.

Result:
channelReadComplete will only fire one time on each completed message
aggregation.
This commit is contained in:
Sam Young 2014-12-11 22:50:04 -08:00 committed by Norman Maurer
parent cc8140b41c
commit cb95a1331d
3 changed files with 98 additions and 6 deletions

View File

@ -18,20 +18,33 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.DecoderResultProvider;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.CharsetUtil;
import org.easymock.EasyMock;
import org.junit.Test;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import static io.netty.util.ReferenceCountUtil.*;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.createMockBuilder;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class HttpObjectAggregatorTest {
@ -332,8 +345,8 @@ public class HttpObjectAggregatorTest {
@Test(expected = IllegalStateException.class)
public void testSetMaxCumulationBufferComponentsAfterInit() throws Exception {
HttpObjectAggregator aggr = new HttpObjectAggregator(Integer.MAX_VALUE);
ChannelHandlerContext ctx = EasyMock.createMock(ChannelHandlerContext.class);
EasyMock.replay(ctx);
ChannelHandlerContext ctx = createMock(ChannelHandlerContext.class);
replay(ctx);
aggr.handlerAdded(ctx);
aggr.setMaxCumulationBufferComponents(10);
}
@ -387,4 +400,40 @@ public class HttpObjectAggregatorTest {
assertNull(ch.readInbound());
ch.finish();
}
@Test
public void testReadCompleteSuppression() throws Exception {
ChannelHandlerAdapter sink = createMockBuilder(ChannelHandlerAdapter.class)
.addMockedMethod("channelReadComplete")
.createStrictMock();
sink.channelReadComplete(anyObject(ChannelHandlerContext.class));
expectLastCall();
replay(sink);
HttpObjectAggregator aggr = new HttpObjectAggregator(1024 * 1024);
EmbeddedChannel embedder = new EmbeddedChannel(aggr, sink);
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost");
HttpContent chunk1 = new DefaultHttpContent(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII));
HttpContent chunk2 = new DefaultHttpContent(Unpooled.copiedBuffer("test2", CharsetUtil.US_ASCII));
HttpContent chunk3 = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
assertFalse(embedder.writeInbound(message));
assertFalse(embedder.writeInbound(chunk1));
assertFalse(embedder.writeInbound(chunk2));
// this should trigger a channelRead event so return true
assertTrue(embedder.writeInbound(chunk3));
assertTrue(embedder.finish());
FullHttpRequest aggratedMessage = embedder.readInbound();
assertNotNull(aggratedMessage);
assertEquals(chunk1.content().readableBytes() + chunk2.content().readableBytes(),
HttpHeaderUtil.getContentLength(aggratedMessage));
checkContentBuffer(aggratedMessage);
assertNull(embedder.readInbound());
verify(sink);
}
}

View File

@ -17,13 +17,17 @@ package io.netty.handler.codec.http.websocketx;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.junit.Assert;
import org.junit.Test;
import static org.easymock.EasyMock.*;
public class WebSocketFrameAggregatorTest {
private final ByteBuf content1 = ReferenceCountUtil.releaseLater(
@ -146,4 +150,34 @@ public class WebSocketFrameAggregatorTest {
}
channel.finish();
}
@Test
public void testReadCompleteSuppression() throws Exception {
ChannelHandlerAdapter sink = createMockBuilder(ChannelHandlerAdapter.class)
.addMockedMethod("channelReadComplete")
.createStrictMock();
sink.channelReadComplete(anyObject(ChannelHandlerContext.class));
expectLastCall();
replay(sink);
WebSocketFrameAggregator aggr = new WebSocketFrameAggregator(Integer.MAX_VALUE);
EmbeddedChannel channel = new EmbeddedChannel(aggr, sink);
Assert.assertFalse(channel.writeInbound(new TextWebSocketFrame(false, 0, content1.copy())));
Assert.assertFalse(channel.writeInbound(new ContinuationWebSocketFrame(false, 0, content2.copy())));
Assert.assertTrue(channel.writeInbound(new ContinuationWebSocketFrame(true, 0, content3.copy())));
Assert.assertTrue(channel.finish());
WebSocketFrame frame = channel.readInbound();
Assert.assertTrue(frame.isFinalFragment());
Assert.assertEquals(0, frame.rsv());
Assert.assertEquals(aggregatedContent, frame.content());
frame.release();
Assert.assertNull(channel.readInbound());
verify(sink);
}
}

View File

@ -184,6 +184,15 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
return ctx;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// if currentMessage is null when this callback fires, aggregation
// of a message already completed.
if (currentMessage == null) {
ctx.fireChannelReadComplete();
}
}
@Override
protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
O currentMessage = this.currentMessage;