Revert "Add support for heartbeat in STOMP decoder/encoder. (#10695)"

This reverts commit 78d5ab4027e49a8931478270e27c532156a591b8.
This commit is contained in:
Norman Maurer 2020-11-03 09:36:40 +01:00
parent 3dce4de50f
commit d16c23d3f5
7 changed files with 30 additions and 122 deletions

View File

@ -32,6 +32,5 @@ public enum StompCommand {
MESSAGE, MESSAGE,
RECEIPT, RECEIPT,
ERROR, ERROR,
HEARTBEAT, // not an actual STOMP command, but used for 'heart-beat' functionality
UNKNOWN UNKNOWN
} }

View File

@ -57,6 +57,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
private static final int DEFAULT_MAX_LINE_LENGTH = 1024; private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
enum State { enum State {
SKIP_CONTROL_CHARACTERS,
READ_HEADERS, READ_HEADERS,
READ_CONTENT, READ_CONTENT,
FINALIZE_FRAME_READ, FINALIZE_FRAME_READ,
@ -70,7 +71,6 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
private int alreadyReadChunkSize; private int alreadyReadChunkSize;
private LastStompContentSubframe lastContent; private LastStompContentSubframe lastContent;
private long contentLength = -1; private long contentLength = -1;
private boolean expectNulCharacter = true;
public StompSubframeDecoder() { public StompSubframeDecoder() {
this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE); this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
@ -85,7 +85,8 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
} }
public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) { public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) {
super(State.READ_HEADERS); super(State.SKIP_CONTROL_CHARACTERS);
ObjectUtil.checkPositive(maxLineLength, "maxLineLength");
ObjectUtil.checkPositive(maxChunkSize, "maxChunkSize"); ObjectUtil.checkPositive(maxChunkSize, "maxChunkSize");
this.maxChunkSize = maxChunkSize; this.maxChunkSize = maxChunkSize;
commandParser = new Utf8LineParser(new AppendableCharSequence(16), maxLineLength); commandParser = new Utf8LineParser(new AppendableCharSequence(16), maxLineLength);
@ -95,19 +96,17 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
@Override @Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { protected void decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
switch (state()) { switch (state()) {
case SKIP_CONTROL_CHARACTERS:
skipControlCharacters(in);
checkpoint(State.READ_HEADERS);
// Fall through.
case READ_HEADERS: case READ_HEADERS:
StompCommand command = StompCommand.UNKNOWN; StompCommand command = StompCommand.UNKNOWN;
StompHeadersSubframe frame = null; StompHeadersSubframe frame = null;
try { try {
command = readCommand(in); command = readCommand(in);
frame = new DefaultStompHeadersSubframe(command); frame = new DefaultStompHeadersSubframe(command);
if (command == StompCommand.HEARTBEAT) {
// heart-beat don't have headers nor nul character, so just skip to end frame
expectNulCharacter = false;
checkpoint(State.FINALIZE_FRAME_READ);
} else {
checkpoint(readHeaders(in, frame.headers())); checkpoint(readHeaders(in, frame.headers()));
}
ctx.fireChannelRead(frame); ctx.fireChannelRead(frame);
} catch (Exception e) { } catch (Exception e) {
if (frame == null) { if (frame == null) {
@ -169,9 +168,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
} }
// Fall through. // Fall through.
case FINALIZE_FRAME_READ: case FINALIZE_FRAME_READ:
if (expectNulCharacter) {
skipNullCharacter(in); skipNullCharacter(in);
}
if (lastContent == null) { if (lastContent == null) {
lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT; lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
} }
@ -193,7 +190,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
} }
String commandStr = commandSequence.toString(); String commandStr = commandSequence.toString();
try { try {
return isHeartbeat(commandStr) ? StompCommand.HEARTBEAT : StompCommand.valueOf(commandStr); return StompCommand.valueOf(commandStr);
} catch (IllegalArgumentException iae) { } catch (IllegalArgumentException iae) {
throw new DecoderException("Cannot to parse command " + commandStr); throw new DecoderException("Cannot to parse command " + commandStr);
} }
@ -222,10 +219,6 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
return contentLength; return contentLength;
} }
private static boolean isHeartbeat(String command) {
return command.isEmpty() || (command.length() == 1 && command.charAt(0) == StompConstants.CR);
}
private static void skipNullCharacter(ByteBuf buffer) { private static void skipNullCharacter(ByteBuf buffer) {
byte b = buffer.readByte(); byte b = buffer.readByte();
if (b != StompConstants.NUL) { if (b != StompConstants.NUL) {
@ -233,12 +226,22 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
} }
} }
private static void skipControlCharacters(ByteBuf buffer) {
byte b;
for (;;) {
b = buffer.readByte();
if (b != StompConstants.CR && b != StompConstants.LF) {
buffer.readerIndex(buffer.readerIndex() - 1);
break;
}
}
}
private void resetDecoder() { private void resetDecoder() {
checkpoint(State.READ_HEADERS); checkpoint(State.SKIP_CONTROL_CHARACTERS);
contentLength = -1; contentLength = -1;
alreadyReadChunkSize = 0; alreadyReadChunkSize = 0;
lastContent = null; lastContent = null;
expectNulCharacter = true;
} }
private static class Utf8LineParser implements ByteProcessor { private static class Utf8LineParser implements ByteProcessor {

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec.stomp;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
@ -29,17 +28,11 @@ import java.util.Map.Entry;
* Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}. * Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}.
*/ */
public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe> { public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe> {
private static final ByteBuf HEARTBEAT_FRAME =
Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { StompConstants.LF }));
@Override @Override
protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception { protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
if (msg instanceof StompFrame) { if (msg instanceof StompFrame) {
StompFrame frame = (StompFrame) msg; StompFrame frame = (StompFrame) msg;
// HEARTBEAT command isn't a real command and should be translated differently
if (frame.command() == StompCommand.HEARTBEAT) {
out.add(HEARTBEAT_FRAME.duplicate());
} else {
ByteBuf frameBuf = encodeFrame(frame, ctx); ByteBuf frameBuf = encodeFrame(frame, ctx);
if (frame.content().isReadable()) { if (frame.content().isReadable()) {
out.add(frameBuf); out.add(frameBuf);
@ -49,7 +42,6 @@ public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe>
frameBuf.writeByte(StompConstants.NUL); frameBuf.writeByte(StompConstants.NUL);
out.add(frameBuf); out.add(frameBuf);
} }
}
} else if (msg instanceof StompHeadersSubframe) { } else if (msg instanceof StompHeadersSubframe) {
StompHeadersSubframe frame = (StompHeadersSubframe) msg; StompHeadersSubframe frame = (StompHeadersSubframe) msg;
ByteBuf buf = encodeFrame(frame, ctx); ByteBuf buf = encodeFrame(frame, ctx);

View File

@ -25,8 +25,6 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*;
public class StompSubframeAggregatorTest { public class StompSubframeAggregatorTest {
private EmbeddedChannel channel; private EmbeddedChannel channel;
@ -65,9 +63,6 @@ public class StompSubframeAggregatorTest {
Assert.assertEquals("hello, queue a!!!", frame.content().toString(CharsetUtil.UTF_8)); Assert.assertEquals("hello, queue a!!!", frame.content().toString(CharsetUtil.UTF_8));
frame.release(); frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
Assert.assertNull(channel.readInbound()); Assert.assertNull(channel.readInbound());
} }
@ -117,9 +112,6 @@ public class StompSubframeAggregatorTest {
Assert.assertEquals(StompCommand.SEND, frame.command()); Assert.assertEquals(StompCommand.SEND, frame.command());
frame.release(); frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
Assert.assertNull(channel.readInbound()); Assert.assertNull(channel.readInbound());
} }
@ -139,28 +131,16 @@ public class StompSubframeAggregatorTest {
Assert.assertEquals(StompCommand.CONNECTED, frame.command()); Assert.assertEquals(StompCommand.CONNECTED, frame.command());
frame.release(); frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
frame = channel.readInbound(); frame = channel.readInbound();
Assert.assertEquals(StompCommand.SEND, frame.command()); Assert.assertEquals(StompCommand.SEND, frame.command());
frame.release(); frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
Assert.assertNull(channel.readInbound()); Assert.assertNull(channel.readInbound());
} }
@Test(expected = TooLongFrameException.class) @Test(expected = TooLongFrameException.class)
public void testTooLongFrameException() { public void testTooLongFrameException() {
EmbeddedChannel channel = new EmbeddedChannel(new StompSubframeDecoder(), new StompSubframeAggregator(10)); EmbeddedChannel channel = new EmbeddedChannel(new StompSubframeDecoder(), new StompSubframeAggregator(10));
channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.TOO_LONG_FRAME.getBytes())); channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.SEND_FRAME_1.getBytes()));
}
private static void assertHeartbeatFrame(EmbeddedChannel channel) {
StompFrame frame = channel.readInbound();
assertEquals(StompCommand.HEARTBEAT, frame.command());
frame.release();
} }
} }

