HttpObjectDecoder half close behavior
Motivation: In the event an HTTP message does not include either a content-length or a transfer-encoding header [RFC 7230](https://tools.ietf.org/html/rfc7230#section-3.3.3) states the behavior must be treated differently for requests and responses. If the channel is half closed then the HttpObjectDecoder is not invoking decodeLast and thus not checking if messages should be sent up the pipeline. Modifications: - Add comments to clarify regular decode default case. - Handle the ChannelInputShutdownEvent in the HttpObjectDecoder and evaluate if messages need to be generated. Result: Messages are generated on half closed, and comments clarify existing logic.
This commit is contained in:
parent
6525240236
commit
fd5db7fa08
@ -20,6 +20,7 @@ import io.netty.buffer.ByteBufProcessor;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.ChannelInputShutdownEvent;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import io.netty.handler.codec.DecoderResult;
|
||||
import io.netty.handler.codec.TooLongFrameException;
|
||||
@ -239,6 +240,13 @@ public abstract class HttpObjectDecoder extends ByteToMessageDecoder {
|
||||
out.add(message);
|
||||
return;
|
||||
default:
|
||||
/**
|
||||
* <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a> states that
|
||||
* if a request does not have either a transfer-encoding or a content-length header then the
|
||||
* message body length is 0. However for a response the body length is the number of octets
|
||||
* received prior to the server closing the connection. So we treat this as variable length
|
||||
* chunked encoding.
|
||||
*/
|
||||
long contentLength = contentLength();
|
||||
if (contentLength == 0 || contentLength == -1 && isDecodingRequest()) {
|
||||
out.add(message);
|
||||
@ -418,6 +426,17 @@ public abstract class HttpObjectDecoder extends ByteToMessageDecoder {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
if (evt instanceof ChannelInputShutdownEvent) {
|
||||
// The decodeLast method is invoked when a channelInactive event is encountered.
|
||||
// This method is responsible for ending requests in some situations and must be called
|
||||
// when the input has been shutdown.
|
||||
super.channelInactive(ctx);
|
||||
}
|
||||
super.userEventTriggered(ctx, evt);
|
||||
}
|
||||
|
||||
protected boolean isContentAlwaysEmpty(HttpMessage msg) {
|
||||
if (msg instanceof HttpResponse) {
|
||||
HttpResponse res = (HttpResponse) msg;
|
||||
|
@ -15,16 +15,39 @@
|
||||
*/
|
||||
package io.netty.handler.codec.http;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.codec.CodecException;
|
||||
import io.netty.handler.codec.PrematureChannelClosureException;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import io.netty.util.NetUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
import static io.netty.util.ReferenceCountUtil.*;
|
||||
import static org.hamcrest.CoreMatchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static io.netty.util.ReferenceCountUtil.release;
|
||||
import static io.netty.util.ReferenceCountUtil.releaseLater;
|
||||
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class HttpClientCodecTest {
|
||||
|
||||
@ -124,4 +147,88 @@ public class HttpClientCodecTest {
|
||||
assertTrue(e instanceof PrematureChannelClosureException);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testServerCloseSocketInputProvidesData() throws InterruptedException {
|
||||
ServerBootstrap sb = new ServerBootstrap();
|
||||
Bootstrap cb = new Bootstrap();
|
||||
final CountDownLatch serverChannelLatch = new CountDownLatch(1);
|
||||
final CountDownLatch responseRecievedLatch = new CountDownLatch(1);
|
||||
try {
|
||||
sb.group(new NioEventLoopGroup(2));
|
||||
sb.channel(NioServerSocketChannel.class);
|
||||
sb.childHandler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
// Don't use the HttpServerCodec, because we don't want to have content-length or anything added.
|
||||
ch.pipeline().addLast(new HttpRequestDecoder(4096, 8192, 8192, true));
|
||||
ch.pipeline().addLast(new HttpObjectAggregator(4096));
|
||||
ch.pipeline().addLast(new SimpleChannelInboundHandler<FullHttpRequest>() {
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) {
|
||||
// This is just a simple demo...don't block in IO
|
||||
assertTrue(ctx.channel() instanceof SocketChannel);
|
||||
final SocketChannel sChannel = (SocketChannel) ctx.channel();
|
||||
/**
|
||||
* The point of this test is to not add any content-length or content-encoding headers
|
||||
* and the client should still handle this.
|
||||
* See <a href="https://tools.ietf.org/html/rfc7230#section-3.3.3">RFC 7230, 3.3.3</a>.
|
||||
*/
|
||||
sChannel.writeAndFlush(Unpooled.wrappedBuffer(("HTTP/1.0 200 OK\r\n" +
|
||||
"Date: Fri, 31 Dec 1999 23:59:59 GMT\r\n" +
|
||||
"Content-Type: text/html\r\n\r\n").getBytes(CharsetUtil.ISO_8859_1)))
|
||||
.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
assertTrue(future.isSuccess());
|
||||
sChannel.writeAndFlush(Unpooled.wrappedBuffer(
|
||||
"<html><body>hello half closed!</body></html>\r\n"
|
||||
.getBytes(CharsetUtil.ISO_8859_1)))
|
||||
.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
assertTrue(future.isSuccess());
|
||||
sChannel.shutdownOutput();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
serverChannelLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
cb.group(new NioEventLoopGroup(1));
|
||||
cb.channel(NioSocketChannel.class);
|
||||
cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true);
|
||||
cb.handler(new ChannelInitializer<Channel>() {
|
||||
@Override
|
||||
protected void initChannel(Channel ch) throws Exception {
|
||||
ch.pipeline().addLast(new HttpClientCodec(4096, 8192, 8192, true, true));
|
||||
ch.pipeline().addLast(new HttpObjectAggregator(4096));
|
||||
ch.pipeline().addLast(new SimpleChannelInboundHandler<FullHttpResponse>() {
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
|
||||
responseRecievedLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
Channel serverChannel = sb.bind(new InetSocketAddress(0)).sync().channel();
|
||||
int port = ((InetSocketAddress) serverChannel.localAddress()).getPort();
|
||||
|
||||
ChannelFuture ccf = cb.connect(new InetSocketAddress(NetUtil.LOCALHOST, port));
|
||||
assertTrue(ccf.awaitUninterruptibly().isSuccess());
|
||||
Channel clientChannel = ccf.channel();
|
||||
assertTrue(serverChannelLatch.await(5, SECONDS));
|
||||
clientChannel.writeAndFlush(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"));
|
||||
assertTrue(responseRecievedLatch.await(5, SECONDS));
|
||||
} finally {
|
||||
sb.group().shutdownGracefully();
|
||||
sb.childGroup().shutdownGracefully();
|
||||
cb.group().shutdownGracefully();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user