Faster MessageList iteration in NioSocketChannel.doWrite()
This commit is contained in:
parent
553cd034b9
commit
a52ac692a9
@ -28,6 +28,7 @@ import io.netty.channel.nio.AbstractNioByteChannel;
|
|||||||
import io.netty.channel.socket.DefaultSocketChannelConfig;
|
import io.netty.channel.socket.DefaultSocketChannelConfig;
|
||||||
import io.netty.channel.socket.ServerSocketChannel;
|
import io.netty.channel.socket.ServerSocketChannel;
|
||||||
import io.netty.channel.socket.SocketChannelConfig;
|
import io.netty.channel.socket.SocketChannelConfig;
|
||||||
|
import io.netty.util.ReferenceCounted;
|
||||||
import io.netty.util.internal.logging.InternalLogger;
|
import io.netty.util.internal.logging.InternalLogger;
|
||||||
import io.netty.util.internal.logging.InternalLoggerFactory;
|
import io.netty.util.internal.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
@ -265,21 +266,22 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected int doWrite(MessageList<Object> msgs, int index) throws Exception {
|
protected int doWrite(MessageList<Object> msgs, final int index) throws Exception {
|
||||||
int size = msgs.size();
|
final int size = msgs.size();
|
||||||
|
|
||||||
// Do non-gathering write for a single buffer case.
|
// Do non-gathering write for a single buffer case.
|
||||||
if (size <= 1 || !msgs.containsOnly(ByteBuf.class)) {
|
if (size <= 1 || !msgs.containsOnly(ByteBuf.class)) {
|
||||||
return super.doWrite(msgs, index);
|
return super.doWrite(msgs, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
MessageList<ByteBuf> bufs = msgs.cast();
|
final Object[] bufs = msgs.array();
|
||||||
|
|
||||||
ByteBuffer[] nioBuffers = getNioBufferArray();
|
ByteBuffer[] nioBuffers = getNioBufferArray();
|
||||||
int nioBufferCnt = 0;
|
int nioBufferCnt = 0;
|
||||||
long expectedWrittenBytes = 0;
|
long expectedWrittenBytes = 0;
|
||||||
for (int i = index; i < size; i++) {
|
for (int i = index; i < size; i++) {
|
||||||
ByteBuf buf = bufs.get(i);
|
ByteBuf buf = (ByteBuf) bufs[i];
|
||||||
|
|
||||||
int readerIndex = buf.readerIndex();
|
int readerIndex = buf.readerIndex();
|
||||||
int readableBytes = buf.readableBytes();
|
int readableBytes = buf.readableBytes();
|
||||||
expectedWrittenBytes += readableBytes;
|
expectedWrittenBytes += readableBytes;
|
||||||
@ -307,7 +309,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
|||||||
ByteBuf directBuf = alloc().directBuffer(readableBytes);
|
ByteBuf directBuf = alloc().directBuffer(readableBytes);
|
||||||
directBuf.writeBytes(buf, readerIndex, readableBytes);
|
directBuf.writeBytes(buf, readerIndex, readableBytes);
|
||||||
buf.release();
|
buf.release();
|
||||||
bufs.set(i, directBuf);
|
bufs[i] = directBuf;
|
||||||
if (nioBufferCnt == nioBuffers.length) {
|
if (nioBufferCnt == nioBuffers.length) {
|
||||||
nioBuffers = doubleNioBufferArray(nioBuffers, nioBufferCnt);
|
nioBuffers = doubleNioBufferArray(nioBuffers, nioBufferCnt);
|
||||||
}
|
}
|
||||||
@ -315,10 +317,11 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final SocketChannel ch = javaChannel();
|
||||||
long writtenBytes = 0;
|
long writtenBytes = 0;
|
||||||
boolean done = false;
|
boolean done = false;
|
||||||
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
|
||||||
final long localWrittenBytes = javaChannel().write(nioBuffers, 0, nioBufferCnt);
|
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
|
||||||
updateOpWrite(expectedWrittenBytes, localWrittenBytes, i == 0);
|
updateOpWrite(expectedWrittenBytes, localWrittenBytes, i == 0);
|
||||||
if (localWrittenBytes == 0) {
|
if (localWrittenBytes == 0) {
|
||||||
break;
|
break;
|
||||||
@ -334,8 +337,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
|||||||
if (done) {
|
if (done) {
|
||||||
// release buffers
|
// release buffers
|
||||||
for (int i = index; i < size; i++) {
|
for (int i = index; i < size; i++) {
|
||||||
ByteBuf buf = bufs.get(i);
|
((ReferenceCounted) bufs[i]).release();
|
||||||
buf.release();
|
|
||||||
}
|
}
|
||||||
return size - index;
|
return size - index;
|
||||||
} else {
|
} else {
|
||||||
@ -343,14 +345,16 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
|||||||
// Release the fully written buffers and update the indexes of the partially written buffer.
|
// Release the fully written buffers and update the indexes of the partially written buffer.
|
||||||
int writtenBufs = 0;
|
int writtenBufs = 0;
|
||||||
for (int i = index; i < size; i++) {
|
for (int i = index; i < size; i++) {
|
||||||
ByteBuf buf = bufs.get(i);
|
final ByteBuf buf = (ByteBuf) bufs[i];
|
||||||
int readable = buf.readableBytes();
|
final int readerIndex = buf.readerIndex();
|
||||||
if (readable < writtenBytes) {
|
final int readableBytes = buf.writerIndex() - readerIndex;
|
||||||
|
|
||||||
|
if (readableBytes < writtenBytes) {
|
||||||
writtenBufs ++;
|
writtenBufs ++;
|
||||||
buf.release();
|
buf.release();
|
||||||
writtenBytes -= readable;
|
writtenBytes -= readableBytes;
|
||||||
} else if (readable > writtenBytes) {
|
} else if (readableBytes > writtenBytes) {
|
||||||
buf.readerIndex(buf.readerIndex() + (int) writtenBytes);
|
buf.readerIndex(readerIndex + (int) writtenBytes);
|
||||||
break;
|
break;
|
||||||
} else { // readable == writtenBytes
|
} else { // readable == writtenBytes
|
||||||
writtenBufs ++;
|
writtenBufs ++;
|
||||||
|
Loading…
Reference in New Issue
Block a user