Merge pull request #381 from fredericBregier/master
Improve HTTP message streaming in decoder same as #380
This commit is contained in:
commit
c2e3d305b4
@ -106,6 +106,7 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
|
|||||||
private ChannelBuffer content;
|
private ChannelBuffer content;
|
||||||
private long chunkSize;
|
private long chunkSize;
|
||||||
private int headerSize;
|
private int headerSize;
|
||||||
|
private int contentRead;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The internal state of {@link HttpMessageDecoder}.
|
* The internal state of {@link HttpMessageDecoder}.
|
||||||
@ -235,17 +236,24 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
case READ_VARIABLE_LENGTH_CONTENT: {
|
case READ_VARIABLE_LENGTH_CONTENT: {
|
||||||
if (content == null) {
|
int toRead = actualReadableBytes();
|
||||||
content = ChannelBuffers.dynamicBuffer();
|
if (toRead > maxChunkSize) {
|
||||||
|
toRead = maxChunkSize;
|
||||||
|
}
|
||||||
|
if (!message.isChunked()) {
|
||||||
|
message.setChunked(true);
|
||||||
|
return new Object[] {message, new DefaultHttpChunk(buffer.readBytes(toRead))};
|
||||||
|
} else {
|
||||||
|
return new DefaultHttpChunk(buffer.readBytes(toRead));
|
||||||
}
|
}
|
||||||
//this will cause a replay error until the channel is closed where this will read what's left in the buffer
|
|
||||||
content.writeBytes(buffer.readBytes(buffer.readableBytes()));
|
|
||||||
return reset();
|
|
||||||
}
|
}
|
||||||
case READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS: {
|
case READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS: {
|
||||||
// Keep reading data as a chunk until the end of connection is reached.
|
// Keep reading data as a chunk until the end of connection is reached.
|
||||||
int chunkSize = Math.min(maxChunkSize, buffer.readableBytes());
|
int toRead = actualReadableBytes();
|
||||||
HttpChunk chunk = new DefaultHttpChunk(buffer.readBytes(chunkSize));
|
if (toRead > maxChunkSize) {
|
||||||
|
toRead = maxChunkSize;
|
||||||
|
}
|
||||||
|
HttpChunk chunk = new DefaultHttpChunk(buffer.readBytes(toRead));
|
||||||
|
|
||||||
if (!buffer.readable()) {
|
if (!buffer.readable()) {
|
||||||
// Reached to the end of the connection.
|
// Reached to the end of the connection.
|
||||||
@ -258,19 +266,23 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
|
|||||||
return chunk;
|
return chunk;
|
||||||
}
|
}
|
||||||
case READ_FIXED_LENGTH_CONTENT: {
|
case READ_FIXED_LENGTH_CONTENT: {
|
||||||
//we have a content-length so we just read the correct number of bytes
|
return readFixedLengthContent(buffer);
|
||||||
readFixedLengthContent(buffer);
|
|
||||||
return reset();
|
|
||||||
}
|
}
|
||||||
case READ_FIXED_LENGTH_CONTENT_AS_CHUNKS: {
|
case READ_FIXED_LENGTH_CONTENT_AS_CHUNKS: {
|
||||||
long chunkSize = this.chunkSize;
|
assert this.chunkSize <= Integer.MAX_VALUE;
|
||||||
HttpChunk chunk;
|
int chunkSize = (int) this.chunkSize;
|
||||||
if (chunkSize > maxChunkSize) {
|
int readLimit = actualReadableBytes();
|
||||||
chunk = new DefaultHttpChunk(buffer.readBytes(maxChunkSize));
|
int toRead = chunkSize;
|
||||||
chunkSize -= maxChunkSize;
|
if (toRead > maxChunkSize) {
|
||||||
|
toRead = maxChunkSize;
|
||||||
|
}
|
||||||
|
if (toRead > readLimit) {
|
||||||
|
toRead = readLimit;
|
||||||
|
}
|
||||||
|
HttpChunk chunk = new DefaultHttpChunk(buffer.readBytes(toRead));
|
||||||
|
if (chunkSize > toRead) {
|
||||||
|
chunkSize -= toRead;
|
||||||
} else {
|
} else {
|
||||||
assert chunkSize <= Integer.MAX_VALUE;
|
|
||||||
chunk = new DefaultHttpChunk(buffer.readBytes((int) chunkSize));
|
|
||||||
chunkSize = 0;
|
chunkSize = 0;
|
||||||
}
|
}
|
||||||
this.chunkSize = chunkSize;
|
this.chunkSize = chunkSize;
|
||||||
@ -310,14 +322,20 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
|
|||||||
return chunk;
|
return chunk;
|
||||||
}
|
}
|
||||||
case READ_CHUNKED_CONTENT_AS_CHUNKS: {
|
case READ_CHUNKED_CONTENT_AS_CHUNKS: {
|
||||||
long chunkSize = this.chunkSize;
|
assert this.chunkSize <= Integer.MAX_VALUE;
|
||||||
HttpChunk chunk;
|
int chunkSize = (int) this.chunkSize;
|
||||||
if (chunkSize > maxChunkSize) {
|
int readLimit = actualReadableBytes();
|
||||||
chunk = new DefaultHttpChunk(buffer.readBytes(maxChunkSize));
|
int toRead = chunkSize;
|
||||||
chunkSize -= maxChunkSize;
|
if (toRead > maxChunkSize) {
|
||||||
|
toRead = maxChunkSize;
|
||||||
|
}
|
||||||
|
if (toRead > readLimit) {
|
||||||
|
toRead = readLimit;
|
||||||
|
}
|
||||||
|
HttpChunk chunk = new DefaultHttpChunk(buffer.readBytes(toRead));
|
||||||
|
if (chunkSize > toRead) {
|
||||||
|
chunkSize -= toRead;
|
||||||
} else {
|
} else {
|
||||||
assert chunkSize <= Integer.MAX_VALUE;
|
|
||||||
chunk = new DefaultHttpChunk(buffer.readBytes((int) chunkSize));
|
|
||||||
chunkSize = 0;
|
chunkSize = 0;
|
||||||
}
|
}
|
||||||
this.chunkSize = chunkSize;
|
this.chunkSize = chunkSize;
|
||||||
@ -414,15 +432,29 @@ public abstract class HttpMessageDecoder extends ReplayingDecoder<Object, HttpMe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void readFixedLengthContent(ChannelBuffer buffer) {
|
private Object readFixedLengthContent(ChannelBuffer buffer) {
|
||||||
|
//we have a content-length so we just read the correct number of bytes
|
||||||
long length = HttpHeaders.getContentLength(message, -1);
|
long length = HttpHeaders.getContentLength(message, -1);
|
||||||
assert length <= Integer.MAX_VALUE;
|
assert length <= Integer.MAX_VALUE;
|
||||||
|
int toRead = (int) length - contentRead;
|
||||||
|
if (toRead > actualReadableBytes()) {
|
||||||
|
toRead = actualReadableBytes();
|
||||||
|
}
|
||||||
|
contentRead = contentRead + toRead;
|
||||||
|
if (length < contentRead) {
|
||||||
|
if (!message.isChunked()) {
|
||||||
|
message.setChunked(true);
|
||||||
|
return new Object[] {message, new DefaultHttpChunk(buffer.readBytes(toRead))};
|
||||||
|
} else {
|
||||||
|
return new DefaultHttpChunk(buffer.readBytes(toRead));
|
||||||
|
}
|
||||||
|
}
|
||||||
if (content == null) {
|
if (content == null) {
|
||||||
content = buffer.readBytes((int) length);
|
content = buffer.readBytes((int) length);
|
||||||
} else {
|
} else {
|
||||||
content.writeBytes(buffer.readBytes((int) length));
|
content.writeBytes(buffer.readBytes((int) length));
|
||||||
}
|
}
|
||||||
|
return reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
private State readHeaders(ChannelBuffer buffer) throws TooLongFrameException {
|
private State readHeaders(ChannelBuffer buffer) throws TooLongFrameException {
|
||||||
|
Loading…
Reference in New Issue
Block a user