Correctly handle automatically suspend/resume in ChunkedWriteHandler. Related to [#1861]

The old implementation was broken and could lead to pending message never be picked up again until the user either explicit called flush or
resumeTransfer().
This commit is contained in:
Norman Maurer 2013-09-24 13:57:51 +02:00
parent cd5f9a2212
commit f35ba4f80f

View File

@ -34,7 +34,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* A {@link ChannelHandler} that adds support for writing a large data stream * A {@link ChannelHandler} that adds support for writing a large data stream
@ -74,20 +73,21 @@ public class ChunkedWriteHandler
InternalLoggerFactory.getInstance(ChunkedWriteHandler.class); InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>(); private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
private final int maxPendingWrites;
private volatile ChannelHandlerContext ctx; private volatile ChannelHandlerContext ctx;
private final AtomicInteger pendingWrites = new AtomicInteger();
private PendingWrite currentWrite; private PendingWrite currentWrite;
public ChunkedWriteHandler() { public ChunkedWriteHandler() {
this(4);
} }
/**
* @deprecated use {@link #ChunkedWriteHandler()}
*/
@Deprecated
public ChunkedWriteHandler(int maxPendingWrites) { public ChunkedWriteHandler(int maxPendingWrites) {
if (maxPendingWrites <= 0) { if (maxPendingWrites <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"maxPendingWrites: " + maxPendingWrites + " (expected: > 0)"); "maxPendingWrites: " + maxPendingWrites + " (expected: > 0)");
} }
this.maxPendingWrites = maxPendingWrites;
} }
@Override @Override
@ -95,10 +95,6 @@ public class ChunkedWriteHandler
this.ctx = ctx; this.ctx = ctx;
} }
private boolean isWritable() {
return pendingWrites.get() < maxPendingWrites;
}
/** /**
* Continues to fetch the chunks from the input. * Continues to fetch the chunks from the input.
*/ */
@ -145,7 +141,8 @@ public class ChunkedWriteHandler
@Override @Override
public void flush(ChannelHandlerContext ctx) throws Exception { public void flush(ChannelHandlerContext ctx) throws Exception {
if (isWritable() || !ctx.channel().isActive()) { Channel channel = ctx.channel();
if (channel.isWritable() || !channel.isActive()) {
doFlush(ctx); doFlush(ctx);
} }
} }
@ -156,6 +153,14 @@ public class ChunkedWriteHandler
super.channelInactive(ctx); super.channelInactive(ctx);
} }
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isWritable()) {
// channel is writable again try to continue flushing
doFlush(ctx);
}
}
private void discard(Throwable cause) { private void discard(Throwable cause) {
for (;;) { for (;;) {
PendingWrite currentWrite = this.currentWrite; PendingWrite currentWrite = this.currentWrite;
@ -197,13 +202,13 @@ public class ChunkedWriteHandler
} }
private void doFlush(final ChannelHandlerContext ctx) throws Exception { private void doFlush(final ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel(); final Channel channel = ctx.channel();
if (!channel.isActive()) { if (!channel.isActive()) {
discard(null); discard(null);
return; return;
} }
boolean needsFlush; boolean needsFlush;
while (isWritable()) { while (channel.isWritable()) {
if (currentWrite == null) { if (currentWrite == null) {
currentWrite = queue.poll(); currentWrite = queue.poll();
} }
@ -267,7 +272,6 @@ public class ChunkedWriteHandler
} }
final int amount = amount(message); final int amount = amount(message);
pendingWrites.incrementAndGet();
ChannelFuture f = ctx.write(message); ChannelFuture f = ctx.write(message);
if (endOfInput) { if (endOfInput) {
this.currentWrite = null; this.currentWrite = null;
@ -280,17 +284,15 @@ public class ChunkedWriteHandler
f.addListener(new ChannelFutureListener() { f.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
currentWrite.progress(amount); currentWrite.progress(amount);
currentWrite.success(); currentWrite.success();
closeInput(chunks); closeInput(chunks);
} }
}); });
} else if (isWritable()) { } else if (channel.isWritable()) {
f.addListener(new ChannelFutureListener() { f.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
if (!future.isSuccess()) { if (!future.isSuccess()) {
closeInput((ChunkedInput<?>) pendingMessage); closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause()); currentWrite.fail(future.cause());
@ -303,13 +305,12 @@ public class ChunkedWriteHandler
f.addListener(new ChannelFutureListener() { f.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet();
if (!future.isSuccess()) { if (!future.isSuccess()) {
closeInput((ChunkedInput<?>) pendingMessage); closeInput((ChunkedInput<?>) pendingMessage);
currentWrite.fail(future.cause()); currentWrite.fail(future.cause());
} else { } else {
currentWrite.progress(amount); currentWrite.progress(amount);
if (isWritable()) { if (channel.isWritable()) {
resumeTransfer(); resumeTransfer();
} }
} }