From 91da8e228b9d9d43fd8e98b8d72faf575f670abc Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Mon, 17 Feb 2014 05:21:10 -0800 Subject: [PATCH] Overall clean-up in EpollSocketChannel - Extract writev part from doWrite() for simplicity - Clearer comments --- .../channel/epoll/EpollSocketChannel.java | 84 +++++++++++-------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 74c9acc2f2..1eddf40ed9 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -128,6 +128,45 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So return localFlushedAmount; } + private void writeBytesMultiple( + ChannelOutboundBuffer in, int msgCount, ByteBuffer[] nioBuffers) throws IOException { + + int nioBufferCnt = in.nioBufferCount(); + long expectedWrittenBytes = in.nioBufferSize(); + + long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt); + + if (localWrittenBytes < expectedWrittenBytes) { + setEpollOut(); + + // Did not write all buffers completely. + // Release the fully written buffers and update the indexes of the partially written buffer. + for (int i = msgCount; i > 0; i --) { + final ByteBuf buf = (ByteBuf) in.current(); + final int readerIndex = buf.readerIndex(); + final int readableBytes = buf.writerIndex() - readerIndex; + + if (readableBytes < localWrittenBytes) { + in.remove(); + localWrittenBytes -= readableBytes; + } else if (readableBytes > localWrittenBytes) { + + buf.readerIndex(readerIndex + (int) localWrittenBytes); + in.progress(localWrittenBytes); + break; + } else { // readable == writtenBytes + in.remove(); + break; + } + } + } else { + // Release all buffers + for (int i = msgCount; i > 0; i --) { + in.remove(); + } + } + } + /** * Write a {@link DefaultFileRegion} * @@ -148,51 +187,24 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So clearEpollOut(); break; } - // Do non-gathering write for a single buffer case. + + // Do gathering write if: + // * the outbound buffer contains more than one messages and + // * they are all buffers rather than a file region. if (msgCount > 1) { // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); if (nioBuffers != null) { + writeBytesMultiple(in, msgCount, nioBuffers); - int nioBufferCnt = in.nioBufferCount(); - long expectedWrittenBytes = in.nioBufferSize(); - - long localWrittenBytes = Native.writev(fd, nioBuffers, 0, nioBufferCnt); - - if (localWrittenBytes < expectedWrittenBytes) { - setEpollOut(); - - // Did not write all buffers completely. - // Release the fully written buffers and update the indexes of the partially written buffer. - for (int i = msgCount; i > 0; i --) { - final ByteBuf buf = (ByteBuf) in.current(); - final int readerIndex = buf.readerIndex(); - final int readableBytes = buf.writerIndex() - readerIndex; - - if (readableBytes < localWrittenBytes) { - in.remove(); - localWrittenBytes -= readableBytes; - } else if (readableBytes > localWrittenBytes) { - - buf.readerIndex(readerIndex + (int) localWrittenBytes); - in.progress(localWrittenBytes); - break; - } else { // readable == writtenBytes - in.remove(); - break; - } - } - } else { - // Release all buffers - for (int i = msgCount; i > 0; i --) { - in.remove(); - } - } - // try again as a ChannelFuture may be notified in the meantime and triggered another flush + // We do not break the loop here even if the outbound buffer was flushed completely, + // because a user might have triggered another write and flush when we notify his or her + // listeners. continue; } } + // The outbound buffer contains only one message or it contains a file region. Object msg = in.current(); if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg;