Add FlushConsolidationHandler which consolidates flush operations as these are expensive
Motivation: Calling flush() and writeAndFlush(...) are expensive operations in the sense as both will produce a write(...) or writev(...) system call if there are any pending writes in the ChannelOutboundBuffer. Often we can consolidate multiple flush operations into one if currently a read loop is active for a Channel, as we can just flush when channelReadComplete is triggered. Consolidating flushes can give a huge performance win depending on how often is flush is called. The only "downside" may be a bit higher latency in the case of where only one flush is triggered by the user. Modifications: Add a FlushConsolidationHandler which will consolidate flushes and so improve the throughput. Result: Better performance (throughput). This is especially true for protocols that use some sort of PIPELINING.
This commit is contained in:
parent
c393374cf5
commit
e3c8a92499
@ -0,0 +1,140 @@
|
||||
/*
|
||||
* Copyright 2016 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.handler.flush;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelHandler;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelOutboundHandler;
|
||||
import io.netty.channel.ChannelOutboundInvoker;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.util.internal.ObjectUtil;
|
||||
|
||||
/**
|
||||
* {@link ChannelDuplexHandler} which consolidate {@link ChannelOutboundInvoker#flush()} operations (which also includes
|
||||
* {@link ChannelOutboundInvoker#writeAndFlush(Object)} and
|
||||
* {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}).
|
||||
* <p>
|
||||
* Flush operations are general speaking expensive as these may trigger a syscall on the transport level. Thus it is
|
||||
* in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
|
||||
* as much as possible.
|
||||
* <p>
|
||||
* When {@link #flush(ChannelHandlerContext)} is called it will only pass it on to the next
|
||||
* {@link ChannelOutboundHandler} in the {@link ChannelPipeline} if no read loop is currently ongoing
|
||||
* as it will pick up any pending flushes when {@link #channelReadComplete(ChannelHandlerContext)} is trigged.
|
||||
* If {@code explicitFlushAfterFlushes} is reached the flush will also be forwarded as well.
|
||||
* <p>
|
||||
* If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
|
||||
* <p>
|
||||
* The {@link FlushConsolidationHandler} should be put as first {@link ChannelHandler} in the
|
||||
* {@link ChannelPipeline} to have the best effect.
|
||||
*/
|
||||
public class FlushConsolidationHandler extends ChannelDuplexHandler {
|
||||
private final int explicitFlushAfterFlushes;
|
||||
private int flushPendingCount;
|
||||
private boolean readInprogess;
|
||||
|
||||
/**
|
||||
* Create new instance which explicit flush after 256 pending flush operations latest.
|
||||
*/
|
||||
public FlushConsolidationHandler() {
|
||||
this(256);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create new instance.
|
||||
*
|
||||
* @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
|
||||
*/
|
||||
public FlushConsolidationHandler(int explicitFlushAfterFlushes) {
|
||||
this.explicitFlushAfterFlushes = ObjectUtil.checkPositive(explicitFlushAfterFlushes,
|
||||
"explicitFlushAfterFlushes");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx) throws Exception {
|
||||
if (readInprogess) {
|
||||
// If there is still a read in compress we are sure we will see a channelReadComplete(...) call. Thus
|
||||
// we only need to flush if we reach the explicitFlushAfterFlushes limit.
|
||||
if (++flushPendingCount == explicitFlushAfterFlushes) {
|
||||
flushPendingCount = 0;
|
||||
ctx.flush();
|
||||
}
|
||||
return;
|
||||
}
|
||||
ctx.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
// This may be the last event in the read loop, so flush now!
|
||||
flushIfNeeded(ctx, true);
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
readInprogess = true;
|
||||
ctx.fireChannelRead(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
// To ensure we not miss to flush anything, do it now.
|
||||
flushIfNeeded(ctx, true);
|
||||
ctx.fireExceptionCaught(cause);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||
// Try to flush one last time if flushes are pending before disconnect the channel.
|
||||
flushIfNeeded(ctx, true);
|
||||
ctx.disconnect(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
|
||||
// Try to flush one last time if flushes are pending before close the channel.
|
||||
flushIfNeeded(ctx, true);
|
||||
ctx.close(promise);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
|
||||
if (!ctx.channel().isWritable()) {
|
||||
// The writability of the channel changed to false, so flush all consolidated flushes now to free up memory.
|
||||
flushIfNeeded(ctx, false);
|
||||
}
|
||||
ctx.fireChannelWritabilityChanged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
flushIfNeeded(ctx, false);
|
||||
}
|
||||
|
||||
private void flushIfNeeded(ChannelHandlerContext ctx, boolean resetReadInProgress) {
|
||||
if (resetReadInProgress) {
|
||||
readInprogess = false;
|
||||
}
|
||||
if (flushPendingCount > 0) {
|
||||
flushPendingCount = 0;
|
||||
ctx.flush();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
/*
|
||||
* Copyright 2016 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Package to control flush behavior.
|
||||
*/
|
||||
package io.netty.handler.flush;
|
@ -0,0 +1,129 @@
|
||||
/*
|
||||
* Copyright 2016 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.netty.handler.flush;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||
import io.netty.channel.embedded.EmbeddedChannel;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
public class FlushConsolidationHandlerTest {
|
||||
|
||||
@Test
|
||||
public void testFlushViaReadComplete() {
|
||||
final AtomicInteger flushCount = new AtomicInteger();
|
||||
EmbeddedChannel channel = newChannel(flushCount);
|
||||
// Flush should go through as there is no read loop in progress.
|
||||
channel.flush();
|
||||
assertEquals(1, flushCount.get());
|
||||
|
||||
// Simulate read loop;
|
||||
channel.pipeline().fireChannelRead(1L);
|
||||
assertEquals(1, flushCount.get());
|
||||
channel.pipeline().fireChannelRead(2L);
|
||||
assertEquals(1, flushCount.get());
|
||||
assertNull(channel.readOutbound());
|
||||
channel.pipeline().fireChannelReadComplete();
|
||||
assertEquals(2, flushCount.get());
|
||||
// Now flush again as the read loop is complete.
|
||||
channel.flush();
|
||||
assertEquals(3, flushCount.get());
|
||||
assertEquals(1L, channel.readOutbound());
|
||||
assertEquals(2L, channel.readOutbound());
|
||||
assertNull(channel.readOutbound());
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushViaClose() {
|
||||
final AtomicInteger flushCount = new AtomicInteger();
|
||||
EmbeddedChannel channel = newChannel(flushCount);
|
||||
// Simulate read loop;
|
||||
channel.pipeline().fireChannelRead(1L);
|
||||
assertEquals(0, flushCount.get());
|
||||
assertNull(channel.readOutbound());
|
||||
channel.close();
|
||||
assertEquals(1, flushCount.get());
|
||||
assertEquals(1L, channel.readOutbound());
|
||||
assertNull(channel.readOutbound());
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushViaDisconnect() {
|
||||
final AtomicInteger flushCount = new AtomicInteger();
|
||||
EmbeddedChannel channel = newChannel(flushCount);
|
||||
// Simulate read loop;
|
||||
channel.pipeline().fireChannelRead(1L);
|
||||
assertEquals(0, flushCount.get());
|
||||
assertNull(channel.readOutbound());
|
||||
channel.disconnect();
|
||||
assertEquals(1, flushCount.get());
|
||||
assertEquals(1L, channel.readOutbound());
|
||||
assertNull(channel.readOutbound());
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalStateException.class)
|
||||
public void testFlushViaException() {
|
||||
final AtomicInteger flushCount = new AtomicInteger();
|
||||
EmbeddedChannel channel = newChannel(flushCount);
|
||||
// Simulate read loop;
|
||||
channel.pipeline().fireChannelRead(1L);
|
||||
assertEquals(0, flushCount.get());
|
||||
assertNull(channel.readOutbound());
|
||||
channel.pipeline().fireExceptionCaught(new IllegalStateException());
|
||||
assertEquals(1, flushCount.get());
|
||||
assertEquals(1L, channel.readOutbound());
|
||||
assertNull(channel.readOutbound());
|
||||
channel.finish();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFlushViaRemoval() {
|
||||
final AtomicInteger flushCount = new AtomicInteger();
|
||||
EmbeddedChannel channel = newChannel(flushCount);
|
||||
// Simulate read loop;
|
||||
channel.pipeline().fireChannelRead(1L);
|
||||
assertEquals(0, flushCount.get());
|
||||
assertNull(channel.readOutbound());
|
||||
channel.pipeline().remove(FlushConsolidationHandler.class);
|
||||
assertEquals(1, flushCount.get());
|
||||
assertEquals(1L, channel.readOutbound());
|
||||
assertNull(channel.readOutbound());
|
||||
assertFalse(channel.finish());
|
||||
}
|
||||
|
||||
private static EmbeddedChannel newChannel(final AtomicInteger flushCount) {
|
||||
return new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
|
||||
@Override
|
||||
public void flush(ChannelHandlerContext ctx) throws Exception {
|
||||
flushCount.incrementAndGet();
|
||||
ctx.flush();
|
||||
}
|
||||
}, new FlushConsolidationHandler(), new ChannelInboundHandlerAdapter() {
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
ctx.writeAndFlush(msg);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user