diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 6290e8d9a5..3a5f528d4f 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -54,7 +54,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha private volatile boolean registered; private ClosedChannelException closedChannelException; - private boolean inFlushNow; + private boolean inFlush0; /** Cache for the string representation of this channel */ private boolean strValActive; @@ -592,25 +592,18 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } @Override - public void flush(boolean force) { + public void flush() { outboundBuffer.addFlush(); - flush0(force); + flush0(); } - private void flush0(boolean force) { - if (inFlushNow) { + protected void flush0() { + if (inFlush0) { // Avoid re-entrance return; } - // Flush immediately only when there's no pending flush. - // If there's a pending flush operation, event loop will call flushNow() later, - // and thus there's no need to call it now. - if (!force && isFlushPending()) { - return; - } - - inFlushNow = true; + inFlush0 = true; final ChannelOutboundBuffer outboundBuffer = AbstractChannel.this.outboundBuffer; @@ -621,7 +614,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } else { outboundBuffer.fail(new ClosedChannelException()); } - inFlushNow = false; + inFlush0 = false; return; } @@ -667,7 +660,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha close(voidPromise()); } } finally { - inFlushNow = false; + inFlush0 = false; } } @@ -803,11 +796,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return 0; } - /** - * Return {@code true} if a flush to the {@link Channel} is currently pending. - */ - protected abstract boolean isFlushPending(); - final class CloseFuture extends DefaultChannelPromise { CloseFuture(AbstractChannel ch) { diff --git a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java index 3fd52a5445..14593da040 100644 --- a/transport/src/main/java/io/netty/channel/AbstractServerChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractServerChannel.java @@ -61,11 +61,6 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S throw new UnsupportedOperationException(); } - @Override - protected boolean isFlushPending() { - return false; - } - @Override protected AbstractUnsafe newUnsafe() { return new DefaultServerUnsafe(); @@ -84,7 +79,7 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S } @Override - public void flush(boolean force) { + public void flush() { // ignore } diff --git a/transport/src/main/java/io/netty/channel/Channel.java b/transport/src/main/java/io/netty/channel/Channel.java index 8a88a8d6aa..41a73d3e21 100644 --- a/transport/src/main/java/io/netty/channel/Channel.java +++ b/transport/src/main/java/io/netty/channel/Channel.java @@ -242,7 +242,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr /** * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}. */ - void flush(boolean force); + void flush(); /** * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}. diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index ac68845509..f1045f0bf8 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -1030,7 +1030,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { @Override public void flush(ChannelHandlerContext ctx) throws Exception { - unsafe.flush(false); + unsafe.flush(); } @Override diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 29adf2db77..d60839797b 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -301,11 +301,6 @@ public class EmbeddedChannel extends AbstractChannel { return new DefaultUnsafe(); } - @Override - protected boolean isFlushPending() { - return false; - } - @Override protected int doWrite(Object[] msgs, int msgsLength, int startIndex) throws Exception { for (int i = startIndex; i < msgsLength; i ++) { diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 13d690c824..63e045e44c 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -315,11 +315,6 @@ public class LocalChannel extends AbstractChannel { } } - @Override - protected boolean isFlushPending() { - return false; - } - private class LocalUnsafe extends AbstractUnsafe { @Override diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index b424ea1796..8e01f83e4f 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -145,6 +145,8 @@ public abstract class AbstractNioChannel extends AbstractChannel { * Read from underlying {@link SelectableChannel} */ void read(); + + void forceFlush(); } protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe { @@ -245,6 +247,23 @@ public abstract class AbstractNioChannel extends AbstractChannel { connectPromise = null; } } + + @Override + protected void flush0() { + // Flush immediately only when there's no pending flush. + // If there's a pending flush operation, event loop will call forceFlush() later, + // and thus there's no need to call it now. + if (isFlushPending()) { + return; + } + super.flush0(); + } + + @Override + public void forceFlush() { + // directly call super.flush0() to force a flush now + super.flush0(); + } } @Override @@ -252,8 +271,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { return loop instanceof NioEventLoop; } - @Override - protected boolean isFlushPending() { + private boolean isFlushPending() { SelectionKey selectionKey = this.selectionKey; return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0; } diff --git a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java index f04ec166fa..78dc9876e9 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java +++ b/transport/src/main/java/io/netty/channel/nio/NioEventLoop.java @@ -528,8 +528,8 @@ public final class NioEventLoop extends SingleThreadEventLoop { processSelectedKey(ch.selectionKey(), task); } - // Call flushNow which will also take care of clear the OP_WRITE once there is nothing left to write - ch.unsafe().flush(true); + // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write + ch.unsafe().forceFlush(); } private static void unregisterWritableTasks(AbstractNioChannel ch) { diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java index 9ae9554e7d..68ac919f35 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioChannel.java @@ -91,11 +91,6 @@ public abstract class AbstractOioChannel extends AbstractChannel { return loop instanceof ThreadPerChannelEventLoop; } - @Override - protected boolean isFlushPending() { - return false; - } - /** * Connect to the remote peer using the given localAddress if one is specified or {@code null} otherwise. */