View File

@ -74,9 +74,6 @@ public class StompSubframeDecoderTest {
assertEquals("hello, queue a!!!", s); assertEquals("hello, queue a!!!", s);
content.release(); content.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound()); assertNull(channel.readInbound());
} }
@ -96,9 +93,6 @@ public class StompSubframeDecoderTest {
assertEquals("hello, queue a!", s); assertEquals("hello, queue a!", s);
content.release(); content.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound()); assertNull(channel.readInbound());
} }
@ -134,9 +128,6 @@ public class StompSubframeDecoderTest {
assertEquals("!!", s); assertEquals("!!", s);
content.release(); content.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound()); assertNull(channel.readInbound());
} }
@ -163,9 +154,6 @@ public class StompSubframeDecoderTest {
assertSame(LastStompContentSubframe.EMPTY_LAST_CONTENT, content2); assertSame(LastStompContentSubframe.EMPTY_LAST_CONTENT, content2);
content2.release(); content2.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound()); assertNull(channel.readInbound());
} }
@ -233,34 +221,4 @@ public class StompSubframeDecoderTest {
assertEquals("body", contentSubFrame.content().toString(UTF_8)); assertEquals("body", contentSubFrame.content().toString(UTF_8));
assertTrue(contentSubFrame.release()); assertTrue(contentSubFrame.release());
} }
@Test
public void testHeartbeatFrame() {
channel = new EmbeddedChannel(new StompSubframeDecoder(true));
ByteBuf incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_CR_LF.getBytes(UTF_8));
assertTrue(channel.writeInbound(incoming));
incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_LF.getBytes(UTF_8));
assertTrue(channel.writeInbound(incoming));
incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_CR_LF.getBytes(UTF_8));
assertTrue(channel.writeInbound(incoming));
// should have three heartbeat frames
assertHeartbeatFrame(channel);
assertHeartbeatFrame(channel);
assertHeartbeatFrame(channel);
assertNull(channel.readInbound());
}
private static void assertHeartbeatFrame(EmbeddedChannel channel) {
StompHeadersSubframe frame = channel.readInbound();
assertEquals(StompCommand.HEARTBEAT, frame.command());
StompContentSubframe content = channel.readInbound();
assertSame(LastStompContentSubframe.EMPTY_LAST_CONTENT, content);
content.release();
}
} }

