Respect ChannelConfig.getWriteSpinCount() when using epoll transport

Motivation:

The writeSpinCount was ignored in the epoll transport and it just kept on trying writing. This could cause unnessary cpu spinning if a slow remote peer was reading the data very very slow.

Modification:

- Correctly take writeSpinCount into account when writing.

Result:

Less cpu spinning when writing to a slow remote peer.
This commit is contained in:
Norman Maurer 2015-02-06 16:29:15 +01:00
parent 10a5635acb
commit 4d0eeb0dcc
3 changed files with 40 additions and 35 deletions

View File

@ -236,14 +236,14 @@ abstract class AbstractEpollChannel extends AbstractChannel {
return localReadAmount; return localReadAmount;
} }
protected final int doWriteBytes(ByteBuf buf) throws Exception { protected final int doWriteBytes(ByteBuf buf, int writeSpinCount) throws Exception {
int readableBytes = buf.readableBytes(); int readableBytes = buf.readableBytes();
int writtenBytes = 0; int writtenBytes = 0;
if (buf.hasMemoryAddress()) { if (buf.hasMemoryAddress()) {
long memoryAddress = buf.memoryAddress(); long memoryAddress = buf.memoryAddress();
int readerIndex = buf.readerIndex(); int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex(); int writerIndex = buf.writerIndex();
for (;;) { for (int i = writeSpinCount - 1; i >= 0; i--) {
int localFlushedAmount = Native.writeAddress( int localFlushedAmount = Native.writeAddress(
fileDescriptor.intValue(), memoryAddress, readerIndex, writerIndex); fileDescriptor.intValue(), memoryAddress, readerIndex, writerIndex);
if (localFlushedAmount > 0) { if (localFlushedAmount > 0) {
@ -253,9 +253,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
} }
readerIndex += localFlushedAmount; readerIndex += localFlushedAmount;
} else { } else {
// Returned EAGAIN need to set EPOLLOUT break;
setFlag(Native.EPOLLOUT);
return writtenBytes;
} }
} }
} else { } else {
@ -265,7 +263,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
} else { } else {
nioBuf = buf.nioBuffer(); nioBuf = buf.nioBuffer();
} }
for (;;) { for (int i = writeSpinCount - 1; i >= 0; i--) {
int pos = nioBuf.position(); int pos = nioBuf.position();
int limit = nioBuf.limit(); int limit = nioBuf.limit();
int localFlushedAmount = Native.write(fileDescriptor.intValue(), nioBuf, pos, limit); int localFlushedAmount = Native.write(fileDescriptor.intValue(), nioBuf, pos, limit);
@ -276,13 +274,15 @@ abstract class AbstractEpollChannel extends AbstractChannel {
return writtenBytes; return writtenBytes;
} }
} else { } else {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
break; break;
} }
} }
return writtenBytes;
} }
if (writtenBytes < readableBytes) {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
}
return writtenBytes;
} }
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe { protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {

View File

@ -70,7 +70,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written * @param buf the {@link ByteBuf} from which the bytes should be written
*/ */
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception { private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf, int writeSpinCount) throws Exception {
int readableBytes = buf.readableBytes(); int readableBytes = buf.readableBytes();
if (readableBytes == 0) { if (readableBytes == 0) {
in.remove(); in.remove();
@ -78,16 +78,17 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
} }
if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) { if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
int writtenBytes = doWriteBytes(buf); int writtenBytes = doWriteBytes(buf, writeSpinCount);
in.removeBytes(writtenBytes); in.removeBytes(writtenBytes);
return writtenBytes == readableBytes; return writtenBytes == readableBytes;
} else { } else {
ByteBuffer[] nioBuffers = buf.nioBuffers(); ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes); return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, writeSpinCount);
} }
} }
private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException { private boolean writeBytesMultiple(
ChannelOutboundBuffer in, IovArray array, int writeSpinCount) throws IOException {
long expectedWrittenBytes = array.size(); long expectedWrittenBytes = array.size();
final long initialExpectedWrittenBytes = expectedWrittenBytes; final long initialExpectedWrittenBytes = expectedWrittenBytes;
@ -100,11 +101,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
boolean done = false; boolean done = false;
int offset = 0; int offset = 0;
int end = offset + cnt; int end = offset + cnt;
for (;;) { for (int i = writeSpinCount - 1; i >= 0; i--) {
long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt); long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt);
if (localWrittenBytes == 0) { if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
break; break;
} }
expectedWrittenBytes -= localWrittenBytes; expectedWrittenBytes -= localWrittenBytes;
@ -127,14 +126,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
} }
} while (offset < end && localWrittenBytes > 0); } while (offset < end && localWrittenBytes > 0);
} }
if (!done) {
setFlag(Native.EPOLLOUT);
}
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
return done; return done;
} }
private boolean writeBytesMultiple( private boolean writeBytesMultiple(
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
int nioBufferCnt, long expectedWrittenBytes) throws IOException { int nioBufferCnt, long expectedWrittenBytes, int writeSpinCount) throws IOException {
assert expectedWrittenBytes != 0; assert expectedWrittenBytes != 0;
final long initialExpectedWrittenBytes = expectedWrittenBytes; final long initialExpectedWrittenBytes = expectedWrittenBytes;
@ -142,11 +143,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
boolean done = false; boolean done = false;
int offset = 0; int offset = 0;
int end = offset + nioBufferCnt; int end = offset + nioBufferCnt;
for (;;) { for (int i = writeSpinCount - 1; i >= 0; i--) {
long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt); long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt);
if (localWrittenBytes == 0) { if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
break; break;
} }
expectedWrittenBytes -= localWrittenBytes; expectedWrittenBytes -= localWrittenBytes;
@ -173,6 +172,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
} }
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes); in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
if (!done) {
setFlag(Native.EPOLLOUT);
}
return done; return done;
} }
@ -182,7 +184,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
* @param region the {@link DefaultFileRegion} from which the bytes should be written * @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return amount the amount of written bytes * @return amount the amount of written bytes
*/ */
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception { private boolean writeFileRegion(
ChannelOutboundBuffer in, DefaultFileRegion region, int writeSpinCount) throws Exception {
final long regionCount = region.count(); final long regionCount = region.count();
if (region.transfered() >= regionCount) { if (region.transfered() >= regionCount) {
in.remove(); in.remove();
@ -193,13 +196,11 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
boolean done = false; boolean done = false;
long flushedAmount = 0; long flushedAmount = 0;
for (;;) { for (int i = writeSpinCount - 1; i >= 0; i--) {
final long offset = region.transfered(); final long offset = region.transfered();
final long localFlushedAmount = final long localFlushedAmount =
Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset); Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
if (localFlushedAmount == 0) { if (localFlushedAmount == 0) {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
break; break;
} }
@ -216,12 +217,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
if (done) { if (done) {
in.remove(); in.remove();
} else {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
} }
return done; return done;
} }
@Override @Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception { protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
for (;;) { for (;;) {
final int msgCount = in.size(); final int msgCount = in.size();
@ -233,7 +238,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf. // Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) { if (msgCount > 1 && in.current() instanceof ByteBuf) {
if (!doWriteMultiple(in)) { if (!doWriteMultiple(in, writeSpinCount)) {
break; break;
} }
@ -241,26 +246,26 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
// because a user might have triggered another write and flush when we notify his or her // because a user might have triggered another write and flush when we notify his or her
// listeners. // listeners.
} else { // msgCount == 1 } else { // msgCount == 1
if (!doWriteSingle(in)) { if (!doWriteSingle(in, writeSpinCount)) {
break; break;
} }
} }
} }
} }
protected boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception { protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
// The outbound buffer contains only one message or it contains a file region. // The outbound buffer contains only one message or it contains a file region.
Object msg = in.current(); Object msg = in.current();
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
if (!writeBytes(in, buf)) { if (!writeBytes(in, buf, writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once // was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes. // the network stack can handle more writes.
return false; return false;
} }
} else if (msg instanceof DefaultFileRegion) { } else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg; DefaultFileRegion region = (DefaultFileRegion) msg;
if (!writeFileRegion(in, region)) { if (!writeFileRegion(in, region, writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once // was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes. // the network stack can handle more writes.
return false; return false;
@ -273,14 +278,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
return true; return true;
} }
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception { private boolean doWriteMultiple(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
if (PlatformDependent.hasUnsafe()) { if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly. // this means we can cast to IovArray and write the IovArray directly.
IovArray array = IovArrayThreadLocal.get(in); IovArray array = IovArrayThreadLocal.get(in);
int cnt = array.count(); int cnt = array.count();
if (cnt >= 1) { if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially. // TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, array)) { if (!writeBytesMultiple(in, array, writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once // was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes. // the network stack can handle more writes.
return false; return false;
@ -293,7 +298,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
int cnt = in.nioBufferCount(); int cnt = in.nioBufferCount();
if (cnt >= 1) { if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially. // TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) { if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once // was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes. // the network stack can handle more writes.
return false; return false;

View File

@ -88,14 +88,14 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
} }
@Override @Override
protected boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception { protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
Object msg = in.current(); Object msg = in.current();
if (msg instanceof FileDescriptor && Native.sendFd(fd().intValue(), ((FileDescriptor) msg).intValue()) > 0) { if (msg instanceof FileDescriptor && Native.sendFd(fd().intValue(), ((FileDescriptor) msg).intValue()) > 0) {
// File descriptor was written, so remove it. // File descriptor was written, so remove it.
in.remove(); in.remove();
return true; return true;
} }
return super.doWriteSingle(in); return super.doWriteSingle(in, writeSpinCount);
} }
@Override @Override