From fda8808210c4f4f092a93da5929df43062c3aa6c Mon Sep 17 00:00:00 2001 From: Idel Pivnitskiy Date: Tue, 4 Nov 2014 19:02:51 +0300 Subject: [PATCH] Rewrite HttpObjectDecoder to make use of proper state machine Motivation: HttpObjectDecoder extended ReplayDecoder which is slightly slower then ByteToMessageDecoder. Modifications: - Changed super class of HttpObjectDecoder from ReplayDecoder to ByteToMessageDecoder. - Rewrote decode() method of HttpObjectDecoder to use proper state machine. - Changed private methods HeaderParser.parse(ByteBuf), readHeaders(ByteBuf) and readTrailingHeaders(ByteBuf), skipControlCharacters(ByteBuf) to consider available bytes. - Set HeaderParser and LineParser as static inner classes. - Replaced not safe actualReadableBytes() with buffer.readableBytes(). Result: Improved performance of HttpObjectDecoder by approximately 177%. --- .../handler/codec/http/HttpObjectDecoder.java | 143 +++++++++++------- .../codec/http/HttpRequestDecoder.java | 2 +- .../codec/http/HttpResponseDecoder.java | 2 +- .../handler/codec/rtsp/RtspObjectDecoder.java | 2 +- 4 files changed, 90 insertions(+), 59 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java index e2035245c0..5190a9b46d 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpObjectDecoder.java @@ -20,10 +20,9 @@ import io.netty.buffer.ByteBufProcessor; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.DecoderResult; -import io.netty.handler.codec.ReplayingDecoder; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.http.HttpObjectDecoder.State; import io.netty.util.internal.AppendableCharSequence; import java.util.List; @@ -99,7 +98,7 @@ import java.util.List; * To implement the decoder of such a derived protocol, extend this class and * implement all abstract methods properly. */ -public abstract class HttpObjectDecoder extends ReplayingDecoder { +public abstract class HttpObjectDecoder extends ByteToMessageDecoder { private static final String EMPTY_VALUE = ""; private final int maxChunkSize; @@ -117,11 +116,13 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { private CharSequence name; private CharSequence value; + private LastHttpContent trailer; + /** * The internal state of {@link HttpObjectDecoder}. * Internal use only. */ - enum State { + private enum State { SKIP_CONTROL_CHARS, READ_INITIAL, READ_HEADER, @@ -135,6 +136,8 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { UPGRADED } + private State currentState = State.SKIP_CONTROL_CHARS; + /** * Creates a new instance with the default * {@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and @@ -159,8 +162,6 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean chunkedSupported, boolean validateHeaders) { - super(State.SKIP_CONTROL_CHARS); - if (maxInitialLineLength <= 0) { throw new IllegalArgumentException( "maxInitialLineLength must be a positive integer: " + @@ -190,26 +191,27 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { resetNow(); } - switch (state()) { + switch (currentState) { case SKIP_CONTROL_CHARS: { - try { - skipControlCharacters(buffer); - checkpoint(State.READ_INITIAL); - } finally { - checkpoint(); + if (!skipControlCharacters(buffer)) { + return; } - // fall-through + currentState = State.READ_INITIAL; } case READ_INITIAL: try { - String[] initialLine = splitInitialLine(lineParser.parse(buffer)); + AppendableCharSequence line = lineParser.parse(buffer); + if (line == null) { + return; + } + String[] initialLine = splitInitialLine(line); if (initialLine.length < 3) { // Invalid initial line - ignore. - checkpoint(State.SKIP_CONTROL_CHARS); + currentState = State.SKIP_CONTROL_CHARS; return; } message = createMessage(initialLine); - checkpoint(State.READ_HEADER); + currentState = State.READ_HEADER; // fall-through } catch (Exception e) { out.add(invalidMessage(e)); @@ -217,7 +219,10 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { } case READ_HEADER: try { State nextState = readHeaders(buffer); - checkpoint(nextState); + if (nextState == null) { + return; + } + currentState = nextState; switch (nextState) { case SKIP_CONTROL_CHARS: // fast-path @@ -261,7 +266,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { } case READ_VARIABLE_LENGTH_CONTENT: { // Keep reading data as a chunk until the end of connection is reached. - int toRead = Math.min(actualReadableBytes(), maxChunkSize); + int toRead = Math.min(buffer.readableBytes(), maxChunkSize); if (toRead > 0) { ByteBuf content = buffer.readSlice(toRead).retain(); out.add(new DefaultHttpContent(content)); @@ -269,7 +274,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { return; } case READ_FIXED_LENGTH_CONTENT: { - int readLimit = actualReadableBytes(); + int readLimit = buffer.readableBytes(); // Check if the buffer is readable first as we use the readable byte count // to create the HttpChunk. This is needed as otherwise we may end up with @@ -303,13 +308,16 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { */ case READ_CHUNK_SIZE: try { AppendableCharSequence line = lineParser.parse(buffer); + if (line == null) { + return; + } int chunkSize = getChunkSize(line.toString()); this.chunkSize = chunkSize; if (chunkSize == 0) { - checkpoint(State.READ_CHUNK_FOOTER); + currentState = State.READ_CHUNK_FOOTER; return; } - checkpoint(State.READ_CHUNKED_CONTENT); + currentState = State.READ_CHUNKED_CONTENT; // fall-through } catch (Exception e) { out.add(invalidChunk(e)); @@ -318,7 +326,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { case READ_CHUNKED_CONTENT: { assert chunkSize <= Integer.MAX_VALUE; int toRead = Math.min((int) chunkSize, maxChunkSize); - toRead = Math.min(toRead, actualReadableBytes()); + toRead = Math.min(toRead, buffer.readableBytes()); if (toRead == 0) { return; } @@ -330,27 +338,27 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { if (chunkSize != 0) { return; } - checkpoint(State.READ_CHUNK_DELIMITER); + currentState = State.READ_CHUNK_DELIMITER; // fall-through } case READ_CHUNK_DELIMITER: { - for (;;) { - byte next = buffer.readByte(); - if (next == HttpConstants.CR) { - if (buffer.readByte() == HttpConstants.LF) { - checkpoint(State.READ_CHUNK_SIZE); - return; - } - } else if (next == HttpConstants.LF) { - checkpoint(State.READ_CHUNK_SIZE); - return; - } else { - checkpoint(); + final int wIdx = buffer.writerIndex(); + int rIdx = buffer.readerIndex(); + while (wIdx > rIdx) { + byte next = buffer.getByte(rIdx++); + if (next == HttpConstants.LF) { + currentState = State.READ_CHUNK_SIZE; + break; } } + buffer.readerIndex(rIdx); + return; } case READ_CHUNK_FOOTER: try { LastHttpContent trailer = readTrailingHeaders(buffer); + if (trailer == null) { + return; + } out.add(trailer); resetNow(); return; @@ -360,17 +368,17 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { } case BAD_MESSAGE: { // Keep discarding until disconnection. - buffer.skipBytes(actualReadableBytes()); + buffer.skipBytes(buffer.readableBytes()); break; } case UPGRADED: { - int readableBytes = actualReadableBytes(); + int readableBytes = buffer.readableBytes(); if (readableBytes > 0) { // Keep on consuming as otherwise we may trigger an DecoderException, // other handler will replace this codec with the upgraded protocol codec to // take the traffic over at some point then. // See https://github.com/netty/netty/issues/2173 - out.add(buffer.readBytes(actualReadableBytes())); + out.add(buffer.readBytes(readableBytes)); } break; } @@ -384,7 +392,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { // Handle the last unfinished message. if (message != null) { boolean chunked = HttpHeaderUtil.isTransferEncodingChunked(message); - if (state() == State.READ_VARIABLE_LENGTH_CONTENT && !in.isReadable() && !chunked) { + if (currentState == State.READ_VARIABLE_LENGTH_CONTENT && !in.isReadable() && !chunked) { // End of connection. out.add(LastHttpContent.EMPTY_LAST_CONTENT); reset(); @@ -448,19 +456,20 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { contentLength = Long.MIN_VALUE; lineParser.reset(); headerParser.reset(); + trailer = null; if (!isDecodingRequest()) { HttpResponse res = (HttpResponse) message; if (res != null && res.status().code() == 101) { - checkpoint(State.UPGRADED); + currentState = State.UPGRADED; return; } } - checkpoint(State.SKIP_CONTROL_CHARS); + currentState = State.SKIP_CONTROL_CHARS; } private HttpMessage invalidMessage(Exception cause) { - checkpoint(State.BAD_MESSAGE); + currentState = State.BAD_MESSAGE; if (message != null) { message.setDecoderResult(DecoderResult.failure(cause)); } else { @@ -474,22 +483,28 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { } private HttpContent invalidChunk(Exception cause) { - checkpoint(State.BAD_MESSAGE); + currentState = State.BAD_MESSAGE; HttpContent chunk = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER); chunk.setDecoderResult(DecoderResult.failure(cause)); message = null; + trailer = null; return chunk; } - private static void skipControlCharacters(ByteBuf buffer) { - for (;;) { - char c = (char) buffer.readUnsignedByte(); - if (!Character.isISOControl(c) && - !Character.isWhitespace(c)) { - buffer.readerIndex(buffer.readerIndex() - 1); + private static boolean skipControlCharacters(ByteBuf buffer) { + boolean skiped = false; + final int wIdx = buffer.writerIndex(); + int rIdx = buffer.readerIndex(); + while (wIdx > rIdx) { + int c = buffer.getUnsignedByte(rIdx++); + if (!Character.isISOControl(c) && !Character.isWhitespace(c)) { + rIdx--; + skiped = true; break; } } + buffer.readerIndex(rIdx); + return skiped; } private State readHeaders(ByteBuf buffer) { @@ -497,6 +512,9 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { final HttpHeaders headers = message.headers(); AppendableCharSequence line = headerParser.parse(buffer); + if (line == null) { + return null; + } if (line.length() > 0) { do { char firstChar = line.charAt(0); @@ -514,6 +532,9 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { } line = headerParser.parse(buffer); + if (line == null) { + return null; + } } while (line.length() > 0); } @@ -549,9 +570,15 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { private LastHttpContent readTrailingHeaders(ByteBuf buffer) { AppendableCharSequence line = headerParser.parse(buffer); + if (line == null) { + return null; + } CharSequence lastHeader = null; if (line.length() > 0) { - LastHttpContent trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders); + LastHttpContent trailer = this.trailer; + if (trailer == null) { + trailer = this.trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders); + } do { char firstChar = line.charAt(0); if (lastHeader != null && (firstChar == ' ' || firstChar == '\t')) { @@ -582,8 +609,12 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { } line = headerParser.parse(buffer); + if (line == null) { + return null; + } } while (line.length() > 0); + this.trailer = null; return trailer; } @@ -693,7 +724,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { return result; } - private class HeaderParser implements ByteBufProcessor { + private static class HeaderParser implements ByteBufProcessor { private final AppendableCharSequence seq; private final int maxLength; private int size; @@ -706,10 +737,10 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { public AppendableCharSequence parse(ByteBuf buffer) { seq.reset(); int i = buffer.forEachByte(this); + if (i == -1) { + return null; + } buffer.readerIndex(i + 1); - - // Call checkpoint to make sure the readerIndex is updated correctly - checkpoint(); return seq; } @@ -733,7 +764,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { // If decoding a response, just throw an exception. throw newException(maxLength); } - size ++; + size++; seq.append(nextByte); return true; } @@ -743,7 +774,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder { } } - private final class LineParser extends HeaderParser { + private static final class LineParser extends HeaderParser { LineParser(AppendableCharSequence seq, int maxLength) { super(seq, maxLength); diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestDecoder.java index a1d073ec35..bae740864f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpRequestDecoder.java @@ -56,7 +56,7 @@ public class HttpRequestDecoder extends HttpObjectDecoder { /** * Creates a new instance with the default - * {@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and + * {@code maxInitialLineLength (4096)}, {@code maxHeaderSize (8192)}, and * {@code maxChunkSize (8192)}. */ public HttpRequestDecoder() { diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseDecoder.java index 340d6251ab..982f98286b 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpResponseDecoder.java @@ -87,7 +87,7 @@ public class HttpResponseDecoder extends HttpObjectDecoder { /** * Creates a new instance with the default - * {@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and + * {@code maxInitialLineLength (4096)}, {@code maxHeaderSize (8192)}, and * {@code maxChunkSize (8192)}. */ public HttpResponseDecoder() { diff --git a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspObjectDecoder.java b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspObjectDecoder.java index f8bed912c2..e73320d297 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspObjectDecoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/rtsp/RtspObjectDecoder.java @@ -52,7 +52,7 @@ public abstract class RtspObjectDecoder extends HttpObjectDecoder { /** * Creates a new instance with the default - * {@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and + * {@code maxInitialLineLength (4096)}, {@code maxHeaderSize (8192)}, and * {@code maxContentLength (8192)}. */ protected RtspObjectDecoder() {