Release message when validation of passed in ChannelPromise fails when calling write(...) / writeAndFlush(...) (#8769)
Motivation: We need to release the message when we throw an IllegalArgumentException because of a validation failure of the promise to eliminate the risk of a memory leak. Modifications: - Consistently release the message before rethrow - Add testcase. Result: Fixes https://github.com/netty/netty/issues/8765.
This commit is contained in:
parent
4a10357fd8
commit
c5c318f56c
|
@ -892,20 +892,6 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
|
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
|
||||||
if (msg == null) {
|
|
||||||
throw new NullPointerException("msg");
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
if (isNotValidPromise(promise, true)) {
|
|
||||||
ReferenceCountUtil.release(msg);
|
|
||||||
// cancelled
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
ReferenceCountUtil.release(msg);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
write(msg, false, promise);
|
write(msg, false, promise);
|
||||||
|
|
||||||
return promise;
|
return promise;
|
||||||
|
@ -967,18 +953,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
|
||||||
if (msg == null) {
|
|
||||||
throw new NullPointerException("msg");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (isNotValidPromise(promise, true)) {
|
|
||||||
ReferenceCountUtil.release(msg);
|
|
||||||
// cancelled
|
|
||||||
return promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
write(msg, true, promise);
|
write(msg, true, promise);
|
||||||
|
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -992,6 +967,18 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||||
}
|
}
|
||||||
|
|
||||||
private void write(Object msg, boolean flush, ChannelPromise promise) {
|
private void write(Object msg, boolean flush, ChannelPromise promise) {
|
||||||
|
ObjectUtil.checkNotNull(msg, "msg");
|
||||||
|
try {
|
||||||
|
if (isNotValidPromise(promise, true)) {
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
// cancelled
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
|
final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
|
||||||
final Object m = pipeline.touch(msg, next);
|
final Object m = pipeline.touch(msg, next);
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
|
|
|
@ -1398,7 +1398,7 @@ public class DefaultChannelPipelineTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
|
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
|
||||||
executionMask |= MASK_DISCONNECT;
|
executionMask |= MASK_DISCONNECT;
|
||||||
promise.setSuccess();
|
promise.setSuccess();
|
||||||
}
|
}
|
||||||
|
@ -1422,7 +1422,7 @@ public class DefaultChannelPipelineTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void read(ChannelHandlerContext ctx) {
|
public void read(ChannelHandlerContext ctx) {
|
||||||
executionMask |= MASK_READ;
|
executionMask |= MASK_READ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1563,8 +1563,10 @@ public class DefaultChannelPipelineTest {
|
||||||
|
|
||||||
pipeline.register().syncUninterruptibly();
|
pipeline.register().syncUninterruptibly();
|
||||||
pipeline.deregister().syncUninterruptibly();
|
pipeline.deregister().syncUninterruptibly();
|
||||||
pipeline.bind(new SocketAddress() { }).syncUninterruptibly();
|
pipeline.bind(new SocketAddress() {
|
||||||
pipeline.connect(new SocketAddress() { }).syncUninterruptibly();
|
}).syncUninterruptibly();
|
||||||
|
pipeline.connect(new SocketAddress() {
|
||||||
|
}).syncUninterruptibly();
|
||||||
pipeline.disconnect().syncUninterruptibly();
|
pipeline.disconnect().syncUninterruptibly();
|
||||||
pipeline.close().syncUninterruptibly();
|
pipeline.close().syncUninterruptibly();
|
||||||
pipeline.write("");
|
pipeline.write("");
|
||||||
|
@ -1582,6 +1584,51 @@ public class DefaultChannelPipelineTest {
|
||||||
skipHandler.assertSkipped();
|
skipHandler.assertSkipped();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteThrowsReleaseMessage() {
|
||||||
|
testWriteThrowsReleaseMessage0(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteAndFlushThrowsReleaseMessage() {
|
||||||
|
testWriteThrowsReleaseMessage0(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testWriteThrowsReleaseMessage0(boolean flush) {
|
||||||
|
ReferenceCounted referenceCounted = new AbstractReferenceCounted() {
|
||||||
|
@Override
|
||||||
|
protected void deallocate() {
|
||||||
|
// NOOP
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ReferenceCounted touch(Object hint) {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
assertEquals(1, referenceCounted.refCnt());
|
||||||
|
|
||||||
|
Channel channel = new LocalChannel(group.next());
|
||||||
|
Channel channel2 = new LocalChannel(group.next());
|
||||||
|
channel.register().syncUninterruptibly();
|
||||||
|
channel2.register().syncUninterruptibly();
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (flush) {
|
||||||
|
channel.writeAndFlush(referenceCounted, channel2.newPromise());
|
||||||
|
} else {
|
||||||
|
channel.write(referenceCounted, channel2.newPromise());
|
||||||
|
}
|
||||||
|
fail();
|
||||||
|
} catch (IllegalArgumentException expected) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
assertEquals(0, referenceCounted.refCnt());
|
||||||
|
|
||||||
|
channel.close().syncUninterruptibly();
|
||||||
|
channel2.close().syncUninterruptibly();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 5000)
|
@Test(timeout = 5000)
|
||||||
public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException {
|
public void handlerAddedStateUpdatedBeforeHandlerAddedDoneForceEventLoop() throws InterruptedException {
|
||||||
handlerAddedStateUpdatedBeforeHandlerAddedDone(true);
|
handlerAddedStateUpdatedBeforeHandlerAddedDone(true);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user