From 3a8b8c9219901bb3fe6dd8b8b87b41eec3e275fc Mon Sep 17 00:00:00 2001 From: olim7t Date: Tue, 30 Aug 2016 09:46:37 -0700 Subject: [PATCH] Consolidate flushes even when no read in progress Motivation: Currently FlushConsolidationHandler only consolidates if a read loop is active for a Channel, otherwise each writeAndFlush(...) call will still be flushed individually. When these calls are close enough, it can be beneficial to consolidate them even outside of a read loop. Modifications: When we allow a flush to "go through", don't perform it immediately, but submit it on the channel's executor. Under high pressure, this gives other writes a chance to enqueue before the task gets executed, and so we flush multiple writes at once. Result: Lower CPU usage and less context switching. --- .../flush/FlushConsolidationHandler.java | 119 ++++++++++++++---- .../flush/FlushConsolidationHandlerTest.java | 79 +++++++++--- 2 files changed, 157 insertions(+), 41 deletions(-) diff --git a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java index e99ea3a0db..12d31627f6 100644 --- a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java +++ b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java @@ -23,21 +23,32 @@ import io.netty.channel.ChannelOutboundHandler; import io.netty.channel.ChannelOutboundInvoker; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; -import io.netty.util.internal.ObjectUtil; + +import java.util.concurrent.Future; /** - * {@link ChannelDuplexHandler} which consolidate {@link ChannelOutboundInvoker#flush()} operations (which also includes - * {@link ChannelOutboundInvoker#writeAndFlush(Object)} and + * {@link ChannelDuplexHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()} + * operations (which also includes + * {@link Channel#writeAndFlush(Object)} / {@link Channel#writeAndFlush(Object, ChannelPromise)} and + * {@link ChannelOutboundInvoker#writeAndFlush(Object)} / * {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}). *

- * Flush operations are general speaking expensive as these may trigger a syscall on the transport level. Thus it is + * Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations * as much as possible. *

- * When {@link #flush(ChannelHandlerContext)} is called it will only pass it on to the next - * {@link ChannelOutboundHandler} in the {@link ChannelPipeline} if no read loop is currently ongoing - * as it will pick up any pending flushes when {@link #channelReadComplete(ChannelHandlerContext)} is trigged. - * If {@code explicitFlushAfterFlushes} is reached the flush will also be forwarded as well. + * If a read loop is currently ongoing, {@link #flush(ChannelHandlerContext)} will not be passed on to the next + * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when + * {@link #channelReadComplete(ChannelHandlerContext)} is triggered. + * If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument: + *

+ * If {@code explicitFlushAfterFlushes} is reached the flush will also be forwarded as well (whether while in a read + * loop, or while batching outside of a read loop). *

* If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations. *

@@ -46,38 +57,81 @@ import io.netty.util.internal.ObjectUtil; */ public class FlushConsolidationHandler extends ChannelDuplexHandler { private final int explicitFlushAfterFlushes; + private final boolean consolidateWhenNoReadInProgress; + private final Runnable flushTask; private int flushPendingCount; - private boolean readInprogess; + private boolean readInProgress; + private ChannelHandlerContext ctx; + private Future nextScheduledFlush; /** * Create new instance which explicit flush after 256 pending flush operations latest. */ public FlushConsolidationHandler() { - this(256); + this(256, false); + } + + /** + * Create new instance which doesn't consolidate flushes when no read is in progress. + * + * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done. + */ + public FlushConsolidationHandler(int explicitFlushAfterFlushes) { + this(explicitFlushAfterFlushes, false); } /** * Create new instance. * * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done. + * @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently + * ongoing. */ - public FlushConsolidationHandler(int explicitFlushAfterFlushes) { - this.explicitFlushAfterFlushes = ObjectUtil.checkPositive(explicitFlushAfterFlushes, - "explicitFlushAfterFlushes"); + public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) { + if (explicitFlushAfterFlushes <= 0) { + throw new IllegalArgumentException("explicitFlushAfterFlushes: " + + explicitFlushAfterFlushes + " (expected: > 0)"); + } + this.explicitFlushAfterFlushes = explicitFlushAfterFlushes; + this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress; + flushTask = consolidateWhenNoReadInProgress ? + new Runnable() { + @Override + public void run() { + if (flushPendingCount > 0 && !readInProgress) { + flushPendingCount = 0; + ctx.flush(); + nextScheduledFlush = null; + } // else we'll flush when the read completes + } + } + : null; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + this.ctx = ctx; } @Override public void flush(ChannelHandlerContext ctx) throws Exception { - if (readInprogess) { - // If there is still a read in compress we are sure we will see a channelReadComplete(...) call. Thus + if (readInProgress) { + // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus // we only need to flush if we reach the explicitFlushAfterFlushes limit. if (++flushPendingCount == explicitFlushAfterFlushes) { - flushPendingCount = 0; - ctx.flush(); + flushNow(ctx); } - return; + } else if (consolidateWhenNoReadInProgress) { + // Flush immediately if we reach the threshold, otherwise schedule + if (++flushPendingCount == explicitFlushAfterFlushes) { + flushNow(ctx); + } else { + scheduleFlush(ctx); + } + } else { + // Always flush directly + flushNow(ctx); } - ctx.flush(); } @Override @@ -89,7 +143,7 @@ public class FlushConsolidationHandler extends ChannelDuplexHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - readInprogess = true; + readInProgress = true; ctx.fireChannelRead(msg); } @@ -129,14 +183,33 @@ public class FlushConsolidationHandler extends ChannelDuplexHandler { } private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) { - readInprogess = false; + readInProgress = false; flushIfNeeded(ctx); } private void flushIfNeeded(ChannelHandlerContext ctx) { if (flushPendingCount > 0) { - flushPendingCount = 0; - ctx.flush(); + flushNow(ctx); + } + } + + private void flushNow(ChannelHandlerContext ctx) { + cancelScheduledFlush(); + flushPendingCount = 0; + ctx.flush(); + } + + private void scheduleFlush(final ChannelHandlerContext ctx) { + if (nextScheduledFlush == null) { + // Run as soon as possible, but still yield to give a chance for additional writes to enqueue. + nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask); + } + } + + private void cancelScheduledFlush() { + if (nextScheduledFlush != null) { + nextScheduledFlush.cancel(false); + nextScheduledFlush = null; } } } diff --git a/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java b/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java index 557b305e79..05eb80cb82 100644 --- a/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java @@ -27,12 +27,51 @@ import static org.junit.Assert.*; public class FlushConsolidationHandlerTest { + private static final int EXPLICIT_FLUSH_AFTER_FLUSHES = 3; + + @Test + public void testFlushViaScheduledTask() { + final AtomicInteger flushCount = new AtomicInteger(); + EmbeddedChannel channel = newChannel(flushCount, true); + // Flushes should not go through immediately, as they're scheduled as an async task + channel.flush(); + assertEquals(0, flushCount.get()); + channel.flush(); + assertEquals(0, flushCount.get()); + // Trigger the execution of the async task + channel.runPendingTasks(); + assertEquals(1, flushCount.get()); + assertFalse(channel.finish()); + } + + @Test + public void testFlushViaThresholdOutsideOfReadLoop() { + final AtomicInteger flushCount = new AtomicInteger(); + EmbeddedChannel channel = newChannel(flushCount, true); + // After a given threshold, the async task should be bypassed and a flush should be triggered immediately + for (int i = 0; i < EXPLICIT_FLUSH_AFTER_FLUSHES; i++) { + channel.flush(); + } + assertEquals(1, flushCount.get()); + assertFalse(channel.finish()); + } + + @Test + public void testImmediateFlushOutsideOfReadLoop() { + final AtomicInteger flushCount = new AtomicInteger(); + EmbeddedChannel channel = newChannel(flushCount, false); + channel.flush(); + assertEquals(1, flushCount.get()); + assertFalse(channel.finish()); + } + @Test public void testFlushViaReadComplete() { final AtomicInteger flushCount = new AtomicInteger(); - EmbeddedChannel channel = newChannel(flushCount); + EmbeddedChannel channel = newChannel(flushCount, false); // Flush should go through as there is no read loop in progress. channel.flush(); + channel.runPendingTasks(); assertEquals(1, flushCount.get()); // Simulate read loop; @@ -45,6 +84,7 @@ public class FlushConsolidationHandlerTest { assertEquals(2, flushCount.get()); // Now flush again as the read loop is complete. channel.flush(); + channel.runPendingTasks(); assertEquals(3, flushCount.get()); assertEquals(1L, channel.readOutbound()); assertEquals(2L, channel.readOutbound()); @@ -55,7 +95,7 @@ public class FlushConsolidationHandlerTest { @Test public void testFlushViaClose() { final AtomicInteger flushCount = new AtomicInteger(); - EmbeddedChannel channel = newChannel(flushCount); + EmbeddedChannel channel = newChannel(flushCount, false); // Simulate read loop; channel.pipeline().fireChannelRead(1L); assertEquals(0, flushCount.get()); @@ -70,7 +110,7 @@ public class FlushConsolidationHandlerTest { @Test public void testFlushViaDisconnect() { final AtomicInteger flushCount = new AtomicInteger(); - EmbeddedChannel channel = newChannel(flushCount); + EmbeddedChannel channel = newChannel(flushCount, false); // Simulate read loop; channel.pipeline().fireChannelRead(1L); assertEquals(0, flushCount.get()); @@ -85,7 +125,7 @@ public class FlushConsolidationHandlerTest { @Test(expected = IllegalStateException.class) public void testFlushViaException() { final AtomicInteger flushCount = new AtomicInteger(); - EmbeddedChannel channel = newChannel(flushCount); + EmbeddedChannel channel = newChannel(flushCount, false); // Simulate read loop; channel.pipeline().fireChannelRead(1L); assertEquals(0, flushCount.get()); @@ -100,7 +140,7 @@ public class FlushConsolidationHandlerTest { @Test public void testFlushViaRemoval() { final AtomicInteger flushCount = new AtomicInteger(); - EmbeddedChannel channel = newChannel(flushCount); + EmbeddedChannel channel = newChannel(flushCount, false); // Simulate read loop; channel.pipeline().fireChannelRead(1L); assertEquals(0, flushCount.get()); @@ -112,18 +152,21 @@ public class FlushConsolidationHandlerTest { assertFalse(channel.finish()); } - private static EmbeddedChannel newChannel(final AtomicInteger flushCount) { - return new EmbeddedChannel(new ChannelOutboundHandlerAdapter() { - @Override - public void flush(ChannelHandlerContext ctx) throws Exception { - flushCount.incrementAndGet(); - ctx.flush(); - } - }, new FlushConsolidationHandler(), new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ctx.writeAndFlush(msg); - } - }); + private static EmbeddedChannel newChannel(final AtomicInteger flushCount, boolean consolidateWhenNoReadInProgress) { + return new EmbeddedChannel( + new ChannelOutboundHandlerAdapter() { + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + flushCount.incrementAndGet(); + ctx.flush(); + } + }, + new FlushConsolidationHandler(EXPLICIT_FLUSH_AFTER_FLUSHES, consolidateWhenNoReadInProgress), + new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ctx.writeAndFlush(msg); + } + }); } }