Don't swallow intermediate write failures in MessageToMessageEncoder (#8454)

Motivation:

If the encoder needs to flush more than one outbound message it will
create a new ChannelPromise for all but the last write which will
swallow failures.

Modification:

Use a PromiseCombiner in the case of multiple messages and the parent
promise isn't the `VoidPromise`.

Result:

Intermediate failures are propagated to the original ChannelPromise.
This commit is contained in:
Bryce Anderson 2018-11-03 03:36:26 -06:00 committed by Norman Maurer
parent 9f6ebab514
commit 6563f23a9b
2 changed files with 59 additions and 12 deletions

View File

@ -22,6 +22,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.PromiseCombiner;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.TypeParameterMatcher;
@ -108,28 +109,36 @@ public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerA
if (out != null) {
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
ctx.write(out.get(0), promise);
ctx.write(out.getUnsafe(0), promise);
} else if (sizeMinusOne > 0) {
// Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
// See https://github.com/netty/netty/issues/2525
ChannelPromise voidPromise = ctx.voidPromise();
boolean isVoidPromise = promise == voidPromise;
for (int i = 0; i < sizeMinusOne; i ++) {
ChannelPromise p;
if (isVoidPromise) {
p = voidPromise;
} else {
p = ctx.newPromise();
}
ctx.write(out.getUnsafe(i), p);
if (promise == ctx.voidPromise()) {
writeVoidPromise(ctx, out);
} else {
writePromiseCombiner(ctx, out, promise);
}
ctx.write(out.getUnsafe(sizeMinusOne), promise);
}
out.recycle();
}
}
}
private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
final ChannelPromise voidPromise = ctx.voidPromise();
for (int i = 0; i < out.size(); i++) {
ctx.write(out.getUnsafe(i), voidPromise);
}
}
private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
final PromiseCombiner combiner = new PromiseCombiner();
for (int i = 0; i < out.size(); i++) {
combiner.add(ctx.write(out.getUnsafe(i)));
}
combiner.finish(promise);
}
/**
* Encode from one message to an other. This method will be called for each written message that can be handled
* by this encoder.

View File

@ -15,9 +15,14 @@
*/
package io.netty.handler.codec;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import org.junit.Test;
import static org.junit.Assert.*;
import java.util.List;
@ -37,4 +42,37 @@ public class MessageToMessageEncoderTest {
});
channel.writeOutbound(new Object());
}
@Test
public void testIntermediateWriteFailures() {
ChannelHandler encoder = new MessageToMessageEncoder<Object>() {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) {
out.add(new Object());
out.add(msg);
}
};
final Exception firstWriteException = new Exception();
ChannelHandler writeThrower = new ChannelOutboundHandlerAdapter() {
private boolean firstWritten;
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (firstWritten) {
ctx.write(msg, promise);
} else {
firstWritten = true;
promise.setFailure(firstWriteException);
}
}
};
EmbeddedChannel channel = new EmbeddedChannel(writeThrower, encoder);
Object msg = new Object();
ChannelFuture write = channel.writeAndFlush(msg);
assertSame(firstWriteException, write.cause());
assertSame(msg, channel.readOutbound());
assertFalse(channel.finish());
}
}