[#2349] Correctly handle cancelled ChannelPromise in DefaultChannelHandlerContext
Motivation: At the moment an IllegalArgumentException will be thrown if a ChannelPromise is cancelled while propagate through the ChannelPipeline. This is not correct, we should just stop to propagate it as it is valid to cancel at any time. Modifications: Stop propagate the operation through the ChannelPipeline once a ChannelPromise is cancelled. Result: No more IllegalArgumentException when cancel a ChannelPromise while moving through the ChannelPipeline.
This commit is contained in:
parent
14b962a434
commit
aa74f00006
@ -437,7 +437,10 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
if (localAddress == null) {
|
if (localAddress == null) {
|
||||||
throw new NullPointerException("localAddress");
|
throw new NullPointerException("localAddress");
|
||||||
}
|
}
|
||||||
validatePromise(promise, false);
|
if (!validatePromise(promise, false)) {
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
@ -475,7 +478,10 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
if (remoteAddress == null) {
|
if (remoteAddress == null) {
|
||||||
throw new NullPointerException("remoteAddress");
|
throw new NullPointerException("remoteAddress");
|
||||||
}
|
}
|
||||||
validatePromise(promise, false);
|
if (!validatePromise(promise, false)) {
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
@ -503,7 +509,10 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture disconnect(final ChannelPromise promise) {
|
public ChannelFuture disconnect(final ChannelPromise promise) {
|
||||||
validatePromise(promise, false);
|
if (!validatePromise(promise, false)) {
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
@ -541,7 +550,10 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture close(final ChannelPromise promise) {
|
public ChannelFuture close(final ChannelPromise promise) {
|
||||||
validatePromise(promise, false);
|
if (!validatePromise(promise, false)) {
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
@ -569,7 +581,10 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ChannelFuture deregister(final ChannelPromise promise) {
|
public ChannelFuture deregister(final ChannelPromise promise) {
|
||||||
validatePromise(promise, false);
|
if (!validatePromise(promise, false)) {
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||||
EventExecutor executor = next.executor();
|
EventExecutor executor = next.executor();
|
||||||
@ -636,8 +651,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
throw new NullPointerException("msg");
|
throw new NullPointerException("msg");
|
||||||
}
|
}
|
||||||
|
|
||||||
validatePromise(promise, true);
|
if (!validatePromise(promise, true)) {
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
write(msg, false, promise);
|
write(msg, false, promise);
|
||||||
|
|
||||||
return promise;
|
return promise;
|
||||||
@ -687,7 +705,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
throw new NullPointerException("msg");
|
throw new NullPointerException("msg");
|
||||||
}
|
}
|
||||||
|
|
||||||
validatePromise(promise, true);
|
if (!validatePromise(promise, true)) {
|
||||||
|
ReferenceCountUtil.release(msg);
|
||||||
|
// cancelled
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
write(msg, true, promise);
|
write(msg, true, promise);
|
||||||
|
|
||||||
@ -798,12 +820,19 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
return new FailedChannelFuture(channel(), executor(), cause);
|
return new FailedChannelFuture(channel(), executor(), cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validatePromise(ChannelPromise promise, boolean allowVoidPromise) {
|
private boolean validatePromise(ChannelPromise promise, boolean allowVoidPromise) {
|
||||||
if (promise == null) {
|
if (promise == null) {
|
||||||
throw new NullPointerException("promise");
|
throw new NullPointerException("promise");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (promise.isDone()) {
|
if (promise.isDone()) {
|
||||||
|
// Check if the promise was cancelled and if so signal that the processing of the operation
|
||||||
|
// should not be performed.
|
||||||
|
//
|
||||||
|
// See https://github.com/netty/netty/issues/2349
|
||||||
|
if (promise.isCancelled()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
throw new IllegalArgumentException("promise already done: " + promise);
|
throw new IllegalArgumentException("promise already done: " + promise);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -813,7 +842,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (promise.getClass() == DefaultChannelPromise.class) {
|
if (promise.getClass() == DefaultChannelPromise.class) {
|
||||||
return;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
|
if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
|
||||||
@ -825,6 +854,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
|||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
|
StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private DefaultChannelHandlerContext findContextInbound() {
|
private DefaultChannelHandlerContext findContextInbound() {
|
||||||
|
@ -18,6 +18,8 @@ package io.netty.channel;
|
|||||||
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.bootstrap.ServerBootstrap;
|
||||||
|
import io.netty.buffer.ByteBuf;
|
||||||
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.ChannelHandler.Sharable;
|
import io.netty.channel.ChannelHandler.Sharable;
|
||||||
import io.netty.channel.local.LocalAddress;
|
import io.netty.channel.local.LocalAddress;
|
||||||
import io.netty.channel.local.LocalChannel;
|
import io.netty.channel.local.LocalChannel;
|
||||||
@ -434,6 +436,81 @@ public class DefaultChannelPipelineTest {
|
|||||||
}).sync();
|
}).sync();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests for https://github.com/netty/netty/issues/2349
|
||||||
|
@Test
|
||||||
|
public void testCancelBind() throws Exception {
|
||||||
|
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||||
|
|
||||||
|
ChannelPromise promise = pipeline.channel().newPromise();
|
||||||
|
assertTrue(promise.cancel(false));
|
||||||
|
ChannelFuture future = pipeline.bind(new LocalAddress("test"), promise);
|
||||||
|
assertTrue(future.isCancelled());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelConnect() throws Exception {
|
||||||
|
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||||
|
|
||||||
|
ChannelPromise promise = pipeline.channel().newPromise();
|
||||||
|
assertTrue(promise.cancel(false));
|
||||||
|
ChannelFuture future = pipeline.connect(new LocalAddress("test"), promise);
|
||||||
|
assertTrue(future.isCancelled());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelDisconnect() throws Exception {
|
||||||
|
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||||
|
|
||||||
|
ChannelPromise promise = pipeline.channel().newPromise();
|
||||||
|
assertTrue(promise.cancel(false));
|
||||||
|
ChannelFuture future = pipeline.disconnect(promise);
|
||||||
|
assertTrue(future.isCancelled());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelClose() throws Exception {
|
||||||
|
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||||
|
|
||||||
|
ChannelPromise promise = pipeline.channel().newPromise();
|
||||||
|
assertTrue(promise.cancel(false));
|
||||||
|
ChannelFuture future = pipeline.close(promise);
|
||||||
|
assertTrue(future.isCancelled());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelDeregister() throws Exception {
|
||||||
|
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||||
|
|
||||||
|
ChannelPromise promise = pipeline.channel().newPromise();
|
||||||
|
assertTrue(promise.cancel(false));
|
||||||
|
ChannelFuture future = pipeline.deregister(promise);
|
||||||
|
assertTrue(future.isCancelled());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelWrite() throws Exception {
|
||||||
|
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||||
|
ChannelPromise promise = pipeline.channel().newPromise();
|
||||||
|
assertTrue(promise.cancel(false));
|
||||||
|
ByteBuf buffer = Unpooled.buffer();
|
||||||
|
assertEquals(1, buffer.refCnt());
|
||||||
|
ChannelFuture future = pipeline.write(buffer, promise);
|
||||||
|
assertTrue(future.isCancelled());
|
||||||
|
assertEquals(0, buffer.refCnt());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCancelWriteAndFlush() throws Exception {
|
||||||
|
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||||
|
ChannelPromise promise = pipeline.channel().newPromise();
|
||||||
|
assertTrue(promise.cancel(false));
|
||||||
|
ByteBuf buffer = Unpooled.buffer();
|
||||||
|
assertEquals(1, buffer.refCnt());
|
||||||
|
ChannelFuture future = pipeline.writeAndFlush(buffer, promise);
|
||||||
|
assertTrue(future.isCancelled());
|
||||||
|
assertEquals(0, buffer.refCnt());
|
||||||
|
}
|
||||||
|
|
||||||
private static int next(DefaultChannelHandlerContext ctx) {
|
private static int next(DefaultChannelHandlerContext ctx) {
|
||||||
DefaultChannelHandlerContext next = ctx.next;
|
DefaultChannelHandlerContext next = ctx.next;
|
||||||
if (next == null) {
|
if (next == null) {
|
||||||
|
Loading…
Reference in New Issue
Block a user