diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index eed2c0fb83..1a8e079760 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -632,15 +632,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements if (executor.inEventLoop()) { next.invokeWrite(msg, promise); } else { - final int size = channel.estimatorHandle().size(msg); - if (size > 0) { - ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); - // Check for null as it may be set to null if the channel is closed already - if (buffer != null) { - buffer.incrementPendingOutboundBytes(size); - } - } - executor.execute(WriteTask.newInstance(next, msg, size, false, promise)); + submitWriteTask(next, executor, msg, false, promise); } return promise; @@ -697,20 +689,25 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements next.invokeWrite(msg, promise); next.invokeFlush(); } else { - final int size = channel.estimatorHandle().size(msg); - if (size > 0) { - ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); - // Check for null as it may be set to null if the channel is closed already - if (buffer != null) { - buffer.incrementPendingOutboundBytes(size); - } - } - executor.execute(WriteTask.newInstance(next, msg, size, true, promise)); + submitWriteTask(next, executor, msg, true, promise); } return promise; } + private void submitWriteTask(DefaultChannelHandlerContext next, EventExecutor executor, + Object msg, boolean flush, ChannelPromise promise) { + final int size = channel.estimatorHandle().size(msg); + if (size > 0) { + ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); + // Check for null as it may be set to null if the channel is closed already + if (buffer != null) { + buffer.incrementPendingOutboundBytes(size); + } + } + executor.execute(WriteTask.newInstance(next, msg, size, flush, promise)); + } + @Override public ChannelFuture writeAndFlush(Object msg) { return writeAndFlush(msg, newPromise());