[#1461] Correctly handle DefaultChannelGroup.write(..) of ByteBuf and ByteBufHolder

This commit is contained in:
Norman Maurer 2013-06-20 16:20:23 +02:00 committed by Trustin Lee
parent e06fcdbc6a
commit c9d01b2fb5

View File

@ -15,6 +15,8 @@
*/ */
package io.netty.channel.group; package io.netty.channel.group;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -216,8 +218,7 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
Map<Integer, ChannelFuture> futures = new LinkedHashMap<Integer, ChannelFuture>(size()); Map<Integer, ChannelFuture> futures = new LinkedHashMap<Integer, ChannelFuture>(size());
for (Channel c: nonServerChannels.values()) { for (Channel c: nonServerChannels.values()) {
ReferenceCountUtil.retain(message); futures.put(c.id(), c.write(safeDuplicate(message)));
futures.put(c.id(), c.write(message));
} }
ReferenceCountUtil.release(message); ReferenceCountUtil.release(message);
@ -232,14 +233,30 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
Map<Integer, ChannelFuture> futures = new LinkedHashMap<Integer, ChannelFuture>(size()); Map<Integer, ChannelFuture> futures = new LinkedHashMap<Integer, ChannelFuture>(size());
for (Channel c: nonServerChannels.values()) { for (Channel c: nonServerChannels.values()) {
MessageList<Object> messagesCopy = messages.retainAll().copy(); int size = messages.size();
futures.put(c.id(), c.write(messagesCopy)); MessageList<Object> messageCopy = MessageList.newInstance(size);
for (int i = 0 ; i < size; i++) {
messageCopy.add(safeDuplicate(messages.get(i)));
}
futures.put(c.id(), c.write(messageCopy));
} }
messages.releaseAllAndRecycle(); messages.releaseAllAndRecycle();
return new DefaultChannelGroupFuture(this, futures, executor); return new DefaultChannelGroupFuture(this, futures, executor);
} }
// Create a safe duplicate of the message to write it to a channel but not affect other writes.
// See https://github.com/netty/netty/issues/1461
private static Object safeDuplicate(Object message) {
if (message instanceof ByteBuf) {
return ((ByteBuf) message).duplicate().retain();
} else if (message instanceof ByteBufHolder) {
return ((ByteBufHolder) message).copy();
} else {
return ReferenceCountUtil.retain(message);
}
}
@Override @Override
public ChannelGroupFuture deregister() { public ChannelGroupFuture deregister() {
Map<Integer, ChannelFuture> futures = Map<Integer, ChannelFuture> futures =