Call ctx.flush() at least once in ChunkedWriteHandler.flush()

Related: #3219

Motivation:

ChunkedWriteHandler.flush() does not call ctx.flush() when channel is
not writable. This can be a problem when other handler / non-Netty
thread writes messages simultaneously, because
ChunkedWriteHandler.flush() might have no chance to observe
channel.isWritable() returns true and thus the channel is never flushed.

Modifications:

- Ensure that ChunkedWriteHandler.flush() calls ctx.flush() at least
  once.

Result:

A stall connection issue, that occurs when certain combination of
handlers exist in a pipeline, has been fixed. (e.g. SslHandler and
ChunkedWriteHandler)
This commit is contained in:
Trustin Lee 2014-12-09 18:16:55 +09:00
parent a8cee4d649
commit 5edb7083b6

View File

@ -15,8 +15,6 @@
*/ */
package io.netty.handler.stream; package io.netty.handler.stream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
@ -136,16 +134,16 @@ public class ChunkedWriteHandler
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel(); if (!doFlush(ctx)) {
if (channel.isWritable() || !channel.isActive()) { // Make sure to flush at least once.
doFlush(ctx); ctx.flush();
} }
} }
@Override @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
doFlush(ctx); doFlush(ctx);
super.channelInactive(ctx); ctx.fireChannelInactive();
} }
@Override @Override
@ -197,12 +195,14 @@ public class ChunkedWriteHandler
} }
} }
private void doFlush(final ChannelHandlerContext ctx) throws Exception { private boolean doFlush(final ChannelHandlerContext ctx) throws Exception {
final Channel channel = ctx.channel(); final Channel channel = ctx.channel();
if (!channel.isActive()) { if (!channel.isActive()) {
discard(null); discard(null);
return; return false;
} }
boolean flushed = false;
while (channel.isWritable()) { while (channel.isWritable()) {
if (currentWrite == null) { if (currentWrite == null) {
currentWrite = queue.poll(); currentWrite = queue.poll();
@ -307,12 +307,15 @@ public class ChunkedWriteHandler
// Always need to flush // Always need to flush
ctx.flush(); ctx.flush();
flushed = true;
if (!channel.isActive()) { if (!channel.isActive()) {
discard(new ClosedChannelException()); discard(new ClosedChannelException());
return; break;
} }
} }
return flushed;
} }
static void closeInput(ChunkedInput<?> chunks) { static void closeInput(ChunkedInput<?> chunks) {