[#2349] Correctly handle cancelled ChannelPromise in DefaultChannelHandlerContext
Motivation: At the moment an IllegalArgumentException will be thrown if a ChannelPromise is cancelled while propagate through the ChannelPipeline. This is not correct, we should just stop to propagate it as it is valid to cancel at any time. Modifications: Stop propagate the operation through the ChannelPipeline once a ChannelPromise is cancelled. Result: No more IllegalArgumentException when cancel a ChannelPromise while moving through the ChannelPipeline.
This commit is contained in:
parent
4fc9afa102
commit
772a9d2610
@ -205,7 +205,10 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
if (localAddress == null) {
|
||||
throw new NullPointerException("localAddress");
|
||||
}
|
||||
validatePromise(ctx, promise, false);
|
||||
if (!validatePromise(ctx, promise, false)) {
|
||||
// promise cancelled
|
||||
return;
|
||||
}
|
||||
|
||||
if (executor.inEventLoop()) {
|
||||
invokeBindNow(ctx, localAddress, promise);
|
||||
@ -226,7 +229,10 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
if (remoteAddress == null) {
|
||||
throw new NullPointerException("remoteAddress");
|
||||
}
|
||||
validatePromise(ctx, promise, false);
|
||||
if (!validatePromise(ctx, promise, false)) {
|
||||
// promise cancelled
|
||||
return;
|
||||
}
|
||||
|
||||
if (executor.inEventLoop()) {
|
||||
invokeConnectNow(ctx, remoteAddress, localAddress, promise);
|
||||
@ -242,7 +248,10 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
|
||||
@Override
|
||||
public void invokeDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
|
||||
validatePromise(ctx, promise, false);
|
||||
if (!validatePromise(ctx, promise, false)) {
|
||||
// promise cancelled
|
||||
return;
|
||||
}
|
||||
|
||||
if (executor.inEventLoop()) {
|
||||
invokeDisconnectNow(ctx, promise);
|
||||
@ -258,7 +267,10 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
|
||||
@Override
|
||||
public void invokeClose(final ChannelHandlerContext ctx, final ChannelPromise promise) {
|
||||
validatePromise(ctx, promise, false);
|
||||
if (!validatePromise(ctx, promise, false)) {
|
||||
// promise cancelled
|
||||
return;
|
||||
}
|
||||
|
||||
if (executor.inEventLoop()) {
|
||||
invokeCloseNow(ctx, promise);
|
||||
@ -274,7 +286,10 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
|
||||
@Override
|
||||
public void invokeDeregister(final ChannelHandlerContext ctx, final ChannelPromise promise) {
|
||||
validatePromise(ctx, promise, false);
|
||||
if (!validatePromise(ctx, promise, false)) {
|
||||
// promise cancelled
|
||||
return;
|
||||
}
|
||||
|
||||
if (executor.inEventLoop()) {
|
||||
invokeDeregisterNow(ctx, promise);
|
||||
@ -313,7 +328,12 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
throw new NullPointerException("msg");
|
||||
}
|
||||
|
||||
validatePromise(ctx, promise, true);
|
||||
if (!validatePromise(ctx, promise, true)) {
|
||||
// promise cancelled
|
||||
ReferenceCountUtil.release(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
invokeWrite(ctx, msg, false, promise);
|
||||
}
|
||||
|
||||
@ -368,12 +388,17 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
throw new NullPointerException("msg");
|
||||
}
|
||||
|
||||
validatePromise(ctx, promise, true);
|
||||
if (!validatePromise(ctx, promise, true)) {
|
||||
// promise cancelled
|
||||
ReferenceCountUtil.release(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
invokeWrite(ctx, msg, true, promise);
|
||||
}
|
||||
|
||||
private static void validatePromise(ChannelHandlerContext ctx, ChannelPromise promise, boolean allowVoidPromise) {
|
||||
private static boolean validatePromise(
|
||||
ChannelHandlerContext ctx, ChannelPromise promise, boolean allowVoidPromise) {
|
||||
if (ctx == null) {
|
||||
throw new NullPointerException("ctx");
|
||||
}
|
||||
@ -383,6 +408,9 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
}
|
||||
|
||||
if (promise.isDone()) {
|
||||
if (promise.isCancelled()) {
|
||||
return false;
|
||||
}
|
||||
throw new IllegalArgumentException("promise already done: " + promise);
|
||||
}
|
||||
|
||||
@ -392,7 +420,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
}
|
||||
|
||||
if (promise.getClass() == DefaultChannelPromise.class) {
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
|
||||
@ -404,6 +432,7 @@ public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
|
||||
throw new IllegalArgumentException(
|
||||
StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void safeExecuteInbound(Runnable task, Object msg) {
|
||||
|
@ -18,6 +18,8 @@ package io.netty.channel;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.channel.local.LocalAddress;
|
||||
import io.netty.channel.local.LocalChannel;
|
||||
@ -438,6 +440,90 @@ public class DefaultChannelPipelineTest {
|
||||
}).sync();
|
||||
}
|
||||
|
||||
// Tests for https://github.com/netty/netty/issues/2349
|
||||
@Test
|
||||
public void testCancelBind() throws Exception {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
group.register(pipeline.channel());
|
||||
|
||||
ChannelPromise promise = pipeline.channel().newPromise();
|
||||
assertTrue(promise.cancel(false));
|
||||
ChannelFuture future = pipeline.bind(new LocalAddress("test"), promise);
|
||||
assertTrue(future.isCancelled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelConnect() throws Exception {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
group.register(pipeline.channel());
|
||||
|
||||
ChannelPromise promise = pipeline.channel().newPromise();
|
||||
assertTrue(promise.cancel(false));
|
||||
ChannelFuture future = pipeline.connect(new LocalAddress("test"), promise);
|
||||
assertTrue(future.isCancelled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelDisconnect() throws Exception {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
group.register(pipeline.channel());
|
||||
|
||||
ChannelPromise promise = pipeline.channel().newPromise();
|
||||
assertTrue(promise.cancel(false));
|
||||
ChannelFuture future = pipeline.disconnect(promise);
|
||||
assertTrue(future.isCancelled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelClose() throws Exception {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
group.register(pipeline.channel());
|
||||
|
||||
ChannelPromise promise = pipeline.channel().newPromise();
|
||||
assertTrue(promise.cancel(false));
|
||||
ChannelFuture future = pipeline.close(promise);
|
||||
assertTrue(future.isCancelled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelDeregister() throws Exception {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
group.register(pipeline.channel());
|
||||
|
||||
ChannelPromise promise = pipeline.channel().newPromise();
|
||||
assertTrue(promise.cancel(false));
|
||||
ChannelFuture future = pipeline.deregister(promise);
|
||||
assertTrue(future.isCancelled());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelWrite() throws Exception {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
group.register(pipeline.channel());
|
||||
|
||||
ChannelPromise promise = pipeline.channel().newPromise();
|
||||
assertTrue(promise.cancel(false));
|
||||
ByteBuf buffer = Unpooled.buffer();
|
||||
assertEquals(1, buffer.refCnt());
|
||||
ChannelFuture future = pipeline.write(buffer, promise);
|
||||
assertTrue(future.isCancelled());
|
||||
assertEquals(0, buffer.refCnt());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCancelWriteAndFlush() throws Exception {
|
||||
ChannelPipeline pipeline = new LocalChannel().pipeline();
|
||||
group.register(pipeline.channel());
|
||||
|
||||
ChannelPromise promise = pipeline.channel().newPromise();
|
||||
assertTrue(promise.cancel(false));
|
||||
ByteBuf buffer = Unpooled.buffer();
|
||||
assertEquals(1, buffer.refCnt());
|
||||
ChannelFuture future = pipeline.writeAndFlush(buffer, promise);
|
||||
assertTrue(future.isCancelled());
|
||||
assertEquals(0, buffer.refCnt());
|
||||
}
|
||||
|
||||
private static int next(DefaultChannelHandlerContext ctx) {
|
||||
DefaultChannelHandlerContext next = ctx.next;
|
||||
if (next == null) {
|
||||
|
Loading…
Reference in New Issue
Block a user