View File

@ -97,17 +97,4 @@ public class StompSubframeEncoderTest {
assertEquals("CONNECTED\nversion:1.2\n\n\0", stompBuffer.toString(CharsetUtil.UTF_8)); assertEquals("CONNECTED\nversion:1.2\n\n\0", stompBuffer.toString(CharsetUtil.UTF_8));
assertTrue(stompBuffer.release()); assertTrue(stompBuffer.release());
} }
@Test
public void testEncodeHeartbeatFrame() {
StompFrame heartbeatFrame = new DefaultStompFrame(StompCommand.HEARTBEAT);
assertTrue(channel.writeOutbound(heartbeatFrame));
ByteBuf stompBuffer = channel.readOutbound();
assertNotNull(stompBuffer);
assertEquals("\n", stompBuffer.toString(CharsetUtil.UTF_8));
assertNull(channel.readOutbound());
}
} }

View File

@ -77,16 +77,5 @@ public final class StompTestConstants {
'\n' + '\n' +
"body\0"; "body\0";
public static final String TOO_LONG_FRAME = "SEND\n" +
"destination:/queue/a\n" +
"content-type:text/plain\n" +
'\n' +
"this is too long!" +
"\0";
public static final String HEARTBEAT_FRAME_CR_LF = "\r\n";
public static final String HEARTBEAT_FRAME_LF = "\n";
private StompTestConstants() { } private StompTestConstants() { }
} }