Consolidate flushes even when no read in progress

Motivation:

Currently FlushConsolidationHandler only consolidates if a read loop is
active for a Channel, otherwise each writeAndFlush(...) call will still
be flushed individually. When these calls are close enough, it can be
beneficial to consolidate them even outside of a read loop.

Modifications:

When we allow a flush to "go through", don't perform it immediately, but
submit it on the channel's executor. Under high pressure, this gives
other writes a chance to enqueue before the task gets executed, and so
we flush multiple writes at once.

Result:

Lower CPU usage and less context switching.
This commit is contained in:
olim7t 2016-08-30 09:46:37 -07:00 committed by Norman Maurer
parent 5986c229c4
commit 3a8b8c9219
2 changed files with 157 additions and 41 deletions

View File

@ -23,21 +23,32 @@ import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundInvoker; import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.util.internal.ObjectUtil;
import java.util.concurrent.Future;
/** /**
* {@link ChannelDuplexHandler} which consolidate {@link ChannelOutboundInvoker#flush()} operations (which also includes * {@link ChannelDuplexHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()}
* {@link ChannelOutboundInvoker#writeAndFlush(Object)} and * operations (which also includes
* {@link Channel#writeAndFlush(Object)} / {@link Channel#writeAndFlush(Object, ChannelPromise)} and
* {@link ChannelOutboundInvoker#writeAndFlush(Object)} /
* {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}). * {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}).
* <p> * <p>
* Flush operations are general speaking expensive as these may trigger a syscall on the transport level. Thus it is * Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is
* in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
* as much as possible. * as much as possible.
* <p> * <p>
* When {@link #flush(ChannelHandlerContext)} is called it will only pass it on to the next * If a read loop is currently ongoing, {@link #flush(ChannelHandlerContext)} will not be passed on to the next
* {@link ChannelOutboundHandler} in the {@link ChannelPipeline} if no read loop is currently ongoing * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when
* as it will pick up any pending flushes when {@link #channelReadComplete(ChannelHandlerContext)} is trigged. * {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
* If {@code explicitFlushAfterFlushes} is reached the flush will also be forwarded as well. * If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
* <ul>
* <li>if {@code false}, flushes are passed on to the next handler directly;</li>
* <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under
* high throughput, this gives the opportunity to process other flushes before the task gets executed, thus
* batching multiple flushes into one.</li>
* </ul>
* If {@code explicitFlushAfterFlushes} is reached the flush will also be forwarded as well (whether while in a read
* loop, or while batching outside of a read loop).
* <p> * <p>
* If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations. * If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
* <p> * <p>
@ -46,38 +57,81 @@ import io.netty.util.internal.ObjectUtil;
*/ */
public class FlushConsolidationHandler extends ChannelDuplexHandler { public class FlushConsolidationHandler extends ChannelDuplexHandler {
private final int explicitFlushAfterFlushes; private final int explicitFlushAfterFlushes;
private final boolean consolidateWhenNoReadInProgress;
private final Runnable flushTask;
private int flushPendingCount; private int flushPendingCount;
private boolean readInprogess; private boolean readInProgress;
private ChannelHandlerContext ctx;
private Future<?> nextScheduledFlush;
/** /**
* Create new instance which explicit flush after 256 pending flush operations latest. * Create new instance which explicit flush after 256 pending flush operations latest.
*/ */
public FlushConsolidationHandler() { public FlushConsolidationHandler() {
this(256); this(256, false);
}
/**
* Create new instance which doesn't consolidate flushes when no read is in progress.
*
* @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
*/
public FlushConsolidationHandler(int explicitFlushAfterFlushes) {
this(explicitFlushAfterFlushes, false);
} }
/** /**
* Create new instance. * Create new instance.
* *
* @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done. * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
* @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently
* ongoing.
*/ */
public FlushConsolidationHandler(int explicitFlushAfterFlushes) { public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
this.explicitFlushAfterFlushes = ObjectUtil.checkPositive(explicitFlushAfterFlushes, if (explicitFlushAfterFlushes <= 0) {
"explicitFlushAfterFlushes"); throw new IllegalArgumentException("explicitFlushAfterFlushes: "
+ explicitFlushAfterFlushes + " (expected: > 0)");
}
this.explicitFlushAfterFlushes = explicitFlushAfterFlushes;
this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
flushTask = consolidateWhenNoReadInProgress ?
new Runnable() {
@Override
public void run() {
if (flushPendingCount > 0 && !readInProgress) {
flushPendingCount = 0;
ctx.flush();
nextScheduledFlush = null;
} // else we'll flush when the read completes
}
}
: null;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
} }
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) throws Exception {
if (readInprogess) { if (readInProgress) {
// If there is still a read in compress we are sure we will see a channelReadComplete(...) call. Thus // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
// we only need to flush if we reach the explicitFlushAfterFlushes limit. // we only need to flush if we reach the explicitFlushAfterFlushes limit.
if (++flushPendingCount == explicitFlushAfterFlushes) { if (++flushPendingCount == explicitFlushAfterFlushes) {
flushPendingCount = 0; flushNow(ctx);
ctx.flush();
} }
return; } else if (consolidateWhenNoReadInProgress) {
// Flush immediately if we reach the threshold, otherwise schedule
if (++flushPendingCount == explicitFlushAfterFlushes) {
flushNow(ctx);
} else {
scheduleFlush(ctx);
}
} else {
// Always flush directly
flushNow(ctx);
} }
ctx.flush();
} }
@Override @Override
@ -89,7 +143,7 @@ public class FlushConsolidationHandler extends ChannelDuplexHandler {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
readInprogess = true; readInProgress = true;
ctx.fireChannelRead(msg); ctx.fireChannelRead(msg);
} }
@ -129,14 +183,33 @@ public class FlushConsolidationHandler extends ChannelDuplexHandler {
} }
private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) { private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) {
readInprogess = false; readInProgress = false;
flushIfNeeded(ctx); flushIfNeeded(ctx);
} }
private void flushIfNeeded(ChannelHandlerContext ctx) { private void flushIfNeeded(ChannelHandlerContext ctx) {
if (flushPendingCount > 0) { if (flushPendingCount > 0) {
flushPendingCount = 0; flushNow(ctx);
ctx.flush(); }
}
private void flushNow(ChannelHandlerContext ctx) {
cancelScheduledFlush();
flushPendingCount = 0;
ctx.flush();
}
private void scheduleFlush(final ChannelHandlerContext ctx) {
if (nextScheduledFlush == null) {
// Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask);
}
}
private void cancelScheduledFlush() {
if (nextScheduledFlush != null) {
nextScheduledFlush.cancel(false);
nextScheduledFlush = null;
} }
} }
} }

