HttpObjectEncoder and MessageAggregator EMPTY_BUFFER usage

Motivation:
HttpObjectEncoder and MessageAggregator treat buffers that are not readable special. If a buffer is not readable, then an EMPTY_BUFFER is written and the actual buffer is ignored. If the buffer has already been released then this will not be correct as the promise will be completed, but in reality the original content shouldn't have resulted in any write because it was invalid.

Modifications:
- HttpObjectEncoder should retain/write the original buffer instead of using EMPTY_BUFFER
- MessageAggregator should retain/write the original ByteBufHolder instead of using EMPTY_BUFFER

Result:
Invalid write operations which happen to not be readable correctly reflect failed status in the promise, and do not result in any writes to the channel.
This commit is contained in:
Scott Mitchell 2017-11-02 15:08:28 -07:00 committed by Norman Maurer
parent fbe0e3506e
commit 93130b172a
3 changed files with 42 additions and 7 deletions

View File

@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.FileRegion; import io.netty.channel.FileRegion;
import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.util.Iterator; import java.util.Iterator;
@ -109,10 +110,12 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
// ch.write(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); // ch.write(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
// //
// See https://github.com/netty/netty/issues/2983 for more information. // See https://github.com/netty/netty/issues/2983 for more information.
if (msg instanceof ByteBuf) {
if (msg instanceof ByteBuf && !((ByteBuf) msg).isReadable()) { final ByteBuf potentialEmptyBuf = (ByteBuf) msg;
out.add(EMPTY_BUFFER); if (!potentialEmptyBuf.isReadable()) {
return; out.add(potentialEmptyBuf.retain());
return;
}
} }
if (msg instanceof HttpContent || msg instanceof ByteBuf || msg instanceof FileRegion) { if (msg instanceof HttpContent || msg instanceof ByteBuf || msg instanceof FileRegion) {
@ -210,7 +213,7 @@ public abstract class HttpObjectEncoder<H extends HttpMessage> extends MessageTo
} else if (contentLength == 0) { } else if (contentLength == 0) {
// Need to produce some output otherwise an // Need to produce some output otherwise an
// IllegalStateException will be thrown // IllegalStateException will be thrown
out.add(EMPTY_BUFFER); out.add(ReferenceCountUtil.retain(msg));
} }
} }

View File

@ -17,13 +17,20 @@ package io.netty.handler.codec.http;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.IllegalReferenceCountException;
import org.junit.Test; import org.junit.Test;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder; import java.nio.ByteOrder;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.*; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
/** /**
*/ */
@ -122,4 +129,29 @@ public class HttpRequestEncoderTest {
buffer.release(); buffer.release();
} }
} }
@Test
public void testEmptyReleasedBufferShouldNotWriteEmptyBufferToChannel() throws Exception {
HttpRequestEncoder encoder = new HttpRequestEncoder();
EmbeddedChannel channel = new EmbeddedChannel(encoder);
ByteBuf buf = Unpooled.buffer();
buf.release();
try {
channel.writeAndFlush(buf).get();
fail();
} catch (ExecutionException e) {
assertThat(e.getCause().getCause(), is(instanceOf(IllegalReferenceCountException.class)));
}
channel.finishAndReleaseAll();
}
@Test
public void testEmptydBufferShouldPassThrough() throws Exception {
HttpRequestEncoder encoder = new HttpRequestEncoder();
EmbeddedChannel channel = new EmbeddedChannel(encoder);
ByteBuf buffer = Unpooled.buffer();
channel.writeAndFlush(buffer).get();
channel.finishAndReleaseAll();
assertEquals(0, buffer.refCnt());
}
} }

View File

@ -241,7 +241,7 @@ public abstract class MessageAggregator<I, S, C extends ByteBufHolder, O extends
if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) { if (m instanceof DecoderResultProvider && !((DecoderResultProvider) m).decoderResult().isSuccess()) {
O aggregated; O aggregated;
if (m instanceof ByteBufHolder && ((ByteBufHolder) m).content().isReadable()) { if (m instanceof ByteBufHolder) {
aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain()); aggregated = beginAggregation(m, ((ByteBufHolder) m).content().retain());
} else { } else {
aggregated = beginAggregation(m, EMPTY_BUFFER); aggregated = beginAggregation(m, EMPTY_BUFFER);