From fd5316ed6fcde05274df38c26b9481771c182c5c Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Wed, 23 Dec 2015 10:05:19 -0800 Subject: [PATCH] ChunkedInput.readChunk parameter of type ByteBufAllocator Motivation: ChunkedInput.readChunk currently takes a ChannelHandlerContext object as a parameters. All current implementations of this interface only use this object to get the ByteBufAllocator object. Thus taking a ChannelHandlerContext as a parameter is more restrictive for users of this API than necessary. Modifications: - Add a new method readChunk(ByteBufAllocator) - Deprecate readChunk(ChannelHandlerContext) and updates all implementations to call readChunk(ByteBufAllocator) Result: API that only requires ByteBufAllocator to use ChunkedInput. --- .../handler/codec/http/HttpChunkedInput.java | 9 +++++++- .../multipart/HttpPostRequestEncoder.java | 9 +++++++- .../multipart/HttpPostRequestEncoderTest.java | 10 ++++++--- .../io/netty/handler/stream/ChunkedFile.java | 9 +++++++- .../io/netty/handler/stream/ChunkedInput.java | 21 ++++++++++++++++++- .../netty/handler/stream/ChunkedNioFile.java | 9 +++++++- .../handler/stream/ChunkedNioStream.java | 9 +++++++- .../netty/handler/stream/ChunkedStream.java | 9 +++++++- .../handler/stream/ChunkedWriteHandler.java | 4 +++- .../stream/ChunkedWriteHandlerTest.java | 19 +++++++++++++++-- 10 files changed, 95 insertions(+), 13 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java index 652ba6ae6f..23926525fb 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/HttpChunkedInput.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.http; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.stream.ChunkedInput; @@ -81,8 +82,14 @@ public class HttpChunkedInput implements ChunkedInput { input.close(); } + @Deprecated @Override public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public HttpContent readChunk(ByteBufAllocator allocator) throws Exception { if (input.isEndOfInput()) { if (sentLastChunk) { return null; @@ -92,7 +99,7 @@ public class HttpChunkedInput implements ChunkedInput { return lastHttpContent; } } else { - ByteBuf buf = input.readChunk(ctx); + ByteBuf buf = input.readChunk(allocator); return new DefaultHttpContent(buf); } } diff --git a/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java b/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java index f365ede795..ad05b0958f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java +++ b/codec-http/src/main/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoder.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.http.multipart; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.http.DefaultFullHttpRequest; @@ -997,6 +998,12 @@ public class HttpPostRequestEncoder implements ChunkedInput { // cleanFiles(); } + @Deprecated + @Override + public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + /** * Returns the next available HttpChunk. The caller is responsible to test if this chunk is the last one (isLast()), * in order to stop calling this getMethod. @@ -1006,7 +1013,7 @@ public class HttpPostRequestEncoder implements ChunkedInput { * if the encoding is in error */ @Override - public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception { + public HttpContent readChunk(ByteBufAllocator allocator) throws Exception { if (isLastChunkSent) { return null; } else { diff --git a/codec-http/src/test/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoderTest.java b/codec-http/src/test/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoderTest.java index 71e77cda78..0624436cb5 100644 --- a/codec-http/src/test/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoderTest.java +++ b/codec-http/src/test/java/io/netty/handler/codec/http/multipart/HttpPostRequestEncoderTest.java @@ -16,6 +16,7 @@ package io.netty.handler.codec.http.multipart; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.SlicedByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.DefaultFullHttpRequest; @@ -31,8 +32,11 @@ import java.io.File; import java.util.Arrays; import java.util.List; -import static io.netty.handler.codec.http.HttpHeaderNames.*; -import static org.junit.Assert.*; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_DISPOSITION; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TRANSFER_ENCODING; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static org.junit.Assert.assertEquals; /** {@link HttpPostRequestEncoder} test case. */ public class HttpPostRequestEncoderTest { @@ -220,7 +224,7 @@ public class HttpPostRequestEncoderTest { encoder.addBodyFileUpload("myfile", file1, "application/x-zip-compressed", false); encoder.finalizeRequest(); while (! encoder.isEndOfInput()) { - HttpContent httpContent = encoder.readChunk(null); + HttpContent httpContent = encoder.readChunk((ByteBufAllocator) null); if (httpContent.content() instanceof SlicedByteBuf) { assertEquals(2, httpContent.content().refCnt()); } else { diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java b/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java index 69d1efb141..3e12e4ae54 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedFile.java @@ -16,6 +16,7 @@ package io.netty.handler.stream; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.FileRegion; @@ -137,8 +138,14 @@ public class ChunkedFile implements ChunkedInput { file.close(); } + @Deprecated @Override public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception { long offset = this.offset; if (offset >= endOffset) { return null; @@ -147,7 +154,7 @@ public class ChunkedFile implements ChunkedInput { int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset); // Check if the buffer is backed by an byte array. If so we can optimize it a bit an safe a copy - ByteBuf buf = ctx.alloc().heapBuffer(chunkSize); + ByteBuf buf = allocator.heapBuffer(chunkSize); boolean release = true; try { file.readFully(buf.array(), buf.arrayOffset(), chunkSize); diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java b/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java index 4c44bf9efc..1756ce1940 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedInput.java @@ -16,6 +16,7 @@ package io.netty.handler.stream; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; /** @@ -35,18 +36,36 @@ public interface ChunkedInput { void close() throws Exception; /** - * Fetches a chunked data from the stream. Once this method returns the last chunk + * @deprecated Use {@link #readChunk(ByteBufAllocator)}. + * + *

Fetches a chunked data from the stream. Once this method returns the last chunk * and thus the stream has reached at its end, any subsequent {@link #isEndOfInput()} * call must return {@code true}. * + * @param ctx The context which provides a {@link ByteBufAllocator} if buffer allocation is necessary. * @return the fetched chunk. * {@code null} if there is no data left in the stream. * Please note that {@code null} does not necessarily mean that the * stream has reached at its end. In a slow stream, the next chunk * might be unavailable just momentarily. */ + @Deprecated B readChunk(ChannelHandlerContext ctx) throws Exception; + /** + * Fetches a chunked data from the stream. Once this method returns the last chunk + * and thus the stream has reached at its end, any subsequent {@link #isEndOfInput()} + * call must return {@code true}. + * + * @param a {@link ByteBufAllocator} if buffer allocation is necessary. + * @return the fetched chunk. + * {@code null} if there is no data left in the stream. + * Please note that {@code null} does not necessarily mean that the + * stream has reached at its end. In a slow stream, the next chunk + * might be unavailable just momentarily. + */ + B readChunk(ByteBufAllocator allocator) throws Exception; + /** * Returns the length of the input. * @return the length of the input if the length of the input is known. diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java b/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java index dbb0521d4d..339a3e5776 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedNioFile.java @@ -16,6 +16,7 @@ package io.netty.handler.stream; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.FileRegion; @@ -141,15 +142,21 @@ public class ChunkedNioFile implements ChunkedInput { in.close(); } + @Deprecated @Override public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception { long offset = this.offset; if (offset >= endOffset) { return null; } int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset); - ByteBuf buffer = ctx.alloc().buffer(chunkSize); + ByteBuf buffer = allocator.buffer(chunkSize); boolean release = true; try { int readBytes = 0; diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java b/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java index fd59e74477..22feb6369d 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedNioStream.java @@ -16,6 +16,7 @@ package io.netty.handler.stream; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import java.nio.ByteBuffer; @@ -96,8 +97,14 @@ public class ChunkedNioStream implements ChunkedInput { in.close(); } + @Deprecated @Override public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception { if (isEndOfInput()) { return null; } @@ -116,7 +123,7 @@ public class ChunkedNioStream implements ChunkedInput { } byteBuffer.flip(); boolean release = true; - ByteBuf buffer = ctx.alloc().buffer(byteBuffer.remaining()); + ByteBuf buffer = allocator.buffer(byteBuffer.remaining()); try { buffer.writeBytes(byteBuffer); byteBuffer.clear(); diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java b/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java index e3625dfea7..f476c03e64 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedStream.java @@ -16,6 +16,7 @@ package io.netty.handler.stream; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import java.io.InputStream; @@ -99,8 +100,14 @@ public class ChunkedStream implements ChunkedInput { in.close(); } + @Deprecated @Override public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception { if (isEndOfInput()) { return null; } @@ -114,7 +121,7 @@ public class ChunkedStream implements ChunkedInput { } boolean release = true; - ByteBuf buffer = ctx.alloc().buffer(chunkSize); + ByteBuf buffer = allocator.buffer(chunkSize); try { // transfer to buffer offset += buffer.writeBytes(in, chunkSize); diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index 6f5c647bc0..0d25cdfb2b 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -15,6 +15,7 @@ */ package io.netty.handler.stream; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; @@ -203,6 +204,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { } boolean flushed = false; + ByteBufAllocator allocator = ctx.alloc(); while (channel.isWritable()) { if (currentWrite == null) { currentWrite = queue.poll(); @@ -220,7 +222,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { boolean suspend; Object message = null; try { - message = chunks.readChunk(ctx); + message = chunks.readChunk(allocator); endOfInput = chunks.isEndOfInput(); if (message == null) { diff --git a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java index 204e140f4b..859303135a 100644 --- a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java @@ -16,6 +16,7 @@ package io.netty.handler.stream; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -31,8 +32,10 @@ import java.io.IOException; import java.nio.channels.Channels; import java.util.concurrent.atomic.AtomicBoolean; -import static io.netty.util.ReferenceCountUtil.*; -import static org.junit.Assert.*; +import static io.netty.util.ReferenceCountUtil.releaseLater; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class ChunkedWriteHandlerTest { private static final byte[] BYTES = new byte[1024 * 64]; @@ -116,8 +119,14 @@ public class ChunkedWriteHandlerTest { // NOOP } + @Deprecated @Override public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception { if (done) { return null; } @@ -173,8 +182,14 @@ public class ChunkedWriteHandlerTest { // NOOP } + @Deprecated @Override public Object readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public Object readChunk(ByteBufAllocator ctx) throws Exception { if (done) { return false; }