Modify HttpObjectDecoder to allow parsing the HTTP headers in multiple steps.

Motivation:
At the moment the whole HTTP header must be parsed at once which can lead to multiple parsing of the same bytes. We can do better here and allow to parse it in multiple steps.

Modifications:

 - Not parse headers multiple times
 - Simplify the code
 - Eliminate uncessary String[] creations
 - Use readSlice(...).retain() when possible.

Result:

Performance improvements as shown in the included benchmark below.

Before change:
[nmaurer@xxx]~% ./wrk-benchmark
Running 2m test @ http://xxx:8080/plaintext
  16 threads and 256 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    21.55ms   15.10ms 245.02ms   90.26%
    Req/Sec   196.33k    30.17k  297.29k    76.03%
  373954750 requests in 2.00m, 50.15GB read
Requests/sec: 3116466.08
Transfer/sec:    427.98MB

After change:
[nmaurer@xxx]~% ./wrk-benchmark
Running 2m test @ http://xxx:8080/plaintext
  16 threads and 256 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    20.91ms   36.79ms   1.26s    98.24%
    Req/Sec   206.67k    21.69k  243.62k    94.96%
  393071191 requests in 2.00m, 52.71GB read
Requests/sec: 3275971.50
Transfer/sec:    449.89MB
This commit is contained in:
Norman Maurer 2014-08-27 08:29:25 +02:00 committed by Trustin Lee
parent a7a654c82f
commit cf4c464d99

View File

