From b0823761f45c1141e3f155d7369ce0ba09d25e5e Mon Sep 17 00:00:00 2001 From: Alexey Kachayev Date: Wed, 14 Mar 2018 20:08:24 +0200 Subject: [PATCH] PendingWriteQueue to handle write operations with void future Motivation: Right now PendingWriteQueue.removeAndWriteAll collects all promises to PromiseCombiner instance which sets listener to each given promise throwing IllegalStateException on VoidChannelPromise which breaks while loop and "reports" operation as failed (when in fact part of writes might be actually written). Modifications: Check if the promise is not void before adding it to the PromiseCombiner instance. Result: PendingWriteQueue.removeAndWriteAll succesfully writes all pendings even in case void promise was used. --- .../io/netty/channel/PendingWriteQueue.java | 4 +++- .../netty/channel/PendingWriteQueueTest.java | 24 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java index 2dcf3efd8e..16ae47bd24 100644 --- a/transport/src/main/java/io/netty/channel/PendingWriteQueue.java +++ b/transport/src/main/java/io/netty/channel/PendingWriteQueue.java @@ -144,7 +144,9 @@ public final class PendingWriteQueue { Object msg = write.msg; ChannelPromise promise = write.promise; recycle(write, false); - combiner.add(promise); + if (!(promise instanceof VoidChannelPromise)) { + combiner.add(promise); + } ctx.write(msg, promise); write = next; } diff --git a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java index f2770281ea..467e681cd7 100644 --- a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java +++ b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java @@ -264,6 +264,30 @@ public class PendingWriteQueueTest { assertEquals(3L, channel.readOutbound()); } + @Test + public void testRemoveAndWriteAllWithVoidPromise() { + EmbeddedChannel channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() { + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + // Convert to writeAndFlush(...) so the promise will be notified by the transport. + ctx.writeAndFlush(msg, promise); + } + }, new ChannelOutboundHandlerAdapter()); + + final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().lastContext()); + + ChannelPromise promise = channel.newPromise(); + queue.add(1L, promise); + queue.add(2L, channel.voidPromise()); + queue.removeAndWriteAll(); + + assertTrue(channel.finish()); + assertTrue(promise.isDone()); + assertTrue(promise.isSuccess()); + assertEquals(1L, channel.readOutbound()); + assertEquals(2L, channel.readOutbound()); + } + @Test public void testRemoveAndFailAllReentrantWrite() { final List failOrder = Collections.synchronizedList(new ArrayList());