From 88712a9ea2efd0f1decfad7d785c6b88026f6f96 Mon Sep 17 00:00:00 2001 From: Martin Furmanski Date: Thu, 28 Nov 2019 12:17:44 +0100 Subject: [PATCH] Improve error handling in ByteToMessageDecoder when expand fails (#9822) Motivation: The buffer which the decoder allocates for the expansion can be leaked if there is a subsequent issue writing to it. Modifications: The error handling has been improved so that the new buffer always is released on failure in the expand. Result: The decoder will not leak in this scenario any more. Fixes: https://github.com/netty/netty/issues/9812 --- .../handler/codec/ByteToMessageDecoder.java | 28 ++--- .../codec/ByteToMessageDecoderTest.java | 105 +++++++++++++++--- 2 files changed, 107 insertions(+), 26 deletions(-) diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 81ae3b2c75..e1a68c58e8 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -79,7 +79,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { try { - final ByteBuf buffer; if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) { // Expand cumulation (by replace it) when either there is not more room in the buffer @@ -89,12 +88,11 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 - buffer = expandCumulation(alloc, cumulation, in.readableBytes()); + cumulation = expandCumulation(alloc, cumulation, in); } else { - buffer = cumulation; + cumulation.writeBytes(in); } - buffer.writeBytes(in); - return buffer; + return cumulation; } finally { // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw // for whatever release (for example because of OutOfMemoryError) @@ -120,8 +118,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 - buffer = expandCumulation(alloc, cumulation, in.readableBytes()); - buffer.writeBytes(in); + buffer = expandCumulation(alloc, cumulation, in); } else { CompositeByteBuf composite; if (cumulation instanceof CompositeByteBuf) { @@ -527,12 +524,17 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter } } - static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { - ByteBuf oldCumulation = cumulation; - cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); - cumulation.writeBytes(oldCumulation); - oldCumulation.release(); - return cumulation; + static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) { + ByteBuf newCumulation = alloc.buffer(oldCumulation.readableBytes() + in.readableBytes()); + ByteBuf toRelease = newCumulation; + try { + newCumulation.writeBytes(oldCumulation); + newCumulation.writeBytes(in); + toRelease = oldCumulation; + return newCumulation; + } finally { + toRelease.release(); + } } /** diff --git a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java index cd9ee37083..4279ece838 100644 --- a/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/ByteToMessageDecoderTest.java @@ -15,7 +15,9 @@ */ package io.netty.handler.codec; +import io.netty.buffer.AbstractByteBufAllocator; import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; @@ -32,7 +34,12 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import static io.netty.buffer.Unpooled.wrappedBuffer; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class ByteToMessageDecoderTest { @@ -308,23 +315,94 @@ public class ByteToMessageDecoderTest { assertFalse(channel.finish()); } - @Test - public void releaseWhenMergeCumulateThrows() { - final Error error = new Error(); + static class WriteFailingByteBuf extends UnpooledHeapByteBuf { + private final Error error = new Error(); + private int untilFailure; - ByteBuf cumulation = new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, 0, 64) { - @Override - public ByteBuf writeBytes(ByteBuf src) { + WriteFailingByteBuf(int untilFailure, int capacity) { + super(UnpooledByteBufAllocator.DEFAULT, capacity, capacity); + this.untilFailure = untilFailure; + } + + @Override + public ByteBuf writeBytes(ByteBuf src) { + if (--untilFailure <= 0) { throw error; } - }; + return super.writeBytes(src); + } + + Error writeError() { + return error; + } + } + + @Test + public void releaseWhenMergeCumulateThrows() { + WriteFailingByteBuf oldCumulation = new WriteFailingByteBuf(1, 64); ByteBuf in = Unpooled.buffer().writeZero(12); + + Throwable thrown = null; try { - ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in); - fail(); - } catch (Error expected) { - assertSame(error, expected); - assertEquals(0, in.refCnt()); + ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, oldCumulation, in); + } catch (Throwable t) { + thrown = t; + } + + assertSame(oldCumulation.writeError(), thrown); + assertEquals(0, in.refCnt()); + assertEquals(1, oldCumulation.refCnt()); + oldCumulation.release(); + } + + @Test + public void releaseWhenMergeCumulateThrowsInExpand() { + releaseWhenMergeCumulateThrowsInExpand(1, true); + releaseWhenMergeCumulateThrowsInExpand(2, true); + releaseWhenMergeCumulateThrowsInExpand(3, false); // sentinel test case + } + + private void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) { + ByteBuf oldCumulation = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8); + final WriteFailingByteBuf newCumulation = new WriteFailingByteBuf(untilFailure, 16); + + ByteBufAllocator allocator = new AbstractByteBufAllocator(false) { + @Override + public boolean isDirectBufferPooled() { + return false; + } + + @Override + protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { + return newCumulation; + } + + @Override + protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { + throw new UnsupportedOperationException(); + } + }; + + ByteBuf in = Unpooled.buffer().writeZero(12); + Throwable thrown = null; + try { + ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(allocator, oldCumulation, in); + } catch (Throwable t) { + thrown = t; + } + + assertEquals(0, in.refCnt()); + + if (shouldFail) { + assertSame(newCumulation.writeError(), thrown); + assertEquals(1, oldCumulation.refCnt()); + oldCumulation.release(); + assertEquals(0, newCumulation.refCnt()); + } else { + assertNull(thrown); + assertEquals(0, oldCumulation.refCnt()); + assertEquals(1, newCumulation.refCnt()); + newCumulation.release(); } } @@ -345,6 +423,7 @@ public class ByteToMessageDecoderTest { } catch (Error expected) { assertSame(error, expected); assertEquals(0, in.refCnt()); + cumulation.release(); } }