From 4c0be9b57ccba5fe655532eb9f4d2361e72a4f83 Mon Sep 17 00:00:00 2001 From: Veebs Date: Mon, 13 Jan 2014 15:07:35 -0800 Subject: [PATCH] Add HttpChunkedInput for easier streaming of chunked content --- .../handler/codec/http/HttpChunkedInput.java | 99 ++++++++++++++ .../codec/http/HttpChunkedInputTest.java | 125 ++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java create mode 100644 codec-http/src/test/java/io/netty/handler/codec/http/HttpChunkedInputTest.java diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java new file mode 100644 index 0000000000..c9e33c0214 --- /dev/null +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java @@ -0,0 +1,99 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.stream.ChunkedInput; + +/** + * A {@link ChunkedInput} that fetches data chunk by chunk for use with HTTP chunked transfers. + *

+ * Each chunk from the input data will be wrapped within a {@link HttpContent}. At the end of the input data, + * {@link LastHttpContent} will be written. + *

+ * Ensure that your HTTP response header contains {@code Transfer-Encoding: chunked}. + *

+ *

+ * public void messageReceived(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
+ *     HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ *     response.headers().set(TRANSFER_ENCODING, CHUNKED);
+ *     ctx.write(response);
+ *
+ *     HttpContentChunkedInput httpChunkWriter = new HttpChunkedInput(
+ *         new ChunkedFile("/tmp/myfile.txt"));
+ *     ChannelFuture sendFileFuture = ctx.write(httpChunkWriter);
+ * }
+ * 
+ */ +public class HttpChunkedInput implements ChunkedInput { + + private ChunkedInput input; + private boolean sentLastChunk; + private LastHttpContent lastHttpContent; + + /** + * Creates a new instance using the specified input. + * @param input {@link ChunkedInput} containing data to write + */ + public HttpChunkedInput(ChunkedInput input) { + this.input = input; + this.lastHttpContent = LastHttpContent.EMPTY_LAST_CONTENT; + } + + /** + * Creates a new instance using the specified input. {@code lastHttpContent} will be written as the terminating + * chunk. + * @param input {@link ChunkedInput} containing data to write + * @param lastHttpContent {@link LastHttpContent} that will be written as the terminating chunk. Use this for + * training headers. + */ + public HttpChunkedInput(ChunkedInput input, LastHttpContent lastHttpContent) { + this.input = input; + this.lastHttpContent = lastHttpContent; + } + + @Override + public boolean isEndOfInput() throws Exception { + if (input.isEndOfInput()) { + // Only end of input after last HTTP chunk has been sent + return sentLastChunk; + } else { + return false; + } + } + + @Override + public void close() throws Exception { + input.close(); + } + + @Override + public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception { + if (input.isEndOfInput()) { + if (sentLastChunk) { + return null; + } else { + // Send last chunk for this input + sentLastChunk = true; + return lastHttpContent; + } + } else { + ByteBuf buf = input.readChunk(ctx); + return new DefaultHttpContent(buf); + } + } +} diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/HttpChunkedInputTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/HttpChunkedInputTest.java new file mode 100644 index 0000000000..49b111796e --- /dev/null +++ b/codec-http/src/test/java/io/netty/handler/codec/http/HttpChunkedInputTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2014 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.http; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import io.netty.buffer.ByteBuf; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.stream.ChunkedFile; +import io.netty.handler.stream.ChunkedInput; +import io.netty.handler.stream.ChunkedNioFile; +import io.netty.handler.stream.ChunkedNioStream; +import io.netty.handler.stream.ChunkedStream; +import io.netty.handler.stream.ChunkedWriteHandler; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; + +import org.junit.Test; + +public class HttpChunkedInputTest { + private static final byte[] BYTES = new byte[1024 * 64]; + private static final File TMP; + + static { + for (int i = 0; i < BYTES.length; i++) { + BYTES[i] = (byte) i; + } + + FileOutputStream out = null; + try { + TMP = File.createTempFile("netty-chunk-", ".tmp"); + TMP.deleteOnExit(); + out = new FileOutputStream(TMP); + out.write(BYTES); + out.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (out != null) { + try { + out.close(); + } catch (IOException e) { + // ignore + } + } + } + } + + @Test + public void testChunkedStream() { + check(new HttpChunkedInput(new ChunkedStream(new ByteArrayInputStream(BYTES)))); + } + + @Test + public void testChunkedNioStream() { + check(new HttpChunkedInput(new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES))))); + } + + @Test + public void testChunkedFile() throws IOException { + check(new HttpChunkedInput(new ChunkedFile(TMP))); + } + + @Test + public void testChunkedNioFile() throws IOException { + check(new HttpChunkedInput(new ChunkedNioFile(TMP))); + } + + private static void check(ChunkedInput... inputs) { + EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler()); + + for (ChunkedInput input : inputs) { + ch.writeOutbound(input); + } + + assertTrue(ch.finish()); + + int i = 0; + int read = 0; + HttpContent lastHttpContent = null; + for (;;) { + HttpContent httpContent = ch.readOutbound(); + if (httpContent == null) { + break; + } else { + if (lastHttpContent != null) { + assertTrue("Chunk must be DefaultHttpContent", lastHttpContent instanceof DefaultHttpContent); + } + } + + ByteBuf buffer = httpContent.content(); + while (buffer.isReadable()) { + assertEquals(BYTES[i++], buffer.readByte()); + read++; + if (i == BYTES.length) { + i = 0; + } + } + buffer.release(); + + // Save last chunk + lastHttpContent = httpContent; + } + + assertEquals(BYTES.length * inputs.length, read); + assertTrue("Last chunk must be DefaultLastHttpContent", lastHttpContent == LastHttpContent.EMPTY_LAST_CONTENT); + } +}