From 845a1a526ad0393a322484b36ebfa34952f78e23 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 6 Oct 2015 11:44:52 +0200 Subject: [PATCH] [#4316] Ensure pending tasks are run when EmbeddedChannel.close(...) or disconnect(...) is called. Motivation: We missed to run all pending tasks when EmbeddedChannel.close(...) or disconnect(...) was called. Because of this channelInactive(...) / channelUnregistered(...) of the handlers were never called. Modifications: Correctly run all pending tasks and cancel all not ready scheduled tasks when close or disconnect was called. Result: Correctly run tasks on close / disconnect and have channelInactive(...) / channelUnregistered(...) called. --- .../channel/embedded/EmbeddedChannel.java | 34 +++++++++- .../netty/channel/PendingWriteQueueTest.java | 3 +- .../channel/embedded/EmbeddedChannelTest.java | 63 +++++++++++++++++++ 3 files changed, 96 insertions(+), 4 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 84188d3754..774e60f3dc 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -229,14 +229,42 @@ public class EmbeddedChannel extends AbstractChannel { */ public boolean finish() { close(); - runPendingTasks(); + checkException(); + return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages); + } + private void finishPendingTasks() { + runPendingTasks(); // Cancel all scheduled tasks that are left. loop.cancelScheduledTasks(); + } - checkException(); + @Override + public final ChannelFuture close() { + ChannelFuture future = super.close(); + finishPendingTasks(); + return future; + } - return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages); + @Override + public final ChannelFuture disconnect() { + ChannelFuture future = super.disconnect(); + finishPendingTasks(); + return future; + } + + @Override + public final ChannelFuture close(ChannelPromise promise) { + ChannelFuture future = super.close(promise); + finishPendingTasks(); + return future; + } + + @Override + public final ChannelFuture disconnect(ChannelPromise promise) { + ChannelFuture future = super.disconnect(promise); + finishPendingTasks(); + return future; } private static boolean isNotEmpty(Queue queue) { diff --git a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java index a28fbba4d6..d243ce647a 100644 --- a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java +++ b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java @@ -248,9 +248,10 @@ public class PendingWriteQueueTest { @Test public void testCloseChannelOnCreation() { EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter()); + ChannelHandlerContext context = channel.pipeline().firstContext(); channel.close().syncUninterruptibly(); - final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); + final PendingWriteQueue queue = new PendingWriteQueue(context); IllegalStateException ex = new IllegalStateException(); ChannelPromise promise = channel.newPromise(); diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index c1e462ad29..12abe5fae0 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -16,6 +16,7 @@ package io.netty.channel.embedded; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; @@ -134,4 +135,66 @@ public class EmbeddedChannelTest { Assert.assertSame(2, channel.readOutbound()); Assert.assertNull(channel.readOutbound()); } + + // See https://github.com/netty/netty/issues/4316. + @Test(timeout = 2000) + public void testFireChannelInactiveAndUnregisteredOnClose() throws InterruptedException { + testFireChannelInactiveAndUnregistered(new Action() { + @Override + public ChannelFuture doRun(Channel channel) { + return channel.close(); + } + }); + testFireChannelInactiveAndUnregistered(new Action() { + @Override + public ChannelFuture doRun(Channel channel) { + return channel.close(channel.newPromise()); + } + }); + } + + @Test(timeout = 2000) + public void testFireChannelInactiveAndUnregisteredOnDisconnect() throws InterruptedException { + testFireChannelInactiveAndUnregistered(new Action() { + @Override + public ChannelFuture doRun(Channel channel) { + return channel.disconnect(); + } + }); + + testFireChannelInactiveAndUnregistered(new Action() { + @Override + public ChannelFuture doRun(Channel channel) { + return channel.disconnect(channel.newPromise()); + } + }); + } + + private static void testFireChannelInactiveAndUnregistered(Action action) throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(3); + EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter() { + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + latch.countDown(); + ctx.executor().execute(new Runnable() { + @Override + public void run() { + // Should be executed. + latch.countDown(); + } + }); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + latch.countDown(); + } + }); + action.doRun(channel).syncUninterruptibly(); + latch.await(); + } + + private interface Action { + ChannelFuture doRun(Channel channel); + } }