diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 7674b5dfe4..3fdbe8282a 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -35,6 +35,7 @@ import java.nio.channels.SelectionKey; * {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes. */ public abstract class AbstractNioByteChannel extends AbstractNioChannel { + private Runnable flushTask; /** * Create a new instance @@ -159,6 +160,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { in.remove(); continue; } + if (!buf.isDirect()) { ByteBufAllocator alloc = alloc(); if (alloc.isDirectBufferPooled()) { @@ -169,6 +171,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { in.current(buf); } } + + boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { @@ -177,6 +181,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { + setOpWrite = true; break; } @@ -192,12 +197,12 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (done) { in.remove(); } else { - // Did not write completely. - setOpWrite(); + incompleteWrite(setOpWrite); break; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; + boolean setOpWrite = false; boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { @@ -206,6 +211,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { for (int i = writeSpinCount - 1; i >= 0; i --) { long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount == 0) { + setOpWrite = true; break; } @@ -221,8 +227,7 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (done) { in.remove(); } else { - // Did not write completely. - setOpWrite(); + incompleteWrite(setOpWrite); break; } } else { @@ -231,6 +236,25 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { } } + protected final void incompleteWrite(boolean setOpWrite) { + // Did not write completely. + if (setOpWrite) { + setOpWrite(); + } else { + // Schedule flush again later so other tasks can be picked up in the meantime + Runnable flushTask = this.flushTask; + if (flushTask == null) { + flushTask = this.flushTask = new Runnable() { + @Override + public void run() { + flush(); + } + }; + } + eventLoop().execute(flushTask); + } + } + /** * Write a {@link FileRegion} * diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index 8cbcd9e908..665ce2945c 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -245,9 +245,11 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty final SocketChannel ch = javaChannel(); long writtenBytes = 0; boolean done = false; + boolean setOpWrite = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { + setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; @@ -293,7 +295,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty } } - setOpWrite(); + incompleteWrite(setOpWrite); break; } }