diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java index 35d7d6cf82..ed4bfbec70 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java @@ -16,12 +16,15 @@ package io.netty.handler.codec.http; import io.netty.buffer.ChannelBuffer; +import io.netty.channel.Channel; import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelOutboundHandlerContext; import io.netty.channel.CombinedChannelHandler; +import io.netty.handler.codec.PrematureChannelClosureException; import io.netty.util.internal.QueueFactory; import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; /** * A combination of {@link HttpRequestEncoder} and {@link HttpResponseDecoder} @@ -31,6 +34,10 @@ import java.util.Queue; * {@link HttpResponseDecoder} to learn what additional state management needs * to be done for HEAD and CONNECT and why * {@link HttpResponseDecoder} can not handle it by itself. + * + * If the {@link Channel} is closed and there are missing responses, + * a {@link PrematureChannelClosureException} is thrown. + * * @see HttpServerCodec * * @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder @@ -44,13 +51,16 @@ public class HttpClientCodec extends CombinedChannelHandler { /** If true, decoding stops (i.e. pass-through) */ volatile boolean done; + private final AtomicLong requestResponseCounter = new AtomicLong(); + private final boolean failOnMissingResponse; + /** * Creates a new instance with the default decoder options * ({@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and * {@code maxChunkSize (8192)}). */ public HttpClientCodec() { - this(4096, 8192, 8192); + this(4096, 8192, 8192, false); } /** @@ -58,9 +68,17 @@ public class HttpClientCodec extends CombinedChannelHandler { */ public HttpClientCodec( int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { + this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false); + } + + public HttpClientCodec( + int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, + boolean failOnMissingResponse) { + init( new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), new Encoder()); + this.failOnMissingResponse = failOnMissingResponse; } private final class Encoder extends HttpRequestEncoder { @@ -70,7 +88,18 @@ public class HttpClientCodec extends CombinedChannelHandler { if (msg instanceof HttpRequest && !done) { queue.offer(((HttpRequest) msg).getMethod()); } + super.encode(ctx, msg, out); + + if (failOnMissingResponse) { + // check if the request is chunked if so do not increment + if (msg instanceof HttpRequest && !((HttpRequest) msg).isChunked()) { + requestResponseCounter.incrementAndGet(); + } else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) { + // increment as its the last chunk + requestResponseCounter.incrementAndGet(); + } + } } } @@ -86,7 +115,28 @@ public class HttpClientCodec extends CombinedChannelHandler { if (done) { return buffer.readBytes(actualReadableBytes()); } else { - return super.decode(ctx, buffer); + Object msg = super.decode(ctx, buffer); + if (failOnMissingResponse) { + decrement(msg); + } + return msg; + } + } + + private void decrement(Object msg) { + if (msg == null) { + return; + } + + // check if its a HttpMessage and its not chunked + if (msg instanceof HttpMessage && !((HttpMessage) msg).isChunked()) { + requestResponseCounter.decrementAndGet(); + } else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) { + requestResponseCounter.decrementAndGet(); + } else if (msg instanceof Object[]) { + // we just decrement it here as we only use this if the end of the chunk is reached + // It would be more safe to check all the objects in the array but would also be slower + requestResponseCounter.decrementAndGet(); } } @@ -141,5 +191,20 @@ public class HttpClientCodec extends CombinedChannelHandler { return super.isContentAlwaysEmpty(msg); } + + @Override + public void channelInactive(ChannelInboundHandlerContext ctx) + throws Exception { + super.channelInactive(ctx); + + if (failOnMissingResponse) { + long missingResponses = requestResponseCounter.get(); + if (missingResponses > 0) { + ctx.fireExceptionCaught(new PrematureChannelClosureException( + "channel gone inactive with " + missingResponses + + " missing response(s)")); + } + } + } } }