Respect ChannelConfig.getWriteSpinCount() when using epoll transport

Motivation:

The writeSpinCount was ignored in the epoll transport and it just kept on trying writing. This could cause unnessary cpu spinning if a slow remote peer was reading the data very very slow.

Modification:

- Correctly take writeSpinCount into account when writing.

Result:

Less cpu spinning when writing to a slow remote peer.
This commit is contained in:
Norman Maurer 2015-02-06 16:29:15 +01:00
parent c62e6b676f
commit 111781f38f
3 changed files with 40 additions and 35 deletions

View File

@ -235,14 +235,14 @@ abstract class AbstractEpollChannel extends AbstractChannel {
return localReadAmount;
}
protected final int doWriteBytes(ByteBuf buf) throws Exception {
protected final int doWriteBytes(ByteBuf buf, int writeSpinCount) throws Exception {
int readableBytes = buf.readableBytes();
int writtenBytes = 0;
if (buf.hasMemoryAddress()) {
long memoryAddress = buf.memoryAddress();
int readerIndex = buf.readerIndex();
int writerIndex = buf.writerIndex();
for (;;) {
for (int i = writeSpinCount - 1; i >= 0; i--) {
int localFlushedAmount = Native.writeAddress(
fileDescriptor.intValue(), memoryAddress, readerIndex, writerIndex);
if (localFlushedAmount > 0) {
@ -252,9 +252,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
}
readerIndex += localFlushedAmount;
} else {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
return writtenBytes;
break;
}
}
} else {
@ -264,7 +262,7 @@ abstract class AbstractEpollChannel extends AbstractChannel {
} else {
nioBuf = buf.nioBuffer();
}
for (;;) {
for (int i = writeSpinCount - 1; i >= 0; i--) {
int pos = nioBuf.position();
int limit = nioBuf.limit();
int localFlushedAmount = Native.write(fileDescriptor.intValue(), nioBuf, pos, limit);
@ -275,13 +273,15 @@ abstract class AbstractEpollChannel extends AbstractChannel {
return writtenBytes;
}
} else {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
break;
}
}
return writtenBytes;
}
if (writtenBytes < readableBytes) {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
}
return writtenBytes;
}
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {

View File

@ -70,7 +70,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written
*/
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
private boolean writeBytes(ChannelOutboundBuffer in, ByteBuf buf, int writeSpinCount) throws Exception {
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
@ -78,16 +78,17 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
}
if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
int writtenBytes = doWriteBytes(buf);
int writtenBytes = doWriteBytes(buf, writeSpinCount);
in.removeBytes(writtenBytes);
return writtenBytes == readableBytes;
} else {
ByteBuffer[] nioBuffers = buf.nioBuffers();
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes);
return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes, writeSpinCount);
}
}
private boolean writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
private boolean writeBytesMultiple(
ChannelOutboundBuffer in, IovArray array, int writeSpinCount) throws IOException {
long expectedWrittenBytes = array.size();
final long initialExpectedWrittenBytes = expectedWrittenBytes;
@ -100,11 +101,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
boolean done = false;
int offset = 0;
int end = offset + cnt;
for (;;) {
for (int i = writeSpinCount - 1; i >= 0; i--) {
long localWrittenBytes = Native.writevAddresses(fd().intValue(), array.memoryAddress(offset), cnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
break;
}
expectedWrittenBytes -= localWrittenBytes;
@ -127,14 +126,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
}
} while (offset < end && localWrittenBytes > 0);
}
if (!done) {
setFlag(Native.EPOLLOUT);
}
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
return done;
}
private boolean writeBytesMultiple(
ChannelOutboundBuffer in, ByteBuffer[] nioBuffers,
int nioBufferCnt, long expectedWrittenBytes) throws IOException {
int nioBufferCnt, long expectedWrittenBytes, int writeSpinCount) throws IOException {
assert expectedWrittenBytes != 0;
final long initialExpectedWrittenBytes = expectedWrittenBytes;
@ -142,11 +143,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
boolean done = false;
int offset = 0;
int end = offset + nioBufferCnt;
for (;;) {
for (int i = writeSpinCount - 1; i >= 0; i--) {
long localWrittenBytes = Native.writev(fd().intValue(), nioBuffers, offset, nioBufferCnt);
if (localWrittenBytes == 0) {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
break;
}
expectedWrittenBytes -= localWrittenBytes;
@ -173,6 +172,9 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
}
in.removeBytes(initialExpectedWrittenBytes - expectedWrittenBytes);
if (!done) {
setFlag(Native.EPOLLOUT);
}
return done;
}
@ -182,7 +184,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
* @param region the {@link DefaultFileRegion} from which the bytes should be written
* @return amount the amount of written bytes
*/
private boolean writeFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
private boolean writeFileRegion(
ChannelOutboundBuffer in, DefaultFileRegion region, int writeSpinCount) throws Exception {
final long regionCount = region.count();
if (region.transfered() >= regionCount) {
in.remove();
@ -193,13 +196,11 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
boolean done = false;
long flushedAmount = 0;
for (;;) {
for (int i = writeSpinCount - 1; i >= 0; i--) {
final long offset = region.transfered();
final long localFlushedAmount =
Native.sendfile(fd().intValue(), region, baseOffset, offset, regionCount - offset);
if (localFlushedAmount == 0) {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
break;
}
@ -216,12 +217,16 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
if (done) {
in.remove();
} else {
// Returned EAGAIN need to set EPOLLOUT
setFlag(Native.EPOLLOUT);
}
return done;
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
for (;;) {
final int msgCount = in.size();
@ -233,7 +238,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
// Do gathering write if the outbounf buffer entries start with more than one ByteBuf.
if (msgCount > 1 && in.current() instanceof ByteBuf) {
if (!doWriteMultiple(in)) {
if (!doWriteMultiple(in, writeSpinCount)) {
break;
}
@ -241,26 +246,26 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
// because a user might have triggered another write and flush when we notify his or her
// listeners.
} else { // msgCount == 1
if (!doWriteSingle(in)) {
if (!doWriteSingle(in, writeSpinCount)) {
break;
}
}
}
}
protected boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!writeBytes(in, buf)) {
if (!writeBytes(in, buf, writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
}
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg;
if (!writeFileRegion(in, region)) {
if (!writeFileRegion(in, region, writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
@ -273,14 +278,14 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
return true;
}
private boolean doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
private boolean doWriteMultiple(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
if (PlatformDependent.hasUnsafe()) {
// this means we can cast to IovArray and write the IovArray directly.
IovArray array = IovArrayThreadLocal.get(in);
int cnt = array.count();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, array)) {
if (!writeBytesMultiple(in, array, writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;
@ -293,7 +298,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel {
int cnt = in.nioBufferCount();
if (cnt >= 1) {
// TODO: Handle the case where cnt == 1 specially.
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize())) {
if (!writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), writeSpinCount)) {
// was not able to write everything so break here we will get notified later again once
// the network stack can handle more writes.
return false;

View File

@ -88,14 +88,14 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel {
}
@Override
protected boolean doWriteSingle(ChannelOutboundBuffer in) throws Exception {
protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) throws Exception {
Object msg = in.current();
if (msg instanceof FileDescriptor && Native.sendFd(fd().intValue(), ((FileDescriptor) msg).intValue()) > 0) {
// File descriptor was written, so remove it.
in.remove();
return true;
}
return super.doWriteSingle(in);
return super.doWriteSingle(in, writeSpinCount);
}
@Override