Throw a PrematureChannelClosureException if the channel was closed before all responses were received for the sent requests. See #256

This commit is contained in:
norman 2012-04-12 10:22:10 +02:00
parent 941e71de36
commit b9c60bd518
3 changed files with 112 additions and 4 deletions

View File

@ -16,13 +16,16 @@
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDownstreamHandler; import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent; import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ChannelUpstreamHandler; import io.netty.channel.ChannelUpstreamHandler;
import io.netty.handler.codec.PrematureChannelClosureException;
import io.netty.util.internal.QueueFactory; import io.netty.util.internal.QueueFactory;
/** /**
@ -33,6 +36,10 @@ import io.netty.util.internal.QueueFactory;
* {@link HttpResponseDecoder} to learn what additional state management needs * {@link HttpResponseDecoder} to learn what additional state management needs
* to be done for <tt>HEAD</tt> and <tt>CONNECT</tt> and why * to be done for <tt>HEAD</tt> and <tt>CONNECT</tt> and why
* {@link HttpResponseDecoder} can not handle it by itself. * {@link HttpResponseDecoder} can not handle it by itself.
*
* If the {@link Channel} gets closed and there are requests missing for a response
* a {@link PrematureChannelClosureException} is thrown.
*
* @see HttpServerCodec * @see HttpServerCodec
* *
* @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder * @apiviz.has io.netty.handler.codec.http.HttpResponseDecoder
@ -49,7 +56,8 @@ public class HttpClientCodec implements ChannelUpstreamHandler,
private final HttpRequestEncoder encoder = new Encoder(); private final HttpRequestEncoder encoder = new Encoder();
private final HttpResponseDecoder decoder; private final HttpResponseDecoder decoder;
private final AtomicLong requestResponseCounter = new AtomicLong(0);
/** /**
* Creates a new instance with the default decoder options * Creates a new instance with the default decoder options
* ({@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and * ({@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and
@ -87,8 +95,17 @@ public class HttpClientCodec implements ChannelUpstreamHandler,
@Override @Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception { Object msg) throws Exception {
if (msg instanceof HttpRequest && !done) { if (msg instanceof HttpRequest) {
queue.offer(((HttpRequest) msg).getMethod()); if (!done) {
queue.offer(((HttpRequest) msg).getMethod());
}
requestResponseCounter.incrementAndGet();
} else if (msg instanceof HttpChunk) {
// increment only if its the last chunk
if (((HttpChunk) msg).isLast()) {
requestResponseCounter.incrementAndGet();
}
} }
return super.encode(ctx, channel, msg); return super.encode(ctx, channel, msg);
} }
@ -106,7 +123,17 @@ public class HttpClientCodec implements ChannelUpstreamHandler,
if (done) { if (done) {
return buffer.readBytes(actualReadableBytes()); return buffer.readBytes(actualReadableBytes());
} else { } else {
return super.decode(ctx, channel, buffer, state); Object msg = super.decode(ctx, channel, buffer, state);
if (msg != null) {
if (msg instanceof HttpMessage) {
requestResponseCounter.decrementAndGet();
} else if (msg instanceof HttpChunk && ((HttpChunk) msg).isLast()) {
requestResponseCounter.decrementAndGet();
}
}
return msg;
} }
} }
@ -161,5 +188,17 @@ public class HttpClientCodec implements ChannelUpstreamHandler,
return super.isContentAlwaysEmpty(msg); return super.isContentAlwaysEmpty(msg);
} }
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelClosed(ctx, e);
long missingResponses = requestResponseCounter.get();
if (missingResponses > 0) {
throw new PrematureChannelClosureException("Channel closed but still missing " + missingResponses + " response(s)");
}
}
} }
} }

View File

@ -0,0 +1,48 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec;
/**
* Exception which should get thrown if a Channel got closed before it is expected
*/
public class PrematureChannelClosureException extends Exception {
/**
*
*/
private static final long serialVersionUID = 233460005724966593L;
public PrematureChannelClosureException() {
super();
}
public PrematureChannelClosureException(String msg) {
super(msg);
}
public PrematureChannelClosureException(String msg, Throwable t) {
super(msg, t);
}
public PrematureChannelClosureException(Throwable t) {
super(t);
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Base package for codecs
*
*/
package io.netty.handler.codec;