Motivation: I need to control WebSockets inbound flow manually, when autoRead=false Modification: Add missed ctx.read() call into WebSocketProtocolHandler, where read request has been swallowed. Result: Fixes #9257
This commit is contained in:
parent
9b37be9550
commit
a05adceae8
@ -48,15 +48,23 @@ abstract class WebSocketProtocolHandler extends MessageToMessageDecoder<WebSocke
|
|||||||
if (frame instanceof PingWebSocketFrame) {
|
if (frame instanceof PingWebSocketFrame) {
|
||||||
frame.content().retain();
|
frame.content().retain();
|
||||||
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content()));
|
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content()));
|
||||||
|
readIfNeeded(ctx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (frame instanceof PongWebSocketFrame && dropPongFrames) {
|
if (frame instanceof PongWebSocketFrame && dropPongFrames) {
|
||||||
|
readIfNeeded(ctx);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
out.add(frame.retain());
|
out.add(frame.retain());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void readIfNeeded(ChannelHandlerContext ctx) {
|
||||||
|
if (!ctx.channel().config().isAutoRead()) {
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||||
ctx.fireExceptionCaught(cause);
|
ctx.fireExceptionCaught(cause);
|
||||||
|
@ -19,9 +19,10 @@ package io.netty.handler.codec.http.websocketx;
|
|||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.embedded.EmbeddedChannel;
|
import io.netty.channel.embedded.EmbeddedChannel;
|
||||||
import io.netty.util.CharsetUtil;
|
import io.netty.handler.flow.FlowControlHandler;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static io.netty.util.CharsetUtil.UTF_8;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -31,7 +32,7 @@ public class WebSocketProtocolHandlerTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPingFrame() {
|
public void testPingFrame() {
|
||||||
ByteBuf pingData = Unpooled.copiedBuffer("Hello, world", CharsetUtil.UTF_8);
|
ByteBuf pingData = Unpooled.copiedBuffer("Hello, world", UTF_8);
|
||||||
EmbeddedChannel channel = new EmbeddedChannel(new WebSocketProtocolHandler() { });
|
EmbeddedChannel channel = new EmbeddedChannel(new WebSocketProtocolHandler() { });
|
||||||
|
|
||||||
PingWebSocketFrame inputMessage = new PingWebSocketFrame(pingData);
|
PingWebSocketFrame inputMessage = new PingWebSocketFrame(pingData);
|
||||||
@ -45,6 +46,65 @@ public class WebSocketProtocolHandlerTest {
|
|||||||
assertFalse(channel.finish());
|
assertFalse(channel.finish());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPingPongFlowControlWhenAutoReadIsDisabled() {
|
||||||
|
String text1 = "Hello, world #1";
|
||||||
|
String text2 = "Hello, world #2";
|
||||||
|
String text3 = "Hello, world #3";
|
||||||
|
String text4 = "Hello, world #4";
|
||||||
|
|
||||||
|
EmbeddedChannel channel = new EmbeddedChannel();
|
||||||
|
channel.config().setAutoRead(false);
|
||||||
|
channel.pipeline().addLast(new FlowControlHandler());
|
||||||
|
channel.pipeline().addLast(new WebSocketProtocolHandler() { });
|
||||||
|
|
||||||
|
// When
|
||||||
|
assertFalse(channel.writeInbound(
|
||||||
|
new PingWebSocketFrame(Unpooled.copiedBuffer(text1, UTF_8)),
|
||||||
|
new TextWebSocketFrame(text2),
|
||||||
|
new TextWebSocketFrame(text3),
|
||||||
|
new PingWebSocketFrame(Unpooled.copiedBuffer(text4, UTF_8))
|
||||||
|
));
|
||||||
|
|
||||||
|
// Then - no messages were handled or propagated
|
||||||
|
assertNull(channel.readInbound());
|
||||||
|
assertNull(channel.readOutbound());
|
||||||
|
|
||||||
|
// When
|
||||||
|
channel.read();
|
||||||
|
|
||||||
|
// Then - pong frame was written to the outbound
|
||||||
|
PongWebSocketFrame response1 = channel.readOutbound();
|
||||||
|
assertEquals(text1, response1.content().toString(UTF_8));
|
||||||
|
|
||||||
|
// And - one requested message was handled and propagated inbound
|
||||||
|
TextWebSocketFrame message2 = channel.readInbound();
|
||||||
|
assertEquals(text2, message2.text());
|
||||||
|
|
||||||
|
// And - no more messages were handled or propagated
|
||||||
|
assertNull(channel.readInbound());
|
||||||
|
assertNull(channel.readOutbound());
|
||||||
|
|
||||||
|
// When
|
||||||
|
channel.read();
|
||||||
|
|
||||||
|
// Then - one requested message was handled and propagated inbound
|
||||||
|
TextWebSocketFrame message3 = channel.readInbound();
|
||||||
|
assertEquals(text3, message3.text());
|
||||||
|
|
||||||
|
// And - no more messages were handled or propagated
|
||||||
|
// Precisely, ping frame 'text4' was NOT read or handled.
|
||||||
|
// It would be handle ONLY on the next 'channel.read()' call.
|
||||||
|
assertNull(channel.readInbound());
|
||||||
|
assertNull(channel.readOutbound());
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
response1.release();
|
||||||
|
message2.release();
|
||||||
|
message3.release();
|
||||||
|
assertFalse(channel.finish());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPongFrameDropFrameFalse() {
|
public void testPongFrameDropFrameFalse() {
|
||||||
EmbeddedChannel channel = new EmbeddedChannel(new WebSocketProtocolHandler(false) { });
|
EmbeddedChannel channel = new EmbeddedChannel(new WebSocketProtocolHandler(false) { });
|
||||||
|
Loading…
Reference in New Issue
Block a user