Add support for heartbeat in STOMP decoder/encoder. (#10695)

Motivation:

Heart-beat is a functionality of STOMP enabling clients and servers to know the healthiness of the connection. The current decoder didn't allow for heart-beat messages to be forwarded to the decoder and were simply swallowed as part of the frame decoding.

Modifications:

Adding support for heartbeat message parsing by introducing a new HEARTBEAT command (not a real STOMP command).
Heartbeat received on the channel will trigger a StompFrame with the command set to HEARTBEAT.
Sending heartbeat on the channel is achieved by creating a StompFrame with the command set to HEARTBEAT.

Result:

Heartbeat can now be received/sent and acted upon to determine the healthiness of the connection and terminate it if needed.
This commit is contained in:
Benjamin Roux 2020-10-30 06:32:57 -07:00 committed by GitHub
parent 26976310d2
commit 81544ab94f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 122 additions and 29 deletions

View File

@ -32,5 +32,6 @@ public enum StompCommand {
MESSAGE,
RECEIPT,
ERROR,
HEARTBEAT, // not an actual STOMP command, but used for 'heart-beat' functionality
UNKNOWN
}

View File

@ -56,7 +56,6 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
enum State {
SKIP_CONTROL_CHARACTERS,
READ_HEADERS,
READ_CONTENT,
FINALIZE_FRAME_READ,
@ -70,6 +69,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
private int alreadyReadChunkSize;
private LastStompContentSubframe lastContent;
private long contentLength = -1;
private boolean expectNulCharacter = true;
public StompSubframeDecoder() {
this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
@ -84,7 +84,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
}
public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) {
super(State.SKIP_CONTROL_CHARACTERS);
super(State.READ_HEADERS);
checkPositive(maxLineLength, "maxLineLength");
checkPositive(maxChunkSize, "maxChunkSize");
this.maxChunkSize = maxChunkSize;
@ -95,17 +95,19 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case SKIP_CONTROL_CHARACTERS:
skipControlCharacters(in);
checkpoint(State.READ_HEADERS);
// Fall through.
case READ_HEADERS:
StompCommand command = StompCommand.UNKNOWN;
StompHeadersSubframe frame = null;
try {
command = readCommand(in);
frame = new DefaultStompHeadersSubframe(command);
checkpoint(readHeaders(in, frame.headers()));
if (command == StompCommand.HEARTBEAT) {
// heart-beat don't have headers nor nul character, so just skip to end frame
expectNulCharacter = false;
checkpoint(State.FINALIZE_FRAME_READ);
} else {
checkpoint(readHeaders(in, frame.headers()));
}
out.add(frame);
} catch (Exception e) {
if (frame == null) {
@ -167,7 +169,9 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
}
// Fall through.
case FINALIZE_FRAME_READ:
skipNullCharacter(in);
if (expectNulCharacter) {
skipNullCharacter(in);
}
if (lastContent == null) {
lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
}
@ -189,7 +193,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
}
String commandStr = commandSequence.toString();
try {
return StompCommand.valueOf(commandStr);
return isHeartbeat(commandStr) ? StompCommand.HEARTBEAT : StompCommand.valueOf(commandStr);
} catch (IllegalArgumentException iae) {
throw new DecoderException("Cannot to parse command " + commandStr);
}
@ -218,6 +222,10 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
return contentLength;
}
private static boolean isHeartbeat(String command) {
return command.isEmpty() || (command.length() == 1 && command.charAt(0) == StompConstants.CR);
}
private static void skipNullCharacter(ByteBuf buffer) {
byte b = buffer.readByte();
if (b != StompConstants.NUL) {
@ -225,22 +233,12 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
}
}
private static void skipControlCharacters(ByteBuf buffer) {
byte b;
for (;;) {
b = buffer.readByte();
if (b != StompConstants.CR && b != StompConstants.LF) {
buffer.readerIndex(buffer.readerIndex() - 1);
break;
}
}
}
private void resetDecoder() {
checkpoint(State.SKIP_CONTROL_CHARACTERS);
checkpoint(State.READ_HEADERS);
contentLength = -1;
alreadyReadChunkSize = 0;
lastContent = null;
expectNulCharacter = true;
}
private static class Utf8LineParser implements ByteProcessor {

View File

@ -17,6 +17,7 @@ package io.netty.handler.codec.stomp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;
@ -28,19 +29,26 @@ import java.util.Map.Entry;
* Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}.
*/
public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe> {
private static final ByteBuf HEARTBEAT_FRAME =
Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { StompConstants.LF }));
@Override
protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
if (msg instanceof StompFrame) {
StompFrame frame = (StompFrame) msg;
ByteBuf frameBuf = encodeFrame(frame, ctx);
if (frame.content().isReadable()) {
out.add(frameBuf);
ByteBuf contentBuf = encodeContent(frame, ctx);
out.add(contentBuf);
// HEARTBEAT command isn't a real command and should be translated differently
if (frame.command() == StompCommand.HEARTBEAT) {
out.add(HEARTBEAT_FRAME.duplicate());
} else {
frameBuf.writeByte(StompConstants.NUL);
out.add(frameBuf);
ByteBuf frameBuf = encodeFrame(frame, ctx);
if (frame.content().isReadable()) {
out.add(frameBuf);
ByteBuf contentBuf = encodeContent(frame, ctx);
out.add(contentBuf);
} else {
frameBuf.writeByte(StompConstants.NUL);
out.add(frameBuf);
}
}
} else if (msg instanceof StompHeadersSubframe) {
StompHeadersSubframe frame = (StompHeadersSubframe) msg;

View File

@ -25,6 +25,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
public class StompSubframeAggregatorTest {
private EmbeddedChannel channel;
@ -63,6 +65,9 @@ public class StompSubframeAggregatorTest {
Assert.assertEquals("hello, queue a!!!", frame.content().toString(CharsetUtil.UTF_8));
frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
Assert.assertNull(channel.readInbound());
}
@ -112,6 +117,9 @@ public class StompSubframeAggregatorTest {
Assert.assertEquals(StompCommand.SEND, frame.command());
frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
Assert.assertNull(channel.readInbound());
}
@ -131,16 +139,28 @@ public class StompSubframeAggregatorTest {
Assert.assertEquals(StompCommand.CONNECTED, frame.command());
frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
frame = channel.readInbound();
Assert.assertEquals(StompCommand.SEND, frame.command());
frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
Assert.assertNull(channel.readInbound());
}
@Test(expected = TooLongFrameException.class)
public void testTooLongFrameException() {
EmbeddedChannel channel = new EmbeddedChannel(new StompSubframeDecoder(), new StompSubframeAggregator(10));
channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.SEND_FRAME_1.getBytes()));
channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.TOO_LONG_FRAME.getBytes()));
}
private static void assertHeartbeatFrame(EmbeddedChannel channel) {
StompFrame frame = channel.readInbound();
assertEquals(StompCommand.HEARTBEAT, frame.command());
frame.release();
}
}

