[#2664] Support write-spinning also in native transport

Motivation:

In our nio implementation we use write-spinning for maximize throughput, but in the native implementation this is not used.

Modification:

Respect writeSpinCount in native transport.

Result:

Better throughput
This commit is contained in:
Norman Maurer 2014-07-17 13:07:47 +02:00
parent ed7240b597
commit 89261fd8df

View File

@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
public final class EpollSocketChannel extends AbstractEpollChannel implements SocketChannel {
private final EpollSocketChannelConfig config;
private Runnable flushTask;
/**
* The future of the current connection attempt. If not null, subsequent
@ -105,16 +106,15 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
/**
* Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
* @param buf the {@link ByteBuf} from which the bytes should be written
* @return amount the amount of written bytes
*/
private int doWriteBytes(ByteBuf buf, int readable) throws Exception {
private int doWriteBytes(ByteBuf buf) throws Exception {
int readerIndex = buf.readerIndex();
int localFlushedAmount;
if (buf.nioBufferCount() == 1) {
if (buf.hasMemoryAddress()) {
localFlushedAmount = Native.writeAddress(fd, buf.memoryAddress(), readerIndex, buf.writerIndex());
} else {
ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, readable);
ByteBuffer nioBuf = buf.internalNioBuffer(readerIndex, buf.readableBytes());
localFlushedAmount = Native.write(fd, nioBuf, nioBuf.position(), nioBuf.limit());
}
} else {
@ -171,11 +171,36 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
boolean done = false;
boolean setEpollOut = false;
long writtenBytes = 0;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setEpollOut = true;
break;
}
expectedWrittenBytes -= localWrittenBytes;
writtenBytes += localWrittenBytes;
if (expectedWrittenBytes == 0) {
done = true;
break;
}
}
if (done) {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
}
if (localWrittenBytes < expectedWrittenBytes) {
setEpollOut();
// Finish the write loop if no new messages were flushed by in.remove().
if (in.isEmpty()) {
clearEpollOut();
}
} else {
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
@ -184,25 +209,39 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < localWrittenBytes) {
if (readableBytes < writtenBytes) {
in.remove();
localWrittenBytes -= readableBytes;
} else if (readableBytes > localWrittenBytes) {
buf.readerIndex(readerIndex + (int) localWrittenBytes);
in.progress(localWrittenBytes);
writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) {
buf.readerIndex(readerIndex + (int) writtenBytes);
in.progress(writtenBytes);
break;
} else { // readable == writtenBytes
in.remove();
break;
}
}
} else {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
incompleteWrite(setEpollOut);
}
}
private void incompleteWrite(boolean setEpollOut) {
// Did not write completely.
if (setEpollOut) {
setEpollOut();
} else {
// Schedule flush again later so other tasks can be picked up in the meantime
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
}
eventLoop().execute(flushTask);
}
}
/**
@ -267,18 +306,32 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
in.remove();
continue;
}
int expected = buf.readableBytes();
int localFlushedAmount = doWriteBytes(buf, expected);
in.progress(localFlushedAmount);
if (localFlushedAmount < expected) {
setEpollOut();
boolean setEpollOut = false;
boolean done = false;
long flushedAmount = 0;
int writeSpinCount = config().getWriteSpinCount();
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setEpollOut = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
in.remove();
done = true;
break;
}
}
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
incompleteWrite(setEpollOut);
break;
}
} else if (msg instanceof DefaultFileRegion) {
DefaultFileRegion region = (DefaultFileRegion) msg;