From e3c8a924993018e5d8e7d724fc89cc1f9d691387 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 5 Jul 2016 14:15:37 +0200 Subject: [PATCH] Add FlushConsolidationHandler which consolidates flush operations as these are expensive Motivation: Calling flush() and writeAndFlush(...) are expensive operations in the sense as both will produce a write(...) or writev(...) system call if there are any pending writes in the ChannelOutboundBuffer. Often we can consolidate multiple flush operations into one if currently a read loop is active for a Channel, as we can just flush when channelReadComplete is triggered. Consolidating flushes can give a huge performance win depending on how often is flush is called. The only "downside" may be a bit higher latency in the case of where only one flush is triggered by the user. Modifications: Add a FlushConsolidationHandler which will consolidate flushes and so improve the throughput. Result: Better performance (throughput). This is especially true for protocols that use some sort of PIPELINING. --- .../flush/FlushConsolidationHandler.java | 140 ++++++++++++++++++ .../io/netty/handler/flush/package-info.java | 20 +++ .../flush/FlushConsolidationHandlerTest.java | 129 ++++++++++++++++ 3 files changed, 289 insertions(+) create mode 100644 handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java create mode 100644 handler/src/main/java/io/netty/handler/flush/package-info.java create mode 100644 handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java diff --git a/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java new file mode 100644 index 0000000000..4a8de756cb --- /dev/null +++ b/handler/src/main/java/io/netty/handler/flush/FlushConsolidationHandler.java @@ -0,0 +1,140 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.flush; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandler; +import io.netty.channel.ChannelOutboundInvoker; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.util.internal.ObjectUtil; + +/** + * {@link ChannelDuplexHandler} which consolidate {@link ChannelOutboundInvoker#flush()} operations (which also includes + * {@link ChannelOutboundInvoker#writeAndFlush(Object)} and + * {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}). + *

+ * Flush operations are general speaking expensive as these may trigger a syscall on the transport level. Thus it is + * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations + * as much as possible. + *

+ * When {@link #flush(ChannelHandlerContext)} is called it will only pass it on to the next + * {@link ChannelOutboundHandler} in the {@link ChannelPipeline} if no read loop is currently ongoing + * as it will pick up any pending flushes when {@link #channelReadComplete(ChannelHandlerContext)} is trigged. + * If {@code explicitFlushAfterFlushes} is reached the flush will also be forwarded as well. + *

+ * If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations. + *

+ * The {@link FlushConsolidationHandler} should be put as first {@link ChannelHandler} in the + * {@link ChannelPipeline} to have the best effect. + */ +public class FlushConsolidationHandler extends ChannelDuplexHandler { + private final int explicitFlushAfterFlushes; + private int flushPendingCount; + private boolean readInprogess; + + /** + * Create new instance which explicit flush after 256 pending flush operations latest. + */ + public FlushConsolidationHandler() { + this(256); + } + + /** + * Create new instance. + * + * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done. + */ + public FlushConsolidationHandler(int explicitFlushAfterFlushes) { + this.explicitFlushAfterFlushes = ObjectUtil.checkPositive(explicitFlushAfterFlushes, + "explicitFlushAfterFlushes"); + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + if (readInprogess) { + // If there is still a read in compress we are sure we will see a channelReadComplete(...) call. Thus + // we only need to flush if we reach the explicitFlushAfterFlushes limit. + if (++flushPendingCount == explicitFlushAfterFlushes) { + flushPendingCount = 0; + ctx.flush(); + } + return; + } + ctx.flush(); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + // This may be the last event in the read loop, so flush now! + flushIfNeeded(ctx, true); + ctx.fireChannelReadComplete(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + readInprogess = true; + ctx.fireChannelRead(msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + // To ensure we not miss to flush anything, do it now. + flushIfNeeded(ctx, true); + ctx.fireExceptionCaught(cause); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + // Try to flush one last time if flushes are pending before disconnect the channel. + flushIfNeeded(ctx, true); + ctx.disconnect(promise); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + // Try to flush one last time if flushes are pending before close the channel. + flushIfNeeded(ctx, true); + ctx.close(promise); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + if (!ctx.channel().isWritable()) { + // The writability of the channel changed to false, so flush all consolidated flushes now to free up memory. + flushIfNeeded(ctx, false); + } + ctx.fireChannelWritabilityChanged(); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + flushIfNeeded(ctx, false); + } + + private void flushIfNeeded(ChannelHandlerContext ctx, boolean resetReadInProgress) { + if (resetReadInProgress) { + readInprogess = false; + } + if (flushPendingCount > 0) { + flushPendingCount = 0; + ctx.flush(); + } + } +} diff --git a/handler/src/main/java/io/netty/handler/flush/package-info.java b/handler/src/main/java/io/netty/handler/flush/package-info.java new file mode 100644 index 0000000000..b5c3c24b4c --- /dev/null +++ b/handler/src/main/java/io/netty/handler/flush/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Package to control flush behavior. + */ +package io.netty.handler.flush; diff --git a/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java b/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java new file mode 100644 index 0000000000..557b305e79 --- /dev/null +++ b/handler/src/test/java/io/netty/handler/flush/FlushConsolidationHandlerTest.java @@ -0,0 +1,129 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.flush; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +public class FlushConsolidationHandlerTest { + + @Test + public void testFlushViaReadComplete() { + final AtomicInteger flushCount = new AtomicInteger(); + EmbeddedChannel channel = newChannel(flushCount); + // Flush should go through as there is no read loop in progress. + channel.flush(); + assertEquals(1, flushCount.get()); + + // Simulate read loop; + channel.pipeline().fireChannelRead(1L); + assertEquals(1, flushCount.get()); + channel.pipeline().fireChannelRead(2L); + assertEquals(1, flushCount.get()); + assertNull(channel.readOutbound()); + channel.pipeline().fireChannelReadComplete(); + assertEquals(2, flushCount.get()); + // Now flush again as the read loop is complete. + channel.flush(); + assertEquals(3, flushCount.get()); + assertEquals(1L, channel.readOutbound()); + assertEquals(2L, channel.readOutbound()); + assertNull(channel.readOutbound()); + assertFalse(channel.finish()); + } + + @Test + public void testFlushViaClose() { + final AtomicInteger flushCount = new AtomicInteger(); + EmbeddedChannel channel = newChannel(flushCount); + // Simulate read loop; + channel.pipeline().fireChannelRead(1L); + assertEquals(0, flushCount.get()); + assertNull(channel.readOutbound()); + channel.close(); + assertEquals(1, flushCount.get()); + assertEquals(1L, channel.readOutbound()); + assertNull(channel.readOutbound()); + assertFalse(channel.finish()); + } + + @Test + public void testFlushViaDisconnect() { + final AtomicInteger flushCount = new AtomicInteger(); + EmbeddedChannel channel = newChannel(flushCount); + // Simulate read loop; + channel.pipeline().fireChannelRead(1L); + assertEquals(0, flushCount.get()); + assertNull(channel.readOutbound()); + channel.disconnect(); + assertEquals(1, flushCount.get()); + assertEquals(1L, channel.readOutbound()); + assertNull(channel.readOutbound()); + assertFalse(channel.finish()); + } + + @Test(expected = IllegalStateException.class) + public void testFlushViaException() { + final AtomicInteger flushCount = new AtomicInteger(); + EmbeddedChannel channel = newChannel(flushCount); + // Simulate read loop; + channel.pipeline().fireChannelRead(1L); + assertEquals(0, flushCount.get()); + assertNull(channel.readOutbound()); + channel.pipeline().fireExceptionCaught(new IllegalStateException()); + assertEquals(1, flushCount.get()); + assertEquals(1L, channel.readOutbound()); + assertNull(channel.readOutbound()); + channel.finish(); + } + + @Test + public void testFlushViaRemoval() { + final AtomicInteger flushCount = new AtomicInteger(); + EmbeddedChannel channel = newChannel(flushCount); + // Simulate read loop; + channel.pipeline().fireChannelRead(1L); + assertEquals(0, flushCount.get()); + assertNull(channel.readOutbound()); + channel.pipeline().remove(FlushConsolidationHandler.class); + assertEquals(1, flushCount.get()); + assertEquals(1L, channel.readOutbound()); + assertNull(channel.readOutbound()); + assertFalse(channel.finish()); + } + + private static EmbeddedChannel newChannel(final AtomicInteger flushCount) { + return new EmbeddedChannel(new ChannelOutboundHandlerAdapter() { + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + flushCount.incrementAndGet(); + ctx.flush(); + } + }, new FlushConsolidationHandler(), new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + ctx.writeAndFlush(msg); + } + }); + } +}