Revert "Add support for heartbeat in STOMP decoder/encoder. (#10695)" (#10766)

This reverts commit 81544ab94f.
This commit is contained in:
Benjamin Roux 2020-11-03 00:36:15 -08:00 committed by GitHub
parent 1492374f99
commit 175d1368e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 29 additions and 122 deletions

View File

@ -32,6 +32,5 @@ public enum StompCommand {
MESSAGE,
RECEIPT,
ERROR,
HEARTBEAT, // not an actual STOMP command, but used for 'heart-beat' functionality
UNKNOWN
}

View File

@ -56,6 +56,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
private static final int DEFAULT_MAX_LINE_LENGTH = 1024;
enum State {
SKIP_CONTROL_CHARACTERS,
READ_HEADERS,
READ_CONTENT,
FINALIZE_FRAME_READ,
@ -69,7 +70,6 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
private int alreadyReadChunkSize;
private LastStompContentSubframe lastContent;
private long contentLength = -1;
private boolean expectNulCharacter = true;
public StompSubframeDecoder() {
this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
@ -84,7 +84,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
}
public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) {
super(State.READ_HEADERS);
super(State.SKIP_CONTROL_CHARACTERS);
checkPositive(maxLineLength, "maxLineLength");
checkPositive(maxChunkSize, "maxChunkSize");
this.maxChunkSize = maxChunkSize;
@ -95,19 +95,17 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case SKIP_CONTROL_CHARACTERS:
skipControlCharacters(in);
checkpoint(State.READ_HEADERS);
// Fall through.
case READ_HEADERS:
StompCommand command = StompCommand.UNKNOWN;
StompHeadersSubframe frame = null;
try {
command = readCommand(in);
frame = new DefaultStompHeadersSubframe(command);
if (command == StompCommand.HEARTBEAT) {
// heart-beat don't have headers nor nul character, so just skip to end frame
expectNulCharacter = false;
checkpoint(State.FINALIZE_FRAME_READ);
} else {
checkpoint(readHeaders(in, frame.headers()));
}
checkpoint(readHeaders(in, frame.headers()));
out.add(frame);
} catch (Exception e) {
if (frame == null) {
@ -169,9 +167,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
}
// Fall through.
case FINALIZE_FRAME_READ:
if (expectNulCharacter) {
skipNullCharacter(in);
}
skipNullCharacter(in);
if (lastContent == null) {
lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT;
}
@ -193,7 +189,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
}
String commandStr = commandSequence.toString();
try {
return isHeartbeat(commandStr) ? StompCommand.HEARTBEAT : StompCommand.valueOf(commandStr);
return StompCommand.valueOf(commandStr);
} catch (IllegalArgumentException iae) {
throw new DecoderException("Cannot to parse command " + commandStr);
}
@ -222,10 +218,6 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
return contentLength;
}
private static boolean isHeartbeat(String command) {
return command.isEmpty() || (command.length() == 1 && command.charAt(0) == StompConstants.CR);
}
private static void skipNullCharacter(ByteBuf buffer) {
byte b = buffer.readByte();
if (b != StompConstants.NUL) {
@ -233,12 +225,22 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
}
}
private static void skipControlCharacters(ByteBuf buffer) {
byte b;
for (;;) {
b = buffer.readByte();
if (b != StompConstants.CR && b != StompConstants.LF) {
buffer.readerIndex(buffer.readerIndex() - 1);
break;
}
}
}
private void resetDecoder() {
checkpoint(State.READ_HEADERS);
checkpoint(State.SKIP_CONTROL_CHARACTERS);
contentLength = -1;
alreadyReadChunkSize = 0;
lastContent = null;
expectNulCharacter = true;
}
private static class Utf8LineParser implements ByteProcessor {

View File

@ -17,7 +17,6 @@ package io.netty.handler.codec.stomp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;
@ -29,26 +28,19 @@ import java.util.Map.Entry;
* Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}.
*/
public class StompSubframeEncoder extends MessageToMessageEncoder<StompSubframe> {
private static final ByteBuf HEARTBEAT_FRAME =
Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { StompConstants.LF }));
@Override
protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List<Object> out) throws Exception {
if (msg instanceof StompFrame) {
StompFrame frame = (StompFrame) msg;
// HEARTBEAT command isn't a real command and should be translated differently
if (frame.command() == StompCommand.HEARTBEAT) {
out.add(HEARTBEAT_FRAME.duplicate());
ByteBuf frameBuf = encodeFrame(frame, ctx);
if (frame.content().isReadable()) {
out.add(frameBuf);
ByteBuf contentBuf = encodeContent(frame, ctx);
out.add(contentBuf);
} else {
ByteBuf frameBuf = encodeFrame(frame, ctx);
if (frame.content().isReadable()) {
out.add(frameBuf);
ByteBuf contentBuf = encodeContent(frame, ctx);
out.add(contentBuf);
} else {
frameBuf.writeByte(StompConstants.NUL);
out.add(frameBuf);
}
frameBuf.writeByte(StompConstants.NUL);
out.add(frameBuf);
}
} else if (msg instanceof StompHeadersSubframe) {
StompHeadersSubframe frame = (StompHeadersSubframe) msg;

View File

@ -25,8 +25,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
public class StompSubframeAggregatorTest {
private EmbeddedChannel channel;
@ -65,9 +63,6 @@ public class StompSubframeAggregatorTest {
Assert.assertEquals("hello, queue a!!!", frame.content().toString(CharsetUtil.UTF_8));
frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
Assert.assertNull(channel.readInbound());
}
@ -117,9 +112,6 @@ public class StompSubframeAggregatorTest {
Assert.assertEquals(StompCommand.SEND, frame.command());
frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
Assert.assertNull(channel.readInbound());
}
@ -139,28 +131,16 @@ public class StompSubframeAggregatorTest {
Assert.assertEquals(StompCommand.CONNECTED, frame.command());
frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
frame = channel.readInbound();
Assert.assertEquals(StompCommand.SEND, frame.command());
frame.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
Assert.assertNull(channel.readInbound());
}
@Test(expected = TooLongFrameException.class)
public void testTooLongFrameException() {
EmbeddedChannel channel = new EmbeddedChannel(new StompSubframeDecoder(), new StompSubframeAggregator(10));
channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.TOO_LONG_FRAME.getBytes()));
}
private static void assertHeartbeatFrame(EmbeddedChannel channel) {
StompFrame frame = channel.readInbound();
assertEquals(StompCommand.HEARTBEAT, frame.command());
frame.release();
channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.SEND_FRAME_1.getBytes()));
}
}

