diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java index 8049b79a90..cd16cca75f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpClientCodec.java @@ -16,13 +16,16 @@ package io.netty.handler.codec.http; import java.util.Queue; +import java.util.concurrent.atomic.AtomicLong; import io.netty.buffer.ChannelBuffer; import io.netty.channel.Channel; import io.netty.channel.ChannelDownstreamHandler; import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelStateEvent; import io.netty.channel.ChannelUpstreamHandler; +import io.netty.handler.codec.PrematureChannelClosureException; import io.netty.util.internal.QueueFactory; /** @@ -33,6 +36,10 @@ import io.netty.util.internal.QueueFactory; * {@link HttpResponseDecoder} to learn what additional state management needs * to be done for HEAD and CONNECT and why * {@link HttpResponseDecoder} can not handle it by itself. + * + * If the {@link Channel} gets closed and there are requests missing for a response + * a {@link PrematureChannelClosureException} is thrown. + * * @see HttpServerCodec * * @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder @@ -49,7 +56,8 @@ public class HttpClientCodec implements ChannelUpstreamHandler, private final HttpRequestEncoder encoder = new Encoder(); private final HttpResponseDecoder decoder; - + private final AtomicLong requestResponseCounter = new AtomicLong(0); + /** * Creates a new instance with the default decoder options * ({@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and @@ -87,8 +95,17 @@ public class HttpClientCodec implements ChannelUpstreamHandler, @Override protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { - if (msg instanceof HttpRequest && !done) { - queue.offer(((HttpRequest) msg).getMethod()); + if (msg instanceof HttpRequest) { + if (!done) { + queue.offer(((HttpRequest) msg).getMethod()); + } + requestResponseCounter.incrementAndGet(); + } else if (msg instanceof HttpChunk) { + + // increment only if its the last chunk + if (((HttpChunk) msg).isLast()) { + requestResponseCounter.incrementAndGet(); + } } return super.encode(ctx, channel, msg); } @@ -106,7 +123,17 @@ public class HttpClientCodec implements ChannelUpstreamHandler, if (done) { return buffer.readBytes(actualReadableBytes()); } else { - return super.decode(ctx, channel, buffer, state); + Object msg = super.decode(ctx, channel, buffer, state); + + if (msg != null) { + if (msg instanceof HttpMessage) { + requestResponseCounter.decrementAndGet(); + } else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) { + requestResponseCounter.decrementAndGet(); + } + } + + return msg; } } @@ -161,5 +188,17 @@ public class HttpClientCodec implements ChannelUpstreamHandler, return super.isContentAlwaysEmpty(msg); } + + @Override + public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + super.channelClosed(ctx, e); + + long missingResponses = requestResponseCounter.get(); + if (missingResponses > 0) { + throw new PrematureChannelClosureException("Channel closed but still missing " + missingResponses + " response(s)"); + } + } + + } } diff --git a/codec/src/main/java/io/netty/handler/codec/PrematureChannelClosureException.java b/codec/src/main/java/io/netty/handler/codec/PrematureChannelClosureException.java new file mode 100644 index 0000000000..64be04fcbe --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/PrematureChannelClosureException.java @@ -0,0 +1,48 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec; + +/** + * Exception which should get thrown if a Channel got closed before it is expected + */ +public class PrematureChannelClosureException extends Exception { + + /** + * + */ + private static final long serialVersionUID = 233460005724966593L; + + public PrematureChannelClosureException() { + super(); + } + + public PrematureChannelClosureException(String msg) { + super(msg); + } + + + public PrematureChannelClosureException(String msg, Throwable t) { + super(msg, t); + } + + + public PrematureChannelClosureException(Throwable t) { + super(t); + } + + + +} diff --git a/codec/src/main/java/io/netty/handler/codec/package-info.java b/codec/src/main/java/io/netty/handler/codec/package-info.java new file mode 100644 index 0000000000..99f835a366 --- /dev/null +++ b/codec/src/main/java/io/netty/handler/codec/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Base package for codecs + * + */ +package io.netty.handler.codec;