From f88cd62354464a3445c68976d6f3a535d775180a Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 28 Jul 2014 04:12:59 -0700 Subject: [PATCH] [#2692] Allows notify ChannelFutureProgressListener on complete writes Motivation: We have some inconsistency when handling writes. Sometimes we call ChannelOutboundBuffer.progress(...) also for complete writes and sometimes not. We should call it always. Modifications: Correctly call ChannelOuboundBuffer.progress(...) for complete and incomplete writes. Result: Consistent behavior --- .../io/netty/channel/epoll/EpollSocketChannel.java | 4 ++++ .../io/netty/channel/oio/AbstractOioByteChannel.java | 11 +++++++++-- .../io/netty/channel/socket/nio/NioSocketChannel.java | 4 +++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 1d2f824259..49c5179ba4 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -253,6 +253,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { + final ByteBuf buf = (ByteBuf) in.current(); + in.progress(buf.readableBytes()); in.remove(); } in.progress(writtenBytes); @@ -268,6 +270,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes < writtenBytes) { + in.progress(readableBytes); in.remove(); writtenBytes -= readableBytes; } else if (readableBytes > writtenBytes) { @@ -275,6 +278,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So in.progress(writtenBytes); break; } else { // readable == writtenBytes + in.progress(readableBytes); in.remove(); break; } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index 4215ad2771..c161505ba3 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -193,12 +193,19 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; - while (buf.isReadable()) { + int readableBytes = buf.readableBytes(); + while (readableBytes > 0) { doWriteBytes(buf); + int newReadableBytes = buf.readableBytes(); + in.progress(readableBytes - newReadableBytes); + readableBytes = newReadableBytes; } in.remove(); } else if (msg instanceof FileRegion) { - doWriteFileRegion((FileRegion) msg); + FileRegion region = (FileRegion) msg; + long transfered = region.transfered(); + doWriteFileRegion(region); + in.progress(region.transfered() - transfered); in.remove(); } else { in.remove(new UnsupportedOperationException( diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index fabcdefbb3..151b575376 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -279,7 +279,9 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { - nioIn.remove(); + final ByteBuf buf = (ByteBuf) in.current(); + in.progress(buf.readableBytes()); + in.remove(); } // Finish the write loop if no new messages were flushed by in.remove().