[#4316] Ensure pending tasks are run when EmbeddedChannel.close(...) or disconnect(...) is called.

Motivation:

We missed to run all pending tasks when EmbeddedChannel.close(...) or disconnect(...) was called. Because of this channelInactive(...) / channelUnregistered(...) of the handlers were never called.

Modifications:

Correctly run all pending tasks and cancel all not ready scheduled tasks when close or disconnect was called.

Result:

Correctly run tasks on close / disconnect and have channelInactive(...) / channelUnregistered(...) called.
This commit is contained in:
Norman Maurer 2015-10-06 11:44:52 +02:00
parent 5d61ef3fed
commit 845a1a526a
3 changed files with 96 additions and 4 deletions

View File

@ -229,14 +229,42 @@ public class EmbeddedChannel extends AbstractChannel {
*/
public boolean finish() {
close();
runPendingTasks();
checkException();
return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
}
private void finishPendingTasks() {
runPendingTasks();
// Cancel all scheduled tasks that are left.
loop.cancelScheduledTasks();
}
checkException();
@Override
public final ChannelFuture close() {
ChannelFuture future = super.close();
finishPendingTasks();
return future;
}
return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
@Override
public final ChannelFuture disconnect() {
ChannelFuture future = super.disconnect();
finishPendingTasks();
return future;
}
@Override
public final ChannelFuture close(ChannelPromise promise) {
ChannelFuture future = super.close(promise);
finishPendingTasks();
return future;
}
@Override
public final ChannelFuture disconnect(ChannelPromise promise) {
ChannelFuture future = super.disconnect(promise);
finishPendingTasks();
return future;
}
private static boolean isNotEmpty(Queue<Object> queue) {

View File

@ -248,9 +248,10 @@ public class PendingWriteQueueTest {
@Test
public void testCloseChannelOnCreation() {
EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
ChannelHandlerContext context = channel.pipeline().firstContext();
channel.close().syncUninterruptibly();
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
final PendingWriteQueue queue = new PendingWriteQueue(context);
IllegalStateException ex = new IllegalStateException();
ChannelPromise promise = channel.newPromise();

View File

@ -16,6 +16,7 @@
package io.netty.channel.embedded;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@ -134,4 +135,66 @@ public class EmbeddedChannelTest {
Assert.assertSame(2, channel.readOutbound());
Assert.assertNull(channel.readOutbound());
}
// See https://github.com/netty/netty/issues/4316.
@Test(timeout = 2000)
public void testFireChannelInactiveAndUnregisteredOnClose() throws InterruptedException {
testFireChannelInactiveAndUnregistered(new Action() {
@Override
public ChannelFuture doRun(Channel channel) {
return channel.close();
}
});
testFireChannelInactiveAndUnregistered(new Action() {
@Override
public ChannelFuture doRun(Channel channel) {
return channel.close(channel.newPromise());
}
});
}
@Test(timeout = 2000)
public void testFireChannelInactiveAndUnregisteredOnDisconnect() throws InterruptedException {
testFireChannelInactiveAndUnregistered(new Action() {
@Override
public ChannelFuture doRun(Channel channel) {
return channel.disconnect();
}
});
testFireChannelInactiveAndUnregistered(new Action() {
@Override
public ChannelFuture doRun(Channel channel) {
return channel.disconnect(channel.newPromise());
}
});
}
private static void testFireChannelInactiveAndUnregistered(Action action) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(3);
EmbeddedChannel channel = new EmbeddedChannel(new ChannelInboundHandlerAdapter() {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
ctx.executor().execute(new Runnable() {
@Override
public void run() {
// Should be executed.
latch.countDown();
}
});
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
}
});
action.doRun(channel).syncUninterruptibly();
latch.await();
}
private interface Action {
ChannelFuture doRun(Channel channel);
}
}