Improve error handling in ByteToMessageDecoder when expand fails (#9822)
Motivation: The buffer which the decoder allocates for the expansion can be leaked if there is a subsequent issue writing to it. Modifications: The error handling has been improved so that the new buffer always is released on failure in the expand. Result: The decoder will not leak in this scenario any more. Fixes: https://github.com/netty/netty/issues/9812
This commit is contained in:
parent
d0f94200e8
commit
88712a9ea2
@ -79,7 +79,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
@Override
|
||||
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
|
||||
try {
|
||||
final ByteBuf buffer;
|
||||
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|
||||
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
|
||||
// Expand cumulation (by replace it) when either there is not more room in the buffer
|
||||
@ -89,12 +88,11 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
// See:
|
||||
// - https://github.com/netty/netty/issues/2327
|
||||
// - https://github.com/netty/netty/issues/1764
|
||||
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
|
||||
cumulation = expandCumulation(alloc, cumulation, in);
|
||||
} else {
|
||||
buffer = cumulation;
|
||||
cumulation.writeBytes(in);
|
||||
}
|
||||
buffer.writeBytes(in);
|
||||
return buffer;
|
||||
return cumulation;
|
||||
} finally {
|
||||
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
|
||||
// for whatever release (for example because of OutOfMemoryError)
|
||||
@ -120,8 +118,7 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
// See:
|
||||
// - https://github.com/netty/netty/issues/2327
|
||||
// - https://github.com/netty/netty/issues/1764
|
||||
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
|
||||
buffer.writeBytes(in);
|
||||
buffer = expandCumulation(alloc, cumulation, in);
|
||||
} else {
|
||||
CompositeByteBuf composite;
|
||||
if (cumulation instanceof CompositeByteBuf) {
|
||||
@ -527,12 +524,17 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
}
|
||||
}
|
||||
|
||||
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
|
||||
ByteBuf oldCumulation = cumulation;
|
||||
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
|
||||
cumulation.writeBytes(oldCumulation);
|
||||
oldCumulation.release();
|
||||
return cumulation;
|
||||
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf oldCumulation, ByteBuf in) {
|
||||
ByteBuf newCumulation = alloc.buffer(oldCumulation.readableBytes() + in.readableBytes());
|
||||
ByteBuf toRelease = newCumulation;
|
||||
try {
|
||||
newCumulation.writeBytes(oldCumulation);
|
||||
newCumulation.writeBytes(in);
|
||||
toRelease = oldCumulation;
|
||||
return newCumulation;
|
||||
} finally {
|
||||
toRelease.release();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -15,7 +15,9 @@
|
||||
*/
|
||||
package io.netty.handler.codec;
|
||||
|
||||
import io.netty.buffer.AbstractByteBufAllocator;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
@ -32,7 +34,12 @@ import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
|
||||
import static io.netty.buffer.Unpooled.wrappedBuffer;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class ByteToMessageDecoderTest {
|
||||
|
||||
@ -308,23 +315,94 @@ public class ByteToMessageDecoderTest {
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void releaseWhenMergeCumulateThrows() {
|
||||
final Error error = new Error();
|
||||
static class WriteFailingByteBuf extends UnpooledHeapByteBuf {
|
||||
private final Error error = new Error();
|
||||
private int untilFailure;
|
||||
|
||||
ByteBuf cumulation = new UnpooledHeapByteBuf(UnpooledByteBufAllocator.DEFAULT, 0, 64) {
|
||||
@Override
|
||||
public ByteBuf writeBytes(ByteBuf src) {
|
||||
WriteFailingByteBuf(int untilFailure, int capacity) {
|
||||
super(UnpooledByteBufAllocator.DEFAULT, capacity, capacity);
|
||||
this.untilFailure = untilFailure;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuf writeBytes(ByteBuf src) {
|
||||
if (--untilFailure <= 0) {
|
||||
throw error;
|
||||
}
|
||||
};
|
||||
return super.writeBytes(src);
|
||||
}
|
||||
|
||||
Error writeError() {
|
||||
return error;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void releaseWhenMergeCumulateThrows() {
|
||||
WriteFailingByteBuf oldCumulation = new WriteFailingByteBuf(1, 64);
|
||||
ByteBuf in = Unpooled.buffer().writeZero(12);
|
||||
|
||||
Throwable thrown = null;
|
||||
try {
|
||||
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, cumulation, in);
|
||||
fail();
|
||||
} catch (Error expected) {
|
||||
assertSame(error, expected);
|
||||
assertEquals(0, in.refCnt());
|
||||
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(UnpooledByteBufAllocator.DEFAULT, oldCumulation, in);
|
||||
} catch (Throwable t) {
|
||||
thrown = t;
|
||||
}
|
||||
|
||||
assertSame(oldCumulation.writeError(), thrown);
|
||||
assertEquals(0, in.refCnt());
|
||||
assertEquals(1, oldCumulation.refCnt());
|
||||
oldCumulation.release();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void releaseWhenMergeCumulateThrowsInExpand() {
|
||||
releaseWhenMergeCumulateThrowsInExpand(1, true);
|
||||
releaseWhenMergeCumulateThrowsInExpand(2, true);
|
||||
releaseWhenMergeCumulateThrowsInExpand(3, false); // sentinel test case
|
||||
}
|
||||
|
||||
private void releaseWhenMergeCumulateThrowsInExpand(int untilFailure, boolean shouldFail) {
|
||||
ByteBuf oldCumulation = UnpooledByteBufAllocator.DEFAULT.heapBuffer(8, 8);
|
||||
final WriteFailingByteBuf newCumulation = new WriteFailingByteBuf(untilFailure, 16);
|
||||
|
||||
ByteBufAllocator allocator = new AbstractByteBufAllocator(false) {
|
||||
@Override
|
||||
public boolean isDirectBufferPooled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
|
||||
return newCumulation;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
|
||||
ByteBuf in = Unpooled.buffer().writeZero(12);
|
||||
Throwable thrown = null;
|
||||
try {
|
||||
ByteToMessageDecoder.MERGE_CUMULATOR.cumulate(allocator, oldCumulation, in);
|
||||
} catch (Throwable t) {
|
||||
thrown = t;
|
||||
}
|
||||
|
||||
assertEquals(0, in.refCnt());
|
||||
|
||||
if (shouldFail) {
|
||||
assertSame(newCumulation.writeError(), thrown);
|
||||
assertEquals(1, oldCumulation.refCnt());
|
||||
oldCumulation.release();
|
||||
assertEquals(0, newCumulation.refCnt());
|
||||
} else {
|
||||
assertNull(thrown);
|
||||
assertEquals(0, oldCumulation.refCnt());
|
||||
assertEquals(1, newCumulation.refCnt());
|
||||
newCumulation.release();
|
||||
}
|
||||
}
|
||||
|
||||
@ -345,6 +423,7 @@ public class ByteToMessageDecoderTest {
|
||||
} catch (Error expected) {
|
||||
assertSame(error, expected);
|
||||
assertEquals(0, in.refCnt());
|
||||
cumulation.release();
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user