From c9d01b2fb59c6ef7905ed0209962a34e3b4ba853 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 20 Jun 2013 16:20:23 +0200 Subject: [PATCH] [#1461] Correctly handle DefaultChannelGroup.write(..) of ByteBuf and ByteBufHolder --- .../channel/group/DefaultChannelGroup.java | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java index 3113a57f55..8e83e8bca4 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java @@ -15,6 +15,8 @@ */ package io.netty.channel.group; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -216,8 +218,7 @@ public class DefaultChannelGroup extends AbstractSet implements Channel Map futures = new LinkedHashMap(size()); for (Channel c: nonServerChannels.values()) { - ReferenceCountUtil.retain(message); - futures.put(c.id(), c.write(message)); + futures.put(c.id(), c.write(safeDuplicate(message))); } ReferenceCountUtil.release(message); @@ -232,14 +233,30 @@ public class DefaultChannelGroup extends AbstractSet implements Channel Map futures = new LinkedHashMap(size()); for (Channel c: nonServerChannels.values()) { - MessageList messagesCopy = messages.retainAll().copy(); - futures.put(c.id(), c.write(messagesCopy)); + int size = messages.size(); + MessageList messageCopy = MessageList.newInstance(size); + for (int i = 0 ; i < size; i++) { + messageCopy.add(safeDuplicate(messages.get(i))); + } + futures.put(c.id(), c.write(messageCopy)); } messages.releaseAllAndRecycle(); return new DefaultChannelGroupFuture(this, futures, executor); } + // Create a safe duplicate of the message to write it to a channel but not affect other writes. + // See https://github.com/netty/netty/issues/1461 + private static Object safeDuplicate(Object message) { + if (message instanceof ByteBuf) { + return ((ByteBuf) message).duplicate().retain(); + } else if (message instanceof ByteBufHolder) { + return ((ByteBufHolder) message).copy(); + } else { + return ReferenceCountUtil.retain(message); + } + } + @Override public ChannelGroupFuture deregister() { Map futures =