From acf6693b1909ab5a04978bc03610934de3a0563b Mon Sep 17 00:00:00 2001 From: Scott Van Wart Date: Fri, 25 Sep 2015 09:29:11 -0300 Subject: [PATCH] Better parsing for STOMP body with no length. Motivation: The STOMP decoder used to fail when parsing a frame with no content-length and a body split across multiple packets. Modifications: Support contentLength of -1 (indicating indeterminate length) and added a check to getContentLength. Moved the NUL byte searching from the readHeaders() method out to the main decoder loop. Result: A STOMP frame can be properly parsed even if it's missing the content-length header and the NUL byte is in a later packet. --- .../codec/stomp/StompSubframeDecoder.java | 76 +++++++++++-------- .../stomp/StompSubframeAggregatorTest.java | 33 ++++++++ .../codec/stomp/StompTestConstants.java | 15 ++++ 3 files changed, 93 insertions(+), 31 deletions(-) diff --git a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java index 101ea6aaca..0172f7921e 100644 --- a/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java +++ b/codec-stomp/src/main/java/io/netty/handler/codec/stomp/StompSubframeDecoder.java @@ -29,7 +29,8 @@ import io.netty.util.internal.StringUtil; import java.util.List; import java.util.Locale; -import static io.netty.buffer.ByteBufUtil.*; +import static io.netty.buffer.ByteBufUtil.indexOf; +import static io.netty.buffer.ByteBufUtil.readBytes; /** * Decodes {@link ByteBuf}s into {@link StompHeadersSubframe}s and @@ -73,7 +74,7 @@ public class StompSubframeDecoder extends ReplayingDecoder { private final int maxChunkSize; private int alreadyReadChunkSize; private LastStompContentSubframe lastContent; - private long contentLength; + private long contentLength = -1; public StompSubframeDecoder() { this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE); @@ -134,21 +135,39 @@ public class StompSubframeDecoder extends ReplayingDecoder { if (toRead > maxChunkSize) { toRead = maxChunkSize; } - int remainingLength = (int) (contentLength - alreadyReadChunkSize); - if (toRead > remainingLength) { - toRead = remainingLength; - } - ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead); - if ((alreadyReadChunkSize += toRead) >= contentLength) { - lastContent = new DefaultLastStompContentSubframe(chunkBuffer); - checkpoint(State.FINALIZE_FRAME_READ); + if (this.contentLength >= 0) { + int remainingLength = (int) (contentLength - alreadyReadChunkSize); + if (toRead > remainingLength) { + toRead = remainingLength; + } + ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead); + if ((alreadyReadChunkSize += toRead) >= contentLength) { + lastContent = new DefaultLastStompContentSubframe(chunkBuffer); + checkpoint(State.FINALIZE_FRAME_READ); + } else { + out.add(new DefaultStompContentSubframe(chunkBuffer)); + return; + } } else { - DefaultStompContentSubframe chunk; - chunk = new DefaultStompContentSubframe(chunkBuffer); - out.add(chunk); - } - if (alreadyReadChunkSize < contentLength) { - return; + int nulIndex = indexOf(in, in.readerIndex(), in.writerIndex(), StompConstants.NUL); + if (nulIndex == in.readerIndex()) { + checkpoint(State.FINALIZE_FRAME_READ); + } else { + if (nulIndex > 0) { + toRead = nulIndex - in.readerIndex(); + } else { + toRead = in.writerIndex() - in.readerIndex(); + } + ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead); + alreadyReadChunkSize += toRead; + if (nulIndex > 0) { + lastContent = new DefaultLastStompContentSubframe(chunkBuffer); + checkpoint(State.FINALIZE_FRAME_READ); + } else { + out.add(new DefaultStompContentSubframe(chunkBuffer)); + return; + } + } } // Fall through. case FINALIZE_FRAME_READ: @@ -198,28 +217,23 @@ public class StompSubframeDecoder extends ReplayingDecoder { headers.add(split[0], split[1]); } } else { - long contentLength = -1; if (headers.contains(StompHeaders.CONTENT_LENGTH)) { - contentLength = getContentLength(headers, 0); - } else { - int globalIndex = indexOf(buffer, buffer.readerIndex(), - buffer.writerIndex(), StompConstants.NUL); - if (globalIndex != -1) { - contentLength = globalIndex - buffer.readerIndex(); + this.contentLength = getContentLength(headers, 0); + if (this.contentLength == 0) { + return State.FINALIZE_FRAME_READ; } } - if (contentLength > 0) { - this.contentLength = contentLength; - return State.READ_CONTENT; - } else { - return State.FINALIZE_FRAME_READ; - } + return State.READ_CONTENT; } } } private static long getContentLength(StompHeaders headers, long defaultValue) { - return headers.getLong(StompHeaders.CONTENT_LENGTH, defaultValue); + long contentLength = headers.getLong(StompHeaders.CONTENT_LENGTH, defaultValue); + if (contentLength < 0) { + throw new DecoderException(StompHeaders.CONTENT_LENGTH + " must be non-negative"); + } + return contentLength; } private static void skipNullCharacter(ByteBuf buffer) { @@ -264,7 +278,7 @@ public class StompSubframeDecoder extends ReplayingDecoder { private void resetDecoder() { checkpoint(State.SKIP_CONTROL_CHARACTERS); - contentLength = 0; + contentLength = -1; alreadyReadChunkSize = 0; lastContent = null; } diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java index 5f896b9691..9c116938c5 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompSubframeAggregatorTest.java @@ -66,6 +66,39 @@ public class StompSubframeAggregatorTest { Assert.assertNull(channel.readInbound()); } + @Test + public void testSingleFrameWithBodyAndNoContentLength() { + ByteBuf incoming = Unpooled.buffer(); + incoming.writeBytes(StompTestConstants.SEND_FRAME_4.getBytes()); + channel.writeInbound(incoming); + + StompFrame frame = channel.readInbound(); + Assert.assertNotNull(frame); + Assert.assertEquals(StompCommand.SEND, frame.command()); + Assert.assertEquals("body", frame.content().toString(CharsetUtil.UTF_8)); + frame.release(); + + Assert.assertNull(channel.readInbound()); + } + + @Test + public void testSingleFrameWithSplitBodyAndNoContentLength() { + for (int n = 0; n < StompTestConstants.SEND_FRAMES_3.length; ++n) { + ByteBuf incoming = Unpooled.buffer(); + incoming.writeBytes(StompTestConstants.SEND_FRAMES_3[n].getBytes()); + channel.writeInbound(incoming); + channel.flush(); + } + + StompFrame frame = channel.readInbound(); + Assert.assertNotNull(frame); + Assert.assertEquals(StompCommand.SEND, frame.command()); + Assert.assertEquals("first part of body\nsecond part of body", frame.content().toString(CharsetUtil.UTF_8)); + frame.release(); + + Assert.assertNull(channel.readInbound()); + } + @Test public void testSingleFrameChunked() { EmbeddedChannel channel = new EmbeddedChannel( diff --git a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java index a427b944cc..7d0de42912 100644 --- a/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java +++ b/codec-stomp/src/test/java/io/netty/handler/codec/stomp/StompTestConstants.java @@ -42,6 +42,21 @@ public final class StompTestConstants { '\n' + "hello, queue a!!!" + "\0\n"; + public static final String[] SEND_FRAMES_3 = { + "SEND\n" + + "destination:/queue/a\n" + + "content-type:text/plain\n" + + '\n' + + "first part of body\n", + "second part of body\0" + }; + + public static final String SEND_FRAME_4 = "SEND\n" + + "destination:/queue/a\n" + + "content-type:text/plain\n" + + '\n' + + "body\0"; private StompTestConstants() { } + }