From 044b6b0661ce9749cc55862968938f11d0698766 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=97=B6=E6=97=A0=E4=B8=A4=E4=B8=B6?= <442367943@qq.com> Date: Thu, 9 Jan 2020 21:51:12 +0800 Subject: [PATCH] FlushConsolidationHandler may suppress flushes by mistake (#9931) Motivation: When `consolidatedWhenNoReadInProgress` is true, `channel.writeAndFlush (data) .addListener (f-> channel.writeAndFlush (data2))` Will cause data2 to never be flushed. Because the flush operation will synchronously execute the `channel.writeAndFlush (data2))` in the `listener`, and at this time, since the current execution thread is still an `eventloop`(`executor.inEventLoop()` was true), all handlers will be executed synchronously. At this time, since `nextScheduledFlush` is still not null, the `flush` operation of `data2` will be ignored in `FlushConsolidationHandler#scheduleFlush`. Modification: - reset `nextScheduledFlush` before `ctx.flush` - use `ObjectUtil` to polish code Result: Fixes https://github.com/netty/netty/issues/9923 --- .../flush/FlushConsolidationHandler.java | 10 ++++----- .../flush/FlushConsolidationHandlerTest.java | 22 +++++++++++++++++++ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java index 01e10f149a..a72e8bb99b 100644 --- a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java +++ b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java @@ -21,6 +21,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundInvoker; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; +import io.netty.util.internal.ObjectUtil; import java.util.concurrent.Future; @@ -93,18 +94,15 @@ public class FlushConsolidationHandler implements ChannelHandler { * ongoing. */ public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) { - if (explicitFlushAfterFlushes <= 0) { - throw new IllegalArgumentException("explicitFlushAfterFlushes: " - + explicitFlushAfterFlushes + " (expected: > 0)"); - } - this.explicitFlushAfterFlushes = explicitFlushAfterFlushes; + this.explicitFlushAfterFlushes = + ObjectUtil.checkPositive(explicitFlushAfterFlushes, "explicitFlushAfterFlushes"); this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress; flushTask = consolidateWhenNoReadInProgress ? () -> { if (flushPendingCount > 0 && !readInProgress) { flushPendingCount = 0; - ctx.flush(); nextScheduledFlush = null; + ctx.flush(); } // else we'll flush when the read completes } : null; diff --git a/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java b/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java index a940ef5db0..12ff91f308 100644 --- a/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java @@ -18,6 +18,8 @@ package io.netty.handler.flush; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import org.junit.Test; import java.util.concurrent.atomic.AtomicInteger; @@ -154,6 +156,26 @@ public class FlushConsolidationHandlerTest { assertFalse(channel.finish()); } + /** + * See https://github.com/netty/netty/issues/9923 + */ + @Test + public void testResend() throws Exception { + final AtomicInteger flushCount = new AtomicInteger(); + final EmbeddedChannel channel = newChannel(flushCount, true); + channel.writeAndFlush(1L).addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + channel.writeAndFlush(1L); + } + }); + channel.flushOutbound(); + assertEquals(1L, ((Long) channel.readOutbound()).longValue()); + assertEquals(1L, ((Long) channel.readOutbound()).longValue()); + assertNull(channel.readOutbound()); + assertFalse(channel.finish()); + } + private static EmbeddedChannel newChannel(final AtomicInteger flushCount, boolean consolidateWhenNoReadInProgress) { return new EmbeddedChannel( new ChannelHandler() {