Call ctx.flush() at least once in ChunkedWriteHandler.flush()
Related: #3219 Motivation: ChunkedWriteHandler.flush() does not call ctx.flush() when channel is not writable. This can be a problem when other handler / non-Netty thread writes messages simultaneously, because ChunkedWriteHandler.flush() might have no chance to observe channel.isWritable() returns true and thus the channel is never flushed. Modifications: - Ensure that ChunkedWriteHandler.flush() calls ctx.flush() at least once. Result: A stall connection issue, that occurs when certain combination of handlers exist in a pipeline, has been fixed. (e.g. SslHandler and ChunkedWriteHandler)
This commit is contained in:
parent
2a426b3d44
commit
9ff234abed
@ -15,8 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package io.netty.handler.stream;
|
package io.netty.handler.stream;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
|
||||||
import io.netty.buffer.ByteBufHolder;
|
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.channel.ChannelDuplexHandler;
|
import io.netty.channel.ChannelDuplexHandler;
|
||||||
@ -135,16 +133,16 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flush(ChannelHandlerContext ctx) throws Exception {
|
public void flush(ChannelHandlerContext ctx) throws Exception {
|
||||||
Channel channel = ctx.channel();
|
if (!doFlush(ctx)) {
|
||||||
if (channel.isWritable() || !channel.isActive()) {
|
// Make sure to flush at least once.
|
||||||
doFlush(ctx);
|
ctx.flush();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||||
doFlush(ctx);
|
doFlush(ctx);
|
||||||
super.channelInactive(ctx);
|
ctx.fireChannelInactive();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -196,12 +194,14 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doFlush(final ChannelHandlerContext ctx) throws Exception {
|
private boolean doFlush(final ChannelHandlerContext ctx) throws Exception {
|
||||||
final Channel channel = ctx.channel();
|
final Channel channel = ctx.channel();
|
||||||
if (!channel.isActive()) {
|
if (!channel.isActive()) {
|
||||||
discard(null);
|
discard(null);
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean flushed = false;
|
||||||
while (channel.isWritable()) {
|
while (channel.isWritable()) {
|
||||||
if (currentWrite == null) {
|
if (currentWrite == null) {
|
||||||
currentWrite = queue.poll();
|
currentWrite = queue.poll();
|
||||||
@ -305,12 +305,15 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
|
|||||||
|
|
||||||
// Always need to flush
|
// Always need to flush
|
||||||
ctx.flush();
|
ctx.flush();
|
||||||
|
flushed = true;
|
||||||
|
|
||||||
if (!channel.isActive()) {
|
if (!channel.isActive()) {
|
||||||
discard(new ClosedChannelException());
|
discard(new ClosedChannelException());
|
||||||
return;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return flushed;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void closeInput(ChunkedInput<?> chunks) {
|
static void closeInput(ChunkedInput<?> chunks) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user