[#1880] Use ByteBufAllocator when read bytes into new chunks
This commit is contained in:
parent
7e6649fd66
commit
ee192f0321
@ -257,6 +257,23 @@ public final class ByteBufUtil {
|
|||||||
return Long.reverseBytes(value);
|
return Long.reverseBytes(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the given amount of bytes into a new {@link ByteBuf} that is allocated from the {@link ByteBufAllocator}.
|
||||||
|
*/
|
||||||
|
public static ByteBuf readBytes(ByteBufAllocator alloc, ByteBuf buffer, int length) {
|
||||||
|
boolean release = true;
|
||||||
|
ByteBuf dst = alloc.buffer(length);
|
||||||
|
try {
|
||||||
|
buffer.readBytes(dst);
|
||||||
|
release = false;
|
||||||
|
return dst;
|
||||||
|
} finally {
|
||||||
|
if (release) {
|
||||||
|
dst.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static int firstIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
|
private static int firstIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
|
||||||
fromIndex = Math.max(fromIndex, 0);
|
fromIndex = Math.max(fromIndex, 0);
|
||||||
if (fromIndex >= toIndex || buffer.capacity() == 0) {
|
if (fromIndex >= toIndex || buffer.capacity() == 0) {
|
||||||
|
@ -25,6 +25,8 @@ import io.netty.handler.codec.TooLongFrameException;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import static io.netty.buffer.ByteBufUtil.readBytes;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decodes {@link ByteBuf}s into {@link HttpMessage}s and
|
* Decodes {@link ByteBuf}s into {@link HttpMessage}s and
|
||||||
* {@link HttpContent}s.
|
* {@link HttpContent}s.
|
||||||
@ -265,7 +267,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
toRead = maxChunkSize;
|
toRead = maxChunkSize;
|
||||||
}
|
}
|
||||||
out.add(message);
|
out.add(message);
|
||||||
out.add(new DefaultHttpContent(buffer.readBytes(toRead)));
|
out.add(new DefaultHttpContent(readBytes(ctx.alloc(), buffer, toRead)));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
case READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS: {
|
case READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS: {
|
||||||
@ -274,7 +276,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
if (toRead > maxChunkSize) {
|
if (toRead > maxChunkSize) {
|
||||||
toRead = maxChunkSize;
|
toRead = maxChunkSize;
|
||||||
}
|
}
|
||||||
ByteBuf content = buffer.readBytes(toRead);
|
ByteBuf content = readBytes(ctx.alloc(), buffer, toRead);
|
||||||
if (!buffer.isReadable()) {
|
if (!buffer.isReadable()) {
|
||||||
reset();
|
reset();
|
||||||
out.add(new DefaultLastHttpContent(content));
|
out.add(new DefaultLastHttpContent(content));
|
||||||
@ -284,7 +286,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
case READ_FIXED_LENGTH_CONTENT: {
|
case READ_FIXED_LENGTH_CONTENT: {
|
||||||
readFixedLengthContent(buffer, out);
|
readFixedLengthContent(ctx, buffer, out);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
case READ_FIXED_LENGTH_CONTENT_AS_CHUNKS: {
|
case READ_FIXED_LENGTH_CONTENT_AS_CHUNKS: {
|
||||||
@ -308,7 +310,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
if (toRead > chunkSize) {
|
if (toRead > chunkSize) {
|
||||||
toRead = (int) chunkSize;
|
toRead = (int) chunkSize;
|
||||||
}
|
}
|
||||||
ByteBuf content = buffer.readBytes(toRead);
|
ByteBuf content = readBytes(ctx.alloc(), buffer, toRead);
|
||||||
if (chunkSize > toRead) {
|
if (chunkSize > toRead) {
|
||||||
chunkSize -= toRead;
|
chunkSize -= toRead;
|
||||||
} else {
|
} else {
|
||||||
@ -348,7 +350,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
}
|
}
|
||||||
case READ_CHUNKED_CONTENT: {
|
case READ_CHUNKED_CONTENT: {
|
||||||
assert chunkSize <= Integer.MAX_VALUE;
|
assert chunkSize <= Integer.MAX_VALUE;
|
||||||
HttpContent chunk = new DefaultHttpContent(buffer.readBytes((int) chunkSize));
|
HttpContent chunk = new DefaultHttpContent(readBytes(ctx.alloc(), buffer, (int) chunkSize));
|
||||||
checkpoint(State.READ_CHUNK_DELIMITER);
|
checkpoint(State.READ_CHUNK_DELIMITER);
|
||||||
out.add(chunk);
|
out.add(chunk);
|
||||||
return;
|
return;
|
||||||
@ -375,7 +377,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
if (toRead > readLimit) {
|
if (toRead > readLimit) {
|
||||||
toRead = readLimit;
|
toRead = readLimit;
|
||||||
}
|
}
|
||||||
HttpContent chunk = new DefaultHttpContent(buffer.readBytes(toRead));
|
HttpContent chunk = new DefaultHttpContent(readBytes(ctx.alloc(), buffer, toRead));
|
||||||
if (chunkSize > toRead) {
|
if (chunkSize > toRead) {
|
||||||
chunkSize -= toRead;
|
chunkSize -= toRead;
|
||||||
} else {
|
} else {
|
||||||
@ -553,7 +555,7 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void readFixedLengthContent(ByteBuf buffer, List<Object> out) {
|
private void readFixedLengthContent(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
|
||||||
//we have a content-length so we just read the correct number of bytes
|
//we have a content-length so we just read the correct number of bytes
|
||||||
long length = HttpHeaders.getContentLength(message, -1);
|
long length = HttpHeaders.getContentLength(message, -1);
|
||||||
assert length <= Integer.MAX_VALUE;
|
assert length <= Integer.MAX_VALUE;
|
||||||
@ -564,11 +566,11 @@ public abstract class HttpObjectDecoder extends ReplayingDecoder<HttpObjectDecod
|
|||||||
contentRead += toRead;
|
contentRead += toRead;
|
||||||
if (length < contentRead) {
|
if (length < contentRead) {
|
||||||
out.add(message);
|
out.add(message);
|
||||||
out.add(new DefaultHttpContent(buffer.readBytes(toRead)));
|
out.add(new DefaultHttpContent(readBytes(ctx.alloc(), buffer, toRead)));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (content == null) {
|
if (content == null) {
|
||||||
content = buffer.readBytes((int) length);
|
content = readBytes(ctx.alloc(), buffer, (int) length);
|
||||||
} else {
|
} else {
|
||||||
content.writeBytes(buffer, (int) length);
|
content.writeBytes(buffer, (int) length);
|
||||||
}
|
}
|
||||||
|
@ -82,10 +82,12 @@ public class HttpContentCompressorTest {
|
|||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk, is(instanceOf(HttpContent.class)));
|
assertThat(chunk, is(instanceOf(HttpContent.class)));
|
||||||
assertThat(chunk.content().isReadable(), is(true));
|
assertThat(chunk.content().isReadable(), is(true));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
||||||
assertThat(chunk.content().isReadable(), is(false));
|
assertThat(chunk.content().isReadable(), is(false));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
assertThat(ch.readOutbound(), is(nullValue()));
|
assertThat(ch.readOutbound(), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
@ -67,6 +67,8 @@ public class HttpContentEncoderTest {
|
|||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk.content().isReadable(), is(false));
|
assertThat(chunk.content().isReadable(), is(false));
|
||||||
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
assertThat(ch.readOutbound(), is(nullValue()));
|
assertThat(ch.readOutbound(), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -88,8 +90,12 @@ public class HttpContentEncoderTest {
|
|||||||
HttpContent chunk;
|
HttpContent chunk;
|
||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("3"));
|
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("3"));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("2"));
|
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("2"));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("1"));
|
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("1"));
|
||||||
|
|
||||||
@ -97,6 +103,7 @@ public class HttpContentEncoderTest {
|
|||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk.content().isReadable(), is(false));
|
assertThat(chunk.content().isReadable(), is(false));
|
||||||
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
assertThat(ch.readOutbound(), is(nullValue()));
|
assertThat(ch.readOutbound(), is(nullValue()));
|
||||||
}
|
}
|
||||||
@ -121,16 +128,23 @@ public class HttpContentEncoderTest {
|
|||||||
HttpContent chunk;
|
HttpContent chunk;
|
||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("3"));
|
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("3"));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("2"));
|
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("2"));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("1"));
|
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("1"));
|
||||||
|
|
||||||
assertThat(chunk, is(instanceOf(HttpContent.class)));
|
assertThat(chunk, is(instanceOf(HttpContent.class)));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk.content().isReadable(), is(false));
|
assertThat(chunk.content().isReadable(), is(false));
|
||||||
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
||||||
assertEquals("Netty", ((LastHttpContent) chunk).trailingHeaders().get("X-Test"));
|
assertEquals("Netty", ((LastHttpContent) chunk).trailingHeaders().get("X-Test"));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
assertThat(ch.readOutbound(), is(nullValue()));
|
assertThat(ch.readOutbound(), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,8 +162,12 @@ public class HttpContentEncoderTest {
|
|||||||
HttpContent c = (HttpContent) ch.readOutbound();
|
HttpContent c = (HttpContent) ch.readOutbound();
|
||||||
assertThat(c.content().readableBytes(), is(2));
|
assertThat(c.content().readableBytes(), is(2));
|
||||||
assertThat(c.content().toString(CharsetUtil.US_ASCII), is("42"));
|
assertThat(c.content().toString(CharsetUtil.US_ASCII), is("42"));
|
||||||
|
c.release();
|
||||||
|
|
||||||
LastHttpContent last = (LastHttpContent) ch.readOutbound();
|
LastHttpContent last = (LastHttpContent) ch.readOutbound();
|
||||||
assertThat(last.content().readableBytes(), is(0));
|
assertThat(last.content().readableBytes(), is(0));
|
||||||
|
last.release();
|
||||||
|
|
||||||
assertThat(ch.readOutbound(), is(nullValue()));
|
assertThat(ch.readOutbound(), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -169,10 +187,13 @@ public class HttpContentEncoderTest {
|
|||||||
HttpContent chunk = (HttpContent) ch.readOutbound();
|
HttpContent chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("0"));
|
assertThat(chunk.content().toString(CharsetUtil.US_ASCII), is("0"));
|
||||||
assertThat(chunk, is(instanceOf(HttpContent.class)));
|
assertThat(chunk, is(instanceOf(HttpContent.class)));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
chunk = (HttpContent) ch.readOutbound();
|
chunk = (HttpContent) ch.readOutbound();
|
||||||
assertThat(chunk.content().isReadable(), is(false));
|
assertThat(chunk.content().isReadable(), is(false));
|
||||||
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
assertThat(chunk, is(instanceOf(LastHttpContent.class)));
|
||||||
|
chunk.release();
|
||||||
|
|
||||||
assertThat(ch.readOutbound(), is(nullValue()));
|
assertThat(ch.readOutbound(), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -198,6 +219,7 @@ public class HttpContentEncoderTest {
|
|||||||
assertThat(res.headers().get(Names.CONTENT_ENCODING), is(nullValue()));
|
assertThat(res.headers().get(Names.CONTENT_ENCODING), is(nullValue()));
|
||||||
assertThat(res.content().readableBytes(), is(0));
|
assertThat(res.content().readableBytes(), is(0));
|
||||||
assertThat(res.content().toString(CharsetUtil.US_ASCII), is(""));
|
assertThat(res.content().toString(CharsetUtil.US_ASCII), is(""));
|
||||||
|
res.release();
|
||||||
|
|
||||||
assertThat(ch.readOutbound(), is(nullValue()));
|
assertThat(ch.readOutbound(), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
@ -68,6 +68,7 @@ public class HttpObjectAggregatorTest {
|
|||||||
// This should be false as we decompose the buffer before to not have deep hierarchy
|
// This should be false as we decompose the buffer before to not have deep hierarchy
|
||||||
assertFalse(buf instanceof CompositeByteBuf);
|
assertFalse(buf instanceof CompositeByteBuf);
|
||||||
}
|
}
|
||||||
|
aggregatedMessage.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -38,6 +38,7 @@ public class HttpResponseDecoderTest {
|
|||||||
|
|
||||||
LastHttpContent content = (LastHttpContent) ch.readInbound();
|
LastHttpContent content = (LastHttpContent) ch.readInbound();
|
||||||
assertThat(content.content().isReadable(), is(false));
|
assertThat(content.content().isReadable(), is(false));
|
||||||
|
content.release();
|
||||||
|
|
||||||
assertThat(ch.readInbound(), is(nullValue()));
|
assertThat(ch.readInbound(), is(nullValue()));
|
||||||
}
|
}
|
||||||
@ -55,11 +56,13 @@ public class HttpResponseDecoderTest {
|
|||||||
ch.writeInbound(Unpooled.wrappedBuffer(new byte[1024]));
|
ch.writeInbound(Unpooled.wrappedBuffer(new byte[1024]));
|
||||||
HttpContent content = (HttpContent) ch.readInbound();
|
HttpContent content = (HttpContent) ch.readInbound();
|
||||||
assertThat(content.content().readableBytes(), is(1024));
|
assertThat(content.content().readableBytes(), is(1024));
|
||||||
|
content.release();
|
||||||
|
|
||||||
assertThat(ch.finish(), is(true));
|
assertThat(ch.finish(), is(true));
|
||||||
|
|
||||||
LastHttpContent lastContent = (LastHttpContent) ch.readInbound();
|
LastHttpContent lastContent = (LastHttpContent) ch.readInbound();
|
||||||
assertThat(lastContent.content().isReadable(), is(false));
|
assertThat(lastContent.content().isReadable(), is(false));
|
||||||
|
lastContent.release();
|
||||||
|
|
||||||
assertThat(ch.readInbound(), is(nullValue()));
|
assertThat(ch.readInbound(), is(nullValue()));
|
||||||
}
|
}
|
||||||
|
@ -57,6 +57,7 @@ public class HttpServerCodecTest {
|
|||||||
empty = false;
|
empty = false;
|
||||||
totalBytesPolled += httpChunk.content().readableBytes();
|
totalBytesPolled += httpChunk.content().readableBytes();
|
||||||
Assert.assertFalse(httpChunk instanceof LastHttpContent);
|
Assert.assertFalse(httpChunk instanceof LastHttpContent);
|
||||||
|
httpChunk.release();
|
||||||
}
|
}
|
||||||
Assert.assertFalse(empty);
|
Assert.assertFalse(empty);
|
||||||
Assert.assertEquals(offeredContentLength, totalBytesPolled);
|
Assert.assertEquals(offeredContentLength, totalBytesPolled);
|
||||||
|
Loading…
Reference in New Issue
Block a user