Motivation: I need to control WebSockets inbound flow manually, when autoRead=false Modification: Add missed ctx.read() call into WebSocketProtocolHandler, where read request has been swallowed. Result: Fixes #9257
This commit is contained in:
parent
517a93d87d
commit
41c1ab2e82
@ -48,15 +48,23 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocke
|
||||
if (frame instanceof PingWebSocketFrame) {
|
||||
frame.content().retain();
|
||||
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content()));
|
||||
readIfNeeded(ctx);
|
||||
return;
|
||||
}
|
||||
if (frame instanceof PongWebSocketFrame && dropPongFrames) {
|
||||
readIfNeeded(ctx);
|
||||
return;
|
||||
}
|
||||
|
||||
out.add(frame.retain());
|
||||
}
|
||||
|
||||
private static void readIfNeeded(ChannelHandlerContext ctx) {
|
||||
if (!ctx.channel().config().isAutoRead()) {
|
||||
ctx.read();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
ctx.fireExceptionCaught(cause);
|
||||
|
@ -19,9 +19,10 @@ package io.netty.handler.codec.http.websocketx;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import io.netty.handler.flow.FlowControlHandler;
|
||||
import org.junit.Test;
|
||||
|
||||
import static io.netty.util.CharsetUtil.UTF_8;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
@ -31,7 +32,7 @@ public class WebSocketProtocolHandlerTest {
|
||||
|
||||
@Test
|
||||
public void testPingFrame() {
|
||||
ByteBuf pingData = Unpooled.copiedBuffer("Hello, world", CharsetUtil.UTF_8);
|
||||
ByteBuf pingData = Unpooled.copiedBuffer("Hello, world", UTF_8);
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new WebSocketProtocolHandler() { });
|
||||
|
||||
PingWebSocketFrame inputMessage = new PingWebSocketFrame(pingData);
|
||||
@ -45,6 +46,65 @@ public class WebSocketProtocolHandlerTest {
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPingPongFlowControlWhenAutoReadIsDisabled() {
|
||||
String text1 = "Hello, world #1";
|
||||
String text2 = "Hello, world #2";
|
||||
String text3 = "Hello, world #3";
|
||||
String text4 = "Hello, world #4";
|
||||
|
||||
EmbeddedChannel channel = new EmbeddedChannel();
|
||||
channel.config().setAutoRead(false);
|
||||
channel.pipeline().addLast(new FlowControlHandler());
|
||||
channel.pipeline().addLast(new WebSocketProtocolHandler() { });
|
||||
|
||||
// When
|
||||
assertFalse(channel.writeInbound(
|
||||
new PingWebSocketFrame(Unpooled.copiedBuffer(text1, UTF_8)),
|
||||
new TextWebSocketFrame(text2),
|
||||
new TextWebSocketFrame(text3),
|
||||
new PingWebSocketFrame(Unpooled.copiedBuffer(text4, UTF_8))
|
||||
));
|
||||
|
||||
// Then - no messages were handled or propagated
|
||||
assertNull(channel.readInbound());
|
||||
assertNull(channel.readOutbound());
|
||||
|
||||
// When
|
||||
channel.read();
|
||||
|
||||
// Then - pong frame was written to the outbound
|
||||
PongWebSocketFrame response1 = channel.readOutbound();
|
||||
assertEquals(text1, response1.content().toString(UTF_8));
|
||||
|
||||
// And - one requested message was handled and propagated inbound
|
||||
TextWebSocketFrame message2 = channel.readInbound();
|
||||
assertEquals(text2, message2.text());
|
||||
|
||||
// And - no more messages were handled or propagated
|
||||
assertNull(channel.readInbound());
|
||||
assertNull(channel.readOutbound());
|
||||
|
||||
// When
|
||||
channel.read();
|
||||
|
||||
// Then - one requested message was handled and propagated inbound
|
||||
TextWebSocketFrame message3 = channel.readInbound();
|
||||
assertEquals(text3, message3.text());
|
||||
|
||||
// And - no more messages were handled or propagated
|
||||
// Precisely, ping frame 'text4' was NOT read or handled.
|
||||
// It would be handle ONLY on the next 'channel.read()' call.
|
||||
assertNull(channel.readInbound());
|
||||
assertNull(channel.readOutbound());
|
||||
|
||||
// Cleanup
|
||||
response1.release();
|
||||
message2.release();
|
||||
message3.release();
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPongFrameDropFrameFalse() {
|
||||
EmbeddedChannel channel = new EmbeddedChannel(new WebSocketProtocolHandler(false) { });
|
||||
|
Loading…
Reference in New Issue
Block a user