diff --git a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java index b4805d98ef..f39328dff7 100644 --- a/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java +++ b/handler/src/main/java/io/netty/handler/stream/ChunkedWriteHandler.java @@ -209,6 +209,21 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { if (currentWrite == null) { break; } + + if (currentWrite.promise.isDone()) { + // This might happen e.g. in the case when a write operation + // failed, but there're still unconsumed chunks left. + // Most chunked input sources would stop generating chunks + // and report end of input, but this doesn't work with any + // source wrapped in HttpChunkedInput. + // Note, that we're not trying to release the message/chunks + // as this had to be done already by someone who resolved the + // promise (using ChunkedInput.close method). + // See https://github.com/netty/netty/issues/8700. + this.currentWrite = null; + continue; + } + final PendingWrite currentWrite = this.currentWrite; final Object pendingMessage = currentWrite.msg; @@ -264,9 +279,13 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - currentWrite.progress(chunks.progress(), chunks.length()); - currentWrite.success(chunks.length()); - closeInput(chunks); + if (!future.isSuccess()) { + closeInput(chunks); + currentWrite.fail(future.cause()); + } else { + currentWrite.progress(chunks.progress(), chunks.length()); + currentWrite.success(chunks.length()); + } } }); } else if (channel.isWritable()) { diff --git a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java index 66b69516fc..5b03048ba6 100644 --- a/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java +++ b/handler/src/test/java/io/netty/handler/stream/ChunkedWriteHandlerTest.java @@ -21,8 +21,11 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; import org.junit.Test; import java.io.ByteArrayInputStream; @@ -31,10 +34,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.Channels; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class ChunkedWriteHandlerTest { private static final byte[] BYTES = new byte[1024 * 64]; @@ -162,8 +164,7 @@ public class ChunkedWriteHandlerTest { EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler()); ch.writeAndFlush(input).addListener(listener).syncUninterruptibly(); - ch.checkException(); - ch.finish(); + assertTrue(ch.finish()); // the listener should have been notified assertTrue(listenerNotified.get()); @@ -220,13 +221,218 @@ public class ChunkedWriteHandlerTest { EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler()); ch.writeAndFlush(input).syncUninterruptibly(); - ch.checkException(); assertTrue(ch.finish()); assertEquals(0, ch.readOutbound()); assertNull(ch.readOutbound()); } + @Test + public void testWriteFailureChunkedStream() throws IOException { + checkFirstFailed(new ChunkedStream(new ByteArrayInputStream(BYTES))); + } + + @Test + public void testWriteFailureChunkedNioStream() throws IOException { + checkFirstFailed(new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES)))); + } + + @Test + public void testWriteFailureChunkedFile() throws IOException { + checkFirstFailed(new ChunkedFile(TMP)); + } + + @Test + public void testWriteFailureChunkedNioFile() throws IOException { + checkFirstFailed(new ChunkedNioFile(TMP)); + } + + @Test + public void testWriteFailureUnchunkedData() throws IOException { + checkFirstFailed(Unpooled.wrappedBuffer(BYTES)); + } + + @Test + public void testSkipAfterFailedChunkedStream() throws IOException { + checkSkipFailed(new ChunkedStream(new ByteArrayInputStream(BYTES)), + new ChunkedStream(new ByteArrayInputStream(BYTES))); + } + + @Test + public void testSkipAfterFailedChunkedNioStream() throws IOException { + checkSkipFailed(new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES))), + new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES)))); + } + + @Test + public void testSkipAfterFailedChunkedFile() throws IOException { + checkSkipFailed(new ChunkedFile(TMP), new ChunkedFile(TMP)); + } + + @Test + public void testSkipAfterFailedChunkedNioFile() throws IOException { + checkSkipFailed(new ChunkedNioFile(TMP), new ChunkedFile(TMP)); + } + + // See https://github.com/netty/netty/issues/8700. + @Test + public void testFailureWhenLastChunkFailed() throws IOException { + ChannelOutboundHandlerAdapter failLast = new ChannelOutboundHandlerAdapter() { + private int passedWrites; + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (++this.passedWrites < 4) { + ctx.write(msg, promise); + } else { + ReferenceCountUtil.release(msg); + promise.tryFailure(new RuntimeException()); + } + } + }; + + EmbeddedChannel ch = new EmbeddedChannel(failLast, new ChunkedWriteHandler()); + ChannelFuture r = ch.writeAndFlush(new ChunkedFile(TMP, 1024 * 16)); // 4 chunks + assertTrue(ch.finish()); + + assertFalse(r.isSuccess()); + assertTrue(r.cause() instanceof RuntimeException); + + // 3 out of 4 chunks were already written + int read = 0; + for (;;) { + ByteBuf buffer = ch.readOutbound(); + if (buffer == null) { + break; + } + read += buffer.readableBytes(); + buffer.release(); + } + + assertEquals(1024 * 16 * 3, read); + } + + @Test + public void testDiscardPendingWritesOnInactive() throws IOException { + + final AtomicBoolean closeWasCalled = new AtomicBoolean(false); + + ChunkedInput notifiableInput = new ChunkedInput() { + private boolean done; + private final ByteBuf buffer = Unpooled.copiedBuffer("Test", CharsetUtil.ISO_8859_1); + + @Override + public boolean isEndOfInput() throws Exception { + return done; + } + + @Override + public void close() throws Exception { + buffer.release(); + closeWasCalled.set(true); + } + + @Deprecated + @Override + public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception { + if (done) { + return null; + } + done = true; + return buffer.retainedDuplicate(); + } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return 1; + } + }; + + EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler()); + + // Write 3 messages and close channel before flushing + ChannelFuture r1 = ch.write(new ChunkedFile(TMP)); + ChannelFuture r2 = ch.write(new ChunkedNioFile(TMP)); + ch.write(notifiableInput); + + // Should be `false` as we do not expect any messages to be written + assertFalse(ch.finish()); + + assertFalse(r1.isSuccess()); + assertFalse(r2.isSuccess()); + assertTrue(closeWasCalled.get()); + } + + // See https://github.com/netty/netty/issues/8700. + @Test + public void testStopConsumingChunksWhenFailed() { + final ByteBuf buffer = Unpooled.copiedBuffer("Test", CharsetUtil.ISO_8859_1); + final AtomicInteger chunks = new AtomicInteger(0); + + ChunkedInput nonClosableInput = new ChunkedInput() { + @Override + public boolean isEndOfInput() throws Exception { + return chunks.get() >= 5; + } + + @Override + public void close() throws Exception { + // no-op + } + + @Deprecated + @Override + public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { + return readChunk(ctx.alloc()); + } + + @Override + public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception { + chunks.incrementAndGet(); + return buffer.retainedDuplicate(); + } + + @Override + public long length() { + return -1; + } + + @Override + public long progress() { + return 1; + } + }; + + ChannelOutboundHandlerAdapter noOpWrites = new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + ReferenceCountUtil.release(msg); + promise.tryFailure(new RuntimeException()); + } + }; + + EmbeddedChannel ch = new EmbeddedChannel(noOpWrites, new ChunkedWriteHandler()); + ch.writeAndFlush(nonClosableInput).awaitUninterruptibly(); + // Should be `false` as we do not expect any messages to be written + assertFalse(ch.finish()); + buffer.release(); + + // We should expect only single chunked being read from the input. + // It's possible to get a race condition here between resolving a promise and + // allocating a new chunk, but should be fine when working with embedded channels. + assertEquals(1, chunks.get()); + } + private static void check(Object... inputs) { EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler()); @@ -255,4 +461,67 @@ public class ChunkedWriteHandlerTest { assertEquals(BYTES.length * inputs.length, read); } + + private static void checkFirstFailed(Object input) { + ChannelOutboundHandlerAdapter noOpWrites = new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + ReferenceCountUtil.release(msg); + promise.tryFailure(new RuntimeException()); + } + }; + + EmbeddedChannel ch = new EmbeddedChannel(noOpWrites, new ChunkedWriteHandler()); + ChannelFuture r = ch.writeAndFlush(input); + + // Should be `false` as we do not expect any messages to be written + assertFalse(ch.finish()); + assertTrue(r.cause() instanceof RuntimeException); + } + + private static void checkSkipFailed(Object input1, Object input2) { + ChannelOutboundHandlerAdapter failFirst = new ChannelOutboundHandlerAdapter() { + private boolean alreadyFailed; + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (alreadyFailed) { + ctx.write(msg, promise); + } else { + this.alreadyFailed = true; + ReferenceCountUtil.release(msg); + promise.tryFailure(new RuntimeException()); + } + } + }; + + EmbeddedChannel ch = new EmbeddedChannel(failFirst, new ChunkedWriteHandler()); + ChannelFuture r1 = ch.write(input1); + ChannelFuture r2 = ch.writeAndFlush(input2).awaitUninterruptibly(); + assertTrue(ch.finish()); + + assertTrue(r1.cause() instanceof RuntimeException); + assertTrue(r2.isSuccess()); + + // note, that after we've "skipped" the first write, + // we expect to see the second message, chunk by chunk + int i = 0; + int read = 0; + for (;;) { + ByteBuf buffer = ch.readOutbound(); + if (buffer == null) { + break; + } + while (buffer.isReadable()) { + assertEquals(BYTES[i++], buffer.readByte()); + read++; + if (i == BYTES.length) { + i = 0; + } + } + buffer.release(); + } + + assertEquals(BYTES.length, read); + } }