From 6a4a9f7493165a8a96b6dd30b0eed0e4e52f0f66 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 24 Mar 2016 13:44:07 +0100 Subject: [PATCH] Correctly run pending tasks before flush and also remove incorrect assert. Motivation: We need to ensure we run all pending tasks before doing any flush in writeOutbound(...) to ensure all pending tasks are run first. Also we should remove the assert of the future and just add a listener to it so it is processed later if needed. This is true as a user may schedule a write for later execution. Modifications: - Remove assert of future in writeOutbound(...) - Correctly run pending tasks before doing the flush and also before doing the close of the channel. - Add unit tests to proof the defect is fixed. Result: Correclty handle the situation of delayed writes. --- .../channel/embedded/EmbeddedChannel.java | 32 +++++++++++-- .../channel/embedded/EmbeddedChannelTest.java | 46 +++++++++++++++++++ 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 0bca358521..54a0abeae1 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -19,6 +19,7 @@ import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -52,6 +53,13 @@ public class EmbeddedChannel extends AbstractChannel { private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true); private final EmbeddedEventLoop loop = new EmbeddedEventLoop(); + private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + recordException(future); + } + }; + private final ChannelMetadata metadata; private final ChannelConfig config; private final SocketAddress localAddress = new EmbeddedSocketAddress(); @@ -217,19 +225,22 @@ public class EmbeddedChannel extends AbstractChannel { } futures.add(write(m)); } - + // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to + // delay the write on the next eventloop run. + runPendingTasks(); flush(); int size = futures.size(); for (int i = 0; i < size; i++) { ChannelFuture future = (ChannelFuture) futures.get(i); - assert future.isDone(); - if (future.cause() != null) { - recordException(future.cause()); + if (future.isDone()) { + recordException(future); + } else { + // The write may be delayed to run later by runPendingTasks() + future.addListener(recordExceptionListener); } } - runPendingTasks(); checkException(); return isNotEmpty(outboundMessages); } finally { @@ -325,7 +336,12 @@ public class EmbeddedChannel extends AbstractChannel { @Override public final ChannelFuture close(ChannelPromise promise) { + // We need to call runPendingTasks() before calling super.close() as there may be something in the queue + // that needs to be run before the actual close takes place. + runPendingTasks(); ChannelFuture future = super.close(promise); + + // Now finish everything else and cancel all scheduled tasks that were not ready set. finishPendingTasks(true); return future; } @@ -377,6 +393,12 @@ public class EmbeddedChannel extends AbstractChannel { } } + private void recordException(ChannelFuture future) { + if (!future.isSuccess()) { + recordException(future.cause()); + } + } + private void recordException(Throwable cause) { if (lastException == null) { lastException = cause; diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index 1d0f4071e0..d67120d6c7 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -308,6 +308,52 @@ public class EmbeddedChannelTest { } } + @Test + public void testWriteLater() { + EmbeddedChannel channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() { + @Override + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) + throws Exception { + ctx.executor().execute(new Runnable() { + @Override + public void run() { + ctx.write(msg, promise); + } + }); + } + }); + Object msg = new Object(); + + assertTrue(channel.writeOutbound(msg)); + assertTrue(channel.finish()); + assertSame(msg, channel.readOutbound()); + assertNull(channel.readOutbound()); + } + + @Test + public void testWriteScheduled() throws InterruptedException { + final int delay = 500; + EmbeddedChannel channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() { + @Override + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) + throws Exception { + ctx.executor().schedule(new Runnable() { + @Override + public void run() { + ctx.writeAndFlush(msg, promise); + } + }, delay, TimeUnit.MILLISECONDS); + } + }); + Object msg = new Object(); + + assertFalse(channel.writeOutbound(msg)); + Thread.sleep(delay * 2); + assertTrue(channel.finish()); + assertSame(msg, channel.readOutbound()); + assertNull(channel.readOutbound()); + } + private static void release(ByteBuf... buffers) { for (ByteBuf buffer : buffers) { if (buffer.refCnt() > 0) {