diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java index fd8903bc4e..af47b1a4b8 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompCommand.java @@ -32,6 +32,5 @@ public enum StompCommand { MESSAGE, RECEIPT, ERROR, - HEARTBEAT, // not an actual STOMP command, but used for 'heart-beat' functionality UNKNOWN } diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java index 17b57fe74e..4ecb09b32e 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java @@ -57,6 +57,7 @@ public class StompSubframeDecoder extends ReplayingDecoder { private static final int DEFAULT_MAX_LINE_LENGTH = 1024; enum State { + SKIP_CONTROL_CHARACTERS, READ_HEADERS, READ_CONTENT, FINALIZE_FRAME_READ, @@ -70,7 +71,6 @@ public class StompSubframeDecoder extends ReplayingDecoder { private int alreadyReadChunkSize; private LastStompContentSubframe lastContent; private long contentLength = -1; - private boolean expectNulCharacter = true; public StompSubframeDecoder() { this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE); @@ -85,7 +85,8 @@ public class StompSubframeDecoder extends ReplayingDecoder { } public StompSubframeDecoder(int maxLineLength, int maxChunkSize, boolean validateHeaders) { - super(State.READ_HEADERS); + super(State.SKIP_CONTROL_CHARACTERS); + ObjectUtil.checkPositive(maxLineLength, "maxLineLength"); ObjectUtil.checkPositive(maxChunkSize, "maxChunkSize"); this.maxChunkSize = maxChunkSize; commandParser = new Utf8LineParser(new AppendableCharSequence(16), maxLineLength); @@ -95,19 +96,17 @@ public class StompSubframeDecoder extends ReplayingDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { switch (state()) { + case SKIP_CONTROL_CHARACTERS: + skipControlCharacters(in); + checkpoint(State.READ_HEADERS); + // Fall through. case READ_HEADERS: StompCommand command = StompCommand.UNKNOWN; StompHeadersSubframe frame = null; try { command = readCommand(in); frame = new DefaultStompHeadersSubframe(command); - if (command == StompCommand.HEARTBEAT) { - // heart-beat don't have headers nor nul character, so just skip to end frame - expectNulCharacter = false; - checkpoint(State.FINALIZE_FRAME_READ); - } else { - checkpoint(readHeaders(in, frame.headers())); - } + checkpoint(readHeaders(in, frame.headers())); ctx.fireChannelRead(frame); } catch (Exception e) { if (frame == null) { @@ -169,9 +168,7 @@ public class StompSubframeDecoder extends ReplayingDecoder { } // Fall through. case FINALIZE_FRAME_READ: - if (expectNulCharacter) { - skipNullCharacter(in); - } + skipNullCharacter(in); if (lastContent == null) { lastContent = LastStompContentSubframe.EMPTY_LAST_CONTENT; } @@ -193,7 +190,7 @@ public class StompSubframeDecoder extends ReplayingDecoder { } String commandStr = commandSequence.toString(); try { - return isHeartbeat(commandStr) ? StompCommand.HEARTBEAT : StompCommand.valueOf(commandStr); + return StompCommand.valueOf(commandStr); } catch (IllegalArgumentException iae) { throw new DecoderException("Cannot to parse command " + commandStr); } @@ -222,10 +219,6 @@ public class StompSubframeDecoder extends ReplayingDecoder { return contentLength; } - private static boolean isHeartbeat(String command) { - return command.isEmpty() || (command.length() == 1 && command.charAt(0) == StompConstants.CR); - } - private static void skipNullCharacter(ByteBuf buffer) { byte b = buffer.readByte(); if (b != StompConstants.NUL) { @@ -233,12 +226,22 @@ public class StompSubframeDecoder extends ReplayingDecoder { } } + private static void skipControlCharacters(ByteBuf buffer) { + byte b; + for (;;) { + b = buffer.readByte(); + if (b != StompConstants.CR && b != StompConstants.LF) { + buffer.readerIndex(buffer.readerIndex() - 1); + break; + } + } + } + private void resetDecoder() { - checkpoint(State.READ_HEADERS); + checkpoint(State.SKIP_CONTROL_CHARACTERS); contentLength = -1; alreadyReadChunkSize = 0; lastContent = null; - expectNulCharacter = true; } private static class Utf8LineParser implements ByteProcessor { diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java index ed63304fe6..38e69f89df 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeEncoder.java @@ -17,7 +17,6 @@ package io.netty.handler.codec.stomp; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.util.CharsetUtil; @@ -29,26 +28,19 @@ import java.util.Map.Entry; * Encodes a {@link StompFrame} or a {@link StompSubframe} into a {@link ByteBuf}. */ public class StompSubframeEncoder extends MessageToMessageEncoder { - private static final ByteBuf HEARTBEAT_FRAME = - Unpooled.unreleasableBuffer(Unpooled.wrappedBuffer(new byte[] { StompConstants.LF })); @Override protected void encode(ChannelHandlerContext ctx, StompSubframe msg, List out) throws Exception { if (msg instanceof StompFrame) { StompFrame frame = (StompFrame) msg; - // HEARTBEAT command isn't a real command and should be translated differently - if (frame.command() == StompCommand.HEARTBEAT) { - out.add(HEARTBEAT_FRAME.duplicate()); + ByteBuf frameBuf = encodeFrame(frame, ctx); + if (frame.content().isReadable()) { + out.add(frameBuf); + ByteBuf contentBuf = encodeContent(frame, ctx); + out.add(contentBuf); } else { - ByteBuf frameBuf = encodeFrame(frame, ctx); - if (frame.content().isReadable()) { - out.add(frameBuf); - ByteBuf contentBuf = encodeContent(frame, ctx); - out.add(contentBuf); - } else { - frameBuf.writeByte(StompConstants.NUL); - out.add(frameBuf); - } + frameBuf.writeByte(StompConstants.NUL); + out.add(frameBuf); } } else if (msg instanceof StompHeadersSubframe) { StompHeadersSubframe frame = (StompHeadersSubframe) msg; diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java index 160b1c7d52..e88c17a4f4 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java @@ -25,8 +25,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.junit.Assert.*; - public class StompSubframeAggregatorTest { private EmbeddedChannel channel; @@ -65,9 +63,6 @@ public class StompSubframeAggregatorTest { Assert.assertEquals("hello, queue a!!!", frame.content().toString(CharsetUtil.UTF_8)); frame.release(); - // frames ending with \n also trigger a heartbeat frame after normal frame parsing - assertHeartbeatFrame(channel); - Assert.assertNull(channel.readInbound()); } @@ -117,9 +112,6 @@ public class StompSubframeAggregatorTest { Assert.assertEquals(StompCommand.SEND, frame.command()); frame.release(); - // frames ending with \n also trigger a heartbeat frame after normal frame parsing - assertHeartbeatFrame(channel); - Assert.assertNull(channel.readInbound()); } @@ -139,28 +131,16 @@ public class StompSubframeAggregatorTest { Assert.assertEquals(StompCommand.CONNECTED, frame.command()); frame.release(); - // frames ending with \n also trigger a heartbeat frame after normal frame parsing - assertHeartbeatFrame(channel); - frame = channel.readInbound(); Assert.assertEquals(StompCommand.SEND, frame.command()); frame.release(); - // frames ending with \n also trigger a heartbeat frame after normal frame parsing - assertHeartbeatFrame(channel); - Assert.assertNull(channel.readInbound()); } @Test(expected = TooLongFrameException.class) public void testTooLongFrameException() { EmbeddedChannel channel = new EmbeddedChannel(new StompSubframeDecoder(), new StompSubframeAggregator(10)); - channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.TOO_LONG_FRAME.getBytes())); - } - - private static void assertHeartbeatFrame(EmbeddedChannel channel) { - StompFrame frame = channel.readInbound(); - assertEquals(StompCommand.HEARTBEAT, frame.command()); - frame.release(); + channel.writeInbound(Unpooled.wrappedBuffer(StompTestConstants.SEND_FRAME_1.getBytes())); } } diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeDecoderTest.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeDecoderTest.java index 59f8abe825..dfea1dfae3 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeDecoderTest.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeDecoderTest.java @@ -74,9 +74,6 @@ public class StompSubframeDecoderTest { assertEquals("hello, queue a!!!", s); content.release(); - // frames ending with \n also trigger a heartbeat frame after normal frame parsing - assertHeartbeatFrame(channel); - assertNull(channel.readInbound()); } @@ -96,9 +93,6 @@ public class StompSubframeDecoderTest { assertEquals("hello, queue a!", s); content.release(); - // frames ending with \n also trigger a heartbeat frame after normal frame parsing - assertHeartbeatFrame(channel); - assertNull(channel.readInbound()); } @@ -134,9 +128,6 @@ public class StompSubframeDecoderTest { assertEquals("!!", s); content.release(); - // frames ending with \n also trigger a heartbeat frame after normal frame parsing - assertHeartbeatFrame(channel); - assertNull(channel.readInbound()); } @@ -163,9 +154,6 @@ public class StompSubframeDecoderTest { assertSame(LastStompContentSubframe.EMPTY_LAST_CONTENT, content2); content2.release(); - // frames ending with \n also trigger a heartbeat frame after normal frame parsing - assertHeartbeatFrame(channel); - assertNull(channel.readInbound()); } @@ -233,34 +221,4 @@ public class StompSubframeDecoderTest { assertEquals("body", contentSubFrame.content().toString(UTF_8)); assertTrue(contentSubFrame.release()); } - - @Test - public void testHeartbeatFrame() { - channel = new EmbeddedChannel(new StompSubframeDecoder(true)); - - ByteBuf incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_CR_LF.getBytes(UTF_8)); - assertTrue(channel.writeInbound(incoming)); - - incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_LF.getBytes(UTF_8)); - assertTrue(channel.writeInbound(incoming)); - - incoming = Unpooled.wrappedBuffer(HEARTBEAT_FRAME_CR_LF.getBytes(UTF_8)); - assertTrue(channel.writeInbound(incoming)); - - // should have three heartbeat frames - assertHeartbeatFrame(channel); - assertHeartbeatFrame(channel); - assertHeartbeatFrame(channel); - - assertNull(channel.readInbound()); - } - - private static void assertHeartbeatFrame(EmbeddedChannel channel) { - StompHeadersSubframe frame = channel.readInbound(); - assertEquals(StompCommand.HEARTBEAT, frame.command()); - - StompContentSubframe content = channel.readInbound(); - assertSame(LastStompContentSubframe.EMPTY_LAST_CONTENT, content); - content.release(); - } } diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java index 01a83f2c41..738a360d77 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeEncoderTest.java @@ -97,17 +97,4 @@ public class StompSubframeEncoderTest { assertEquals("CONNECTED\nversion:1.2\n\n\0", stompBuffer.toString(CharsetUtil.UTF_8)); assertTrue(stompBuffer.release()); } - - @Test - public void testEncodeHeartbeatFrame() { - StompFrame heartbeatFrame = new DefaultStompFrame(StompCommand.HEARTBEAT); - - assertTrue(channel.writeOutbound(heartbeatFrame)); - - ByteBuf stompBuffer = channel.readOutbound(); - assertNotNull(stompBuffer); - assertEquals("\n", stompBuffer.toString(CharsetUtil.UTF_8)); - - assertNull(channel.readOutbound()); - } } diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java index cac23d0e37..bf2efcfc6a 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java @@ -77,16 +77,5 @@ public final class StompTestConstants { '\n' + "body\0"; - public static final String TOO_LONG_FRAME = "SEND\n" + - "destination:/queue/a\n" + - "content-type:text/plain\n" + - '\n' + - "this is too long!" + - "\0"; - - public static final String HEARTBEAT_FRAME_CR_LF = "\r\n"; - - public static final String HEARTBEAT_FRAME_LF = "\n"; - private StompTestConstants() { } }