View File

@ -27,12 +27,51 @@ import static org.junit.Assert.*;
public class FlushConsolidationHandlerTest { public class FlushConsolidationHandlerTest {
private static final int EXPLICIT_FLUSH_AFTER_FLUSHES = 3;
@Test
public void testFlushViaScheduledTask() {
final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount, true);
// Flushes should not go through immediately, as they're scheduled as an async task
channel.flush();
assertEquals(0, flushCount.get());
channel.flush();
assertEquals(0, flushCount.get());
// Trigger the execution of the async task
channel.runPendingTasks();
assertEquals(1, flushCount.get());
assertFalse(channel.finish());
}
@Test
public void testFlushViaThresholdOutsideOfReadLoop() {
final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount, true);
// After a given threshold, the async task should be bypassed and a flush should be triggered immediately
for (int i = 0; i < EXPLICIT_FLUSH_AFTER_FLUSHES; i++) {
channel.flush();
}
assertEquals(1, flushCount.get());
assertFalse(channel.finish());
}
@Test
public void testImmediateFlushOutsideOfReadLoop() {
final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount, false);
channel.flush();
assertEquals(1, flushCount.get());
assertFalse(channel.finish());
}
@Test @Test
public void testFlushViaReadComplete() { public void testFlushViaReadComplete() {
final AtomicInteger flushCount = new AtomicInteger(); final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount); EmbeddedChannel channel = newChannel(flushCount, false);
// Flush should go through as there is no read loop in progress. // Flush should go through as there is no read loop in progress.
channel.flush(); channel.flush();
channel.runPendingTasks();
assertEquals(1, flushCount.get()); assertEquals(1, flushCount.get());
// Simulate read loop; // Simulate read loop;
@ -45,6 +84,7 @@ public class FlushConsolidationHandlerTest {
assertEquals(2, flushCount.get()); assertEquals(2, flushCount.get());
// Now flush again as the read loop is complete. // Now flush again as the read loop is complete.
channel.flush(); channel.flush();
channel.runPendingTasks();
assertEquals(3, flushCount.get()); assertEquals(3, flushCount.get());
assertEquals(1L, channel.readOutbound()); assertEquals(1L, channel.readOutbound());
assertEquals(2L, channel.readOutbound()); assertEquals(2L, channel.readOutbound());
@ -55,7 +95,7 @@ public class FlushConsolidationHandlerTest {
@Test @Test
public void testFlushViaClose() { public void testFlushViaClose() {
final AtomicInteger flushCount = new AtomicInteger(); final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount); EmbeddedChannel channel = newChannel(flushCount, false);
// Simulate read loop; // Simulate read loop;
channel.pipeline().fireChannelRead(1L); channel.pipeline().fireChannelRead(1L);
assertEquals(0, flushCount.get()); assertEquals(0, flushCount.get());
@ -70,7 +110,7 @@ public class FlushConsolidationHandlerTest {
@Test @Test
public void testFlushViaDisconnect() { public void testFlushViaDisconnect() {
final AtomicInteger flushCount = new AtomicInteger(); final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount); EmbeddedChannel channel = newChannel(flushCount, false);
// Simulate read loop; // Simulate read loop;
channel.pipeline().fireChannelRead(1L); channel.pipeline().fireChannelRead(1L);
assertEquals(0, flushCount.get()); assertEquals(0, flushCount.get());
@ -85,7 +125,7 @@ public class FlushConsolidationHandlerTest {
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testFlushViaException() { public void testFlushViaException() {
final AtomicInteger flushCount = new AtomicInteger(); final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount); EmbeddedChannel channel = newChannel(flushCount, false);
// Simulate read loop; // Simulate read loop;
channel.pipeline().fireChannelRead(1L); channel.pipeline().fireChannelRead(1L);
assertEquals(0, flushCount.get()); assertEquals(0, flushCount.get());
@ -100,7 +140,7 @@ public class FlushConsolidationHandlerTest {
@Test @Test
public void testFlushViaRemoval() { public void testFlushViaRemoval() {
final AtomicInteger flushCount = new AtomicInteger(); final AtomicInteger flushCount = new AtomicInteger();
EmbeddedChannel channel = newChannel(flushCount); EmbeddedChannel channel = newChannel(flushCount, false);
// Simulate read loop; // Simulate read loop;
channel.pipeline().fireChannelRead(1L); channel.pipeline().fireChannelRead(1L);
assertEquals(0, flushCount.get()); assertEquals(0, flushCount.get());
@ -112,18 +152,21 @@ public class FlushConsolidationHandlerTest {
assertFalse(channel.finish()); assertFalse(channel.finish());
} }
private static EmbeddedChannel newChannel(final AtomicInteger flushCount) { private static EmbeddedChannel newChannel(final AtomicInteger flushCount, boolean consolidateWhenNoReadInProgress) {
return new EmbeddedChannel(new ChannelOutboundHandlerAdapter() { return new EmbeddedChannel(
@Override new ChannelOutboundHandlerAdapter() {
public void flush(ChannelHandlerContext ctx) throws Exception { @Override
flushCount.incrementAndGet(); public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush(); flushCount.incrementAndGet();
} ctx.flush();
}, new FlushConsolidationHandler(), new ChannelInboundHandlerAdapter() { }
@Override },
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { new FlushConsolidationHandler(EXPLICIT_FLUSH_AFTER_FLUSHES, consolidateWhenNoReadInProgress),
ctx.writeAndFlush(msg); new ChannelInboundHandlerAdapter() {
} @Override
}); public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.writeAndFlush(msg);
}
});
} }
} }