Make use of ByteBufProcessor for extract initial line and headers
This gives some nice performance boost as readByte() is quite expensive because of the index / replay checks.
This commit is contained in:
parent
8930709940
commit
faf8becf2e
@ -16,6 +16,7 @@
|
|||||||
package io.netty.handler.codec.http;
|
package io.netty.handler.codec.http;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.ByteBufProcessor;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.ChannelPipeline;
|
||||||
@ -106,12 +107,14 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
private final int maxChunkSize;
|
private final int maxChunkSize;
|
||||||
private final boolean chunkedSupported;
|
private final boolean chunkedSupported;
|
||||||
protected final boolean validateHeaders;
|
protected final boolean validateHeaders;
|
||||||
|
private final AppendableCharSequence seq = new AppendableCharSequence(128);
|
||||||
|
private final HeaderParser headerParser = new HeaderParser(seq);
|
||||||
|
private final LineParser lineParser = new LineParser(seq);
|
||||||
|
|
||||||
private HttpMessage message;
|
private HttpMessage message;
|
||||||
private long chunkSize;
|
private long chunkSize;
|
||||||
private int headerSize;
|
private int headerSize;
|
||||||
private long contentLength = Long.MIN_VALUE;
|
private long contentLength = Long.MIN_VALUE;
|
||||||
private final AppendableCharSequence sb = new AppendableCharSequence(128);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The internal state of {@link HttpObjectDecoder}.
|
* The internal state of {@link HttpObjectDecoder}.
|
||||||
@ -191,7 +194,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
case READ_INITIAL: try {
|
case READ_INITIAL: try {
|
||||||
String[] initialLine = splitInitialLine(readLine(buffer, maxInitialLineLength));
|
String[] initialLine = splitInitialLine(lineParser.parse(buffer));
|
||||||
if (initialLine.length < 3) {
|
if (initialLine.length < 3) {
|
||||||
// Invalid initial line - ignore.
|
// Invalid initial line - ignore.
|
||||||
checkpoint(State.SKIP_CONTROL_CHARS);
|
checkpoint(State.SKIP_CONTROL_CHARS);
|
||||||
@ -299,7 +302,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
* read chunk, read and ignore the CRLF and repeat until 0
|
* read chunk, read and ignore the CRLF and repeat until 0
|
||||||
*/
|
*/
|
||||||
case READ_CHUNK_SIZE: try {
|
case READ_CHUNK_SIZE: try {
|
||||||
AppendableCharSequence line = readLine(buffer, maxInitialLineLength);
|
AppendableCharSequence line = lineParser.parse(buffer);
|
||||||
int chunkSize = getChunkSize(line.toString());
|
int chunkSize = getChunkSize(line.toString());
|
||||||
this.chunkSize = chunkSize;
|
this.chunkSize = chunkSize;
|
||||||
if (chunkSize == 0) {
|
if (chunkSize == 0) {
|
||||||
@ -467,7 +470,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
final HttpMessage message = this.message;
|
final HttpMessage message = this.message;
|
||||||
final HttpHeaders headers = message.headers();
|
final HttpHeaders headers = message.headers();
|
||||||
|
|
||||||
AppendableCharSequence line = readHeader(buffer);
|
AppendableCharSequence line = headerParser.parse(buffer);
|
||||||
String name = null;
|
String name = null;
|
||||||
String value = null;
|
String value = null;
|
||||||
if (line.length() > 0) {
|
if (line.length() > 0) {
|
||||||
@ -485,7 +488,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
value = header[1];
|
value = header[1];
|
||||||
}
|
}
|
||||||
|
|
||||||
line = readHeader(buffer);
|
line = headerParser.parse(buffer);
|
||||||
} while (line.length() > 0);
|
} while (line.length() > 0);
|
||||||
|
|
||||||
// Add the last header.
|
// Add the last header.
|
||||||
@ -518,7 +521,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
|
|
||||||
private LastHttpContent readTrailingHeaders(ByteBuf buffer) {
|
private LastHttpContent readTrailingHeaders(ByteBuf buffer) {
|
||||||
headerSize = 0;
|
headerSize = 0;
|
||||||
AppendableCharSequence line = readHeader(buffer);
|
AppendableCharSequence line = headerParser.parse(buffer);
|
||||||
String lastHeader = null;
|
String lastHeader = null;
|
||||||
if (line.length() > 0) {
|
if (line.length() > 0) {
|
||||||
LastHttpContent trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders);
|
LastHttpContent trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders);
|
||||||
@ -544,7 +547,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
lastHeader = name;
|
lastHeader = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
line = readHeader(buffer);
|
line = headerParser.parse(buffer);
|
||||||
} while (line.length() > 0);
|
} while (line.length() > 0);
|
||||||
|
|
||||||
return trailer;
|
return trailer;
|
||||||
@ -553,46 +556,6 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
return LastHttpContent.EMPTY_LAST_CONTENT;
|
return LastHttpContent.EMPTY_LAST_CONTENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
private AppendableCharSequence readHeader(ByteBuf buffer) {
|
|
||||||
AppendableCharSequence sb = this.sb;
|
|
||||||
sb.reset();
|
|
||||||
int headerSize = this.headerSize;
|
|
||||||
|
|
||||||
loop:
|
|
||||||
for (;;) {
|
|
||||||
char nextByte = (char) buffer.readByte();
|
|
||||||
headerSize ++;
|
|
||||||
|
|
||||||
switch (nextByte) {
|
|
||||||
case HttpConstants.CR:
|
|
||||||
nextByte = (char) buffer.readByte();
|
|
||||||
headerSize ++;
|
|
||||||
if (nextByte == HttpConstants.LF) {
|
|
||||||
break loop;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case HttpConstants.LF:
|
|
||||||
break loop;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Abort decoding if the header part is too large.
|
|
||||||
if (headerSize >= maxHeaderSize) {
|
|
||||||
// TODO: Respond with Bad Request and discard the traffic
|
|
||||||
// or close the connection.
|
|
||||||
// No need to notify the upstream handlers - just log.
|
|
||||||
// If decoding a response, just throw an exception.
|
|
||||||
throw new TooLongFrameException(
|
|
||||||
"HTTP header is larger than " +
|
|
||||||
maxHeaderSize + " bytes.");
|
|
||||||
}
|
|
||||||
|
|
||||||
sb.append(nextByte);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.headerSize = headerSize;
|
|
||||||
return sb;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract boolean isDecodingRequest();
|
protected abstract boolean isDecodingRequest();
|
||||||
protected abstract HttpMessage createMessage(String[] initialLine) throws Exception;
|
protected abstract HttpMessage createMessage(String[] initialLine) throws Exception;
|
||||||
protected abstract HttpMessage createInvalidMessage();
|
protected abstract HttpMessage createInvalidMessage();
|
||||||
@ -610,35 +573,6 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
return Integer.parseInt(hex, 16);
|
return Integer.parseInt(hex, 16);
|
||||||
}
|
}
|
||||||
|
|
||||||
private AppendableCharSequence readLine(ByteBuf buffer, int maxLineLength) {
|
|
||||||
AppendableCharSequence sb = this.sb;
|
|
||||||
sb.reset();
|
|
||||||
int lineLength = 0;
|
|
||||||
while (true) {
|
|
||||||
byte nextByte = buffer.readByte();
|
|
||||||
if (nextByte == HttpConstants.CR) {
|
|
||||||
nextByte = buffer.readByte();
|
|
||||||
if (nextByte == HttpConstants.LF) {
|
|
||||||
return sb;
|
|
||||||
}
|
|
||||||
} else if (nextByte == HttpConstants.LF) {
|
|
||||||
return sb;
|
|
||||||
} else {
|
|
||||||
if (lineLength >= maxLineLength) {
|
|
||||||
// TODO: Respond with Bad Request and discard the traffic
|
|
||||||
// or close the connection.
|
|
||||||
// No need to notify the upstream handlers - just log.
|
|
||||||
// If decoding a response, just throw an exception.
|
|
||||||
throw new TooLongFrameException(
|
|
||||||
"An HTTP line is larger than " + maxLineLength +
|
|
||||||
" bytes.");
|
|
||||||
}
|
|
||||||
lineLength ++;
|
|
||||||
sb.append((char) nextByte);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String[] splitInitialLine(AppendableCharSequence sb) {
|
private static String[] splitInitialLine(AppendableCharSequence sb) {
|
||||||
int aStart;
|
int aStart;
|
||||||
int aEnd;
|
int aEnd;
|
||||||
@ -729,4 +663,86 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final class HeaderParser implements ByteBufProcessor {
|
||||||
|
private final AppendableCharSequence seq;
|
||||||
|
|
||||||
|
HeaderParser(AppendableCharSequence seq) {
|
||||||
|
this.seq = seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AppendableCharSequence parse(ByteBuf buffer) {
|
||||||
|
seq.reset();
|
||||||
|
headerSize = 0;
|
||||||
|
int i = buffer.forEachByte(this);
|
||||||
|
buffer.readerIndex(i + 1);
|
||||||
|
return seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean process(byte value) throws Exception {
|
||||||
|
char nextByte = (char) value;
|
||||||
|
headerSize++;
|
||||||
|
if (nextByte == HttpConstants.CR) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (nextByte == HttpConstants.LF) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Abort decoding if the header part is too large.
|
||||||
|
if (headerSize >= maxHeaderSize) {
|
||||||
|
// TODO: Respond with Bad Request and discard the traffic
|
||||||
|
// or close the connection.
|
||||||
|
// No need to notify the upstream handlers - just log.
|
||||||
|
// If decoding a response, just throw an exception.
|
||||||
|
throw new TooLongFrameException(
|
||||||
|
"HTTP header is larger than " +
|
||||||
|
maxHeaderSize + " bytes.");
|
||||||
|
}
|
||||||
|
|
||||||
|
seq.append(nextByte);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final class LineParser implements ByteBufProcessor {
|
||||||
|
private final AppendableCharSequence seq;
|
||||||
|
private int size;
|
||||||
|
|
||||||
|
LineParser(AppendableCharSequence seq) {
|
||||||
|
this.seq = seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AppendableCharSequence parse(ByteBuf buffer) {
|
||||||
|
seq.reset();
|
||||||
|
size = 0;
|
||||||
|
int i = buffer.forEachByte(this);
|
||||||
|
buffer.readerIndex(i + 1);
|
||||||
|
return seq;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean process(byte value) throws Exception {
|
||||||
|
char nextByte = (char) value;
|
||||||
|
if (nextByte == HttpConstants.CR) {
|
||||||
|
return true;
|
||||||
|
} else if (nextByte == HttpConstants.LF) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
if (size >= maxInitialLineLength) {
|
||||||
|
// TODO: Respond with Bad Request and discard the traffic
|
||||||
|
// or close the connection.
|
||||||
|
// No need to notify the upstream handlers - just log.
|
||||||
|
// If decoding a response, just throw an exception.
|
||||||
|
throw new TooLongFrameException(
|
||||||
|
"An HTTP line is larger than " + maxInitialLineLength +
|
||||||
|
" bytes.");
|
||||||
|
}
|
||||||
|
size ++;
|
||||||
|
seq.append(nextByte);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -189,7 +189,6 @@ public class HttpResponseDecoderTest {
|
|||||||
|
|
||||||
// Close the connection without sending anything.
|
// Close the connection without sending anything.
|
||||||
ch.finish();
|
ch.finish();
|
||||||
|
|
||||||
// The decoder should not generate the last chunk because it's closed prematurely.
|
// The decoder should not generate the last chunk because it's closed prematurely.
|
||||||
assertThat(ch.readInbound(), is(nullValue()));
|
assertThat(ch.readInbound(), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
@ -345,7 +345,7 @@ final class ReplayingDecoderBuffer extends ByteBuf {
|
|||||||
@Override
|
@Override
|
||||||
public int forEachByte(ByteBufProcessor processor) {
|
public int forEachByte(ByteBufProcessor processor) {
|
||||||
int ret = buffer.forEachByte(processor);
|
int ret = buffer.forEachByte(processor);
|
||||||
if (!terminated && ret < 0) {
|
if (ret < 0) {
|
||||||
throw REPLAY;
|
throw REPLAY;
|
||||||
} else {
|
} else {
|
||||||
return ret;
|
return ret;
|
||||||
|
Loading…
Reference in New Issue
Block a user