View File

@ -74,9 +74,6 @@ public class StompSubframeDecoderTest {
assertEquals("hello, queue a!!!", s);
content.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound());
}
@ -96,9 +93,6 @@ public class StompSubframeDecoderTest {
assertEquals("hello, queue a!", s);
content.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound());
}
@ -134,9 +128,6 @@ public class StompSubframeDecoderTest {
assertEquals("!!", s);
content.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound());
}
@ -163,9 +154,6 @@ public class StompSubframeDecoderTest {
assertSame(LastStompContentSubframe.EMPTY_LAST_CONTENT, content2);
content2.release();
// frames ending with \n also trigger a heartbeat frame after normal frame parsing
assertHeartbeatFrame(channel);
assertNull(channel.readInbound());
}
@ -233,34 +221,4 @@ public class StompSubframeDecoderTest {
assertEquals("body", contentSubFrame.content().toString(UTF_8));
assertTrue(contentSubFrame.release());
}
@Test
public void testHeartbeatFrame() {
channel = new EmbeddedChannel(new StompSubframeDecoder(true));
ByteBuf incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_CR_LF.getBytes(UTF_8));
assertTrue(channel.writeInbound(incoming));
incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_LF.getBytes(UTF_8));
assertTrue(channel.writeInbound(incoming));
incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_CR_LF.getBytes(UTF_8));
assertTrue(channel.writeInbound(incoming));
// should have three heartbeat frames
assertHeartbeatFrame(channel);
assertHeartbeatFrame(channel);
assertHeartbeatFrame(channel);
assertNull(channel.readInbound());
}
private static void assertHeartbeatFrame(EmbeddedChannel channel) {
StompHeadersSubframe frame = channel.readInbound();
assertEquals(StompCommand.HEARTBEAT, frame.command());
StompContentSubframe content = channel.readInbound();
assertSame(LastStompContentSubframe.EMPTY_LAST_CONTENT, content);
content.release();
}
}

View File

@ -97,17 +97,4 @@ public class StompSubframeEncoderTest {
assertEquals("CONNECTED\nversion:1.2\n\n\0", stompBuffer.toString(CharsetUtil.UTF_8));
assertTrue(stompBuffer.release());
}
@Test
public void testEncodeHeartbeatFrame() {
StompFrame heartbeatFrame = new DefaultStompFrame(StompCommand.HEARTBEAT);
assertTrue(channel.writeOutbound(heartbeatFrame));
ByteBuf stompBuffer = channel.readOutbound();
assertNotNull(stompBuffer);
assertEquals("\n", stompBuffer.toString(CharsetUtil.UTF_8));
assertNull(channel.readOutbound());
}
}

View File

@ -77,16 +77,5 @@ public final class StompTestConstants {
'\n' +
"body\0";
public static final String TOO_LONG_FRAME = "SEND\n" +
"destination:/queue/a\n" +
"content-type:text/plain\n" +
'\n' +
"this is too long!" +
"\0";
public static final String HEARTBEAT_FRAME_CR_LF = "\r\n";
public static final String HEARTBEAT_FRAME_LF = "\n";
private StompTestConstants() { }
}