diff --git a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java index 16eed68901..6b47b6f8a2 100644 --- a/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java +++ b/codec/src/main/java/io/netty/handler/codec/MessageToMessageEncoder.java @@ -22,6 +22,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; +import io.netty.util.concurrent.PromiseCombiner; import io.netty.util.internal.StringUtil; import io.netty.util.internal.TypeParameterMatcher; @@ -108,28 +109,36 @@ public abstract class MessageToMessageEncoder extends ChannelOutboundHandlerA if (out != null) { final int sizeMinusOne = out.size() - 1; if (sizeMinusOne == 0) { - ctx.write(out.get(0), promise); + ctx.write(out.getUnsafe(0), promise); } else if (sizeMinusOne > 0) { // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure // See https://github.com/netty/netty/issues/2525 - ChannelPromise voidPromise = ctx.voidPromise(); - boolean isVoidPromise = promise == voidPromise; - for (int i = 0; i < sizeMinusOne; i ++) { - ChannelPromise p; - if (isVoidPromise) { - p = voidPromise; - } else { - p = ctx.newPromise(); - } - ctx.write(out.getUnsafe(i), p); + if (promise == ctx.voidPromise()) { + writeVoidPromise(ctx, out); + } else { + writePromiseCombiner(ctx, out, promise); } - ctx.write(out.getUnsafe(sizeMinusOne), promise); } out.recycle(); } } } + private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) { + final ChannelPromise voidPromise = ctx.voidPromise(); + for (int i = 0; i < out.size(); i++) { + ctx.write(out.getUnsafe(i), voidPromise); + } + } + + private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) { + final PromiseCombiner combiner = new PromiseCombiner(); + for (int i = 0; i < out.size(); i++) { + combiner.add(ctx.write(out.getUnsafe(i))); + } + combiner.finish(promise); + } + /** * Encode from one message to an other. This method will be called for each written message that can be handled * by this encoder. diff --git a/codec/src/test/java/io/netty/handler/codec/MessageToMessageEncoderTest.java b/codec/src/test/java/io/netty/handler/codec/MessageToMessageEncoderTest.java index 322631adf6..2d09725af0 100644 --- a/codec/src/test/java/io/netty/handler/codec/MessageToMessageEncoderTest.java +++ b/codec/src/test/java/io/netty/handler/codec/MessageToMessageEncoderTest.java @@ -15,9 +15,14 @@ */ package io.netty.handler.codec; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; import org.junit.Test; +import static org.junit.Assert.*; import java.util.List; @@ -37,4 +42,37 @@ public class MessageToMessageEncoderTest { }); channel.writeOutbound(new Object()); } + + @Test + public void testIntermediateWriteFailures() { + ChannelHandler encoder = new MessageToMessageEncoder() { + @Override + protected void encode(ChannelHandlerContext ctx, Object msg, List out) { + out.add(new Object()); + out.add(msg); + } + }; + + final Exception firstWriteException = new Exception(); + + ChannelHandler writeThrower = new ChannelOutboundHandlerAdapter() { + private boolean firstWritten; + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (firstWritten) { + ctx.write(msg, promise); + } else { + firstWritten = true; + promise.setFailure(firstWriteException); + } + } + }; + + EmbeddedChannel channel = new EmbeddedChannel(writeThrower, encoder); + Object msg = new Object(); + ChannelFuture write = channel.writeAndFlush(msg); + assertSame(firstWriteException, write.cause()); + assertSame(msg, channel.readOutbound()); + assertFalse(channel.finish()); + } }