Add a line-based frame decoder with good performance.

Using DelimiterBasedFrameDecoder with Delimiters.lineDelimiter() has
quadratic performance in the size of the input buffer.  Needless to
say, the performance degrades pretty quickly as the size of the buffer
increases.  Larger MTUs or loopback connections can make it so bad that
it appears that the code is "busy waiting", when in fact it's spending
almost 100% of the CPU time in DelimiterBasedFrameDecoder.indexOf().

Add a new LineBasedFrameDecoder that decodes line-delimited frames
in O(n) instead of DelimiterBasedFrameDecoder's O(n^2) implementation.
In OpenTSDB's telnet-style protocol decoder this resulted in throughput
increases of an order of magnitude.

Change DelimiterBasedFrameDecoder to automatically detect when the
frames are delimited by line endings, and automatically switch to
using LineBasedFrameDecoder under the hood.  This means that all Netty
applications out there that using the combo DelimiterBasedFrameDecoder
with Delimiters.lineDelimiter() will automatically benefit from the
better performance of LineBasedFrameDecoder, without requiring a code
change.
This commit is contained in:
Norman Maurer 2012-11-25 13:36:55 +01:00
parent b1cd953d02
commit d7d8503f5c
4 changed files with 234 additions and 15 deletions

View File

@ -62,6 +62,8 @@ public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder<Object> {
private final boolean failFast;
private boolean discardingTooLongFrame;
private int tooLongFrameLength;
/** Set only when decoding with "\n" and "\r\n" as the delimiter. */
private final LineBasedFrameDecoder lineBasedDecoder;
/**
* Creates a new instance.
@ -110,15 +112,8 @@ public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder<Object> {
public DelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, boolean failFast,
ByteBuf delimiter) {
validateMaxFrameLength(maxFrameLength);
validateDelimiter(delimiter);
delimiters = new ByteBuf[] {
delimiter.slice(
delimiter.readerIndex(), delimiter.readableBytes())
};
this.maxFrameLength = maxFrameLength;
this.stripDelimiter = stripDelimiter;
this.failFast = failFast;
this(maxFrameLength, stripDelimiter, failFast, new ByteBuf[] {
delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())});
}
/**
@ -173,20 +168,51 @@ public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder<Object> {
}
if (delimiters.length == 0) {
throw new IllegalArgumentException("empty delimiters");
}
this.delimiters = new ByteBuf[delimiters.length];
for (int i = 0; i < delimiters.length; i ++) {
ByteBuf d = delimiters[i];
validateDelimiter(d);
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
} else if (isLineBased(delimiters) && !isSubclass()) {
lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
this.delimiters = null;
} else {
this.delimiters = new ByteBuf[delimiters.length];
for (int i = 0; i < delimiters.length; i ++) {
ByteBuf d = delimiters[i];
validateDelimiter(d);
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
}
lineBasedDecoder = null;
}
this.maxFrameLength = maxFrameLength;
this.stripDelimiter = stripDelimiter;
this.failFast = failFast;
}
/** Returns true if the delimiters are "\n" and "\r\n". */
private static boolean isLineBased(final ByteBuf[] delimiters) {
if (delimiters.length != 2) {
return false;
}
ByteBuf a = delimiters[0];
ByteBuf b = delimiters[1];
if (a.capacity() < b.capacity()) {
a = delimiters[1];
b = delimiters[0];
}
return a.capacity() == 2 && b.capacity() == 1
&& a.getByte(0) == '\r' && a.getByte(1) == '\n'
&& b.getByte(0) == '\n';
}
/**
* Return <code>true</code> if the current instance is a subclass of DelimiterBasedFrameDecoder
*/
private boolean isSubclass() {
return this.getClass() != DelimiterBasedFrameDecoder.class;
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}
// Try all delimiters and choose the delimiter which yields the shortest frame.
int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;

View File

@ -0,0 +1,135 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
/**
* A decoder that splits the received {@link ByteBuf}s on line endings.
* <p>
* Both {@code "\n"} and {@code "\r\n"} are handled.
* For a more general delimiter-based decoder, see {@link DelimiterBasedFrameDecoder}.
*/
public class LineBasedFrameDecoder extends ByteToMessageDecoder<ByteBuf> {
/** Maximum length of a frame we're willing to decode. */
private final int maxLength;
/** Whether or not to throw an exception as soon as we exceed maxLength. */
private final boolean failFast;
private final boolean stripDelimiter;
/** True if we're discarding input because we're already over maxLength. */
private boolean discarding;
/**
* Creates a new decoder.
* @param maxLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
*/
public LineBasedFrameDecoder(final int maxLength) {
this(maxLength, true, false);
}
/**
* Creates a new decoder.
* @param maxLength the maximum length of the decoded frame.
* A {@link TooLongFrameException} is thrown if
* the length of the frame exceeds this value.
* @param stripDelimiter whether the decoded frame should strip out the
* delimiter or not
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
* thrown as soon as the decoder notices the length of the
* frame will exceed <tt>maxFrameLength</tt> regardless of
* whether the entire frame has been read.
* If <tt>false</tt>, a {@link TooLongFrameException} is
* thrown after the entire frame that exceeds
* <tt>maxFrameLength</tt> has been read.
*/
public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
this.maxLength = maxLength;
this.failFast = failFast;
this.stripDelimiter = stripDelimiter;
}
@Override
public ByteBuf decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
final int eol = findEndOfLine(buffer);
if (eol != -1) {
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
assert length >= 0: "Invalid length=" + length;
if (discarding) {
frame = null;
buffer.skipBytes(length);
if (!failFast) {
fail(ctx, "over " + (maxLength + length) + " bytes");
}
} else {
int delimLength;
final byte delim = buffer.getByte(buffer.readerIndex() + length);
if (delim == '\r') {
delimLength = 2; // Skip the \r\n.
} else {
delimLength = 1;
}
if (stripDelimiter) {
frame = buffer.readBytes(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readBytes(length + delimLength);
}
}
return frame;
}
final int buffered = buffer.readableBytes();
if (!discarding && buffered > maxLength) {
discarding = true;
if (failFast) {
fail(ctx, buffered + " bytes buffered already");
}
}
if (discarding) {
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
private void fail(final ChannelHandlerContext ctx, final String msg) {
ctx.fireExceptionCaught(new TooLongFrameException("Frame length exceeds " + maxLength + " ("
+ msg + ')'));
}
/**
* Returns the index in the buffer of the end of line found.
* Returns -1 if no end of line was found in the buffer.
*/
private static int findEndOfLine(final ByteBuf buffer) {
final int n = buffer.writerIndex();
for (int i = buffer.readerIndex(); i < n; i ++) {
final byte b = buffer.getByte(i);
if (b == '\n') {
return i;
} else if (b == '\r' && i < n - 1 && buffer.getByte(i + 1) == '\n') {
return i; // \r\n
}
}
return -1; // Not found.
}
}

View File

@ -19,6 +19,7 @@ import static org.junit.Assert.*;
import java.nio.charset.Charset;
import io.netty.util.CharsetUtil;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
@ -66,4 +67,16 @@ public class DelimiterBasedFrameDecoderTest {
assertEquals("g\r\n", ((ByteBuf)ch.readInbound()).toString(Charset.defaultCharset()));
assertNull(ch.readInbound());
}
@Test
public void testDecode() throws Exception {
EmbeddedByteChannel ch = new EmbeddedByteChannel(
new DelimiterBasedFrameDecoder(8192, true, Delimiters.lineDelimiter()));
ch.writeInbound(Unpooled.copiedBuffer("first\r\nsecond\nthird", CharsetUtil.US_ASCII));
assertEquals("first", ((ByteBuf)ch.readInbound()).toString(CharsetUtil.US_ASCII));
assertEquals("second", ((ByteBuf)ch.readInbound()).toString(CharsetUtil.US_ASCII));
assertNull(ch.readInbound());
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedByteChannel;
import io.netty.util.CharsetUtil;
import org.junit.Assert;
import org.junit.Test;
public class LineBasedFrameDecoderTest {
@Test
public void testDecodeWithStrip() throws Exception {
EmbeddedByteChannel ch = new EmbeddedByteChannel(new LineBasedFrameDecoder(8192, true, false));
ch.writeInbound(Unpooled.copiedBuffer("first\r\nsecond\nthird", CharsetUtil.US_ASCII));
Assert.assertEquals("first", ((ByteBuf)ch.readInbound()).toString(CharsetUtil.US_ASCII));
Assert.assertEquals("second", ((ByteBuf)ch.readInbound()).toString(CharsetUtil.US_ASCII));
Assert.assertNull(ch.readInbound());
}
@Test
public void testDecodeWithoutStrip() throws Exception {
EmbeddedByteChannel ch = new EmbeddedByteChannel(new LineBasedFrameDecoder(8192, false, false));
ch.writeInbound(Unpooled.copiedBuffer("first\r\nsecond\nthird", CharsetUtil.US_ASCII));
Assert.assertEquals("first\r\n", ((ByteBuf)ch.readInbound()).toString(CharsetUtil.US_ASCII));
Assert.assertEquals("second\n", ((ByteBuf)ch.readInbound()).toString(CharsetUtil.US_ASCII));
Assert.assertNull(ch.readInbound());
}
}