From 4e989e20d4c12ce08b00aca4610e72ac3cf84cbe Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 10 Mar 2009 08:42:19 +0000 Subject: [PATCH] Added HttpChunkAggregator --- .../http/HttpClientPipelineFactory.java | 1 + .../http/HttpServerPipelineFactory.java | 1 + .../codec/http/HttpChunkAggregator.java | 103 ++++++++++++++++++ 3 files changed, 105 insertions(+) create mode 100644 src/main/java/org/jboss/netty/handler/codec/http/HttpChunkAggregator.java diff --git a/src/main/java/org/jboss/netty/example/http/HttpClientPipelineFactory.java b/src/main/java/org/jboss/netty/example/http/HttpClientPipelineFactory.java index 04dc3498c6..7e30fdd7c5 100644 --- a/src/main/java/org/jboss/netty/example/http/HttpClientPipelineFactory.java +++ b/src/main/java/org/jboss/netty/example/http/HttpClientPipelineFactory.java @@ -44,6 +44,7 @@ public class HttpClientPipelineFactory implements ChannelPipelineFactory { // Create a default pipeline implementation. ChannelPipeline pipeline = pipeline(); pipeline.addLast("decoder", new HttpResponseDecoder(8192, 8192, 8192)); + //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); pipeline.addLast("encoder", new HttpRequestEncoder()); pipeline.addLast("handler", handler); return pipeline; diff --git a/src/main/java/org/jboss/netty/example/http/HttpServerPipelineFactory.java b/src/main/java/org/jboss/netty/example/http/HttpServerPipelineFactory.java index 3bf4508387..e3e9163a57 100644 --- a/src/main/java/org/jboss/netty/example/http/HttpServerPipelineFactory.java +++ b/src/main/java/org/jboss/netty/example/http/HttpServerPipelineFactory.java @@ -45,6 +45,7 @@ public class HttpServerPipelineFactory implements ChannelPipelineFactory { ChannelPipeline pipeline = pipeline(); pipeline.addLast("decoder", new HttpRequestDecoder(8192, 8192, 8192)); + //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576)); pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("handler", handler); return pipeline; diff --git a/src/main/java/org/jboss/netty/handler/codec/http/HttpChunkAggregator.java b/src/main/java/org/jboss/netty/handler/codec/http/HttpChunkAggregator.java new file mode 100644 index 0000000000..2dd7e3c4b4 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/codec/http/HttpChunkAggregator.java @@ -0,0 +1,103 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2009, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.handler.codec.http; + +import java.util.List; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipelineCoverage; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.handler.codec.frame.TooLongFrameException; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * @version $Rev$, $Date$ + */ +@ChannelPipelineCoverage("one") +public class HttpChunkAggregator extends SimpleChannelUpstreamHandler { + + private final int maxContentLength; + private volatile HttpMessage currentMessage; + + public HttpChunkAggregator(int maxContentLength) { + if (maxContentLength <= 0) { + throw new IllegalArgumentException( + "maxContentLength must be a positive integer: " + + maxContentLength); + } + this.maxContentLength = maxContentLength; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + Object msg = e.getMessage(); + if (!(msg instanceof HttpMessage) && !(msg instanceof HttpChunk)) { + ctx.sendUpstream(e); + return; + } + + HttpMessage currentMessage = this.currentMessage; + if (currentMessage == null) { + HttpMessage m = (HttpMessage) msg; + if (m.isChunked()) { + // A chunked message - remove 'Transfer-Encoding' header, + // initialize the cumulative buffer, and wait for incoming chunks. + List encodings = m.getHeaders(HttpHeaders.Names.TRANSFER_ENCODING); + encodings.remove(HttpHeaders.Values.CHUNKED); + if (encodings.isEmpty()) { + m.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING); + } + m.setContent(ChannelBuffers.dynamicBuffer(e.getChannel().getConfig().getBufferFactory())); + this.currentMessage = m; + } else { + // Not a chunked message - pass through. + ctx.sendUpstream(e); + } + } else { + // Merge the received chunk into the content of the current message. + HttpChunk chunk = (HttpChunk) msg; + ChannelBuffer content = currentMessage.getContent(); + + if (content.readableBytes() > maxContentLength - chunk.getContent().readableBytes()) { + throw new TooLongFrameException( + "HTTP content length exceeded " + maxContentLength + + " bytes."); + } + + content.writeBytes(chunk.getContent()); + if (chunk.isLast()) { + this.currentMessage = null; + currentMessage.setHeader( + HttpHeaders.Names.CONTENT_LENGTH, + String.valueOf(content.readableBytes())); + Channels.fireMessageReceived(ctx, currentMessage, e.getRemoteAddress()); + } + } + } +}