Revert "Suppress channelReadComplete until MessageAggregator completes aggregating a message. https://github.com/netty/netty/issues/3168"
This reverts commit cb95a1331d2dfed63d9f8732657beb6c2a6778fd.
This commit is contained in:
parent
cb95a1331d
commit
99a703be88
@ -18,33 +18,20 @@ package io.netty.handler.codec.http;
|
|||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.CompositeByteBuf;
|
import io.netty.buffer.CompositeByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandlerAdapter;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.handler.codec.DecoderResultProvider;
|
import io.netty.handler.codec.DecoderResultProvider;
|
||||||
import io.netty.handler.codec.TooLongFrameException;
|
import io.netty.handler.codec.TooLongFrameException;
|
||||||
import io.netty.util.CharsetUtil;
|
import io.netty.util.CharsetUtil;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.nio.channels.ClosedChannelException;
|
import java.nio.channels.ClosedChannelException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import static io.netty.util.ReferenceCountUtil.releaseLater;
|
import static io.netty.util.ReferenceCountUtil.*;
|
||||||
import static org.easymock.EasyMock.anyObject;
|
import static org.hamcrest.CoreMatchers.*;
|
||||||
import static org.easymock.EasyMock.createMock;
|
import static org.junit.Assert.*;
|
||||||
import static org.easymock.EasyMock.createMockBuilder;
|
|
||||||
import static org.easymock.EasyMock.expectLastCall;
|
|
||||||
import static org.easymock.EasyMock.replay;
|
|
||||||
import static org.easymock.EasyMock.verify;
|
|
||||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertThat;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
|
|
||||||
public class HttpObjectAggregatorTest {
|
public class HttpObjectAggregatorTest {
|
||||||
|
|
||||||
@ -345,8 +332,8 @@ public class HttpObjectAggregatorTest {
|
|||||||
@Test(expected = IllegalStateException.class)
|
@Test(expected = IllegalStateException.class)
|
||||||
public void testSetMaxCumulationBufferComponentsAfterInit() throws Exception {
|
public void testSetMaxCumulationBufferComponentsAfterInit() throws Exception {
|
||||||
HttpObjectAggregator aggr = new HttpObjectAggregator(Integer.MAX_VALUE);
|
HttpObjectAggregator aggr = new HttpObjectAggregator(Integer.MAX_VALUE);
|
||||||
ChannelHandlerContext ctx = createMock(ChannelHandlerContext.class);
|
ChannelHandlerContext ctx = EasyMock.createMock(ChannelHandlerContext.class);
|
||||||
replay(ctx);
|
EasyMock.replay(ctx);
|
||||||
aggr.handlerAdded(ctx);
|
aggr.handlerAdded(ctx);
|
||||||
aggr.setMaxCumulationBufferComponents(10);
|
aggr.setMaxCumulationBufferComponents(10);
|
||||||
}
|
}
|
||||||
@ -400,40 +387,4 @@ public class HttpObjectAggregatorTest {
|
|||||||
assertNull(ch.readInbound());
|
assertNull(ch.readInbound());
|
||||||
ch.finish();
|
ch.finish();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReadCompleteSuppression() throws Exception {
|
|
||||||
ChannelHandlerAdapter sink = createMockBuilder(ChannelHandlerAdapter.class)
|
|
||||||
.addMockedMethod("channelReadComplete")
|
|
||||||
.createStrictMock();
|
|
||||||
|
|
||||||
sink.channelReadComplete(anyObject(ChannelHandlerContext.class));
|
|
||||||
expectLastCall();
|
|
||||||
replay(sink);
|
|
||||||
|
|
||||||
HttpObjectAggregator aggr = new HttpObjectAggregator(1024 * 1024);
|
|
||||||
EmbeddedChannel embedder = new EmbeddedChannel(aggr, sink);
|
|
||||||
|
|
||||||
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost");
|
|
||||||
HttpContent chunk1 = new DefaultHttpContent(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII));
|
|
||||||
HttpContent chunk2 = new DefaultHttpContent(Unpooled.copiedBuffer("test2", CharsetUtil.US_ASCII));
|
|
||||||
HttpContent chunk3 = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
|
|
||||||
assertFalse(embedder.writeInbound(message));
|
|
||||||
assertFalse(embedder.writeInbound(chunk1));
|
|
||||||
assertFalse(embedder.writeInbound(chunk2));
|
|
||||||
|
|
||||||
// this should trigger a channelRead event so return true
|
|
||||||
assertTrue(embedder.writeInbound(chunk3));
|
|
||||||
assertTrue(embedder.finish());
|
|
||||||
FullHttpRequest aggratedMessage = embedder.readInbound();
|
|
||||||
assertNotNull(aggratedMessage);
|
|
||||||
|
|
||||||
assertEquals(chunk1.content().readableBytes() + chunk2.content().readableBytes(),
|
|
||||||
HttpHeaderUtil.getContentLength(aggratedMessage));
|
|
||||||
checkContentBuffer(aggratedMessage);
|
|
||||||
assertNull(embedder.readInbound());
|
|
||||||
|
|
||||||
verify(sink);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -17,17 +17,13 @@ package io.netty.handler.codec.http.websocketx;
|
|||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandlerAdapter;
|
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.handler.codec.TooLongFrameException;
|
import io.netty.handler.codec.TooLongFrameException;
|
||||||
import io.netty.util.CharsetUtil;
|
import io.netty.util.CharsetUtil;
|
||||||
import io.netty.util.ReferenceCountUtil;
|
import io.netty.util.ReferenceCountUtil;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.easymock.EasyMock.*;
|
|
||||||
|
|
||||||
public class WebSocketFrameAggregatorTest {
|
public class WebSocketFrameAggregatorTest {
|
||||||
private final ByteBuf content1 = ReferenceCountUtil.releaseLater(
|
private final ByteBuf content1 = ReferenceCountUtil.releaseLater(
|
||||||
@ -150,34 +146,4 @@ public class WebSocketFrameAggregatorTest {
|
|||||||
}
|
}
|
||||||
channel.finish();
|
channel.finish();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testReadCompleteSuppression() throws Exception {
|
|
||||||
ChannelHandlerAdapter sink = createMockBuilder(ChannelHandlerAdapter.class)
|
|
||||||
.addMockedMethod("channelReadComplete")
|
|
||||||
.createStrictMock();
|
|
||||||
|
|
||||||
sink.channelReadComplete(anyObject(ChannelHandlerContext.class));
|
|
||||||
expectLastCall();
|
|
||||||
replay(sink);
|
|
||||||
|
|
||||||
WebSocketFrameAggregator aggr = new WebSocketFrameAggregator(Integer.MAX_VALUE);
|
|
||||||
EmbeddedChannel channel = new EmbeddedChannel(aggr, sink);
|
|
||||||
|
|
||||||
Assert.assertFalse(channel.writeInbound(new TextWebSocketFrame(false, 0, content1.copy())));
|
|
||||||
Assert.assertFalse(channel.writeInbound(new ContinuationWebSocketFrame(false, 0, content2.copy())));
|
|
||||||
Assert.assertTrue(channel.writeInbound(new ContinuationWebSocketFrame(true, 0, content3.copy())));
|
|
||||||
Assert.assertTrue(channel.finish());
|
|
||||||
|
|
||||||
WebSocketFrame frame = channel.readInbound();
|
|
||||||
Assert.assertTrue(frame.isFinalFragment());
|
|
||||||
Assert.assertEquals(0, frame.rsv());
|
|
||||||
Assert.assertEquals(aggregatedContent, frame.content());
|
|
||||||
frame.release();
|
|
||||||
|
|
||||||
Assert.assertNull(channel.readInbound());
|
|
||||||
|
|
||||||
verify(sink);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -184,15 +184,6 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
|
|||||||
return ctx;
|
return ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
|
||||||
// if currentMessage is null when this callback fires, aggregation
|
|
||||||
// of a message already completed.
|
|
||||||
if (currentMessage == null) {
|
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
|
protected void decode(final ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception {
|
||||||
O currentMessage = this.currentMessage;
|
O currentMessage = this.currentMessage;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user