Better parsing for STOMP body with no length.

Motivation:

The STOMP decoder used to fail when parsing a frame with no content-length
and a body split across multiple packets.

Modifications:

Support contentLength of -1 (indicating indeterminate length) and added a
check to getContentLength.  Moved the NUL byte searching from the
readHeaders() method out to the main decoder loop.

Result:

A STOMP frame can be properly parsed even if it's missing the
content-length header and the NUL byte is in a later packet.
This commit is contained in:
Scott Van Wart 2015-09-25 09:29:11 -03:00 committed by Norman Maurer
parent c8a941d01e
commit 2adf6e5358
3 changed files with 93 additions and 31 deletions

View File

@ -29,7 +29,8 @@ import io.netty.util.internal.StringUtil;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import static io.netty.buffer.ByteBufUtil.*; import static io.netty.buffer.ByteBufUtil.indexOf;
import static io.netty.buffer.ByteBufUtil.readBytes;
/** /**
* Decodes {@link ByteBuf}s into {@link StompHeadersSubframe}s and * Decodes {@link ByteBuf}s into {@link StompHeadersSubframe}s and
@ -73,7 +74,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
private final int maxChunkSize; private final int maxChunkSize;
private int alreadyReadChunkSize; private int alreadyReadChunkSize;
private LastStompContentSubframe lastContent; private LastStompContentSubframe lastContent;
private long contentLength; private long contentLength = -1;
public StompSubframeDecoder() { public StompSubframeDecoder() {
this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE); this(DEFAULT_MAX_LINE_LENGTH, DEFAULT_CHUNK_SIZE);
@ -134,21 +135,39 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
if (toRead > maxChunkSize) { if (toRead > maxChunkSize) {
toRead = maxChunkSize; toRead = maxChunkSize;
} }
int remainingLength = (int) (contentLength - alreadyReadChunkSize); if (this.contentLength >= 0) {
if (toRead > remainingLength) { int remainingLength = (int) (contentLength - alreadyReadChunkSize);
toRead = remainingLength; if (toRead > remainingLength) {
} toRead = remainingLength;
ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead); }
if ((alreadyReadChunkSize += toRead) >= contentLength) { ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
lastContent = new DefaultLastStompContentSubframe(chunkBuffer); if ((alreadyReadChunkSize += toRead) >= contentLength) {
checkpoint(State.FINALIZE_FRAME_READ); lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
checkpoint(State.FINALIZE_FRAME_READ);
} else {
out.add(new DefaultStompContentSubframe(chunkBuffer));
return;
}
} else { } else {
DefaultStompContentSubframe chunk; int nulIndex = indexOf(in, in.readerIndex(), in.writerIndex(), StompConstants.NUL);
chunk = new DefaultStompContentSubframe(chunkBuffer); if (nulIndex == in.readerIndex()) {
out.add(chunk); checkpoint(State.FINALIZE_FRAME_READ);
} } else {
if (alreadyReadChunkSize < contentLength) { if (nulIndex > 0) {
return; toRead = nulIndex - in.readerIndex();
} else {
toRead = in.writerIndex() - in.readerIndex();
}
ByteBuf chunkBuffer = readBytes(ctx.alloc(), in, toRead);
alreadyReadChunkSize += toRead;
if (nulIndex > 0) {
lastContent = new DefaultLastStompContentSubframe(chunkBuffer);
checkpoint(State.FINALIZE_FRAME_READ);
} else {
out.add(new DefaultStompContentSubframe(chunkBuffer));
return;
}
}
} }
// Fall through. // Fall through.
case FINALIZE_FRAME_READ: case FINALIZE_FRAME_READ:
@ -198,28 +217,23 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
headers.add(split[0], split[1]); headers.add(split[0], split[1]);
} }
} else { } else {
long contentLength = -1;
if (headers.contains(StompHeaders.CONTENT_LENGTH)) { if (headers.contains(StompHeaders.CONTENT_LENGTH)) {
contentLength = getContentLength(headers, 0); this.contentLength = getContentLength(headers, 0);
} else { if (this.contentLength == 0) {
int globalIndex = indexOf(buffer, buffer.readerIndex(), return State.FINALIZE_FRAME_READ;
buffer.writerIndex(), StompConstants.NUL);
if (globalIndex != -1) {
contentLength = globalIndex - buffer.readerIndex();
} }
} }
if (contentLength > 0) { return State.READ_CONTENT;
this.contentLength = contentLength;
return State.READ_CONTENT;
} else {
return State.FINALIZE_FRAME_READ;
}
} }
} }
} }
private static long getContentLength(StompHeaders headers, long defaultValue) { private static long getContentLength(StompHeaders headers, long defaultValue) {
return headers.getLong(StompHeaders.CONTENT_LENGTH, defaultValue); long contentLength = headers.getLong(StompHeaders.CONTENT_LENGTH, defaultValue);
if (contentLength < 0) {
throw new DecoderException(StompHeaders.CONTENT_LENGTH + " must be non-negative");
}
return contentLength;
} }
private static void skipNullCharacter(ByteBuf buffer) { private static void skipNullCharacter(ByteBuf buffer) {
@ -264,7 +278,7 @@ public class StompSubframeDecoder extends ReplayingDecoder<State> {
private void resetDecoder() { private void resetDecoder() {
checkpoint(State.SKIP_CONTROL_CHARACTERS); checkpoint(State.SKIP_CONTROL_CHARACTERS);
contentLength = 0; contentLength = -1;
alreadyReadChunkSize = 0; alreadyReadChunkSize = 0;
lastContent = null; lastContent = null;
} }

View File

@ -66,6 +66,39 @@ public class StompSubframeAggregatorTest {
Assert.assertNull(channel.readInbound()); Assert.assertNull(channel.readInbound());
} }
@Test
public void testSingleFrameWithBodyAndNoContentLength() {
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(StompTestConstants.SEND_FRAME_4.getBytes());
channel.writeInbound(incoming);
StompFrame frame = channel.readInbound();
Assert.assertNotNull(frame);
Assert.assertEquals(StompCommand.SEND, frame.command());
Assert.assertEquals("body", frame.content().toString(CharsetUtil.UTF_8));
frame.release();
Assert.assertNull(channel.readInbound());
}
@Test
public void testSingleFrameWithSplitBodyAndNoContentLength() {
for (int n = 0; n < StompTestConstants.SEND_FRAMES_3.length; ++n) {
ByteBuf incoming = Unpooled.buffer();
incoming.writeBytes(StompTestConstants.SEND_FRAMES_3[n].getBytes());
channel.writeInbound(incoming);
channel.flush();
}
StompFrame frame = channel.readInbound();
Assert.assertNotNull(frame);
Assert.assertEquals(StompCommand.SEND, frame.command());
Assert.assertEquals("first part of body\nsecond part of body", frame.content().toString(CharsetUtil.UTF_8));
frame.release();
Assert.assertNull(channel.readInbound());
}
@Test @Test
public void testSingleFrameChunked() { public void testSingleFrameChunked() {
EmbeddedChannel channel = new EmbeddedChannel( EmbeddedChannel channel = new EmbeddedChannel(

View File

@ -42,6 +42,21 @@ public final class StompTestConstants {
'\n' + '\n' +
"hello, queue a!!!" + "hello, queue a!!!" +
"\0\n"; "\0\n";
public static final String[] SEND_FRAMES_3 = {
"SEND\n" +
"destination:/queue/a\n" +
"content-type:text/plain\n" +
'\n' +
"first part of body\n",
"second part of body\0"
};
public static final String SEND_FRAME_4 = "SEND\n" +
"destination:/queue/a\n" +
"content-type:text/plain\n" +
'\n' +
"body\0";
private StompTestConstants() { } private StompTestConstants() { }
} }