@ -29,8 +29,6 @@ import io.netty.util.internal.AppendableCharSequence;
import java.util.List; import java.util.List;
import static io.netty.buffer.ByteBufUtil.*;
/** /**
* Decodes {@link ByteBuf}s into {@link HttpMessage}s and * Decodes {@link ByteBuf}s into {@link HttpMessage}s and
* {@link HttpContent}s. * {@link HttpContent}s.
@ -103,22 +101,23 @@ import static io.netty.buffer.ByteBufUtil.*;
* implement all abstract methods properly. * implement all abstract methods properly.
*/ */
public abstract class HttpObjectDecoder extends ReplayingDecoder<State> { public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
private static final String EMPTY_VALUE = "";
private final int maxInitialLineLength;
private final int maxHeaderSize;
private final int maxChunkSize; private final int maxChunkSize;
private final boolean chunkedSupported; private final boolean chunkedSupported;
protected final boolean validateHeaders; protected final boolean validateHeaders;
private final AppendableCharSequence seq = new AppendableCharSequence(128); private final HeaderParser headerParser;
private final HeaderParser headerParser = new HeaderParser(seq); private final LineParser lineParser;
private final LineParser lineParser = new LineParser(seq);
private HttpMessage message; private HttpMessage message;
private long chunkSize; private long chunkSize;
private int headerSize;
private long contentLength = Long.MIN_VALUE; private long contentLength = Long.MIN_VALUE;
private volatile boolean resetRequested; private volatile boolean resetRequested;
// These will be updated by splitHeader(...)
private CharSequence name;
private CharSequence value;
/** /**
* The internal state of {@link HttpObjectDecoder}. * The internal state of {@link HttpObjectDecoder}.
* <em>Internal use only</em>. * <em>Internal use only</em>.
@ -178,11 +177,12 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
"maxChunkSize must be a positive integer: " + "maxChunkSize must be a positive integer: " +
maxChunkSize); maxChunkSize);
} }
this.maxInitialLineLength = maxInitialLineLength;
this.maxHeaderSize = maxHeaderSize;
this.maxChunkSize = maxChunkSize; this.maxChunkSize = maxChunkSize;
this.chunkedSupported = chunkedSupported; this.chunkedSupported = chunkedSupported;
this.validateHeaders = validateHeaders; this.validateHeaders = validateHeaders;
AppendableCharSequence seq = new AppendableCharSequence(128);
lineParser = new LineParser(seq, maxInitialLineLength);
headerParser = new HeaderParser(seq, maxHeaderSize);
} }
@Override @Override
@ -199,6 +199,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
} finally { } finally {
checkpoint(); checkpoint();
} }
// fall-through
} }
case READ_INITIAL: try { case READ_INITIAL: try {
String[] initialLine = splitInitialLine(lineParser.parse(buffer)); String[] initialLine = splitInitialLine(lineParser.parse(buffer));
@ -210,7 +211,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
message = createMessage(initialLine); message = createMessage(initialLine);
checkpoint(State.READ_HEADER); checkpoint(State.READ_HEADER);
// fall-through
} catch (Exception e) { } catch (Exception e) {
out.add(invalidMessage(e)); out.add(invalidMessage(e));
return; return;
@ -218,40 +219,43 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
case READ_HEADER: try { case READ_HEADER: try {
State nextState = readHeaders(buffer); State nextState = readHeaders(buffer);
checkpoint(nextState); checkpoint(nextState);
if (nextState == State.READ_CHUNK_SIZE) { switch (nextState) {
if (!chunkedSupported) { case SKIP_CONTROL_CHARS:
throw new IllegalArgumentException("Chunked messages not supported"); // fast-path
} // No content is expected.
// Chunked encoding - generate HttpMessage first. HttpChunks will follow. out.add(message);
out.add(message); out.add(LastHttpContent.EMPTY_LAST_CONTENT);
return; resetNow();
} return;
if (nextState == State.SKIP_CONTROL_CHARS) { case READ_CHUNK_SIZE:
// No content is expected. if (!chunkedSupported) {
out.add(message); throw new IllegalArgumentException("Chunked messages not supported");
out.add(LastHttpContent.EMPTY_LAST_CONTENT); }
resetNow(); // Chunked encoding - generate HttpMessage first. HttpChunks will follow.
return; out.add(message);
} return;
long contentLength = contentLength(); default:
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) { long contentLength = contentLength();
out.add(message); if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
out.add(LastHttpContent.EMPTY_LAST_CONTENT); out.add(message);
resetNow(); out.add(LastHttpContent.EMPTY_LAST_CONTENT);
return; resetNow();
} return;
}
assert nextState == State.READ_FIXED_LENGTH_CONTENT || nextState == State.READ_VARIABLE_LENGTH_CONTENT; assert nextState == State.READ_FIXED_LENGTH_CONTENT ||
nextState == State.READ_VARIABLE_LENGTH_CONTENT;
out.add(message); out.add(message);
if (nextState == State.READ_FIXED_LENGTH_CONTENT) { if (nextState == State.READ_FIXED_LENGTH_CONTENT) {
// chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by chunk. // chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by chunk.
chunkSize = contentLength; chunkSize = contentLength;
}
// We return here, this forces decode to be called again where we will decode the content
return;
} }
// We return here, this forces decode to be called again where we will decode the content
return;
} catch (Exception e) { } catch (Exception e) {
out.add(invalidMessage(e)); out.add(invalidMessage(e));
return; return;
@ -260,18 +264,8 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
// Keep reading data as a chunk until the end of connection is reached. // Keep reading data as a chunk until the end of connection is reached.
int toRead = Math.min(actualReadableBytes(), maxChunkSize); int toRead = Math.min(actualReadableBytes(), maxChunkSize);
if (toRead > 0) { if (toRead > 0) {
ByteBuf content = readBytes(ctx.alloc(), buffer, toRead); ByteBuf content = buffer.readSlice(toRead).retain();
if (buffer.isReadable()) { out.add(new DefaultHttpContent(content));
out.add(new DefaultHttpContent(content));
} else {
// End of connection.
out.add(new DefaultLastHttpContent(content, validateHeaders));
resetNow();
}
} else if (!buffer.isReadable()) {
// End of connection.
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
resetNow();
} }
return; return;
} }
@ -292,7 +286,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
if (toRead > chunkSize) { if (toRead > chunkSize) {
toRead = (int) chunkSize; toRead = (int) chunkSize;
} }
ByteBuf content = readBytes(ctx.alloc(), buffer, toRead); ByteBuf content = buffer.readSlice(toRead).retain();
chunkSize -= toRead; chunkSize -= toRead;
if (chunkSize == 0) { if (chunkSize == 0) {
@ -315,9 +309,9 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
if (chunkSize == 0) { if (chunkSize == 0) {
checkpoint(State.READ_CHUNK_FOOTER); checkpoint(State.READ_CHUNK_FOOTER);
return; return;
} else {
checkpoint(State.READ_CHUNKED_CONTENT);
} }
checkpoint(State.READ_CHUNKED_CONTENT);
// fall-through
} catch (Exception e) { } catch (Exception e) {
out.add(invalidChunk(e)); out.add(invalidChunk(e));
return; return;
@ -325,18 +319,20 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
case READ_CHUNKED_CONTENT: { case READ_CHUNKED_CONTENT: {
assert chunkSize <= Integer.MAX_VALUE; assert chunkSize <= Integer.MAX_VALUE;
int toRead = Math.min((int) chunkSize, maxChunkSize); int toRead = Math.min((int) chunkSize, maxChunkSize);
toRead = Math.min(toRead, actualReadableBytes());
HttpContent chunk = new DefaultHttpContent(readBytes(ctx.alloc(), buffer, toRead)); if (toRead == 0) {
return;
}
HttpContent chunk = new DefaultHttpContent(buffer.readSlice(toRead).retain());
chunkSize -= toRead; chunkSize -= toRead;
out.add(chunk); out.add(chunk);
if (chunkSize == 0) { if (chunkSize != 0) {
// Read all content.
checkpoint(State.READ_CHUNK_DELIMITER);
} else {
return; return;
} }
checkpoint(State.READ_CHUNK_DELIMITER);
// fall-through
} }
case READ_CHUNK_DELIMITER: { case READ_CHUNK_DELIMITER: {
for (;;) { for (;;) {
@ -388,10 +384,16 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
// Handle the last unfinished message. // Handle the last unfinished message.
if (message != null) { if (message != null) {
boolean chunked = HttpHeaders.isTransferEncodingChunked(message);
if (state() == State.READ_VARIABLE_LENGTH_CONTENT && !in.isReadable() && !chunked) {
// End of connection.
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
reset();
return;
}
// Check if the closure of the connection signifies the end of the content. // Check if the closure of the connection signifies the end of the content.
boolean prematureClosure; boolean prematureClosure;
if (isDecodingRequest()) { if (isDecodingRequest() || chunked) {
// The last request did not wait for a response. // The last request did not wait for a response.
prematureClosure = true; prematureClosure = true;
} else { } else {
@ -442,7 +444,11 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
private void resetNow() { private void resetNow() {
HttpMessage message = this.message; HttpMessage message = this.message;
this.message = null; this.message = null;
name = null;
value = null;
contentLength = Long.MIN_VALUE; contentLength = Long.MIN_VALUE;
lineParser.reset();
headerParser.reset();
if (!isDecodingRequest()) { if (!isDecodingRequest()) {
HttpResponse res = (HttpResponse) message; HttpResponse res = (HttpResponse) message;
if (res != null && res.status().code() == 101) { if (res != null && res.status().code() == 101) {
@ -488,37 +494,34 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
} }
private State readHeaders(ByteBuf buffer) { private State readHeaders(ByteBuf buffer) {
headerSize = 0;
final HttpMessage message = this.message; final HttpMessage message = this.message;
final HttpHeaders headers = message.headers(); final HttpHeaders headers = message.headers();
AppendableCharSequence line = headerParser.parse(buffer); AppendableCharSequence line = headerParser.parse(buffer);
String name = null;
String value = null;
if (line.length() > 0) { if (line.length() > 0) {
headers.clear();
do { do {
char firstChar = line.charAt(0); char firstChar = line.charAt(0);
if (name != null && (firstChar == ' ' || firstChar == '\t')) { if (name != null && (firstChar == ' ' || firstChar == '\t')) {
value = value + ' ' + line.toString().trim(); value = value.toString() + ' ' + line.toString().trim();
} else { } else {
if (name != null) { if (name != null) {
headers.add(name, value); headers.add(name, value);
} }
String[] header = splitHeader(line); splitHeader(line);
name = header[0];
value = header[1];
} }
line = headerParser.parse(buffer); line = headerParser.parse(buffer);
} while (line.length() > 0); } while (line.length() > 0);
// Add the last header.
if (name != null) {
headers.add(name, value);
}
} }
// Add the last header.
if (name != null) {
headers.add(name, value);
}
// reset name and value fields
name = null;
value = null;
State nextState; State nextState;
if (isContentAlwaysEmpty(message)) { if (isContentAlwaysEmpty(message)) {
@ -542,9 +545,8 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
} }
private LastHttpContent readTrailingHeaders(ByteBuf buffer) { private LastHttpContent readTrailingHeaders(ByteBuf buffer) {
headerSize = 0;
AppendableCharSequence line = headerParser.parse(buffer); AppendableCharSequence line = headerParser.parse(buffer);
String lastHeader = null; CharSequence lastHeader = null;
if (line.length() > 0) { if (line.length() > 0) {
LastHttpContent trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders); LastHttpContent trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders);
do { do {
@ -559,14 +561,17 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
// Content-Length, Transfer-Encoding, or Trailer // Content-Length, Transfer-Encoding, or Trailer
} }
} else { } else {
String[] header = splitHeader(line); splitHeader(line);
String name = header[0]; CharSequence headerName = name;
if (!AsciiString.equalsIgnoreCase(name, HttpHeaders.Names.CONTENT_LENGTH) && if (!AsciiString.equalsIgnoreCase(headerName, HttpHeaders.Names.CONTENT_LENGTH) &&
!AsciiString.equalsIgnoreCase(name, HttpHeaders.Names.TRANSFER_ENCODING) && !AsciiString.equalsIgnoreCase(headerName, HttpHeaders.Names.TRANSFER_ENCODING) &&
!AsciiString.equalsIgnoreCase(name, HttpHeaders.Names.TRAILER)) { !AsciiString.equalsIgnoreCase(headerName, HttpHeaders.Names.TRAILER)) {
trailer.trailingHeaders().add(name, header[1]); trailer.trailingHeaders().add(headerName, value);
} }
lastHeader = name; lastHeader = name;
// reset name and value fields
name = null;
value = null;
} }
line = headerParser.parse(buffer); line = headerParser.parse(buffer);
@ -618,7 +623,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
cStart < cEnd? sb.substring(cStart, cEnd) : "" }; cStart < cEnd? sb.substring(cStart, cEnd) : "" };
} }
private static String[] splitHeader(AppendableCharSequence sb) { private void splitHeader(AppendableCharSequence sb) {
final int length = sb.length(); final int length = sb.length();
int nameStart; int nameStart;
int nameEnd; int nameEnd;
@ -641,19 +646,14 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
} }
} }
name = sb.substring(nameStart, nameEnd);
valueStart = findNonWhitespace(sb, colonEnd); valueStart = findNonWhitespace(sb, colonEnd);
if (valueStart == length) { if (valueStart == length) {
return new String[] { value = EMPTY_VALUE;
sb.substring(nameStart, nameEnd), } else {
"" valueEnd = findEndOfString(sb);
}; value = sb.substring(valueStart, valueEnd);
} }
valueEnd = findEndOfString(sb);
return new String[] {
sb.substring(nameStart, nameEnd),
sb.substring(valueStart, valueEnd)
};
} }
private static int findNonWhitespace(CharSequence sb, int offset) { private static int findNonWhitespace(CharSequence sb, int offset) {
@ -686,85 +686,75 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
return result; return result;
} }
private final class HeaderParser implements ByteBufProcessor { private class HeaderParser implements ByteBufProcessor {
private final AppendableCharSequence seq; private final AppendableCharSequence seq;
private final int maxLength;
private int size;
HeaderParser(AppendableCharSequence seq) { HeaderParser(AppendableCharSequence seq, int maxLength) {
this.seq = seq; this.seq = seq;
this.maxLength = maxLength;
} }
public AppendableCharSequence parse(ByteBuf buffer) { public AppendableCharSequence parse(ByteBuf buffer) {
seq.reset(); seq.reset();
headerSize = 0;
int i = buffer.forEachByte(this); int i = buffer.forEachByte(this);
buffer.readerIndex(i + 1); buffer.readerIndex(i + 1);
// Call checkpoint to make sure the readerIndex is updated correctly
checkpoint();
return seq; return seq;
} }
public void reset() {
size = 0;
}
@Override @Override
public boolean process(byte value) throws Exception { public boolean process(byte value) throws Exception {
char nextByte = (char) value; char nextByte = (char) value;
headerSize++;
if (nextByte == HttpConstants.CR) { if (nextByte == HttpConstants.CR) {
return true; return true;
} }
if (nextByte == HttpConstants.LF) { if (nextByte == HttpConstants.LF) {
return false; return false;
} }
if (size >= maxLength) {
// Abort decoding if the header part is too large.
if (headerSize >= maxHeaderSize) {
// TODO: Respond with Bad Request and discard the traffic // TODO: Respond with Bad Request and discard the traffic
// or close the connection. // or close the connection.
// No need to notify the upstream handlers - just log. // No need to notify the upstream handlers - just log.
// If decoding a response, just throw an exception. // If decoding a response, just throw an exception.
throw new TooLongFrameException( throw newException(maxLength);
"HTTP header is larger than " +
maxHeaderSize + " bytes.");
} }
size ++;
seq.append(nextByte); seq.append(nextByte);
return true; return true;
} }
protected TooLongFrameException newException(int maxLength) {
return new TooLongFrameException(
"HTTP header is larger than " + maxLength +
" bytes.");
}
} }
private final class LineParser implements ByteBufProcessor { private final class LineParser extends HeaderParser {
private final AppendableCharSequence seq;
private int size;
LineParser(AppendableCharSequence seq) { LineParser(AppendableCharSequence seq, int maxLength) {
this.seq = seq; super(seq, maxLength);
}
public AppendableCharSequence parse(ByteBuf buffer) {
seq.reset();
size = 0;
int i = buffer.forEachByte(this);
buffer.readerIndex(i + 1);
return seq;
} }
@Override @Override
public boolean process(byte value) throws Exception { public AppendableCharSequence parse(ByteBuf buffer) {
char nextByte = (char) value; reset();
if (nextByte == HttpConstants.CR) { return super.parse(buffer);
return true; }
} else if (nextByte == HttpConstants.LF) {
return false; @Override
} else { protected TooLongFrameException newException(int maxLength) {
if (size >= maxInitialLineLength) { return new TooLongFrameException(
// TODO: Respond with Bad Request and discard the traffic "An HTTP line is larger than " + maxLength +
// or close the connection. " bytes.");
// No need to notify the upstream handlers - just log.
// If decoding a response, just throw an exception.
throw new TooLongFrameException(
"An HTTP line is larger than " + maxInitialLineLength +
" bytes.");
}
size ++;
seq.append(nextByte);
return true;
}
} }
} }
} }