[#2692] Allows notify ChannelFutureProgressListener on complete writes
Motivation: We have some inconsistency when handling writes. Sometimes we call ChannelOutboundBuffer.progress(...) also for complete writes and sometimes not. We should call it always. Modifications: Correctly call ChannelOuboundBuffer.progress(...) for complete and incomplete writes. Result: Consistent behavior
This commit is contained in:
parent
d989b24351
commit
35061a4332
@ -188,6 +188,8 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
if (done) {
|
if (done) {
|
||||||
// Release all buffers
|
// Release all buffers
|
||||||
for (int i = msgCount; i > 0; i --) {
|
for (int i = msgCount; i > 0; i --) {
|
||||||
|
final ByteBuf buf = (ByteBuf) in.current();
|
||||||
|
in.progress(buf.readableBytes());
|
||||||
in.remove();
|
in.remove();
|
||||||
}
|
}
|
||||||
in.progress(writtenBytes);
|
in.progress(writtenBytes);
|
||||||
@ -203,6 +205,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
final int readableBytes = buf.writerIndex() - readerIndex;
|
final int readableBytes = buf.writerIndex() - readerIndex;
|
||||||
|
|
||||||
if (readableBytes < writtenBytes) {
|
if (readableBytes < writtenBytes) {
|
||||||
|
in.progress(readableBytes);
|
||||||
in.remove();
|
in.remove();
|
||||||
writtenBytes -= readableBytes;
|
writtenBytes -= readableBytes;
|
||||||
} else if (readableBytes > writtenBytes) {
|
} else if (readableBytes > writtenBytes) {
|
||||||
@ -210,6 +213,7 @@ public final class EpollSocketChannel extends AbstractEpollChannel implements So
|
|||||||
in.progress(writtenBytes);
|
in.progress(writtenBytes);
|
||||||
break;
|
break;
|
||||||
} else { // readable == writtenBytes
|
} else { // readable == writtenBytes
|
||||||
|
in.progress(readableBytes);
|
||||||
in.remove();
|
in.remove();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -193,12 +193,19 @@ public abstract class AbstractOioByteChannel extends AbstractOioChannel {
|
|||||||
}
|
}
|
||||||
if (msg instanceof ByteBuf) {
|
if (msg instanceof ByteBuf) {
|
||||||
ByteBuf buf = (ByteBuf) msg;
|
ByteBuf buf = (ByteBuf) msg;
|
||||||
while (buf.isReadable()) {
|
int readableBytes = buf.readableBytes();
|
||||||
|
while (readableBytes > 0) {
|
||||||
doWriteBytes(buf);
|
doWriteBytes(buf);
|
||||||
|
int newReadableBytes = buf.readableBytes();
|
||||||
|
in.progress(readableBytes - newReadableBytes);
|
||||||
|
readableBytes = newReadableBytes;
|
||||||
}
|
}
|
||||||
in.remove();
|
in.remove();
|
||||||
} else if (msg instanceof FileRegion) {
|
} else if (msg instanceof FileRegion) {
|
||||||
doWriteFileRegion((FileRegion) msg);
|
FileRegion region = (FileRegion) msg;
|
||||||
|
long transfered = region.transfered();
|
||||||
|
doWriteFileRegion(region);
|
||||||
|
in.progress(region.transfered() - transfered);
|
||||||
in.remove();
|
in.remove();
|
||||||
} else {
|
} else {
|
||||||
in.remove(new UnsupportedOperationException(
|
in.remove(new UnsupportedOperationException(
|
||||||
|
@ -280,6 +280,8 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
|||||||
if (done) {
|
if (done) {
|
||||||
// Release all buffers
|
// Release all buffers
|
||||||
for (int i = msgCount; i > 0; i --) {
|
for (int i = msgCount; i > 0; i --) {
|
||||||
|
final ByteBuf buf = (ByteBuf) in.current();
|
||||||
|
in.progress(buf.readableBytes());
|
||||||
in.remove();
|
in.remove();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user