Add a line-based frame decoder with good performance.
Using DelimiterBasedFrameDecoder with Delimiters.lineDelimiter() has quadratic performance in the size of the input buffer. Needless to say, the performance degrades pretty quickly as the size of the buffer increases. Larger MTUs or loopback connections can make it so bad that it appears that the code is "busy waiting", when in fact it's spending almost 100% of the CPU time in DelimiterBasedFrameDecoder.indexOf(). Add a new LineBasedFrameDecoder that decodes line-delimited frames in O(n) instead of DelimiterBasedFrameDecoder's O(n^2) implementation. In OpenTSDB's telnet-style protocol decoder this resulted in throughput increases of an order of magnitude. Change DelimiterBasedFrameDecoder to automatically detect when the frames are delimited by line endings, and automatically switch to using LineBasedFrameDecoder under the hood. This means that all Netty applications out there that using the combo DelimiterBasedFrameDecoder with Delimiters.lineDelimiter() will automatically benefit from the better performance of LineBasedFrameDecoder, without requiring a code change.
This commit is contained in:
parent
26943f189c
commit
d4155ad518
@ -55,6 +55,7 @@ import org.jboss.netty.channel.Channels;
|
||||
* +----------+
|
||||
* </pre>
|
||||
*
|
||||
*
|
||||
* @apiviz.uses org.jboss.netty.handler.codec.frame.Delimiters - - useful
|
||||
*/
|
||||
public class DelimiterBasedFrameDecoder extends FrameDecoder {
|
||||
@ -65,6 +66,8 @@ public class DelimiterBasedFrameDecoder extends FrameDecoder {
|
||||
private final boolean failFast;
|
||||
private boolean discardingTooLongFrame;
|
||||
private int tooLongFrameLength;
|
||||
/** Set only when decoding with "\n" and "\r\n" as the delimiter. */
|
||||
private LineBasedFrameDecoder lineBasedDecoder;
|
||||
|
||||
/**
|
||||
* Creates a new instance.
|
||||
@ -113,15 +116,9 @@ public class DelimiterBasedFrameDecoder extends FrameDecoder {
|
||||
public DelimiterBasedFrameDecoder(
|
||||
int maxFrameLength, boolean stripDelimiter, boolean failFast,
|
||||
ChannelBuffer delimiter) {
|
||||
validateMaxFrameLength(maxFrameLength);
|
||||
validateDelimiter(delimiter);
|
||||
delimiters = new ChannelBuffer[] {
|
||||
this(maxFrameLength, stripDelimiter, failFast, new ChannelBuffer[] {
|
||||
delimiter.slice(
|
||||
delimiter.readerIndex(), delimiter.readableBytes())
|
||||
};
|
||||
this.maxFrameLength = maxFrameLength;
|
||||
this.stripDelimiter = stripDelimiter;
|
||||
this.failFast = failFast;
|
||||
delimiter.readerIndex(), delimiter.readableBytes()) });
|
||||
}
|
||||
|
||||
/**
|
||||
@ -176,21 +173,52 @@ public class DelimiterBasedFrameDecoder extends FrameDecoder {
|
||||
}
|
||||
if (delimiters.length == 0) {
|
||||
throw new IllegalArgumentException("empty delimiters");
|
||||
}
|
||||
this.delimiters = new ChannelBuffer[delimiters.length];
|
||||
for (int i = 0; i < delimiters.length; i ++) {
|
||||
ChannelBuffer d = delimiters[i];
|
||||
validateDelimiter(d);
|
||||
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
|
||||
} else if (isLineBased(delimiters) && !isSubclass()) {
|
||||
lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
|
||||
this.delimiters = null;
|
||||
} else {
|
||||
this.delimiters = new ChannelBuffer[delimiters.length];
|
||||
for (int i = 0; i < delimiters.length; i ++) {
|
||||
ChannelBuffer d = delimiters[i];
|
||||
validateDelimiter(d);
|
||||
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
|
||||
}
|
||||
lineBasedDecoder = null;
|
||||
}
|
||||
this.maxFrameLength = maxFrameLength;
|
||||
this.stripDelimiter = stripDelimiter;
|
||||
this.failFast = failFast;
|
||||
}
|
||||
|
||||
/** Returns true if the delimiters are "\n" and "\r\n". */
|
||||
private static boolean isLineBased(final ChannelBuffer[] delimiters) {
|
||||
if (delimiters.length != 2) {
|
||||
return false;
|
||||
}
|
||||
ChannelBuffer a = delimiters[0];
|
||||
ChannelBuffer b = delimiters[1];
|
||||
if (a.capacity() < b.capacity()) {
|
||||
a = delimiters[1];
|
||||
b = delimiters[0];
|
||||
}
|
||||
return a.capacity() == 2 && b.capacity() == 1
|
||||
&& a.getByte(0) == '\r' && a.getByte(1) == '\n'
|
||||
&& b.getByte(0) == '\n';
|
||||
}
|
||||
|
||||
/**
|
||||
* Return <code>true</code> if the current instance is a subclass of DelimiterBasedFrameDecoder
|
||||
*/
|
||||
private boolean isSubclass() {
|
||||
return this.getClass() != DelimiterBasedFrameDecoder.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decode(
|
||||
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
if (lineBasedDecoder != null) {
|
||||
return lineBasedDecoder.decode(ctx, channel, buffer);
|
||||
}
|
||||
// Try all delimiters and choose the delimiter which yields the shortest frame.
|
||||
int minFrameLength = Integer.MAX_VALUE;
|
||||
ChannelBuffer minDelim = null;
|
||||
|
@ -0,0 +1,140 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.codec.frame;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
|
||||
/**
|
||||
* A decoder that splits the received {@link ChannelBuffer}s on line endings.
|
||||
* <p>
|
||||
* Both {@code "\n"} and {@code "\r\n"} are handled.
|
||||
* For a more general delimiter-based decoder, see {@link DelimiterBasedFrameDecoder}.
|
||||
*/
|
||||
public class LineBasedFrameDecoder extends FrameDecoder {
|
||||
|
||||
/** Maximum length of a frame we're willing to decode. */
|
||||
private final int maxLength;
|
||||
/** Whether or not to throw an exception as soon as we exceed maxLength. */
|
||||
private final boolean failFast;
|
||||
private final boolean stripDelimiter;
|
||||
|
||||
/** True if we're discarding input because we're already over maxLength. */
|
||||
private boolean discarding;
|
||||
|
||||
/**
|
||||
* Creates a new decoder.
|
||||
* @param maxLength the maximum length of the decoded frame.
|
||||
* A {@link TooLongFrameException} is thrown if
|
||||
* the length of the frame exceeds this value.
|
||||
*/
|
||||
public LineBasedFrameDecoder(final int maxLength) {
|
||||
this(maxLength, true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new decoder.
|
||||
* @param maxLength the maximum length of the decoded frame.
|
||||
* A {@link TooLongFrameException} is thrown if
|
||||
* the length of the frame exceeds this value.
|
||||
* @param stripDelimiter whether the decoded frame should strip out the
|
||||
* delimiter or not
|
||||
* @param failFast If <tt>true</tt>, a {@link TooLongFrameException} is
|
||||
* thrown as soon as the decoder notices the length of the
|
||||
* frame will exceed <tt>maxFrameLength</tt> regardless of
|
||||
* whether the entire frame has been read.
|
||||
* If <tt>false</tt>, a {@link TooLongFrameException} is
|
||||
* thrown after the entire frame that exceeds
|
||||
* <tt>maxFrameLength</tt> has been read.
|
||||
*/
|
||||
public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
|
||||
this.maxLength = maxLength;
|
||||
this.failFast = failFast;
|
||||
this.stripDelimiter = stripDelimiter;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object decode(final ChannelHandlerContext ctx,
|
||||
final Channel channel,
|
||||
final ChannelBuffer buffer) throws Exception {
|
||||
final int eol = findEndOfLine(buffer);
|
||||
if (eol != -1) {
|
||||
final ChannelBuffer frame;
|
||||
final int length = eol - buffer.readerIndex();
|
||||
assert length >= 0: "Invalid length=" + length;
|
||||
if (discarding) {
|
||||
frame = null;
|
||||
buffer.skipBytes(length);
|
||||
if (!failFast) {
|
||||
fail(ctx, "over " + (maxLength + length) + " bytes");
|
||||
}
|
||||
} else {
|
||||
int delimLength;
|
||||
final byte delim = buffer.getByte(buffer.readerIndex() + length);
|
||||
if (delim == '\r') {
|
||||
delimLength = 2; // Skip the \r\n.
|
||||
} else {
|
||||
delimLength = 1;
|
||||
}
|
||||
if (stripDelimiter) {
|
||||
frame = buffer.readBytes(length);
|
||||
buffer.skipBytes(delimLength);
|
||||
} else {
|
||||
frame = buffer.readBytes(length + delimLength);
|
||||
}
|
||||
}
|
||||
return frame;
|
||||
}
|
||||
|
||||
final int buffered = buffer.readableBytes();
|
||||
if (!discarding && buffered > maxLength) {
|
||||
discarding = true;
|
||||
if (failFast) {
|
||||
fail(ctx, buffered + " bytes buffered already");
|
||||
}
|
||||
}
|
||||
if (discarding) {
|
||||
buffer.skipBytes(buffer.readableBytes());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private void fail(final ChannelHandlerContext ctx, final String msg) {
|
||||
Channels.fireExceptionCaught(ctx.getChannel(),
|
||||
new TooLongFrameException("Frame length exceeds " + maxLength + " ("
|
||||
+ msg + ')'));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the index in the buffer of the end of line found.
|
||||
* Returns -1 if no end of line was found in the buffer.
|
||||
*/
|
||||
private static int findEndOfLine(final ChannelBuffer buffer) {
|
||||
final int n = buffer.writerIndex();
|
||||
for (int i = buffer.readerIndex(); i < n; i ++) {
|
||||
final byte b = buffer.getByte(i);
|
||||
if (b == '\n') {
|
||||
return i;
|
||||
} else if (b == '\r' && i < n - 1 && buffer.getByte(i + 1) == '\n') {
|
||||
return i; // \r\n
|
||||
}
|
||||
}
|
||||
return -1; // Not found.
|
||||
}
|
||||
|
||||
}
|
@ -64,4 +64,17 @@ public class DelimiterBasedFrameDecoderTest {
|
||||
Assert.assertEquals("A", buf.toString(CharsetUtil.ISO_8859_1));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDecode() throws Exception {
|
||||
DecoderEmbedder<ChannelBuffer> embedder = new DecoderEmbedder<ChannelBuffer>(
|
||||
new DelimiterBasedFrameDecoder(8192, true, Delimiters.lineDelimiter()));
|
||||
|
||||
Assert.assertTrue(embedder.offer(ChannelBuffers.copiedBuffer("first\r\nsecond\nthird", CharsetUtil.US_ASCII)));
|
||||
Assert.assertTrue(embedder.finish());
|
||||
Assert.assertEquals("first", embedder.poll().toString(CharsetUtil.US_ASCII));
|
||||
Assert.assertEquals("second", embedder.poll().toString(CharsetUtil.US_ASCII));
|
||||
Assert.assertNull(embedder.poll());
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,52 @@
|
||||
/*
|
||||
* Copyright 2012 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.jboss.netty.handler.codec.frame;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.handler.codec.embedder.CodecEmbedderException;
|
||||
import org.jboss.netty.handler.codec.embedder.DecoderEmbedder;
|
||||
import org.jboss.netty.util.CharsetUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class LineBasedFrameDecoderTest {
|
||||
@Test
|
||||
public void testDecodeWithStrip() throws Exception {
|
||||
DecoderEmbedder<ChannelBuffer> embedder = new DecoderEmbedder<ChannelBuffer>(
|
||||
new LineBasedFrameDecoder(8192, true, false));
|
||||
|
||||
Assert.assertTrue(embedder.offer(ChannelBuffers.copiedBuffer("first\r\nsecond\nthird", CharsetUtil.US_ASCII)));
|
||||
Assert.assertTrue(embedder.finish());
|
||||
Assert.assertEquals("first", embedder.poll().toString(CharsetUtil.US_ASCII));
|
||||
Assert.assertEquals("second", embedder.poll().toString(CharsetUtil.US_ASCII));
|
||||
Assert.assertNull(embedder.poll());
|
||||
|
||||
}
|
||||
@Test
|
||||
public void testDecodeWithoutStrip() throws Exception {
|
||||
DecoderEmbedder<ChannelBuffer> embedder = new DecoderEmbedder<ChannelBuffer>(
|
||||
new LineBasedFrameDecoder(8192, false, false));
|
||||
|
||||
Assert.assertTrue(embedder.offer(ChannelBuffers.copiedBuffer("first\r\nsecond\nthird", CharsetUtil.US_ASCII)));
|
||||
Assert.assertTrue(embedder.finish());
|
||||
Assert.assertEquals("first\r\n", embedder.poll().toString(CharsetUtil.US_ASCII));
|
||||
Assert.assertEquals("second\n", embedder.poll().toString(CharsetUtil.US_ASCII));
|
||||
Assert.assertNull(embedder.poll());
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user