From 35061a4332d806ecfecbf9e8117bc7e3433bb4a9 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 28 Jul 2014 04:12:59 -0700 Subject: [PATCH] [#2692] Allows notify ChannelFutureProgressListener on complete writes Motivation: We have some inconsistency when handling writes. Sometimes we call ChannelOutboundBuffer.progress(...) also for complete writes and sometimes not. We should call it always. Modifications: Correctly call ChannelOuboundBuffer.progress(...) for complete and incomplete writes. Result: Consistent behavior --- .../io/netty/channel/epoll/EpollSocketChannel.java | 4 ++++ .../io/netty/channel/oio/AbstractOioByteChannel.java | 11 +++++++++-- .../io/netty/channel/socket/nio/NioSocketChannel.java | 2 ++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java index 8a35e65779..6d5258599e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollSocketChannel.java @@ -188,6 +188,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { + final ByteBuf buf = (ByteBuf) in.current(); + in.progress(buf.readableBytes()); in.remove(); } in.progress(writtenBytes); @@ -203,6 +205,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So final int readableBytes = buf.writerIndex() - readerIndex; if (readableBytes < writtenBytes) { + in.progress(readableBytes); in.remove(); writtenBytes -= readableBytes; } else if (readableBytes > writtenBytes) { @@ -210,6 +213,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So in.progress(writtenBytes); break; } else { // readable == writtenBytes + in.progress(readableBytes); in.remove(); break; } diff --git a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java index 4215ad2771..c161505ba3 100644 --- a/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/oio/AbstractOioByteChannel.java @@ -193,12 +193,19 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel { } if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; - while (buf.isReadable()) { + int readableBytes = buf.readableBytes(); + while (readableBytes > 0) { doWriteBytes(buf); + int newReadableBytes = buf.readableBytes(); + in.progress(readableBytes - newReadableBytes); + readableBytes = newReadableBytes; } in.remove(); } else if (msg instanceof FileRegion) { - doWriteFileRegion((FileRegion) msg); + FileRegion region = (FileRegion) msg; + long transfered = region.transfered(); + doWriteFileRegion(region); + in.progress(region.transfered() - transfered); in.remove(); } else { in.remove(new UnsupportedOperationException( diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index d0bf55b070..8e35e4e54b 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -280,6 +280,8 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty if (done) { // Release all buffers for (int i = msgCount; i > 0; i --) { + final ByteBuf buf = (ByteBuf) in.current(); + in.progress(buf.readableBytes()); in.remove(); }