ChunkedInput.readChunk parameter of type ByteBufAllocator

Motivation:
ChunkedInput.readChunk currently takes a ChannelHandlerContext object as a parameters. All current implementations of this interface only use this object to get the ByteBufAllocator object. Thus taking a ChannelHandlerContext as a parameter is more restrictive for users of this API than necessary.

Modifications:
- Add a new method readChunk(ByteBufAllocator)
- Deprecate readChunk(ChannelHandlerContext) and updates all implementations to call readChunk(ByteBufAllocator)

Result:
API that only requires ByteBufAllocator to use ChunkedInput.
This commit is contained in:
Scott Mitchell 2015-12-23 10:05:19 -08:00
parent d59bf84ef2
commit fd5316ed6f
10 changed files with 95 additions and 13 deletions

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http; package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.stream.ChunkedInput; import io.netty.handler.stream.ChunkedInput;
@ -81,8 +82,14 @@ public class HttpChunkedInput implements ChunkedInput<HttpContent> {
input.close(); input.close();
} }
@Deprecated
@Override @Override
public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception { public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}
@Override
public HttpContent readChunk(ByteBufAllocator allocator) throws Exception {
if (input.isEndOfInput()) { if (input.isEndOfInput()) {
if (sentLastChunk) { if (sentLastChunk) {
return null; return null;
@ -92,7 +99,7 @@ public class HttpChunkedInput implements ChunkedInput<HttpContent> {
return lastHttpContent; return lastHttpContent;
} }
} else { } else {
ByteBuf buf = input.readChunk(ctx); ByteBuf buf = input.readChunk(allocator);
return new DefaultHttpContent(buf); return new DefaultHttpContent(buf);
} }
} }

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http.multipart; package io.netty.handler.codec.http.multipart;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.DecoderResult; import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpRequest;
@ -997,6 +998,12 @@ public class HttpPostRequestEncoder implements ChunkedInput<HttpContent> {
// cleanFiles(); // cleanFiles();
} }
@Deprecated
@Override
public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}
/** /**
* Returns the next available HttpChunk. The caller is responsible to test if this chunk is the last one (isLast()), * Returns the next available HttpChunk. The caller is responsible to test if this chunk is the last one (isLast()),
* in order to stop calling this getMethod. * in order to stop calling this getMethod.
@ -1006,7 +1013,7 @@ public class HttpPostRequestEncoder implements ChunkedInput<HttpContent> {
* if the encoding is in error * if the encoding is in error
*/ */
@Override @Override
public HttpContent readChunk(ChannelHandlerContext ctx) throws Exception { public HttpContent readChunk(ByteBufAllocator allocator) throws Exception {
if (isLastChunkSent) { if (isLastChunkSent) {
return null; return null;
} else { } else {

View File

@ -16,6 +16,7 @@
package io.netty.handler.codec.http.multipart; package io.netty.handler.codec.http.multipart;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.SlicedByteBuf; import io.netty.buffer.SlicedByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultFullHttpRequest;
@ -31,8 +32,11 @@ import java.io.File;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import static io.netty.handler.codec.http.HttpHeaderNames.*; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_DISPOSITION;
import static org.junit.Assert.*; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TRANSFER_ENCODING;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static org.junit.Assert.assertEquals;
/** {@link HttpPostRequestEncoder} test case. */ /** {@link HttpPostRequestEncoder} test case. */
public class HttpPostRequestEncoderTest { public class HttpPostRequestEncoderTest {
@ -220,7 +224,7 @@ public class HttpPostRequestEncoderTest {
encoder.addBodyFileUpload("myfile", file1, "application/x-zip-compressed", false); encoder.addBodyFileUpload("myfile", file1, "application/x-zip-compressed", false);
encoder.finalizeRequest(); encoder.finalizeRequest();
while (! encoder.isEndOfInput()) { while (! encoder.isEndOfInput()) {
HttpContent httpContent = encoder.readChunk(null); HttpContent httpContent = encoder.readChunk((ByteBufAllocator) null);
if (httpContent.content() instanceof SlicedByteBuf) { if (httpContent.content() instanceof SlicedByteBuf) {
assertEquals(2, httpContent.content().refCnt()); assertEquals(2, httpContent.content().refCnt());
} else { } else {

View File

@ -16,6 +16,7 @@
package io.netty.handler.stream; package io.netty.handler.stream;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
@ -137,8 +138,14 @@ public class ChunkedFile implements ChunkedInput<ByteBuf> {
file.close(); file.close();
} }
@Deprecated
@Override @Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}
@Override
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
long offset = this.offset; long offset = this.offset;
if (offset >= endOffset) { if (offset >= endOffset) {
return null; return null;
@ -147,7 +154,7 @@ public class ChunkedFile implements ChunkedInput<ByteBuf> {
int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset); int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset);
// Check if the buffer is backed by an byte array. If so we can optimize it a bit an safe a copy // Check if the buffer is backed by an byte array. If so we can optimize it a bit an safe a copy
ByteBuf buf = ctx.alloc().heapBuffer(chunkSize); ByteBuf buf = allocator.heapBuffer(chunkSize);
boolean release = true; boolean release = true;
try { try {
file.readFully(buf.array(), buf.arrayOffset(), chunkSize); file.readFully(buf.array(), buf.arrayOffset(), chunkSize);

View File

@ -16,6 +16,7 @@
package io.netty.handler.stream; package io.netty.handler.stream;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
/** /**
@ -35,18 +36,36 @@ public interface ChunkedInput<B> {
void close() throws Exception; void close() throws Exception;
/** /**
* Fetches a chunked data from the stream. Once this method returns the last chunk * @deprecated Use {@link #readChunk(ByteBufAllocator)}.
*
* <p>Fetches a chunked data from the stream. Once this method returns the last chunk
* and thus the stream has reached at its end, any subsequent {@link #isEndOfInput()} * and thus the stream has reached at its end, any subsequent {@link #isEndOfInput()}
* call must return {@code true}. * call must return {@code true}.
* *
* @param ctx The context which provides a {@link ByteBufAllocator} if buffer allocation is necessary.
* @return the fetched chunk. * @return the fetched chunk.
* {@code null} if there is no data left in the stream. * {@code null} if there is no data left in the stream.
* Please note that {@code null} does not necessarily mean that the * Please note that {@code null} does not necessarily mean that the
* stream has reached at its end. In a slow stream, the next chunk * stream has reached at its end. In a slow stream, the next chunk
* might be unavailable just momentarily. * might be unavailable just momentarily.
*/ */
@Deprecated
B readChunk(ChannelHandlerContext ctx) throws Exception; B readChunk(ChannelHandlerContext ctx) throws Exception;
/**
* Fetches a chunked data from the stream. Once this method returns the last chunk
* and thus the stream has reached at its end, any subsequent {@link #isEndOfInput()}
* call must return {@code true}.
*
* @param a {@link ByteBufAllocator} if buffer allocation is necessary.
* @return the fetched chunk.
* {@code null} if there is no data left in the stream.
* Please note that {@code null} does not necessarily mean that the
* stream has reached at its end. In a slow stream, the next chunk
* might be unavailable just momentarily.
*/
B readChunk(ByteBufAllocator allocator) throws Exception;
/** /**
* Returns the length of the input. * Returns the length of the input.
* @return the length of the input if the length of the input is known. * @return the length of the input if the length of the input is known.

View File

@ -16,6 +16,7 @@
package io.netty.handler.stream; package io.netty.handler.stream;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
@ -141,15 +142,21 @@ public class ChunkedNioFile implements ChunkedInput<ByteBuf> {
in.close(); in.close();
} }
@Deprecated
@Override @Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}
@Override
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
long offset = this.offset; long offset = this.offset;
if (offset >= endOffset) { if (offset >= endOffset) {
return null; return null;
} }
int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset); int chunkSize = (int) Math.min(this.chunkSize, endOffset - offset);
ByteBuf buffer = ctx.alloc().buffer(chunkSize); ByteBuf buffer = allocator.buffer(chunkSize);
boolean release = true; boolean release = true;
try { try {
int readBytes = 0; int readBytes = 0;

View File

@ -16,6 +16,7 @@
package io.netty.handler.stream; package io.netty.handler.stream;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -96,8 +97,14 @@ public class ChunkedNioStream implements ChunkedInput<ByteBuf> {
in.close(); in.close();
} }
@Deprecated
@Override @Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}
@Override
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
if (isEndOfInput()) { if (isEndOfInput()) {
return null; return null;
} }
@ -116,7 +123,7 @@ public class ChunkedNioStream implements ChunkedInput<ByteBuf> {
} }
byteBuffer.flip(); byteBuffer.flip();
boolean release = true; boolean release = true;
ByteBuf buffer = ctx.alloc().buffer(byteBuffer.remaining()); ByteBuf buffer = allocator.buffer(byteBuffer.remaining());
try { try {
buffer.writeBytes(byteBuffer); buffer.writeBytes(byteBuffer);
byteBuffer.clear(); byteBuffer.clear();

View File

@ -16,6 +16,7 @@
package io.netty.handler.stream; package io.netty.handler.stream;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.io.InputStream; import java.io.InputStream;
@ -99,8 +100,14 @@ public class ChunkedStream implements ChunkedInput<ByteBuf> {
in.close(); in.close();
} }
@Deprecated
@Override @Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}
@Override
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
if (isEndOfInput()) { if (isEndOfInput()) {
return null; return null;
} }
@ -114,7 +121,7 @@ public class ChunkedStream implements ChunkedInput<ByteBuf> {
} }
boolean release = true; boolean release = true;
ByteBuf buffer = ctx.alloc().buffer(chunkSize); ByteBuf buffer = allocator.buffer(chunkSize);
try { try {
// transfer to buffer // transfer to buffer
offset += buffer.writeBytes(in, chunkSize); offset += buffer.writeBytes(in, chunkSize);

View File

@ -15,6 +15,7 @@
*/ */
package io.netty.handler.stream; package io.netty.handler.stream;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
@ -203,6 +204,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
} }
boolean flushed = false; boolean flushed = false;
ByteBufAllocator allocator = ctx.alloc();
while (channel.isWritable()) { while (channel.isWritable()) {
if (currentWrite == null) { if (currentWrite == null) {
currentWrite = queue.poll(); currentWrite = queue.poll();
@ -220,7 +222,7 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
boolean suspend; boolean suspend;
Object message = null; Object message = null;
try { try {
message = chunks.readChunk(ctx); message = chunks.readChunk(allocator);
endOfInput = chunks.isEndOfInput(); endOfInput = chunks.isEndOfInput();
if (message == null) { if (message == null) {

View File

@ -16,6 +16,7 @@
package io.netty.handler.stream; package io.netty.handler.stream;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -31,8 +32,10 @@ import java.io.IOException;
import java.nio.channels.Channels; import java.nio.channels.Channels;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.util.ReferenceCountUtil.*; import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.junit.Assert.*; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class ChunkedWriteHandlerTest { public class ChunkedWriteHandlerTest {
private static final byte[] BYTES = new byte[1024 * 64]; private static final byte[] BYTES = new byte[1024 * 64];
@ -116,8 +119,14 @@ public class ChunkedWriteHandlerTest {
// NOOP // NOOP
} }
@Deprecated
@Override @Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}
@Override
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
if (done) { if (done) {
return null; return null;
} }
@ -173,8 +182,14 @@ public class ChunkedWriteHandlerTest {
// NOOP // NOOP
} }
@Deprecated
@Override @Override
public Object readChunk(ChannelHandlerContext ctx) throws Exception { public Object readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}
@Override
public Object readChunk(ByteBufAllocator ctx) throws Exception {
if (done) { if (done) {
return false; return false;
} }