Add an operation that resets the state of HttpObjectDecoder
Motivation: Currently, it is impossible to give a user the full control over what to do in response to the request with 'Expect: 100-continue' header. Currently, a user have to do one of the following: - Accept the request and respond with 100 Continue, or - Send the reject response and close the connection. .. which means it is impossible to send the reject response and keep the connection alive so that the client sends additional requests. Modification: Added a public method called 'reset()' to HttpObjectDecoder so that a user can reset the state of the decoder easily. Once called, the decoder will assume the next input will be the beginning of a new request. HttpObjectAggregator now calls `reset()`right after calling 'handleOversizedMessage()' so that the decoder can continue to decode the subsequent request even after the request with 'Expect: 100-continue' header is rejected. Added relevant unit tests / Minor clean-up Result: This commit completes the fix of #2211
This commit is contained in:
parent
e278b57489
commit
2c4aff13c7
@ -242,7 +242,17 @@ public class HttpObjectAggregator extends MessageToMessageDecoder<HttpObject> {
|
||||
try {
|
||||
handleOversizedMessage(ctx, msg);
|
||||
} finally {
|
||||
// Release the message in case it is a full one.
|
||||
ReferenceCountUtil.release(msg);
|
||||
|
||||
if (msg instanceof HttpRequest) {
|
||||
// If an oversized request was handled properly and the connection is still alive
|
||||
// (i.e. rejected 100-continue). the decoder should prepare to handle a new message.
|
||||
HttpObjectDecoder decoder = ctx.pipeline().get(HttpObjectDecoder.class);
|
||||
if (decoder != null) {
|
||||
decoder.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -115,6 +115,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
||||
private long chunkSize;
|
||||
private int headerSize;
|
||||
private long contentLength = Long.MIN_VALUE;
|
||||
private volatile boolean resetRequested;
|
||||
|
||||
/**
|
||||
* The internal state of {@link HttpObjectDecoder}.
|
||||
@ -184,6 +185,10 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
|
||||
if (resetRequested) {
|
||||
resetNow();
|
||||
}
|
||||
|
||||
switch (state()) {
|
||||
case SKIP_CONTROL_CHARS: {
|
||||
try {
|
||||
@ -223,14 +228,14 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
||||
// No content is expected.
|
||||
out.add(message);
|
||||
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
reset();
|
||||
resetNow();
|
||||
return;
|
||||
}
|
||||
long contentLength = contentLength();
|
||||
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
|
||||
out.add(message);
|
||||
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
reset();
|
||||
resetNow();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -259,12 +264,12 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
||||
} else {
|
||||
// End of connection.
|
||||
out.add(new DefaultLastHttpContent(content, validateHeaders));
|
||||
reset();
|
||||
resetNow();
|
||||
}
|
||||
} else if (!buffer.isReadable()) {
|
||||
// End of connection.
|
||||
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
reset();
|
||||
resetNow();
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -291,7 +296,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
||||
if (chunkSize == 0) {
|
||||
// Read all content.
|
||||
out.add(new DefaultLastHttpContent(content, validateHeaders));
|
||||
reset();
|
||||
resetNow();
|
||||
} else {
|
||||
out.add(new DefaultHttpContent(content));
|
||||
}
|
||||
@ -350,7 +355,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
||||
case READ_CHUNK_FOOTER: try {
|
||||
LastHttpContent trailer = readTrailingHeaders(buffer);
|
||||
out.add(trailer);
|
||||
reset();
|
||||
resetNow();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
out.add(invalidChunk(e));
|
||||
@ -393,7 +398,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
||||
// connection, so it is perfectly fine.
|
||||
prematureClosure = contentLength() > 0;
|
||||
}
|
||||
reset();
|
||||
resetNow();
|
||||
|
||||
if (!prematureClosure) {
|
||||
out.add(LastHttpContent.EMPTY_LAST_CONTENT);
|
||||
@ -424,7 +429,15 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
||||
return false;
|
||||
}
|
||||
|
||||
private void reset() {
|
||||
/**
|
||||
* Resets the state of the decoder so that it is ready to decode a new message.
|
||||
* This method is useful for handling a rejected request with {@code Expect: 100-continue} header.
|
||||
*/
|
||||
public void reset() {
|
||||
resetRequested = true;
|
||||
}
|
||||
|
||||
private void resetNow() {
|
||||
HttpMessage message = this.message;
|
||||
this.message = null;
|
||||
contentLength = Long.MIN_VALUE;
|
||||
|
@ -107,7 +107,7 @@ public class HttpObjectAggregatorTest {
|
||||
@Test
|
||||
public void testOversizedRequest() {
|
||||
EmbeddedChannel embedder = new EmbeddedChannel(new HttpObjectAggregator(4));
|
||||
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost");
|
||||
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, "http://localhost");
|
||||
HttpContent chunk1 = new DefaultHttpContent(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII));
|
||||
HttpContent chunk2 = new DefaultHttpContent(Unpooled.copiedBuffer("test2", CharsetUtil.US_ASCII));
|
||||
HttpContent chunk3 = LastHttpContent.EMPTY_LAST_CONTENT;
|
||||
@ -134,24 +134,24 @@ public class HttpObjectAggregatorTest {
|
||||
@Test
|
||||
public void testOversizedRequestWithoutKeepAlive() {
|
||||
// send a HTTP/1.0 request with no keep-alive header
|
||||
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.GET, "http://localhost");
|
||||
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_0, HttpMethod.PUT, "http://localhost");
|
||||
HttpHeaders.setContentLength(message, 5);
|
||||
checkOversizedRequest(message);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOversizedRequestWithContentLength() {
|
||||
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost");
|
||||
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, "http://localhost");
|
||||
HttpHeaders.setContentLength(message, 5);
|
||||
checkOversizedRequest(message);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOversizedMessageWith100Continue() {
|
||||
public void testOversizedRequestWith100Continue() {
|
||||
EmbeddedChannel embedder = new EmbeddedChannel(new HttpObjectAggregator(8));
|
||||
|
||||
// send an oversized request with 100 continue
|
||||
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost");
|
||||
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, "http://localhost");
|
||||
HttpHeaders.set100ContinueExpected(message);
|
||||
HttpHeaders.setContentLength(message, 16);
|
||||
|
||||
@ -174,7 +174,7 @@ public class HttpObjectAggregatorTest {
|
||||
assertTrue(embedder.isOpen());
|
||||
|
||||
// Now send a valid request.
|
||||
HttpRequest message2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost");
|
||||
HttpRequest message2 = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, "http://localhost");
|
||||
|
||||
assertFalse(embedder.writeInbound(message2));
|
||||
assertFalse(embedder.writeInbound(chunk2));
|
||||
@ -192,6 +192,35 @@ public class HttpObjectAggregatorTest {
|
||||
assertFalse(embedder.finish());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOversizedRequestWith100ContinueAndDecoder() {
|
||||
EmbeddedChannel embedder = new EmbeddedChannel(new HttpRequestDecoder(), new HttpObjectAggregator(4));
|
||||
embedder.writeInbound(Unpooled.copiedBuffer(
|
||||
"PUT /upload HTTP/1.1\r\n" +
|
||||
"Expect: 100-continue\r\n" +
|
||||
"Content-Length: 100\r\n\r\n", CharsetUtil.US_ASCII));
|
||||
|
||||
assertNull(embedder.readInbound());
|
||||
|
||||
FullHttpResponse response = embedder.readOutbound();
|
||||
assertEquals(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, response.getStatus());
|
||||
assertEquals("0", response.headers().get(Names.CONTENT_LENGTH));
|
||||
|
||||
// Keep-alive is on by default in HTTP/1.1, so the connection should be still alive.
|
||||
assertTrue(embedder.isOpen());
|
||||
|
||||
// The decoder should be reset by the aggregator at this point and be able to decode the next request.
|
||||
embedder.writeInbound(Unpooled.copiedBuffer("GET /max-upload-size HTTP/1.1\r\n\r\n", CharsetUtil.US_ASCII));
|
||||
|
||||
FullHttpRequest request = embedder.readInbound();
|
||||
assertThat(request.getMethod(), is(HttpMethod.GET));
|
||||
assertThat(request.getUri(), is("/max-upload-size"));
|
||||
assertThat(request.content().readableBytes(), is(0));
|
||||
request.release();
|
||||
|
||||
assertFalse(embedder.finish());
|
||||
}
|
||||
|
||||
private static void checkOversizedRequest(HttpRequest message) {
|
||||
EmbeddedChannel embedder = new EmbeddedChannel(new HttpObjectAggregator(4));
|
||||
|
||||
@ -258,7 +287,7 @@ public class HttpObjectAggregatorTest {
|
||||
HttpObjectAggregator aggr = new HttpObjectAggregator(1024 * 1024);
|
||||
EmbeddedChannel embedder = new EmbeddedChannel(aggr);
|
||||
|
||||
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "http://localhost");
|
||||
HttpRequest message = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, "http://localhost");
|
||||
HttpHeaders.setHeader(message, "X-Test", true);
|
||||
HttpHeaders.setHeader(message, "Transfer-Encoding", "Chunked");
|
||||
HttpContent chunk1 = new DefaultHttpContent(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII));
|
||||
|
@ -23,6 +23,7 @@ import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class HttpRequestDecoderTest {
|
||||
@ -156,7 +157,7 @@ public class HttpRequestDecoderTest {
|
||||
c.release();
|
||||
}
|
||||
|
||||
LastHttpContent c = (LastHttpContent) channel.readInbound();
|
||||
LastHttpContent c = channel.readInbound();
|
||||
assertEquals(1, c.content().readableBytes());
|
||||
assertEquals(content[content.length - 1], c.content().readByte());
|
||||
c.release();
|
||||
@ -176,4 +177,58 @@ public class HttpRequestDecoderTest {
|
||||
HttpRequest req = channel.readInbound();
|
||||
assertEquals("", req.headers().get("EmptyHeader"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test100Continue() {
|
||||
HttpRequestDecoder decoder = new HttpRequestDecoder();
|
||||
EmbeddedChannel channel = new EmbeddedChannel(decoder);
|
||||
String oversized =
|
||||
"PUT /file HTTP/1.1\r\n" +
|
||||
"Expect: 100-continue\r\n" +
|
||||
"Content-Length: 1048576000\r\n\r\n";
|
||||
|
||||
channel.writeInbound(Unpooled.copiedBuffer(oversized, CharsetUtil.US_ASCII));
|
||||
assertThat(channel.readInbound(), is(instanceOf(HttpRequest.class)));
|
||||
|
||||
// At this point, we assume that we sent '413 Entity Too Large' to the peer without closing the connection
|
||||
// so that the client can try again.
|
||||
decoder.reset();
|
||||
|
||||
String query = "GET /max-file-size HTTP/1.1\r\n\r\n";
|
||||
channel.writeInbound(Unpooled.copiedBuffer(query, CharsetUtil.US_ASCII));
|
||||
assertThat(channel.readInbound(), is(instanceOf(HttpRequest.class)));
|
||||
assertThat(channel.readInbound(), is(instanceOf(LastHttpContent.class)));
|
||||
|
||||
assertThat(channel.finish(), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test100ContinueWithBadClient() {
|
||||
HttpRequestDecoder decoder = new HttpRequestDecoder();
|
||||
EmbeddedChannel channel = new EmbeddedChannel(decoder);
|
||||
String oversized =
|
||||
"PUT /file HTTP/1.1\r\n" +
|
||||
"Expect: 100-continue\r\n" +
|
||||
"Content-Length: 1048576000\r\n\r\n" +
|
||||
"WAY_TOO_LARGE_DATA_BEGINS";
|
||||
|
||||
channel.writeInbound(Unpooled.copiedBuffer(oversized, CharsetUtil.US_ASCII));
|
||||
assertThat(channel.readInbound(), is(instanceOf(HttpRequest.class)));
|
||||
|
||||
HttpContent prematureData = channel.readInbound();
|
||||
prematureData.release();
|
||||
|
||||
assertThat(channel.readInbound(), is(nullValue()));
|
||||
|
||||
// At this point, we assume that we sent '413 Entity Too Large' to the peer without closing the connection
|
||||
// so that the client can try again.
|
||||
decoder.reset();
|
||||
|
||||
String query = "GET /max-file-size HTTP/1.1\r\n\r\n";
|
||||
channel.writeInbound(Unpooled.copiedBuffer(query, CharsetUtil.US_ASCII));
|
||||
assertThat(channel.readInbound(), is(instanceOf(HttpRequest.class)));
|
||||
assertThat(channel.readInbound(), is(instanceOf(LastHttpContent.class)));
|
||||
|
||||
assertThat(channel.finish(), is(false));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user