diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 20eb586b31..d9e63de16c 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit; public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { private final EpollSocketChannelConfig config; + private Runnable flushTask; /** * The future of the current connection attempt. If not null, subsequent @@ -105,16 +106,15 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So /** * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * @param buf the {@link ByteBuf} from which the bytes should be written - * @return amount the amount of written bytes */ - private int doWriteBytes(ByteBuf buf, int readable) throws Exception { + private int doWriteBytes(ByteBuf buf) throws Exception { int readerIndex = buf.readerIndex(); int localFlushedAmount; if (buf.nioBufferCount() == 1) { if (buf.hasMemoryAddress()) { localFlushedAmount = Native.writeAddress(fd, buf.memoryAddress(), readerIndex, buf.writerIndex()); } else { - ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, readable); + ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes()); localFlushedAmount = Native.write(fd, nioBuf, nioBuf.position(), nioBuf.limit()); } } else { @@ -171,11 +171,36 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); + boolean done = false; + boolean setEpollOut = false; + long writtenBytes = 0; - long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt); + for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { + long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt); + if (localWrittenBytes == 0) { + setEpollOut = true; + break; + } + expectedWrittenBytes -= localWrittenBytes; + writtenBytes += localWrittenBytes; + if (expectedWrittenBytes == 0) { + done = true; + break; + } + } + if (done) { + // Release all buffers + for (int i = msgCount; i > 0; i --) { + in.remove(); + } - if (localWrittenBytes < expectedWrittenBytes) { - setEpollOut(); + // Finish the write loop if no new messages were flushed by in.remove(). + if (in.isEmpty()) { + clearEpollOut(); + } + } else { + // Did not write all buffers completely. + // Release the fully written buffers and update the indexes of the partially written buffer. // Did not write all buffers completely. // Release the fully written buffers and update the indexes of the partially written buffer. @@ -184,24 +209,38 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So final int readerIndex = buf.readerIndex(); final int readableBytes = buf.writerIndex() - readerIndex; - if (readableBytes < localWrittenBytes) { + if (readableBytes < writtenBytes) { in.remove(); - localWrittenBytes -= readableBytes; - } else if (readableBytes > localWrittenBytes) { - - buf.readerIndex(readerIndex + (int) localWrittenBytes); - in.progress(localWrittenBytes); + writtenBytes -= readableBytes; + } else if (readableBytes > writtenBytes) { + buf.readerIndex(readerIndex + (int) writtenBytes); + in.progress(writtenBytes); break; } else { // readable == writtenBytes in.remove(); break; } } + incompleteWrite(setEpollOut); + } + } + + private void incompleteWrite(boolean setEpollOut) { + // Did not write completely. + if (setEpollOut) { + setEpollOut(); } else { - // Release all buffers - for (int i = msgCount; i > 0; i --) { - in.remove(); + // Schedule flush again later so other tasks can be picked up in the meantime + Runnable flushTask = this.flushTask; + if (flushTask == null) { + flushTask = this.flushTask = new Runnable() { + @Override + public void run() { + flush(); + } + }; } + eventLoop().execute(flushTask); } } @@ -267,18 +306,32 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So in.remove(); continue; } + boolean setEpollOut = false; + boolean done = false; + long flushedAmount = 0; + int writeSpinCount = config().getWriteSpinCount(); + for (int i = writeSpinCount - 1; i >= 0; i --) { + int localFlushedAmount = doWriteBytes(buf); + if (localFlushedAmount == 0) { + setEpollOut = true; + break; + } - int expected = buf.readableBytes(); - int localFlushedAmount = doWriteBytes(buf, expected); - in.progress(localFlushedAmount); - if (localFlushedAmount < expected) { - setEpollOut(); + flushedAmount += localFlushedAmount; + if (!buf.isReadable()) { + done = true; + break; + } + } + + in.progress(flushedAmount); + + if (done) { + in.remove(); + } else { + incompleteWrite(setEpollOut); break; } - if (!buf.isReadable()) { - in.remove(); - } - } else if (msg instanceof DefaultFileRegion) { DefaultFileRegion region = (DefaultFileRegion) msg;