Improve error handling in ByteToMessageDecoder when expand fails (#9822)

Motivation:

The buffer which the decoder allocates for the expansion can be
leaked if there is a subsequent issue writing to it.

Modifications:
The error handling has been improved so that the new buffer always
is released on failure in the expand.

Result:
The decoder will not leak in this scenario any more.

Fixes: https://github.com/netty/netty/issues/9812
This commit is contained in:
Martin Furmanski 2019-11-28 12:17:44 +01:00 committed by Norman Maurer
parent 0a252b48b1
commit 585ed4d08f
2 changed files with 111 additions and 30 deletions

View File

@ -25,14 +25,14 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.util.internal.StringUtil;
import java.util.List;
/**
* {@link ChannelInboundHandler} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
* {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
* other Message type.
*
* For example here is an implementation which reads all readable bytes from
@ -72,14 +72,13 @@ import java.util.List;
* is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
* to avoid leaking memory.
*/
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter implements ChannelInboundHandler {
public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
/**
* Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
*/
public static final Cumulator MERGE_CUMULATOR = (alloc, cumulation, in) -> {
try {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
@ -89,12 +88,11 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter impleme
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
cumulation = expandCumulation(alloc, cumulation, in);
} else {
buffer = cumulation;
cumulation.writeBytes(in);
}
buffer.writeBytes(in);
return buffer;
return cumulation;
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
@ -117,8 +115,7 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter impleme
// See:
// - https://github.com/netty/netty/issues/2327
// - https://github.com/netty/netty/issues/1764
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
buffer.writeBytes(in);
buffer = expandCumulation(alloc, cumulation, in);
} else {
CompositeByteBuf composite;
if (cumulation instanceof CompositeByteBuf) {
@ -521,12 +518,17 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter impleme
}
}
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
ByteBuf newCumulation = alloc.buffer(oldCumulation.readableBytes() + in.readableBytes());
ByteBuf toRelease = newCumulation;
try {
newCumulation.writeBytes(oldCumulation);
newCumulation.writeBytes(in);
toRelease = oldCumulation;
return newCumulation;
} finally {
toRelease.release();
}
}
/**

View File

@ -15,7 +15,9 @@
*/
package io.netty.handler.codec;
import io.netty.buffer.AbstractByteBufAllocator;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
@ -32,7 +34,12 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import static io.netty.buffer.Unpooled.wrappedBuffer;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ByteToMessageDecoderTest {
@ -308,23 +315,94 @@ public class ByteToMessageDecoderTest {
assertFalse(channel.finish());
}
@Test
public void releaseWhenMergeCumulateThrows() {
final Error error = new Error();
static class WriteFailingByteBuf extends UnpooledHeapByteBuf {
private final Error error = new Error();
private int untilFailure;
WriteFailingByteBuf(int untilFailure, int capacity) {
super(UnpooledByteBufAllocator.DEFAULT, capacity, capacity);
this.untilFailure = untilFailure;
}
ByteBuf cumulation = new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, 0, 64) {
@Override
public ByteBuf writeBytes(ByteBuf src) {
if (--untilFailure <= 0) {
throw error;
}
};
return super.writeBytes(src);
}
Error writeError() {
return error;
}
}
@Test
public void releaseWhenMergeCumulateThrows() {
WriteFailingByteBuf oldCumulation = new WriteFailingByteBuf(1, 64);
ByteBuf in = Unpooled.buffer().writeZero(12);
Throwable thrown = null;
try {
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in);
fail();
} catch (Error expected) {
assertSame(error, expected);
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, oldCumulation, in);
} catch (Throwable t) {
thrown = t;
}
assertSame(oldCumulation.writeError(), thrown);
assertEquals(0, in.refCnt());
assertEquals(1, oldCumulation.refCnt());
oldCumulation.release();
}
@Test
public void releaseWhenMergeCumulateThrowsInExpand() {
releaseWhenMergeCumulateThrowsInExpand(1, true);
releaseWhenMergeCumulateThrowsInExpand(2, true);
releaseWhenMergeCumulateThrowsInExpand(3, false); // sentinel test case
}
private void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) {
ByteBuf oldCumulation = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8);
final WriteFailingByteBuf newCumulation = new WriteFailingByteBuf(untilFailure, 16);
ByteBufAllocator allocator = new AbstractByteBufAllocator(false) {
@Override
public boolean isDirectBufferPooled() {
return false;
}
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return newCumulation;
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
throw new UnsupportedOperationException();
}
};
ByteBuf in = Unpooled.buffer().writeZero(12);
Throwable thrown = null;
try {
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(allocator, oldCumulation, in);
} catch (Throwable t) {
thrown = t;
}
assertEquals(0, in.refCnt());
if (shouldFail) {
assertSame(newCumulation.writeError(), thrown);
assertEquals(1, oldCumulation.refCnt());
oldCumulation.release();
assertEquals(0, newCumulation.refCnt());
} else {
assertNull(thrown);
assertEquals(0, oldCumulation.refCnt());
assertEquals(1, newCumulation.refCnt());
newCumulation.release();
}
}
@ -345,6 +423,7 @@ public class ByteToMessageDecoderTest {
} catch (Error expected) {
assertSame(error, expected);
assertEquals(0, in.refCnt());
cumulation.release();
}
}