[#2692] Allows notify ChannelFutureProgressListener on complete writes

Motivation:

We have some inconsistency when handling writes. Sometimes we call ChannelOutboundBuffer.progress(...) also for complete writes and sometimes not. We should call it always.

Modifications:

Correctly call ChannelOuboundBuffer.progress(...) for complete and incomplete writes.

Result:

Consistent behavior
This commit is contained in:
Norman Maurer 2014-07-28 04:12:59 -07:00
parent 30295138fe
commit 2131edadf2
3 changed files with 16 additions and 3 deletions

View File

@ -253,6 +253,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
if (done) { if (done) {
// Release all buffers // Release all buffers
for (int i = msgCount; i > 0; i --) { for (int i = msgCount; i > 0; i --) {
final ByteBuf buf = (ByteBuf) in.current();
in.progress(buf.readableBytes());
in.remove(); in.remove();
} }
in.progress(writtenBytes); in.progress(writtenBytes);
@ -268,6 +270,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
final int readableBytes = buf.writerIndex() - readerIndex; final int readableBytes = buf.writerIndex() - readerIndex;
if (readableBytes < writtenBytes) { if (readableBytes < writtenBytes) {
in.progress(readableBytes);
in.remove(); in.remove();
writtenBytes -= readableBytes; writtenBytes -= readableBytes;
} else if (readableBytes > writtenBytes) { } else if (readableBytes > writtenBytes) {
@ -275,6 +278,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
in.progress(writtenBytes); in.progress(writtenBytes);
break; break;
} else { // readable == writtenBytes } else { // readable == writtenBytes
in.progress(readableBytes);
in.remove(); in.remove();
break; break;
} }

View File

@ -193,12 +193,19 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
} }
if (msg instanceof ByteBuf) { if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
while (buf.isReadable()) { int readableBytes = buf.readableBytes();
while (readableBytes > 0) {
doWriteBytes(buf); doWriteBytes(buf);
int newReadableBytes = buf.readableBytes();
in.progress(readableBytes - newReadableBytes);
readableBytes = newReadableBytes;
} }
in.remove(); in.remove();
} else if (msg instanceof FileRegion) { } else if (msg instanceof FileRegion) {
doWriteFileRegion((FileRegion) msg); FileRegion region = (FileRegion) msg;
long transfered = region.transfered();
doWriteFileRegion(region);
in.progress(region.transfered() - transfered);
in.remove(); in.remove();
} else { } else {
in.remove(new UnsupportedOperationException( in.remove(new UnsupportedOperationException(

View File

@ -278,7 +278,9 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
if (done) { if (done) {
// Release all buffers // Release all buffers
for (int i = msgCount; i > 0; i --) { for (int i = msgCount; i > 0; i --) {
nioIn.remove(); final ByteBuf buf = (ByteBuf) in.current();
in.progress(buf.readableBytes());
in.remove();
} }
// Finish the write loop if no new messages were flushed by in.remove(). // Finish the write loop if no new messages were flushed by in.remove().