[#2664] Support write-spinning also in native transport

Motivation:

In our nio implementation we use write-spinning for maximize throughput, but in the native implementation this is not used.

Modification:

Respect writeSpinCount in native transport.

Result:

Better throughput
This commit is contained in:
Norman Maurer 2014-07-17 13:07:47 +02:00
parent 401a6db84e
commit e258b48f8d

View File

@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel { public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel {
private final EpollSocketChannelConfig config; private final EpollSocketChannelConfig config;
private Runnable flushTask;
/** /**
* The future of the current connection attempt. If not null, subsequent * The future of the current connection attempt. If not null, subsequent
@ -105,16 +106,15 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
/** /**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}. * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written * @param buf the {@link ByteBuf} from which the bytes should be written
* @return amount the amount of written bytes
*/ */
private int doWriteBytes(ByteBuf buf, int readable) throws Exception { private int doWriteBytes(ByteBuf buf) throws Exception {
int readerIndex = buf.readerIndex(); int readerIndex = buf.readerIndex();
int localFlushedAmount; int localFlushedAmount;
if (buf.nioBufferCount() == 1) { if (buf.nioBufferCount() == 1) {
if (buf.hasMemoryAddress()) { if (buf.hasMemoryAddress()) {
localFlushedAmount = Native.writeAddress(fd, buf.memoryAddress(), readerIndex, buf.writerIndex()); localFlushedAmount = Native.writeAddress(fd, buf.memoryAddress(), readerIndex, buf.writerIndex());
} else { } else {
ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, readable); ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes());
localFlushedAmount = Native.write(fd, nioBuf, nioBuf.position(), nioBuf.limit()); localFlushedAmount = Native.write(fd, nioBuf, nioBuf.position(), nioBuf.limit());
} }
} else { } else {
@ -171,11 +171,36 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
int nioBufferCnt = in.nioBufferCount(); int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize(); long expectedWrittenBytes = in.nioBufferSize();
boolean done = false;
boolean setEpollOut = false;
long writtenBytes = 0;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt); long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setEpollOut = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
if (done) {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
}
if (localWrittenBytes < expectedWrittenBytes) { // Finish the write loop if no new messages were flushed by in.remove().
setEpollOut(); if (in.isEmpty()) {
clearEpollOut();
}
} else {
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
// Did not write all buffers completely. // Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer. // Release the fully written buffers and update the indexes of the partially written buffer.
@ -184,25 +209,39 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
final int readerIndex = buf.readerIndex(); final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex; final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < localWrittenBytes) { if (readableBytes < writtenBytes) {
in.remove(); in.remove();
localWrittenBytes -= readableBytes; writtenBytes -= readableBytes;
} else if (readableBytes > localWrittenBytes) { } else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
buf.readerIndex(readerIndex + (int) localWrittenBytes); in.progress(writtenBytes);
in.progress(localWrittenBytes);
break; break;
} else { // readable == writtenBytes } else { // readable == writtenBytes
in.remove(); in.remove();
break; break;
} }
} }
} else { incompleteWrite(setEpollOut);
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
} }
} }
private void incompleteWrite(boolean setEpollOut) {
// Did not write completely.
if (setEpollOut) {
setEpollOut();
} else {
// Schedule flush again later so other tasks can be picked up in the meantime
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
}
eventLoop().execute(flushTask);
}
} }
/** /**
@ -267,18 +306,32 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
in.remove(); in.remove();
continue; continue;
} }
boolean setEpollOut = false;
int expected = buf.readableBytes(); boolean done = false;
int localFlushedAmount = doWriteBytes(buf, expected); long flushedAmount = 0;
in.progress(localFlushedAmount); int writeSpinCount = config().getWriteSpinCount();
if (localFlushedAmount < expected) { for (int i = writeSpinCount - 1; i >= 0; i --) {
setEpollOut(); int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setEpollOut = true;
break; break;
} }
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) { if (!buf.isReadable()) {
in.remove(); done = true;
break;
}
} }
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
incompleteWrite(setEpollOut);
break;
}
} else if (msg instanceof DefaultFileRegion) { } else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg; DefaultFileRegion region = (DefaultFileRegion) msg;