Correctly run pending tasks before flush and also remove incorrect assert.

Motivation:

We need to ensure we run all pending tasks before doing any flush in writeOutbound(...) to ensure all pending tasks are run first. Also we should remove the assert of the future and just add a listener to it so it is processed later if needed. This is true as a user may schedule a write for later execution.

Modifications:

- Remove assert of future in writeOutbound(...)
- Correctly run pending tasks before doing the flush and also before doing the close of the channel.
- Add unit tests to proof the defect is fixed.

Result:

Correclty handle the situation of delayed writes.
This commit is contained in:
Norman Maurer 2016-03-24 13:44:07 +01:00
parent 900db0acff
commit 6a4a9f7493
2 changed files with 73 additions and 5 deletions

View File

@ -19,6 +19,7 @@ import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@ -52,6 +53,13 @@ public class EmbeddedChannel extends AbstractChannel {
private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
recordException(future);
}
};
private final ChannelMetadata metadata;
private final ChannelConfig config;
private final SocketAddress localAddress = new EmbeddedSocketAddress();
@ -217,19 +225,22 @@ public class EmbeddedChannel extends AbstractChannel {
}
futures.add(write(m));
}
// We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
// delay the write on the next eventloop run.
runPendingTasks();
flush();
int size = futures.size();
for (int i = 0; i < size; i++) {
ChannelFuture future = (ChannelFuture) futures.get(i);
assert future.isDone();
if (future.cause() != null) {
recordException(future.cause());
if (future.isDone()) {
recordException(future);
} else {
// The write may be delayed to run later by runPendingTasks()
future.addListener(recordExceptionListener);
}
}
runPendingTasks();
checkException();
return isNotEmpty(outboundMessages);
} finally {
@ -325,7 +336,12 @@ public class EmbeddedChannel extends AbstractChannel {
@Override
public final ChannelFuture close(ChannelPromise promise) {
// We need to call runPendingTasks() before calling super.close() as there may be something in the queue
// that needs to be run before the actual close takes place.
runPendingTasks();
ChannelFuture future = super.close(promise);
// Now finish everything else and cancel all scheduled tasks that were not ready set.
finishPendingTasks(true);
return future;
}
@ -377,6 +393,12 @@ public class EmbeddedChannel extends AbstractChannel {
}
}
private void recordException(ChannelFuture future) {
if (!future.isSuccess()) {
recordException(future.cause());
}
}
private void recordException(Throwable cause) {
if (lastException == null) {
lastException = cause;

View File

@ -308,6 +308,52 @@ public class EmbeddedChannelTest {
}
}
@Test
public void testWriteLater() {
EmbeddedChannel channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
ctx.write(msg, promise);
}
});
}
});
Object msg = new Object();
assertTrue(channel.writeOutbound(msg));
assertTrue(channel.finish());
assertSame(msg, channel.readOutbound());
assertNull(channel.readOutbound());
}
@Test
public void testWriteScheduled() throws InterruptedException {
final int delay = 500;
EmbeddedChannel channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.writeAndFlush(msg, promise);
}
}, delay, TimeUnit.MILLISECONDS);
}
});
Object msg = new Object();
assertFalse(channel.writeOutbound(msg));
Thread.sleep(delay * 2);
assertTrue(channel.finish());
assertSame(msg, channel.readOutbound());
assertNull(channel.readOutbound());
}
private static void release(ByteBuf... buffers) {
for (ByteBuf buffer : buffers) {
if (buffer.refCnt() > 0) {