Overall clean-up in EpollSocketChannel

- Extract writev part from doWrite() for simplicity
- Clearer comments
This commit is contained in:
Trustin Lee 2014-02-17 05:21:10 -08:00
parent 8f3c09ba6b
commit 91da8e228b

View File

@ -128,6 +128,45 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
return localFlushedAmount;
}
private void writeBytesMultiple(
ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException {
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes < expectedWrittenBytes) {
setEpollOut();
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < localWrittenBytes) {
in.remove();
localWrittenBytes -= readableBytes;
} else if (readableBytes > localWrittenBytes) {
buf.readerIndex(readerIndex + (int) localWrittenBytes);
in.progress(localWrittenBytes);
break;
} else { // readable == writtenBytes
in.remove();
break;
}
}
} else {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
}
}
}
/**
* Write a {@link DefaultFileRegion}
*
@ -148,51 +187,24 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
clearEpollOut();
break;
}
// Do non-gathering write for a single buffer case.
// Do gathering write if:
// * the outbound buffer contains more than one messages and
// * they are all buffers rather than a file region.
if (msgCount > 1) {
// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
if (nioBuffers != null) {
writeBytesMultiple(in, msgCount, nioBuffers);
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes < expectedWrittenBytes) {
setEpollOut();
// Did not write all buffers completely.
// Release the fully written buffers and update the indexes of the partially written buffer.
for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < localWrittenBytes) {
in.remove();
localWrittenBytes -= readableBytes;
} else if (readableBytes > localWrittenBytes) {
buf.readerIndex(readerIndex + (int) localWrittenBytes);
in.progress(localWrittenBytes);
break;
} else { // readable == writtenBytes
in.remove();
break;
}
}
} else {
// Release all buffers
for (int i = msgCount; i > 0; i --) {
in.remove();
}
}
// try again as a ChannelFuture may be notified in the meantime and triggered another flush
// We do not break the loop here even if the outbound buffer was flushed completely,
// because a user might have triggered another write and flush when we notify his or her
// listeners.
continue;
}
}
// The outbound buffer contains only one message or it contains a file region.
Object msg = in.current();
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;