From ac4dacd84f5412effa59635d9280839ef207b1c5 Mon Sep 17 00:00:00 2001 From: norman Date: Tue, 3 Jul 2012 10:37:05 +0200 Subject: [PATCH] Minimize byte copies by using a CompositeByteBuf to concat the chunks. See #413 --- .../codec/http/HttpChunkAggregator.java | 67 ++++++++- .../codec/http/HttpChunkAggregatorTest.java | 140 ++++++++++++++++++ 2 files changed, 205 insertions(+), 2 deletions(-) create mode 100644 codec-http/src/test/java/io/netty/handler/codec/http/HttpChunkAggregatorTest.java diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java index e65bb5d95e..44f218506f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkAggregator.java @@ -17,6 +17,7 @@ package io.netty.handler.codec.http; import static io.netty.handler.codec.http.HttpHeaders.*; import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; @@ -47,13 +48,16 @@ import java.util.Map.Entry; * @apiviz.has io.netty.handler.codec.http.HttpChunk oneway - - filters out */ public class HttpChunkAggregator extends MessageToMessageDecoder { - + public static final int DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS = 1024; private static final ByteBuf CONTINUE = Unpooled.copiedBuffer( "HTTP/1.1 100 Continue\r\n\r\n", CharsetUtil.US_ASCII); private final int maxContentLength; private HttpMessage currentMessage; + private int maxCumulationBufferComponents = DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS; + private ChannelHandlerContext ctx; + /** * Creates a new instance. * @@ -71,6 +75,38 @@ public class HttpChunkAggregator extends MessageToMessageDecoder= 2)"); + } + + if (ctx == null) { + this.maxCumulationBufferComponents = maxCumulationBufferComponents; + } else { + throw new IllegalStateException( + "decoder properties cannot be changed once the decoder is added to a pipeline."); + } + } + @Override public boolean isDecodable(Object msg) throws Exception { return msg instanceof HttpMessage || msg instanceof HttpChunk; @@ -131,7 +167,9 @@ public class HttpChunkAggregator extends MessageToMessageDecoder= maxCumulationBufferComponents) { + currentMessage.setContent(Unpooled.wrappedBuffer(composite.copy(), input)); + } else { + List decomposed = composite.decompose(0, composite.readableBytes()); + ByteBuf[] buffers = decomposed.toArray(new ByteBuf[decomposed.size() + 1]); + buffers[buffers.length - 1] = input; + + currentMessage.setContent(Unpooled.wrappedBuffer(buffers)); + } + } else { + currentMessage.setContent(Unpooled.wrappedBuffer(cumulation, input)); + } + + } + + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; + } + } diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/HttpChunkAggregatorTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/HttpChunkAggregatorTest.java new file mode 100644 index 0000000000..5662fbf3dc --- /dev/null +++ b/codec-http/src/test/java/io/netty/handler/codec/http/HttpChunkAggregatorTest.java @@ -0,0 +1,140 @@ +/* + * Copyright 2012 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http; + +import static org.junit.Assert.*; + +import java.util.List; + +import org.easymock.EasyMock; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.buffer.CompositeByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.embedded.EmbeddedMessageChannel; +import io.netty.handler.codec.TooLongFrameException; + +import io.netty.util.CharsetUtil; +import org.junit.Test; + +public class HttpChunkAggregatorTest { + + @Test + public void testAggregate() { + HttpChunkAggregator aggr = new HttpChunkAggregator(1024 * 1024); + EmbeddedMessageChannel embedder = new EmbeddedMessageChannel(aggr); + + HttpMessage message = new DefaultHttpMessage(HttpVersion.HTTP_1_1); + HttpHeaders.setHeader(message, "X-Test", true); + message.setChunked(true); + HttpChunk chunk1 = new DefaultHttpChunk(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII)); + HttpChunk chunk2 = new DefaultHttpChunk(Unpooled.copiedBuffer("test2", CharsetUtil.US_ASCII)); + HttpChunk chunk3 = new DefaultHttpChunk(Unpooled.EMPTY_BUFFER); + assertFalse(embedder.writeInbound(message)); + assertFalse(embedder.writeInbound(chunk1)); + assertFalse(embedder.writeInbound(chunk2)); + + // this should trigger a messageReceived event so return true + assertTrue(embedder.writeInbound(chunk3)); + assertTrue(embedder.finish()); + HttpMessage aggratedMessage = (HttpMessage) embedder.readInbound(); + assertNotNull(aggratedMessage); + + assertEquals(chunk1.getContent().readableBytes() + chunk2.getContent().readableBytes(), HttpHeaders.getContentLength(aggratedMessage)); + assertEquals(aggratedMessage.getHeader("X-Test"), Boolean.TRUE.toString()); + checkContentBuffer(aggratedMessage); + assertNull(embedder.readInbound()); + + } + + private void checkContentBuffer(HttpMessage aggregatedMessage) { + CompositeByteBuf buffer = (CompositeByteBuf) aggregatedMessage.getContent(); + assertEquals(2, buffer.numComponents()); + List buffers = buffer.decompose(0, buffer.capacity()); + assertEquals(2, buffers.size()); + for (ByteBuf buf: buffers) { + // This should be false as we decompose the buffer before to not have deep hierarchy + assertFalse(buf instanceof CompositeByteBuf); + } + } + + @Test + public void testAggregateWithTrailer() { + HttpChunkAggregator aggr = new HttpChunkAggregator(1024 * 1024); + EmbeddedMessageChannel embedder = new EmbeddedMessageChannel(aggr); + HttpMessage message = new DefaultHttpMessage(HttpVersion.HTTP_1_1); + HttpHeaders.setHeader(message, "X-Test", true); + message.setChunked(true); + HttpChunk chunk1 = new DefaultHttpChunk(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII)); + HttpChunk chunk2 = new DefaultHttpChunk(Unpooled.copiedBuffer("test2", CharsetUtil.US_ASCII)); + HttpChunkTrailer trailer = new DefaultHttpChunkTrailer(); + trailer.setHeader("X-Trailer", true); + + assertFalse(embedder.writeInbound(message)); + assertFalse(embedder.writeInbound(chunk1)); + assertFalse(embedder.writeInbound(chunk2)); + + // this should trigger a messageReceived event so return true + assertTrue(embedder.writeInbound(trailer)); + assertTrue(embedder.finish()); + HttpMessage aggratedMessage = (HttpMessage) embedder.readInbound(); + assertNotNull(aggratedMessage); + + assertEquals(chunk1.getContent().readableBytes() + chunk2.getContent().readableBytes(), HttpHeaders.getContentLength(aggratedMessage)); + assertEquals(aggratedMessage.getHeader("X-Test"), Boolean.TRUE.toString()); + assertEquals(aggratedMessage.getHeader("X-Trailer"), Boolean.TRUE.toString()); + checkContentBuffer(aggratedMessage); + + assertNull(embedder.readInbound()); + + } + + + @Test(expected = TooLongFrameException.class) + public void testTooLongFrameException() { + HttpChunkAggregator aggr = new HttpChunkAggregator(4); + EmbeddedMessageChannel embedder = new EmbeddedMessageChannel(aggr); + HttpMessage message = new DefaultHttpMessage(HttpVersion.HTTP_1_1); + message.setChunked(true); + HttpChunk chunk1 = new DefaultHttpChunk(Unpooled.copiedBuffer("test", CharsetUtil.US_ASCII)); + HttpChunk chunk2 = new DefaultHttpChunk(Unpooled.copiedBuffer("test2", CharsetUtil.US_ASCII)); + assertFalse(embedder.writeInbound(message)); + assertFalse(embedder.writeInbound(chunk1)); + embedder.writeInbound(chunk2); + fail(); + + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidConstructorUsage() { + new HttpChunkAggregator(0); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidMaxCumulationBufferComponents() { + HttpChunkAggregator aggr= new HttpChunkAggregator(Integer.MAX_VALUE); + aggr.setMaxCumulationBufferComponents(1); + } + + @Test(expected = IllegalStateException.class) + public void testSetMaxCumulationBufferComponentsAfterInit() throws Exception { + HttpChunkAggregator aggr = new HttpChunkAggregator(Integer.MAX_VALUE); + ChannelHandlerContext ctx = EasyMock.createMock(ChannelHandlerContext.class); + EasyMock.replay(ctx); + aggr.beforeAdd(ctx); + aggr.setMaxCumulationBufferComponents(10); + } +}