View File

@ -74,6 +74,9 @@ public class StompSubframeDecoderTest {
assertEquals("hello, queue a!!!", s);
content.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound());
}
@ -93,6 +96,9 @@ public class StompSubframeDecoderTest {
assertEquals("hello, queue a!", s);
content.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound());
}
@ -128,6 +134,9 @@ public class StompSubframeDecoderTest {
assertEquals("!!", s);
content.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound());
}
@ -154,6 +163,9 @@ public class StompSubframeDecoderTest {
assertSame(LastStompContentSubframe.EMPTY_LAST_CONTENT, content2);
content2.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound());
}
@ -221,4 +233,34 @@ public class StompSubframeDecoderTest {
assertEquals("body", contentSubFrame.content().toString(UTF_8));
assertTrue(contentSubFrame.release());
}
@Test
public void testHeartbeatFrame() {
channel = new EmbeddedChannel(new StompSubframeDecoder(true));
ByteBuf incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_CR_LF.getBytes(UTF_8));
assertTrue(channel.writeInbound(incoming));
incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_LF.getBytes(UTF_8));
assertTrue(channel.writeInbound(incoming));
incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_CR_LF.getBytes(UTF_8));
assertTrue(channel.writeInbound(incoming));
// should have three heartbeat frames
assertHeartbeatFrame(channel);
assertHeartbeatFrame(channel);
assertHeartbeatFrame(channel);
assertNull(channel.readInbound());
}
private static void assertHeartbeatFrame(EmbeddedChannel channel) {
StompHeadersSubframe frame = channel.readInbound();
assertEquals(StompCommand.HEARTBEAT, frame.command());
StompContentSubframe content = channel.readInbound();
assertSame(LastStompContentSubframe.EMPTY_LAST_CONTENT, content);
content.release();
}
}

View File

@ -97,4 +97,17 @@ public class StompSubframeEncoderTest {
assertEquals("CONNECTED\nversion:1.2\n\n\0", stompBuffer.toString(CharsetUtil.UTF_8));
assertTrue(stompBuffer.release());
}
@Test
public void testEncodeHeartbeatFrame() {
StompFrame heartbeatFrame = new DefaultStompFrame(StompCommand.HEARTBEAT);
assertTrue(channel.writeOutbound(heartbeatFrame));
ByteBuf stompBuffer = channel.readOutbound();
assertNotNull(stompBuffer);
assertEquals("\n", stompBuffer.toString(CharsetUtil.UTF_8));
assertNull(channel.readOutbound());
}
}

View File

@ -77,5 +77,16 @@ public final class StompTestConstants {
'\n' +
"body\0";
public static final String TOO_LONG_FRAME = "SEND\n" +
"destination:/queue/a\n" +
"content-type:text/plain\n" +
'\n' +
"this is too long!" +
"\0";
public static final String HEARTBEAT_FRAME_CR_LF = "\r\n";
public static final String HEARTBEAT_FRAME_LF = "\n";
private StompTestConstants() { }
}