Rewrite HttpObjectDecoder to make use of proper state machine
Motivation: HttpObjectDecoder extended ReplayDecoder which is slightly slower then ByteToMessageDecoder. Modifications: - Changed super class of HttpObjectDecoder from ReplayDecoder to ByteToMessageDecoder. - Rewrote decode() method of HttpObjectDecoder to use proper state machine. - Changed private methods HeaderParser.parse(ByteBuf), readHeaders(ByteBuf) and readTrailingHeaders(ByteBuf), skipControlCharacters(ByteBuf) to consider available bytes. - Set HeaderParser and LineParser as static inner classes. - Replaced not safe actualReadableBytes() with buffer.readableBytes(). Result: Improved performance of HttpObjectDecoder by approximately 177%.
This commit is contained in:
parent
59f222a821
commit
cc97be6002
@ -20,10 +20,9 @@ import io.netty.buffer.ByteBufProcessor;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import io.netty.handler.codec.DecoderResult;
|
||||
import io.netty.handler.codec.ReplayingDecoder;
|
||||
import io.netty.handler.codec.TooLongFrameException;
|
||||
import io.netty.handler.codec.http.HttpObjectDecoder.State;
|
||||
import io.netty.util.internal.AppendableCharSequence;
|
||||
|
||||
import java.util.List;
|
||||
@ -99,7 +98,7 @@ import java.util.List;
|
||||
* To implement the decoder of such a derived protocol, extend this class and
|
||||
* implement all abstract methods properly.
|
||||
*/
|
||||
public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
public abstract class HttpObjectDecoder extends ByteToMessageDecoder {
|
||||
private static final String EMPTY_VALUE = "";
|
||||
|
||||
private final int maxChunkSize;
|
||||
@ -111,16 +110,19 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
private HttpMessage message;
|
||||
private long chunkSize;
|
||||
private long contentLength = Long.MIN_VALUE;
|
||||
private volatile boolean resetRequested;
|
||||
|
||||
// These will be updated by splitHeader(...)
|
||||
private CharSequence name;
|
||||
private CharSequence value;
|
||||
|
||||
private LastHttpContent trailer;
|
||||
|
||||
/**
|
||||
* The internal state of {@link HttpObjectDecoder}.
|
||||
* <em>Internal use only</em>.
|
||||
*/
|
||||
enum State {
|
||||
private enum State {
|
||||
SKIP_CONTROL_CHARS,
|
||||
READ_INITIAL,
|
||||
READ_HEADER,
|
||||
@ -134,6 +136,8 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
UPGRADED
|
||||
}
|
||||
|
||||
private State currentState = State.SKIP_CONTROL_CHARS;
|
||||
|
||||
/**
|
||||
* Creates a new instance with the default
|
||||
* {@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and
|
||||
@ -158,22 +162,20 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize,
|
||||
boolean chunkedSupported, boolean validateHeaders) {
|
||||
|
||||
super(State.SKIP_CONTROL_CHARS);
|
||||
|
||||
if (maxInitialLineLength <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"maxInitialLineLength must be a positive integer: " +
|
||||
maxInitialLineLength);
|
||||
maxInitialLineLength);
|
||||
}
|
||||
if (maxHeaderSize <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"maxHeaderSize must be a positive integer: " +
|
||||
maxHeaderSize);
|
||||
maxHeaderSize);
|
||||
}
|
||||
if (maxChunkSize <= 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"maxChunkSize must be a positive integer: " +
|
||||
maxChunkSize);
|
||||
maxChunkSize);
|
||||
}
|
||||
this.maxChunkSize = maxChunkSize;
|
||||
this.chunkedSupported = chunkedSupported;
|
||||
@ -185,190 +187,202 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
|
||||
switch (state()) {
|
||||
case SKIP_CONTROL_CHARS: {
|
||||
try {
|
||||
skipControlCharacters(buffer);
|
||||
checkpoint(State.READ_INITIAL);
|
||||
} finally {
|
||||
checkpoint();
|
||||
}
|
||||
// fall-through
|
||||
if (resetRequested) {
|
||||
resetNow();
|
||||
}
|
||||
case READ_INITIAL: try {
|
||||
String[] initialLine = splitInitialLine(lineParser.parse(buffer));
|
||||
if (initialLine.length < 3) {
|
||||
// Invalid initial line - ignore.
|
||||
checkpoint(State.SKIP_CONTROL_CHARS);
|
||||
return;
|
||||
}
|
||||
|
||||
message = createMessage(initialLine);
|
||||
checkpoint(State.READ_HEADER);
|
||||
// fall-through
|
||||
} catch (Exception e) {
|
||||
out.add(invalidMessage(e));
|
||||
return;
|
||||
}
|
||||
case READ_HEADER: try {
|
||||
State nextState = readHeaders(buffer);
|
||||
checkpoint(nextState);
|
||||
switch (nextState) {
|
||||
case SKIP_CONTROL_CHARS:
|
||||
// fast-path
|
||||
// No content is expected.
|
||||
out.add(message);
|
||||
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
reset();
|
||||
return;
|
||||
case READ_CHUNK_SIZE:
|
||||
if (!chunkedSupported) {
|
||||
throw new IllegalArgumentException("Chunked messages not supported");
|
||||
switch (currentState) {
|
||||
case SKIP_CONTROL_CHARS: {
|
||||
if (!skipControlCharacters(buffer)) {
|
||||
return;
|
||||
}
|
||||
// Chunked encoding - generate HttpMessage first. HttpChunks will follow.
|
||||
out.add(message);
|
||||
return;
|
||||
default:
|
||||
long contentLength = contentLength();
|
||||
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
|
||||
out.add(message);
|
||||
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
reset();
|
||||
currentState = State.READ_INITIAL;
|
||||
}
|
||||
case READ_INITIAL: try {
|
||||
AppendableCharSequence line = lineParser.parse(buffer);
|
||||
if (line == null) {
|
||||
return;
|
||||
}
|
||||
String[] initialLine = splitInitialLine(line);
|
||||
if (initialLine.length < 3) {
|
||||
// Invalid initial line - ignore.
|
||||
currentState = State.SKIP_CONTROL_CHARS;
|
||||
return;
|
||||
}
|
||||
|
||||
assert nextState == State.READ_FIXED_LENGTH_CONTENT ||
|
||||
nextState == State.READ_VARIABLE_LENGTH_CONTENT;
|
||||
|
||||
out.add(message);
|
||||
|
||||
if (nextState == State.READ_FIXED_LENGTH_CONTENT) {
|
||||
// chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by chunk.
|
||||
chunkSize = contentLength;
|
||||
message = createMessage(initialLine);
|
||||
currentState = State.READ_HEADER;
|
||||
// fall-through
|
||||
} catch (Exception e) {
|
||||
out.add(invalidMessage(e));
|
||||
return;
|
||||
}
|
||||
case READ_HEADER: try {
|
||||
State nextState = readHeaders(buffer);
|
||||
if (nextState == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// We return here, this forces decode to be called again where we will decode the content
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
out.add(invalidMessage(e));
|
||||
return;
|
||||
}
|
||||
case READ_VARIABLE_LENGTH_CONTENT: {
|
||||
// Keep reading data as a chunk until the end of connection is reached.
|
||||
int toRead = Math.min(actualReadableBytes(), maxChunkSize);
|
||||
if (toRead > 0) {
|
||||
ByteBuf content = buffer.readSlice(toRead).retain();
|
||||
out.add(new DefaultHttpContent(content));
|
||||
}
|
||||
return;
|
||||
}
|
||||
case READ_FIXED_LENGTH_CONTENT: {
|
||||
int readLimit = actualReadableBytes();
|
||||
|
||||
// Check if the buffer is readable first as we use the readable byte count
|
||||
// to create the HttpChunk. This is needed as otherwise we may end up with
|
||||
// create a HttpChunk instance that contains an empty buffer and so is
|
||||
// handled like it is the last HttpChunk.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/433
|
||||
if (readLimit == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int toRead = Math.min(readLimit, maxChunkSize);
|
||||
if (toRead > chunkSize) {
|
||||
toRead = (int) chunkSize;
|
||||
}
|
||||
ByteBuf content = buffer.readSlice(toRead).retain();
|
||||
chunkSize -= toRead;
|
||||
|
||||
if (chunkSize == 0) {
|
||||
// Read all content.
|
||||
out.add(new DefaultLastHttpContent(content, validateHeaders));
|
||||
reset();
|
||||
} else {
|
||||
out.add(new DefaultHttpContent(content));
|
||||
}
|
||||
return;
|
||||
}
|
||||
/**
|
||||
* everything else after this point takes care of reading chunked content. basically, read chunk size,
|
||||
* read chunk, read and ignore the CRLF and repeat until 0
|
||||
*/
|
||||
case READ_CHUNK_SIZE: try {
|
||||
AppendableCharSequence line = lineParser.parse(buffer);
|
||||
int chunkSize = getChunkSize(line.toString());
|
||||
this.chunkSize = chunkSize;
|
||||
if (chunkSize == 0) {
|
||||
checkpoint(State.READ_CHUNK_FOOTER);
|
||||
return;
|
||||
}
|
||||
checkpoint(State.READ_CHUNKED_CONTENT);
|
||||
// fall-through
|
||||
} catch (Exception e) {
|
||||
out.add(invalidChunk(e));
|
||||
return;
|
||||
}
|
||||
case READ_CHUNKED_CONTENT: {
|
||||
assert chunkSize <= Integer.MAX_VALUE;
|
||||
int toRead = Math.min((int) chunkSize, maxChunkSize);
|
||||
toRead = Math.min(toRead, actualReadableBytes());
|
||||
if (toRead == 0) {
|
||||
return;
|
||||
}
|
||||
HttpContent chunk = new DefaultHttpContent(buffer.readSlice(toRead).retain());
|
||||
chunkSize -= toRead;
|
||||
|
||||
out.add(chunk);
|
||||
|
||||
if (chunkSize != 0) {
|
||||
return;
|
||||
}
|
||||
checkpoint(State.READ_CHUNK_DELIMITER);
|
||||
// fall-through
|
||||
}
|
||||
case READ_CHUNK_DELIMITER: {
|
||||
for (;;) {
|
||||
byte next = buffer.readByte();
|
||||
if (next == HttpConstants.CR) {
|
||||
if (buffer.readByte() == HttpConstants.LF) {
|
||||
checkpoint(State.READ_CHUNK_SIZE);
|
||||
currentState = nextState;
|
||||
switch (nextState) {
|
||||
case SKIP_CONTROL_CHARS:
|
||||
// fast-path
|
||||
// No content is expected.
|
||||
out.add(message);
|
||||
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
resetNow();
|
||||
return;
|
||||
case READ_CHUNK_SIZE:
|
||||
if (!chunkedSupported) {
|
||||
throw new IllegalArgumentException("Chunked messages not supported");
|
||||
}
|
||||
// Chunked encoding - generate HttpMessage first. HttpChunks will follow.
|
||||
out.add(message);
|
||||
return;
|
||||
default:
|
||||
long contentLength = contentLength();
|
||||
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
|
||||
out.add(message);
|
||||
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
resetNow();
|
||||
return;
|
||||
}
|
||||
|
||||
assert nextState == State.READ_FIXED_LENGTH_CONTENT ||
|
||||
nextState == State.READ_VARIABLE_LENGTH_CONTENT;
|
||||
|
||||
out.add(message);
|
||||
|
||||
if (nextState == State.READ_FIXED_LENGTH_CONTENT) {
|
||||
// chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT state reads data chunk by
|
||||
// chunk.
|
||||
chunkSize = contentLength;
|
||||
}
|
||||
|
||||
// We return here, this forces decode to be called again where we will decode the content
|
||||
return;
|
||||
}
|
||||
} else if (next == HttpConstants.LF) {
|
||||
checkpoint(State.READ_CHUNK_SIZE);
|
||||
return;
|
||||
} else {
|
||||
checkpoint();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
out.add(invalidMessage(e));
|
||||
return;
|
||||
}
|
||||
}
|
||||
case READ_CHUNK_FOOTER: try {
|
||||
LastHttpContent trailer = readTrailingHeaders(buffer);
|
||||
out.add(trailer);
|
||||
reset();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
out.add(invalidChunk(e));
|
||||
return;
|
||||
}
|
||||
case BAD_MESSAGE: {
|
||||
// Keep discarding until disconnection.
|
||||
buffer.skipBytes(actualReadableBytes());
|
||||
break;
|
||||
}
|
||||
case UPGRADED: {
|
||||
int readableBytes = actualReadableBytes();
|
||||
if (readableBytes > 0) {
|
||||
// Keep on consuming as otherwise we may trigger an DecoderException,
|
||||
// other handler will replace this codec with the upgraded protocol codec to
|
||||
// take the traffic over at some point then.
|
||||
// See https://github.com/netty/netty/issues/2173
|
||||
out.add(buffer.readBytes(actualReadableBytes()));
|
||||
case READ_VARIABLE_LENGTH_CONTENT: {
|
||||
// Keep reading data as a chunk until the end of connection is reached.
|
||||
int toRead = Math.min(buffer.readableBytes(), maxChunkSize);
|
||||
if (toRead > 0) {
|
||||
ByteBuf content = buffer.readSlice(toRead).retain();
|
||||
out.add(new DefaultHttpContent(content));
|
||||
}
|
||||
return;
|
||||
}
|
||||
case READ_FIXED_LENGTH_CONTENT: {
|
||||
int readLimit = buffer.readableBytes();
|
||||
|
||||
// Check if the buffer is readable first as we use the readable byte count
|
||||
// to create the HttpChunk. This is needed as otherwise we may end up with
|
||||
// create a HttpChunk instance that contains an empty buffer and so is
|
||||
// handled like it is the last HttpChunk.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/433
|
||||
if (readLimit == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int toRead = Math.min(readLimit, maxChunkSize);
|
||||
if (toRead > chunkSize) {
|
||||
toRead = (int) chunkSize;
|
||||
}
|
||||
ByteBuf content = buffer.readSlice(toRead).retain();
|
||||
chunkSize -= toRead;
|
||||
|
||||
if (chunkSize == 0) {
|
||||
// Read all content.
|
||||
out.add(new DefaultLastHttpContent(content, validateHeaders));
|
||||
resetNow();
|
||||
} else {
|
||||
out.add(new DefaultHttpContent(content));
|
||||
}
|
||||
return;
|
||||
}
|
||||
/**
|
||||
* everything else after this point takes care of reading chunked content. basically, read chunk size,
|
||||
* read chunk, read and ignore the CRLF and repeat until 0
|
||||
*/
|
||||
case READ_CHUNK_SIZE: try {
|
||||
AppendableCharSequence line = lineParser.parse(buffer);
|
||||
if (line == null) {
|
||||
return;
|
||||
}
|
||||
int chunkSize = getChunkSize(line.toString());
|
||||
this.chunkSize = chunkSize;
|
||||
if (chunkSize == 0) {
|
||||
currentState = State.READ_CHUNK_FOOTER;
|
||||
return;
|
||||
}
|
||||
currentState = State.READ_CHUNKED_CONTENT;
|
||||
// fall-through
|
||||
} catch (Exception e) {
|
||||
out.add(invalidChunk(e));
|
||||
return;
|
||||
}
|
||||
case READ_CHUNKED_CONTENT: {
|
||||
assert chunkSize <= Integer.MAX_VALUE;
|
||||
int toRead = Math.min((int) chunkSize, maxChunkSize);
|
||||
toRead = Math.min(toRead, buffer.readableBytes());
|
||||
if (toRead == 0) {
|
||||
return;
|
||||
}
|
||||
HttpContent chunk = new DefaultHttpContent(buffer.readSlice(toRead).retain());
|
||||
chunkSize -= toRead;
|
||||
|
||||
out.add(chunk);
|
||||
|
||||
if (chunkSize != 0) {
|
||||
return;
|
||||
}
|
||||
currentState = State.READ_CHUNK_DELIMITER;
|
||||
// fall-through
|
||||
}
|
||||
case READ_CHUNK_DELIMITER: {
|
||||
final int wIdx = buffer.writerIndex();
|
||||
int rIdx = buffer.readerIndex();
|
||||
while (wIdx > rIdx) {
|
||||
byte next = buffer.getByte(rIdx++);
|
||||
if (next == HttpConstants.LF) {
|
||||
currentState = State.READ_CHUNK_SIZE;
|
||||
break;
|
||||
}
|
||||
}
|
||||
buffer.readerIndex(rIdx);
|
||||
return;
|
||||
}
|
||||
case READ_CHUNK_FOOTER: try {
|
||||
LastHttpContent trailer = readTrailingHeaders(buffer);
|
||||
if (trailer == null) {
|
||||
return;
|
||||
}
|
||||
out.add(trailer);
|
||||
resetNow();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
out.add(invalidChunk(e));
|
||||
return;
|
||||
}
|
||||
case BAD_MESSAGE: {
|
||||
// Keep discarding until disconnection.
|
||||
buffer.skipBytes(buffer.readableBytes());
|
||||
break;
|
||||
}
|
||||
case UPGRADED: {
|
||||
int readableBytes = buffer.readableBytes();
|
||||
if (readableBytes > 0) {
|
||||
// Keep on consuming as otherwise we may trigger an DecoderException,
|
||||
// other handler will replace this codec with the upgraded protocol codec to
|
||||
// take the traffic over at some point then.
|
||||
// See https://github.com/netty/netty/issues/2173
|
||||
out.add(buffer.readBytes(readableBytes));
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -379,7 +393,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
// Handle the last unfinished message.
|
||||
if (message != null) {
|
||||
boolean chunked = HttpHeaders.isTransferEncodingChunked(message);
|
||||
if (state() == State.READ_VARIABLE_LENGTH_CONTENT && !in.isReadable() && !chunked) {
|
||||
if (currentState == State.READ_VARIABLE_LENGTH_CONTENT && !in.isReadable() && !chunked) {
|
||||
// End of connection.
|
||||
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
reset();
|
||||
@ -396,7 +410,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
// connection, so it is perfectly fine.
|
||||
prematureClosure = contentLength() > 0;
|
||||
}
|
||||
reset();
|
||||
resetNow();
|
||||
|
||||
if (!prematureClosure) {
|
||||
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
@ -420,14 +434,22 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
}
|
||||
|
||||
switch (code) {
|
||||
case 204: case 205: case 304:
|
||||
return true;
|
||||
case 204: case 205: case 304:
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void reset() {
|
||||
/**
|
||||
* Resets the state of the decoder so that it is ready to decode a new message.
|
||||
* This method is useful for handling a rejected request with {@code Expect: 100-continue} header.
|
||||
*/
|
||||
public void reset() {
|
||||
resetRequested = true;
|
||||
}
|
||||
|
||||
private void resetNow() {
|
||||
HttpMessage message = this.message;
|
||||
this.message = null;
|
||||
name = null;
|
||||
@ -435,19 +457,20 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
contentLength = Long.MIN_VALUE;
|
||||
lineParser.reset();
|
||||
headerParser.reset();
|
||||
trailer = null;
|
||||
if (!isDecodingRequest()) {
|
||||
HttpResponse res = (HttpResponse) message;
|
||||
if (res != null && res.getStatus().code() == 101) {
|
||||
checkpoint(State.UPGRADED);
|
||||
currentState = State.UPGRADED;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
checkpoint(State.SKIP_CONTROL_CHARS);
|
||||
currentState = State.SKIP_CONTROL_CHARS;
|
||||
}
|
||||
|
||||
private HttpMessage invalidMessage(Exception cause) {
|
||||
checkpoint(State.BAD_MESSAGE);
|
||||
currentState = State.BAD_MESSAGE;
|
||||
if (message != null) {
|
||||
message.setDecoderResult(DecoderResult.failure(cause));
|
||||
} else {
|
||||
@ -461,22 +484,28 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
}
|
||||
|
||||
private HttpContent invalidChunk(Exception cause) {
|
||||
checkpoint(State.BAD_MESSAGE);
|
||||
currentState = State.BAD_MESSAGE;
|
||||
HttpContent chunk = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER);
|
||||
chunk.setDecoderResult(DecoderResult.failure(cause));
|
||||
message = null;
|
||||
trailer = null;
|
||||
return chunk;
|
||||
}
|
||||
|
||||
private static void skipControlCharacters(ByteBuf buffer) {
|
||||
for (;;) {
|
||||
char c = (char) buffer.readUnsignedByte();
|
||||
if (!Character.isISOControl(c) &&
|
||||
!Character.isWhitespace(c)) {
|
||||
buffer.readerIndex(buffer.readerIndex() - 1);
|
||||
private static boolean skipControlCharacters(ByteBuf buffer) {
|
||||
boolean skiped = false;
|
||||
final int wIdx = buffer.writerIndex();
|
||||
int rIdx = buffer.readerIndex();
|
||||
while (wIdx > rIdx) {
|
||||
int c = buffer.getUnsignedByte(rIdx++);
|
||||
if (!Character.isISOControl(c) && !Character.isWhitespace(c)) {
|
||||
rIdx--;
|
||||
skiped = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
buffer.readerIndex(rIdx);
|
||||
return skiped;
|
||||
}
|
||||
|
||||
private State readHeaders(ByteBuf buffer) {
|
||||
@ -484,6 +513,9 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
final HttpHeaders headers = message.headers();
|
||||
|
||||
AppendableCharSequence line = headerParser.parse(buffer);
|
||||
if (line == null) {
|
||||
return null;
|
||||
}
|
||||
if (line.length() > 0) {
|
||||
do {
|
||||
char firstChar = line.charAt(0);
|
||||
@ -501,6 +533,9 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
}
|
||||
|
||||
line = headerParser.parse(buffer);
|
||||
if (line == null) {
|
||||
return null;
|
||||
}
|
||||
} while (line.length() > 0);
|
||||
}
|
||||
|
||||
@ -536,26 +571,36 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
|
||||
private LastHttpContent readTrailingHeaders(ByteBuf buffer) {
|
||||
AppendableCharSequence line = headerParser.parse(buffer);
|
||||
if (line == null) {
|
||||
return null;
|
||||
}
|
||||
CharSequence lastHeader = null;
|
||||
if (line.length() > 0) {
|
||||
LastHttpContent trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders);
|
||||
LastHttpContent trailer = this.trailer;
|
||||
if (trailer == null) {
|
||||
trailer = this.trailer = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders);
|
||||
}
|
||||
do {
|
||||
char firstChar = line.charAt(0);
|
||||
if (lastHeader != null && (firstChar == ' ' || firstChar == '\t')) {
|
||||
List<String> current = trailer.trailingHeaders().getAll(lastHeader);
|
||||
if (!current.isEmpty()) {
|
||||
int lastPos = current.size() - 1;
|
||||
String newString = current.get(lastPos) + line.toString().trim();
|
||||
current.set(lastPos, newString);
|
||||
String lineTrimmed = line.toString().trim();
|
||||
CharSequence currentLastPos = current.get(lastPos);
|
||||
StringBuilder b = new StringBuilder(currentLastPos.length() + lineTrimmed.length());
|
||||
b.append(currentLastPos);
|
||||
b.append(lineTrimmed);
|
||||
current.set(lastPos, b.toString());
|
||||
} else {
|
||||
// Content-Length, Transfer-Encoding, or Trailer
|
||||
}
|
||||
} else {
|
||||
splitHeader(line);
|
||||
CharSequence headerName = name;
|
||||
if (!HttpHeaders.equalsIgnoreCase(headerName, HttpHeaders.Names.CONTENT_LENGTH) &&
|
||||
!HttpHeaders.equalsIgnoreCase(headerName, HttpHeaders.Names.TRANSFER_ENCODING) &&
|
||||
!HttpHeaders.equalsIgnoreCase(headerName, HttpHeaders.Names.TRAILER)) {
|
||||
if (!HttpHeaders.equalsIgnoreCase(HttpHeaders.Names.CONTENT_LENGTH, headerName) &&
|
||||
!HttpHeaders.equalsIgnoreCase(HttpHeaders.Names.TRANSFER_ENCODING, headerName) &&
|
||||
!HttpHeaders.equalsIgnoreCase(HttpHeaders.Names.TRAILER, headerName)) {
|
||||
trailer.trailingHeaders().add(headerName, value);
|
||||
}
|
||||
lastHeader = name;
|
||||
@ -565,8 +610,12 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
}
|
||||
|
||||
line = headerParser.parse(buffer);
|
||||
if (line == null) {
|
||||
return null;
|
||||
}
|
||||
} while (line.length() > 0);
|
||||
|
||||
this.trailer = null;
|
||||
return trailer;
|
||||
}
|
||||
|
||||
@ -676,7 +725,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
return result;
|
||||
}
|
||||
|
||||
private class HeaderParser implements ByteBufProcessor {
|
||||
private static class HeaderParser implements ByteBufProcessor {
|
||||
private final AppendableCharSequence seq;
|
||||
private final int maxLength;
|
||||
private int size;
|
||||
@ -689,10 +738,10 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
public AppendableCharSequence parse(ByteBuf buffer) {
|
||||
seq.reset();
|
||||
int i = buffer.forEachByte(this);
|
||||
if (i == -1) {
|
||||
return null;
|
||||
}
|
||||
buffer.readerIndex(i + 1);
|
||||
|
||||
// Call checkpoint to make sure the readerIndex is updated correctly
|
||||
checkpoint();
|
||||
return seq;
|
||||
}
|
||||
|
||||
@ -716,7 +765,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
// If decoding a response, just throw an exception.
|
||||
throw newException(maxLength);
|
||||
}
|
||||
size ++;
|
||||
size++;
|
||||
seq.append(nextByte);
|
||||
return true;
|
||||
}
|
||||
@ -726,7 +775,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<State> {
|
||||
}
|
||||
}
|
||||
|
||||
private final class LineParser extends HeaderParser {
|
||||
private static final class LineParser extends HeaderParser {
|
||||
|
||||
LineParser(AppendableCharSequence seq, int maxLength) {
|
||||
super(seq, maxLength);
|
||||
|
@ -56,7 +56,7 @@ public class HttpRequestDecoder extends HttpObjectDecoder {
|
||||
|
||||
/**
|
||||
* Creates a new instance with the default
|
||||
* {@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and
|
||||
* {@code maxInitialLineLength (4096)}, {@code maxHeaderSize (8192)}, and
|
||||
* {@code maxChunkSize (8192)}.
|
||||
*/
|
||||
public HttpRequestDecoder() {
|
||||
|
@ -87,7 +87,7 @@ public class HttpResponseDecoder extends HttpObjectDecoder {
|
||||
|
||||
/**
|
||||
* Creates a new instance with the default
|
||||
* {@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and
|
||||
* {@code maxInitialLineLength (4096)}, {@code maxHeaderSize (8192)}, and
|
||||
* {@code maxChunkSize (8192)}.
|
||||
*/
|
||||
public HttpResponseDecoder() {
|
||||
|
@ -52,7 +52,7 @@ public abstract class RtspObjectDecoder extends HttpObjectDecoder {
|
||||
|
||||
/**
|
||||
* Creates a new instance with the default
|
||||
* {@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and
|
||||
* {@code maxInitialLineLength (4096)}, {@code maxHeaderSize (8192)}, and
|
||||
* {@code maxContentLength (8192)}.
|
||||
*/
|
||||
protected RtspObjectDecoder() {
|
||||
|
Loading…
Reference in New Issue
Block a user