From 4d0eeb0dccc1fad54fef213c489a3fb52eb774f8 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 6 Feb 2015 16:29:15 +0100 Subject: [PATCH] Respect ChannelConfig.getWriteSpinCount() when using epoll transport Motivation: The writeSpinCount was ignored in the epoll transport and it just kept on trying writing. This could cause unnessary cpu spinning if a slow remote peer was reading the data very very slow. Modification: - Correctly take writeSpinCount into account when writing. Result: Less cpu spinning when writing to a slow remote peer. --- .../channel/epoll/AbstractEpollChannel.java | 18 +++---- .../epoll/AbstractEpollStreamChannel.java | 53 ++++++++++--------- .../epoll/EpollDomainSocketChannel.java | 4 +- 3 files changed, 40 insertions(+), 35 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index e58cc5fe2c..4cc965c084 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -236,14 +236,14 @@ abstract class AbstractEpollChannel extends AbstractChannel { return localReadAmount; } - protected final int doWriteBytes(ByteBuf buf) throws Exception { + protected final int doWriteBytes(ByteBuf buf, int writeSpinCount) throws Exception { int readableBytes = buf.readableBytes(); int writtenBytes = 0; if (buf.hasMemoryAddress()) { long memoryAddress = buf.memoryAddress(); int readerIndex = buf.readerIndex(); int writerIndex = buf.writerIndex(); - for (;;) { + for (int i = writeSpinCount - 1; i >= 0; i--) { int localFlushedAmount = Native.writeAddress( fileDescriptor.intValue(), memoryAddress, readerIndex, writerIndex); if (localFlushedAmount > 0) { @@ -253,9 +253,7 @@ abstract class AbstractEpollChannel extends AbstractChannel { } readerIndex += localFlushedAmount; } else { - // Returned EAGAIN need to set EPOLLOUT - setFlag(Native.EPOLLOUT); - return writtenBytes; + break; } } } else { @@ -265,7 +263,7 @@ abstract class AbstractEpollChannel extends AbstractChannel { } else { nioBuf = buf.nioBuffer(); } - for (;;) { + for (int i = writeSpinCount - 1; i >= 0; i--) { int pos = nioBuf.position(); int limit = nioBuf.limit(); int localFlushedAmount = Native.write(fileDescriptor.intValue(), nioBuf, pos, limit); @@ -276,13 +274,15 @@ abstract class AbstractEpollChannel extends AbstractChannel { return writtenBytes; } } else { - // Returned EAGAIN need to set EPOLLOUT - setFlag(Native.EPOLLOUT); break; } } - return writtenBytes; } + if (writtenBytes < readableBytes) { + // Returned EAGAIN need to set EPOLLOUT + setFlag(Native.EPOLLOUT); + } + return writtenBytes; } protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 49dc5acefc..3fb0bbd25e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -70,7 +70,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * @param buf the {@link ByteBuf} from which the bytes should be written */ - private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception { + private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf, int writeSpinCount) throws Exception { int readableBytes = buf.readableBytes(); if (readableBytes == 0) { in.remove(); @@ -78,16 +78,17 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) { - int writtenBytes = doWriteBytes(buf); + int writtenBytes = doWriteBytes(buf, writeSpinCount); in.removeBytes(writtenBytes); return writtenBytes == readableBytes; } else { ByteBuffer[] nioBuffers = buf.nioBuffers(); - return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes); + return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, writeSpinCount); } } - private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException { + private boolean writeBytesMultiple( + ChannelOutboundBuffer in, IovArray array, int writeSpinCount) throws IOException { long expectedWrittenBytes = array.size(); final long initialExpectedWrittenBytes = expectedWrittenBytes; @@ -100,11 +101,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { boolean done = false; int offset = 0; int end = offset + cnt; - for (;;) { + for (int i = writeSpinCount - 1; i >= 0; i--) { long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt); if (localWrittenBytes == 0) { - // Returned EAGAIN need to set EPOLLOUT - setFlag(Native.EPOLLOUT); break; } expectedWrittenBytes -= localWrittenBytes; @@ -127,14 +126,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } } while (offset < end && localWrittenBytes > 0); } - + if (!done) { + setFlag(Native.EPOLLOUT); + } in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); return done; } private boolean writeBytesMultiple( ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, - int nioBufferCnt, long expectedWrittenBytes) throws IOException { + int nioBufferCnt, long expectedWrittenBytes, int writeSpinCount) throws IOException { assert expectedWrittenBytes != 0; final long initialExpectedWrittenBytes = expectedWrittenBytes; @@ -142,11 +143,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { boolean done = false; int offset = 0; int end = offset + nioBufferCnt; - for (;;) { + for (int i = writeSpinCount - 1; i >= 0; i--) { long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt); if (localWrittenBytes == 0) { - // Returned EAGAIN need to set EPOLLOUT - setFlag(Native.EPOLLOUT); break; } expectedWrittenBytes -= localWrittenBytes; @@ -173,6 +172,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { } in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); + if (!done) { + setFlag(Native.EPOLLOUT); + } return done; } @@ -182,7 +184,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { * @param region the {@link DefaultFileRegion} from which the bytes should be written * @return amount the amount of written bytes */ - private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { + private boolean writeFileRegion( + ChannelOutboundBuffer in, DefaultFileRegion region, int writeSpinCount) throws Exception { final long regionCount = region.count(); if (region.transfered() >= regionCount) { in.remove(); @@ -193,13 +196,11 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { boolean done = false; long flushedAmount = 0; - for (;;) { + for (int i = writeSpinCount - 1; i >= 0; i--) { final long offset = region.transfered(); final long localFlushedAmount = Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset); if (localFlushedAmount == 0) { - // Returned EAGAIN need to set EPOLLOUT - setFlag(Native.EPOLLOUT); break; } @@ -216,12 +217,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { if (done) { in.remove(); + } else { + // Returned EAGAIN need to set EPOLLOUT + setFlag(Native.EPOLLOUT); } return done; } @Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { + int writeSpinCount = config().getWriteSpinCount(); for (;;) { final int msgCount = in.size(); @@ -233,7 +238,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { // Do gathering write if the outbounf buffer entries start with more than one ByteBuf. if (msgCount > 1 && in.current() instanceof ByteBuf) { - if (!doWriteMultiple(in)) { + if (!doWriteMultiple(in, writeSpinCount)) { break; } @@ -241,26 +246,26 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { // because a user might have triggered another write and flush when we notify his or her // listeners. } else { // msgCount == 1 - if (!doWriteSingle(in)) { + if (!doWriteSingle(in, writeSpinCount)) { break; } } } } - protected boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception { + protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { // The outbound buffer contains only one message or it contains a file region. Object msg = in.current(); if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; - if (!writeBytes(in, buf)) { + if (!writeBytes(in, buf, writeSpinCount)) { // was not able to write everything so break here we will get notified later again once // the network stack can handle more writes. return false; } } else if (msg instanceof DefaultFileRegion) { DefaultFileRegion region = (DefaultFileRegion) msg; - if (!writeFileRegion(in, region)) { + if (!writeFileRegion(in, region, writeSpinCount)) { // was not able to write everything so break here we will get notified later again once // the network stack can handle more writes. return false; @@ -273,14 +278,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { return true; } - private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception { + private boolean doWriteMultiple(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { if (PlatformDependent.hasUnsafe()) { // this means we can cast to IovArray and write the IovArray directly. IovArray array = IovArrayThreadLocal.get(in); int cnt = array.count(); if (cnt >= 1) { // TODO: Handle the case where cnt == 1 specially. - if (!writeBytesMultiple(in, array)) { + if (!writeBytesMultiple(in, array, writeSpinCount)) { // was not able to write everything so break here we will get notified later again once // the network stack can handle more writes. return false; @@ -293,7 +298,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel { int cnt = in.nioBufferCount(); if (cnt >= 1) { // TODO: Handle the case where cnt == 1 specially. - if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) { + if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), writeSpinCount)) { // was not able to write everything so break here we will get notified later again once // the network stack can handle more writes. return false; diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java index e0fba2ffa0..5a507b13df 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -88,14 +88,14 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel { } @Override - protected boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception { + protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception { Object msg = in.current(); if (msg instanceof FileDescriptor && Native.sendFd(fd().intValue(), ((FileDescriptor) msg).intValue()) > 0) { // File descriptor was written, so remove it. in.remove(); return true; } - return super.doWriteSingle(in); + return super.doWriteSingle(in, writeSpinCount); } @Override