FlushConsolidationHandler may suppress flushes by mistake (#9931)

Motivation:

When `consolidatedWhenNoReadInProgress` is true, `channel.writeAndFlush (data) .addListener (f-> channel.writeAndFlush (data2))` Will cause data2 to never be flushed.

Because the flush operation will synchronously execute the `channel.writeAndFlush (data2))` in the `listener`, and at this time, since the current execution thread is still an `eventloop`(`executor.inEventLoop()` was true), all handlers will be executed synchronously. At this time, since `nextScheduledFlush` is still not null, the `flush` operation of `data2` will be ignored in `FlushConsolidationHandler#scheduleFlush`.

Modification:

 - reset `nextScheduledFlush` before `ctx.flush`
 - use `ObjectUtil` to polish code

Result:

Fixes https://github.com/netty/netty/issues/9923
This commit is contained in:
时无两丶 2020-01-09 21:51:12 +08:00 committed by Norman Maurer
parent 961362f43f
commit 044b6b0661
2 changed files with 26 additions and 6 deletions

View File

@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.util.internal.ObjectUtil;
import java.util.concurrent.Future;
@ -93,18 +94,15 @@ public class FlushConsolidationHandler implements ChannelHandler {
* ongoing.
*/
public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
if (explicitFlushAfterFlushes <= 0) {
throw new IllegalArgumentException("explicitFlushAfterFlushes: "
+ explicitFlushAfterFlushes + " (expected: > 0)");
}
this.explicitFlushAfterFlushes = explicitFlushAfterFlushes;
this.explicitFlushAfterFlushes =
ObjectUtil.checkPositive(explicitFlushAfterFlushes, "explicitFlushAfterFlushes");
this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
flushTask = consolidateWhenNoReadInProgress ?
() -> {
if (flushPendingCount > 0 && !readInProgress) {
flushPendingCount = 0;
ctx.flush();
nextScheduledFlush = null;
ctx.flush();
} // else we'll flush when the read completes
}
: null;

View File

@ -18,6 +18,8 @@ package io.netty.handler.flush;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
@ -154,6 +156,26 @@ public class FlushConsolidationHandlerTest {
assertFalse(channel.finish());
}
/**
* See https://github.com/netty/netty/issues/9923
*/
@Test
public void testResend() throws Exception {
final AtomicInteger flushCount = new AtomicInteger();
final EmbeddedChannel channel = newChannel(flushCount, true);
channel.writeAndFlush(1L).addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
channel.writeAndFlush(1L);
}
});
channel.flushOutbound();
assertEquals(1L, ((Long) channel.readOutbound()).longValue());
assertEquals(1L, ((Long) channel.readOutbound()).longValue());
assertNull(channel.readOutbound());
assertFalse(channel.finish());
}
private static EmbeddedChannel newChannel(final AtomicInteger flushCount, boolean consolidateWhenNoReadInProgress) {
return new EmbeddedChannel(
new ChannelHandler() {