From 81544ab94f831dfe1782ab5cad917626c6b4a169 Mon Sep 17 00:00:00 2001 From: Benjamin Roux Date: Fri, 30 Oct 2020 06:32:57 -0700 Subject: [PATCH] Add support for heartbeat in STOMP decoder/encoder. (#10695) Motivation: Heart-beat is a functionality of STOMP enabling clients and servers to know the healthiness of the connection. The current decoder didn't allow for heart-beat messages to be forwarded to the decoder and were simply swallowed as part of the frame decoding. Modifications: Adding support for heartbeat message parsing by introducing a new HEARTBEAT command (not a real STOMP command). Heartbeat received on the channel will trigger a StompFrame with the command set to HEARTBEAT. Sending heartbeat on the channel is achieved by creating a StompFrame with the command set to HEARTBEAT. Result: Heartbeat can now be received/sent and acted upon to determine the healthiness of the connection and terminate it if needed. --- .../handler/codec/stomp/StompCommand.java | 1 + .../codec/stomp/StompSubframeDecoder.java | 40 +++++++++--------- .../codec/stomp/StompSubframeEncoder.java | 22 ++++++---- .../stomp/StompSubframeAggregatorTest.java | 22 +++++++++- .../codec/stomp/StompSubframeDecoderTest.java | 42 +++++++++++++++++++ .../codec/stomp/StompSubframeEncoderTest.java | 13 ++++++ .../codec/stomp/StompTestConstants.java | 11 +++++ 7 files changed, 122 insertions(+), 29 deletions(-) diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java index af47b1a4b8..fd8903bc4e 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java @@ -32,5 +32,6 @@ public enum StompCommand { MESSAGE, RECEIPT, ERROR, + HEARTBEAT, // not an actual STOMP command, but used for 'heart-beat' functionality UNKNOWN } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java index a786e69969..1fe0fd225e 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java @@ -56,7 +56,6 @@ public class StompSubframeDecoder extends ReplayingDecoder { private static final int DEFAULT_MAX_LINE_LENGTH = 1024; enum State { - SKIP_CONTROL_CHARACTERS, READ_HEADERS, READ_CONTENT, FINALIZE_FRAME_READ, @@ -70,6 +69,7 @@ public class StompSubframeDecoder extends ReplayingDecoder { private int alreadyReadChunkSize; private LastStompContentSubframe lastContent; private long contentLength = -1; + private boolean expectNulCharacter = true; public StompSubframeDecoder() { this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE); @@ -84,7 +84,7 @@ public class StompSubframeDecoder extends ReplayingDecoder { } public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) { - super(State.SKIP_CONTROL_CHARACTERS); + super(State.READ_HEADERS); checkPositive(maxLineLength, "maxLineLength"); checkPositive(maxChunkSize, "maxChunkSize"); this.maxChunkSize = maxChunkSize; @@ -95,17 +95,19 @@ public class StompSubframeDecoder extends ReplayingDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { switch (state()) { - case SKIP_CONTROL_CHARACTERS: - skipControlCharacters(in); - checkpoint(State.READ_HEADERS); - // Fall through. case READ_HEADERS: StompCommand command = StompCommand.UNKNOWN; StompHeadersSubframe frame = null; try { command = readCommand(in); frame = new DefaultStompHeadersSubframe(command); - checkpoint(readHeaders(in, frame.headers())); + if (command == StompCommand.HEARTBEAT) { + // heart-beat don't have headers nor nul character, so just skip to end frame + expectNulCharacter = false; + checkpoint(State.FINALIZE_FRAME_READ); + } else { + checkpoint(readHeaders(in, frame.headers())); + } out.add(frame); } catch (Exception e) { if (frame == null) { @@ -167,7 +169,9 @@ public class StompSubframeDecoder extends ReplayingDecoder { } // Fall through. case FINALIZE_FRAME_READ: - skipNullCharacter(in); + if (expectNulCharacter) { + skipNullCharacter(in); + } if (lastContent == null) { lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT; } @@ -189,7 +193,7 @@ public class StompSubframeDecoder extends ReplayingDecoder { } String commandStr = commandSequence.toString(); try { - return StompCommand.valueOf(commandStr); + return isHeartbeat(commandStr) ? StompCommand.HEARTBEAT : StompCommand.valueOf(commandStr); } catch (IllegalArgumentException iae) { throw new DecoderException("Cannot to parse command " + commandStr); } @@ -218,6 +222,10 @@ public class StompSubframeDecoder extends ReplayingDecoder { return contentLength; } + private static boolean isHeartbeat(String command) { + return command.isEmpty() || (command.length() == 1 && command.charAt(0) == StompConstants.CR); + } + private static void skipNullCharacter(ByteBuf buffer) { byte b = buffer.readByte(); if (b != StompConstants.NUL) { @@ -225,22 +233,12 @@ public class StompSubframeDecoder extends ReplayingDecoder { } } - private static void skipControlCharacters(ByteBuf buffer) { - byte b; - for (;;) { - b = buffer.readByte(); - if (b != StompConstants.CR && b != StompConstants.LF) { - buffer.readerIndex(buffer.readerIndex() - 1); - break; - } - } - } - private void resetDecoder() { - checkpoint(State.SKIP_CONTROL_CHARACTERS); + checkpoint(State.READ_HEADERS); contentLength = -1; alreadyReadChunkSize = 0; lastContent = null; + expectNulCharacter = true; } private static class Utf8LineParser implements ByteProcessor { diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java index 38e69f89df..ed63304fe6 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.stomp; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.util.CharsetUtil; @@ -28,19 +29,26 @@ import java.util.Map.Entry; * Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}. */ public class StompSubframeEncoder extends MessageToMessageEncoder { + private static final ByteBuf HEARTBEAT_FRAME = + Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { StompConstants.LF })); @Override protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List out) throws Exception { if (msg instanceof StompFrame) { StompFrame frame = (StompFrame) msg; - ByteBuf frameBuf = encodeFrame(frame, ctx); - if (frame.content().isReadable()) { - out.add(frameBuf); - ByteBuf contentBuf = encodeContent(frame, ctx); - out.add(contentBuf); + // HEARTBEAT command isn't a real command and should be translated differently + if (frame.command() == StompCommand.HEARTBEAT) { + out.add(HEARTBEAT_FRAME.duplicate()); } else { - frameBuf.writeByte(StompConstants.NUL); - out.add(frameBuf); + ByteBuf frameBuf = encodeFrame(frame, ctx); + if (frame.content().isReadable()) { + out.add(frameBuf); + ByteBuf contentBuf = encodeContent(frame, ctx); + out.add(contentBuf); + } else { + frameBuf.writeByte(StompConstants.NUL); + out.add(frameBuf); + } } } else if (msg instanceof StompHeadersSubframe) { StompHeadersSubframe frame = (StompHeadersSubframe) msg; diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java index e88c17a4f4..160b1c7d52 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java @@ -25,6 +25,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.*; + public class StompSubframeAggregatorTest { private EmbeddedChannel channel; @@ -63,6 +65,9 @@ public class StompSubframeAggregatorTest { Assert.assertEquals("hello, queue a!!!", frame.content().toString(CharsetUtil.UTF_8)); frame.release(); + // frames ending with \n also trigger a heartbeat frame after normal frame parsing + assertHeartbeatFrame(channel); + Assert.assertNull(channel.readInbound()); } @@ -112,6 +117,9 @@ public class StompSubframeAggregatorTest { Assert.assertEquals(StompCommand.SEND, frame.command()); frame.release(); + // frames ending with \n also trigger a heartbeat frame after normal frame parsing + assertHeartbeatFrame(channel); + Assert.assertNull(channel.readInbound()); } @@ -131,16 +139,28 @@ public class StompSubframeAggregatorTest { Assert.assertEquals(StompCommand.CONNECTED, frame.command()); frame.release(); + // frames ending with \n also trigger a heartbeat frame after normal frame parsing + assertHeartbeatFrame(channel); + frame = channel.readInbound(); Assert.assertEquals(StompCommand.SEND, frame.command()); frame.release(); + // frames ending with \n also trigger a heartbeat frame after normal frame parsing + assertHeartbeatFrame(channel); + Assert.assertNull(channel.readInbound()); } @Test(expected = TooLongFrameException.class) public void testTooLongFrameException() { EmbeddedChannel channel = new EmbeddedChannel(new StompSubframeDecoder(), new StompSubframeAggregator(10)); - channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.SEND_FRAME_1.getBytes())); + channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.TOO_LONG_FRAME.getBytes())); + } + + private static void assertHeartbeatFrame(EmbeddedChannel channel) { + StompFrame frame = channel.readInbound(); + assertEquals(StompCommand.HEARTBEAT, frame.command()); + frame.release(); } } diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeDecoderTest.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeDecoderTest.java index dfea1dfae3..59f8abe825 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeDecoderTest.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeDecoderTest.java @@ -74,6 +74,9 @@ public class StompSubframeDecoderTest { assertEquals("hello, queue a!!!", s); content.release(); + // frames ending with \n also trigger a heartbeat frame after normal frame parsing + assertHeartbeatFrame(channel); + assertNull(channel.readInbound()); } @@ -93,6 +96,9 @@ public class StompSubframeDecoderTest { assertEquals("hello, queue a!", s); content.release(); + // frames ending with \n also trigger a heartbeat frame after normal frame parsing + assertHeartbeatFrame(channel); + assertNull(channel.readInbound()); } @@ -128,6 +134,9 @@ public class StompSubframeDecoderTest { assertEquals("!!", s); content.release(); + // frames ending with \n also trigger a heartbeat frame after normal frame parsing + assertHeartbeatFrame(channel); + assertNull(channel.readInbound()); } @@ -154,6 +163,9 @@ public class StompSubframeDecoderTest { assertSame(LastStompContentSubframe.EMPTY_LAST_CONTENT, content2); content2.release(); + // frames ending with \n also trigger a heartbeat frame after normal frame parsing + assertHeartbeatFrame(channel); + assertNull(channel.readInbound()); } @@ -221,4 +233,34 @@ public class StompSubframeDecoderTest { assertEquals("body", contentSubFrame.content().toString(UTF_8)); assertTrue(contentSubFrame.release()); } + + @Test + public void testHeartbeatFrame() { + channel = new EmbeddedChannel(new StompSubframeDecoder(true)); + + ByteBuf incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_CR_LF.getBytes(UTF_8)); + assertTrue(channel.writeInbound(incoming)); + + incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_LF.getBytes(UTF_8)); + assertTrue(channel.writeInbound(incoming)); + + incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_CR_LF.getBytes(UTF_8)); + assertTrue(channel.writeInbound(incoming)); + + // should have three heartbeat frames + assertHeartbeatFrame(channel); + assertHeartbeatFrame(channel); + assertHeartbeatFrame(channel); + + assertNull(channel.readInbound()); + } + + private static void assertHeartbeatFrame(EmbeddedChannel channel) { + StompHeadersSubframe frame = channel.readInbound(); + assertEquals(StompCommand.HEARTBEAT, frame.command()); + + StompContentSubframe content = channel.readInbound(); + assertSame(LastStompContentSubframe.EMPTY_LAST_CONTENT, content); + content.release(); + } } diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java index 8cdf4422bb..d5ae1e26a9 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java @@ -97,4 +97,17 @@ public class StompSubframeEncoderTest { assertEquals("CONNECTED\nversion:1.2\n\n\0", stompBuffer.toString(CharsetUtil.UTF_8)); assertTrue(stompBuffer.release()); } + + @Test + public void testEncodeHeartbeatFrame() { + StompFrame heartbeatFrame = new DefaultStompFrame(StompCommand.HEARTBEAT); + + assertTrue(channel.writeOutbound(heartbeatFrame)); + + ByteBuf stompBuffer = channel.readOutbound(); + assertNotNull(stompBuffer); + assertEquals("\n", stompBuffer.toString(CharsetUtil.UTF_8)); + + assertNull(channel.readOutbound()); + } } diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java index bf2efcfc6a..cac23d0e37 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java @@ -77,5 +77,16 @@ public final class StompTestConstants { '\n' + "body\0"; + public static final String TOO_LONG_FRAME = "SEND\n" + + "destination:/queue/a\n" + + "content-type:text/plain\n" + + '\n' + + "this is too long!" + + "\0"; + + public static final String HEARTBEAT_FRAME_CR_LF = "\r\n"; + + public static final String HEARTBEAT_FRAME_LF = "\n"; + private StompTestConstants() { } }