Add an option to tell a user if there was a missing response (#256)

- Contributed by @normanmaurer
This commit is contained in:
Trustin Lee 2012-05-30 15:35:14 -07:00
parent 24eb85cdf5
commit 922cec1f9b

View File

@ -16,12 +16,15 @@
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelOutboundHandlerContext; import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.CombinedChannelHandler; import io.netty.channel.CombinedChannelHandler;
import io.netty.handler.codec.PrematureChannelClosureException;
import io.netty.util.internal.QueueFactory; import io.netty.util.internal.QueueFactory;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* A combination of {@link HttpRequestEncoder} and {@link HttpResponseDecoder} * A combination of {@link HttpRequestEncoder} and {@link HttpResponseDecoder}
@ -31,6 +34,10 @@ import java.util.Queue;
* {@link HttpResponseDecoder} to learn what additional state management needs * {@link HttpResponseDecoder} to learn what additional state management needs
* to be done for <tt>HEAD</tt> and <tt>CONNECT</tt> and why * to be done for <tt>HEAD</tt> and <tt>CONNECT</tt> and why
* {@link HttpResponseDecoder} can not handle it by itself. * {@link HttpResponseDecoder} can not handle it by itself.
*
* If the {@link Channel} is closed and there are missing responses,
* a {@link PrematureChannelClosureException} is thrown.
*
* @see HttpServerCodec * @see HttpServerCodec
* *
* @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder * @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder
@ -44,13 +51,16 @@ public class HttpClientCodec extends CombinedChannelHandler {
/** If true, decoding stops (i.e. pass-through) */ /** If true, decoding stops (i.e. pass-through) */
volatile boolean done; volatile boolean done;
private final AtomicLong requestResponseCounter = new AtomicLong();
private final boolean failOnMissingResponse;
/** /**
* Creates a new instance with the default decoder options * Creates a new instance with the default decoder options
* ({@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and * ({@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and
* {@code maxChunkSize (8192)}). * {@code maxChunkSize (8192)}).
*/ */
public HttpClientCodec() { public HttpClientCodec() {
this(4096, 8192, 8192); this(4096, 8192, 8192, false);
} }
/** /**
@ -58,9 +68,17 @@ public class HttpClientCodec extends CombinedChannelHandler {
*/ */
public HttpClientCodec( public HttpClientCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false);
}
public HttpClientCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize,
boolean failOnMissingResponse) {
init( init(
new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize),
new Encoder()); new Encoder());
this.failOnMissingResponse = failOnMissingResponse;
} }
private final class Encoder extends HttpRequestEncoder { private final class Encoder extends HttpRequestEncoder {
@ -70,7 +88,18 @@ public class HttpClientCodec extends CombinedChannelHandler {
if (msg instanceof HttpRequest && !done) { if (msg instanceof HttpRequest && !done) {
queue.offer(((HttpRequest) msg).getMethod()); queue.offer(((HttpRequest) msg).getMethod());
} }
super.encode(ctx, msg, out); super.encode(ctx, msg, out);
if (failOnMissingResponse) {
// check if the request is chunked if so do not increment
if (msg instanceof HttpRequest && !((HttpRequest) msg).isChunked()) {
requestResponseCounter.incrementAndGet();
} else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
// increment as its the last chunk
requestResponseCounter.incrementAndGet();
}
}
} }
} }
@ -86,7 +115,28 @@ public class HttpClientCodec extends CombinedChannelHandler {
if (done) { if (done) {
return buffer.readBytes(actualReadableBytes()); return buffer.readBytes(actualReadableBytes());
} else { } else {
return super.decode(ctx, buffer); Object msg = super.decode(ctx, buffer);
if (failOnMissingResponse) {
decrement(msg);
}
return msg;
}
}
private void decrement(Object msg) {
if (msg == null) {
return;
}
// check if its a HttpMessage and its not chunked
if (msg instanceof HttpMessage && !((HttpMessage) msg).isChunked()) {
requestResponseCounter.decrementAndGet();
} else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
requestResponseCounter.decrementAndGet();
} else if (msg instanceof Object[]) {
// we just decrement it here as we only use this if the end of the chunk is reached
// It would be more safe to check all the objects in the array but would also be slower
requestResponseCounter.decrementAndGet();
} }
} }
@ -141,5 +191,20 @@ public class HttpClientCodec extends CombinedChannelHandler {
return super.isContentAlwaysEmpty(msg); return super.isContentAlwaysEmpty(msg);
} }
@Override
public void channelInactive(ChannelInboundHandlerContext<Byte> ctx)
throws Exception {
super.channelInactive(ctx);
if (failOnMissingResponse) {
long missingResponses = requestResponseCounter.get();
if (missingResponses > 0) {
ctx.fireExceptionCaught(new PrematureChannelClosureException(
"channel gone inactive with " + missingResponses +
" missing response(s)"));
}
}
}
} }
} }