Correctly propagate write failures from ChunkedWriteHandler (#8716)

Motivation:

ChunkedWriteHandler should report write operation as failed
in case *any* chunked was not written. Right now this is not
true for the last chunk.

Modifications:

* Check if the appropriate write operation was succesfull when
  reporting the last chunk

* Skip writing chunks if the write operation was already marked
  as "done"

* Test cases to cover write failures when dealing with chunked input

Result:

Fix https://github.com/netty/netty/issues/8700
This commit is contained in:
Oleksii Kachaiev 2019-01-16 12:07:59 +02:00 committed by Norman Maurer
parent 67cb8cce5b
commit f004b72662
2 changed files with 297 additions and 9 deletions

View File

@ -209,6 +209,21 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
if (currentWrite == null) {
break;
}
if (currentWrite.promise.isDone()) {
// This might happen e.g. in the case when a write operation
// failed, but there're still unconsumed chunks left.
// Most chunked input sources would stop generating chunks
// and report end of input, but this doesn't work with any
// source wrapped in HttpChunkedInput.
// Note, that we're not trying to release the message/chunks
// as this had to be done already by someone who resolved the
// promise (using ChunkedInput.close method).
// See https://github.com/netty/netty/issues/8700.
this.currentWrite = null;
continue;
}
final PendingWrite currentWrite = this.currentWrite;
final Object pendingMessage = currentWrite.msg;
@ -264,9 +279,13 @@ public class ChunkedWriteHandler extends ChannelDuplexHandler {
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
currentWrite.progress(chunks.progress(), chunks.length());
currentWrite.success(chunks.length());
closeInput(chunks);
if (!future.isSuccess()) {
closeInput(chunks);
currentWrite.fail(future.cause());
} else {
currentWrite.progress(chunks.progress(), chunks.length());
currentWrite.success(chunks.length());
}
}
});
} else if (channel.isWritable()) {

View File

@ -21,8 +21,11 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;
import java.io.ByteArrayInputStream;
@ -31,10 +34,9 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
public class ChunkedWriteHandlerTest {
private static final byte[] BYTES = new byte[1024 * 64];
@ -162,8 +164,7 @@ public class ChunkedWriteHandlerTest {
EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler());
ch.writeAndFlush(input).addListener(listener).syncUninterruptibly();
ch.checkException();
ch.finish();
assertTrue(ch.finish());
// the listener should have been notified
assertTrue(listenerNotified.get());
@ -220,13 +221,218 @@ public class ChunkedWriteHandlerTest {
EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler());
ch.writeAndFlush(input).syncUninterruptibly();
ch.checkException();
assertTrue(ch.finish());
assertEquals(0, ch.readOutbound());
assertNull(ch.readOutbound());
}
@Test
public void testWriteFailureChunkedStream() throws IOException {
checkFirstFailed(new ChunkedStream(new ByteArrayInputStream(BYTES)));
}
@Test
public void testWriteFailureChunkedNioStream() throws IOException {
checkFirstFailed(new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES))));
}
@Test
public void testWriteFailureChunkedFile() throws IOException {
checkFirstFailed(new ChunkedFile(TMP));
}
@Test
public void testWriteFailureChunkedNioFile() throws IOException {
checkFirstFailed(new ChunkedNioFile(TMP));
}
@Test
public void testWriteFailureUnchunkedData() throws IOException {
checkFirstFailed(Unpooled.wrappedBuffer(BYTES));
}
@Test
public void testSkipAfterFailedChunkedStream() throws IOException {
checkSkipFailed(new ChunkedStream(new ByteArrayInputStream(BYTES)),
new ChunkedStream(new ByteArrayInputStream(BYTES)));
}
@Test
public void testSkipAfterFailedChunkedNioStream() throws IOException {
checkSkipFailed(new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES))),
new ChunkedNioStream(Channels.newChannel(new ByteArrayInputStream(BYTES))));
}
@Test
public void testSkipAfterFailedChunkedFile() throws IOException {
checkSkipFailed(new ChunkedFile(TMP), new ChunkedFile(TMP));
}
@Test
public void testSkipAfterFailedChunkedNioFile() throws IOException {
checkSkipFailed(new ChunkedNioFile(TMP), new ChunkedFile(TMP));
}
// See https://github.com/netty/netty/issues/8700.
@Test
public void testFailureWhenLastChunkFailed() throws IOException {
ChannelOutboundHandlerAdapter failLast = new ChannelOutboundHandlerAdapter() {
private int passedWrites;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (++this.passedWrites < 4) {
ctx.write(msg, promise);
} else {
ReferenceCountUtil.release(msg);
promise.tryFailure(new RuntimeException());
}
}
};
EmbeddedChannel ch = new EmbeddedChannel(failLast, new ChunkedWriteHandler());
ChannelFuture r = ch.writeAndFlush(new ChunkedFile(TMP, 1024 * 16)); // 4 chunks
assertTrue(ch.finish());
assertFalse(r.isSuccess());
assertTrue(r.cause() instanceof RuntimeException);
// 3 out of 4 chunks were already written
int read = 0;
for (;;) {
ByteBuf buffer = ch.readOutbound();
if (buffer == null) {
break;
}
read += buffer.readableBytes();
buffer.release();
}
assertEquals(1024 * 16 * 3, read);
}
@Test
public void testDiscardPendingWritesOnInactive() throws IOException {
final AtomicBoolean closeWasCalled = new AtomicBoolean(false);
ChunkedInput<ByteBuf> notifiableInput = new ChunkedInput<ByteBuf>() {
private boolean done;
private final ByteBuf buffer = Unpooled.copiedBuffer("Test", CharsetUtil.ISO_8859_1);
@Override
public boolean isEndOfInput() throws Exception {
return done;
}
@Override
public void close() throws Exception {
buffer.release();
closeWasCalled.set(true);
}
@Deprecated
@Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}
@Override
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
if (done) {
return null;
}
done = true;
return buffer.retainedDuplicate();
}
@Override
public long length() {
return -1;
}
@Override
public long progress() {
return 1;
}
};
EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler());
// Write 3 messages and close channel before flushing
ChannelFuture r1 = ch.write(new ChunkedFile(TMP));
ChannelFuture r2 = ch.write(new ChunkedNioFile(TMP));
ch.write(notifiableInput);
// Should be `false` as we do not expect any messages to be written
assertFalse(ch.finish());
assertFalse(r1.isSuccess());
assertFalse(r2.isSuccess());
assertTrue(closeWasCalled.get());
}
// See https://github.com/netty/netty/issues/8700.
@Test
public void testStopConsumingChunksWhenFailed() {
final ByteBuf buffer = Unpooled.copiedBuffer("Test", CharsetUtil.ISO_8859_1);
final AtomicInteger chunks = new AtomicInteger(0);
ChunkedInput<ByteBuf> nonClosableInput = new ChunkedInput<ByteBuf>() {
@Override
public boolean isEndOfInput() throws Exception {
return chunks.get() >= 5;
}
@Override
public void close() throws Exception {
// no-op
}
@Deprecated
@Override
public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
return readChunk(ctx.alloc());
}
@Override
public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
chunks.incrementAndGet();
return buffer.retainedDuplicate();
}
@Override
public long length() {
return -1;
}
@Override
public long progress() {
return 1;
}
};
ChannelOutboundHandlerAdapter noOpWrites = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg);
promise.tryFailure(new RuntimeException());
}
};
EmbeddedChannel ch = new EmbeddedChannel(noOpWrites, new ChunkedWriteHandler());
ch.writeAndFlush(nonClosableInput).awaitUninterruptibly();
// Should be `false` as we do not expect any messages to be written
assertFalse(ch.finish());
buffer.release();
// We should expect only single chunked being read from the input.
// It's possible to get a race condition here between resolving a promise and
// allocating a new chunk, but should be fine when working with embedded channels.
assertEquals(1, chunks.get());
}
private static void check(Object... inputs) {
EmbeddedChannel ch = new EmbeddedChannel(new ChunkedWriteHandler());
@ -255,4 +461,67 @@ public class ChunkedWriteHandlerTest {
assertEquals(BYTES.length * inputs.length, read);
}
private static void checkFirstFailed(Object input) {
ChannelOutboundHandlerAdapter noOpWrites = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ReferenceCountUtil.release(msg);
promise.tryFailure(new RuntimeException());
}
};
EmbeddedChannel ch = new EmbeddedChannel(noOpWrites, new ChunkedWriteHandler());
ChannelFuture r = ch.writeAndFlush(input);
// Should be `false` as we do not expect any messages to be written
assertFalse(ch.finish());
assertTrue(r.cause() instanceof RuntimeException);
}
private static void checkSkipFailed(Object input1, Object input2) {
ChannelOutboundHandlerAdapter failFirst = new ChannelOutboundHandlerAdapter() {
private boolean alreadyFailed;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (alreadyFailed) {
ctx.write(msg, promise);
} else {
this.alreadyFailed = true;
ReferenceCountUtil.release(msg);
promise.tryFailure(new RuntimeException());
}
}
};
EmbeddedChannel ch = new EmbeddedChannel(failFirst, new ChunkedWriteHandler());
ChannelFuture r1 = ch.write(input1);
ChannelFuture r2 = ch.writeAndFlush(input2).awaitUninterruptibly();
assertTrue(ch.finish());
assertTrue(r1.cause() instanceof RuntimeException);
assertTrue(r2.isSuccess());
// note, that after we've "skipped" the first write,
// we expect to see the second message, chunk by chunk
int i = 0;
int read = 0;
for (;;) {
ByteBuf buffer = ch.readOutbound();
if (buffer == null) {
break;
}
while (buffer.isReadable()) {
assertEquals(BYTES[i++], buffer.readByte());
read++;
if (i == BYTES.length) {
i = 0;
}
}
buffer.release();
}
assertEquals(BYTES.length, read);
}
}