[#1743] Fix IllegalStateException by remove usage of PendingWrite in ChunkedWriteHandler. This needs more thoughts before re-introduce it

This commit is contained in:
Norman Maurer 2013-08-16 08:11:19 +02:00
parent d1f592575a
commit 9e7529b2f5

View File

@ -26,8 +26,6 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelProgressivePromise; import io.netty.channel.ChannelProgressivePromise;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PendingWrite;
import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory;
@ -78,8 +76,6 @@ public class ChunkedWriteHandler
private volatile ChannelHandlerContext ctx; private volatile ChannelHandlerContext ctx;
private final AtomicInteger pendingWrites = new AtomicInteger(); private final AtomicInteger pendingWrites = new AtomicInteger();
private PendingWrite currentWrite; private PendingWrite currentWrite;
private int progress;
public ChunkedWriteHandler() { public ChunkedWriteHandler() {
this(4); this(4);
} }
@ -142,7 +138,7 @@ public class ChunkedWriteHandler
@Override @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
queue.add(PendingWrite.newInstance(msg, promise)); queue.add(new PendingWrite(msg, promise));
} }
@Override @Override
@ -171,7 +167,7 @@ public class ChunkedWriteHandler
if (currentWrite == null) { if (currentWrite == null) {
break; break;
} }
Object message = currentWrite.msg(); Object message = currentWrite.msg;
if (message instanceof ChunkedInput) { if (message instanceof ChunkedInput) {
ChunkedInput<?> in = (ChunkedInput<?>) message; ChunkedInput<?> in = (ChunkedInput<?>) message;
try { try {
@ -179,13 +175,13 @@ public class ChunkedWriteHandler
if (cause == null) { if (cause == null) {
cause = new ClosedChannelException(); cause = new ClosedChannelException();
} }
currentWrite.failAndRecycle(cause); currentWrite.fail(cause);
} else { } else {
currentWrite.successAndRecycle(); currentWrite.promise.setSuccess();
} }
closeInput(in); closeInput(in);
} catch (Exception e) { } catch (Exception e) {
currentWrite.failAndRecycle(e); currentWrite.fail(e);
logger.warn(ChunkedInput.class.getSimpleName() + ".isEndOfInput() failed", e); logger.warn(ChunkedInput.class.getSimpleName() + ".isEndOfInput() failed", e);
closeInput(in); closeInput(in);
} }
@ -193,7 +189,7 @@ public class ChunkedWriteHandler
if (cause == null) { if (cause == null) {
cause = new ClosedChannelException(); cause = new ClosedChannelException();
} }
currentWrite.failAndRecycle(cause); currentWrite.fail(cause);
} }
} }
} }
@ -215,8 +211,7 @@ public class ChunkedWriteHandler
} }
needsFlush = true; needsFlush = true;
final PendingWrite currentWrite = this.currentWrite; final PendingWrite currentWrite = this.currentWrite;
final Promise<Void> currentPromise = this.currentWrite.promise(); final Object pendingMessage = currentWrite.msg;
final Object pendingMessage = currentWrite.msg();
if (pendingMessage instanceof ChunkedInput) { if (pendingMessage instanceof ChunkedInput) {
final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage; final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
@ -240,7 +235,18 @@ public class ChunkedWriteHandler
ReferenceCountUtil.release(message); ReferenceCountUtil.release(message);
} }
currentWrite.failAndRecycle(t); currentWrite.fail(t);
if (ctx.executor().inEventLoop()) {
ctx.fireExceptionCaught(t);
} else {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
ctx.fireExceptionCaught(t);
}
});
}
closeInput(chunks); closeInput(chunks);
break; break;
} }
@ -272,16 +278,7 @@ public class ChunkedWriteHandler
@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
pendingWrites.decrementAndGet(); pendingWrites.decrementAndGet();
if (future.isSuccess()) { currentWrite.promise.setSuccess();
if (!currentPromise.isDone()) {
currentWrite.successAndRecycle();
}
} else {
if (!currentPromise.isDone()) {
currentWrite.failAndRecycle(future.cause());
}
}
closeInput(chunks); closeInput(chunks);
} }
}); });
@ -292,12 +289,9 @@ public class ChunkedWriteHandler
pendingWrites.decrementAndGet(); pendingWrites.decrementAndGet();
if (!future.isSuccess()) { if (!future.isSuccess()) {
closeInput((ChunkedInput<?>) pendingMessage); closeInput((ChunkedInput<?>) pendingMessage);
if (!currentPromise.isDone()) { currentWrite.fail(future.cause());
// only recycle if not done before
currentWrite.failAndRecycle(future.cause());
}
} else { } else {
progress((ChannelPromise) currentWrite.promise()); currentWrite.progress();
} }
} }
}); });
@ -308,12 +302,9 @@ public class ChunkedWriteHandler
pendingWrites.decrementAndGet(); pendingWrites.decrementAndGet();
if (!future.isSuccess()) { if (!future.isSuccess()) {
closeInput((ChunkedInput<?>) pendingMessage); closeInput((ChunkedInput<?>) pendingMessage);
if (!currentPromise.isDone()) { currentWrite.fail(future.cause());
// only recycle if not done before
currentWrite.failAndRecycle(future.cause());
}
} else { } else {
progress((ChannelPromise) currentWrite.promise()); currentWrite.progress();
if (isWritable()) { if (isWritable()) {
resumeTransfer(); resumeTransfer();
} }
@ -322,7 +313,7 @@ public class ChunkedWriteHandler
}); });
} }
} else { } else {
ctx.write(pendingMessage, (ChannelPromise) currentWrite.recycleAndGet()); ctx.write(pendingMessage, currentWrite.promise);
this.currentWrite = null; this.currentWrite = null;
} }
@ -336,7 +327,7 @@ public class ChunkedWriteHandler
} }
} }
void closeInput(ChunkedInput<?> chunks) { static void closeInput(ChunkedInput<?> chunks) {
try { try {
chunks.close(); chunks.close();
} catch (Throwable t) { } catch (Throwable t) {
@ -344,13 +335,30 @@ public class ChunkedWriteHandler
logger.warn("Failed to close a chunked input.", t); logger.warn("Failed to close a chunked input.", t);
} }
} }
progress = 0;
} }
void progress(ChannelPromise promise) { private static final class PendingWrite {
progress ++; final Object msg;
if (promise instanceof ChannelProgressivePromise) { final ChannelPromise promise;
((ChannelProgressivePromise) promise).tryProgress(progress, -1); private long progress;
PendingWrite(Object msg, ChannelPromise promise) {
this.msg = msg;
this.promise = promise;
}
void fail(Throwable cause) {
ReferenceCountUtil.release(msg);
if (promise != null) {
promise.setFailure(cause);
}
}
void progress() {
progress ++;
if (promise instanceof ChannelProgressivePromise) {
((ChannelProgressivePromise) promise).tryProgress(progress, -1);
}
} }
} }
} }