Release message when validation of passed in ChannelPromise fails when calling write(...) / writeAndFlush(...) (#8769)

Motivation:

We need to release the message when we throw an IllegalArgumentException because of a validation failure of the promise to eliminate the risk of a memory leak.

Modifications:

- Consistently release the message before rethrow
- Add testcase.

Result:

Fixes https://github.com/netty/netty/issues/8765.
This commit is contained in:
Norman Maurer 2019-01-24 07:43:04 +01:00 committed by GitHub
parent 57012dddb4
commit 3c2b86303a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 25 deletions

View File

@ -706,20 +706,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
@ -781,18 +767,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
write(msg, true, promise);
return promise;
}
@ -806,6 +781,18 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();

View File

@ -1223,6 +1223,51 @@ public class DefaultChannelPipelineTest {
}
}
@Test
public void testWriteThrowsReleaseMessage() {
testWriteThrowsReleaseMessage0(false);
}
@Test
public void testWriteAndFlushThrowsReleaseMessage() {
testWriteThrowsReleaseMessage0(true);
}
private void testWriteThrowsReleaseMessage0(boolean flush) {
ReferenceCounted referenceCounted = new AbstractReferenceCounted() {
@Override
protected void deallocate() {
// NOOP
}
@Override
public ReferenceCounted touch(Object hint) {
return this;
}
};
assertEquals(1, referenceCounted.refCnt());
Channel channel = new LocalChannel();
Channel channel2 = new LocalChannel();
group.register(channel).syncUninterruptibly();
group.register(channel2).syncUninterruptibly();
try {
if (flush) {
channel.writeAndFlush(referenceCounted, channel2.newPromise());
} else {
channel.write(referenceCounted, channel2.newPromise());
}
fail();
} catch (IllegalArgumentException expected) {
// expected
}
assertEquals(0, referenceCounted.refCnt());
channel.close().syncUninterruptibly();
channel2.close().syncUninterruptibly();
}
@Test(timeout = 5000)
public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException {
handlerAddedStateUpdatedBeforeHandlerAddedDone(true);