From c249926784a58884690c62a3ca0bf227fc439c2c Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 12 May 2016 11:03:42 +0200 Subject: [PATCH] [#3127] Allow to write with VoidPromise to Channels in ChannelGroup Motivation: Users sometimes want to use Channel.voidPromise() when write to a Channel to reduce GC-pressure. This should be also possible when write via a ChannelGroup. Modifications: Add new write(...) and writeAndFlush(...) overloads which allow to signale that a VoidPromise should be used to write to the Channel Result: Users can write with VoidPromise when using ChannelGroup --- .../io/netty/channel/group/ChannelGroup.java | 34 +++- .../channel/group/DefaultChannelGroup.java | 57 ++++-- .../channel/group/VoidChannelGroupFuture.java | 169 ++++++++++++++++++ 3 files changed, 240 insertions(+), 20 deletions(-) create mode 100644 transport/src/main/java/io/netty/channel/group/VoidChannelGroupFuture.java diff --git a/transport/src/main/java/io/netty/channel/group/ChannelGroup.java b/transport/src/main/java/io/netty/channel/group/ChannelGroup.java index c66022e1d0..8d6944f485 100644 --- a/transport/src/main/java/io/netty/channel/group/ChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/ChannelGroup.java @@ -121,7 +121,7 @@ public interface ChannelGroup extends Set, Comparable { /** * Writes the specified {@code message} to all {@link Channel}s in this - * group that match the given {@link ChannelMatcher}. If the specified {@code message} is an instance of + * group that are matched by the given {@link ChannelMatcher}. If the specified {@code message} is an instance of * {@link ByteBuf}, it is automatically * {@linkplain ByteBuf#duplicate() duplicated} to avoid a race * condition. The same is true for {@link ByteBufHolder}. Please note that this operation is asynchronous as @@ -132,6 +132,22 @@ public interface ChannelGroup extends Set, Comparable { */ ChannelGroupFuture write(Object message, ChannelMatcher matcher); + /** + * Writes the specified {@code message} to all {@link Channel}s in this + * group that are matched by the given {@link ChannelMatcher}. If the specified {@code message} is an instance of + * {@link ByteBuf}, it is automatically + * {@linkplain ByteBuf#duplicate() duplicated} to avoid a race + * condition. The same is true for {@link ByteBufHolder}. Please note that this operation is asynchronous as + * {@link Channel#write(Object)} is. + * + * If {@code voidPromise} is {@code true} {@link Channel#voidPromise()} is used for the writes and so the same + * restrictions to the returned {@link ChannelGroupFuture} apply as to a void promise. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ + ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise); + /** * Flush all {@link Channel}s in this * group. If the specified {@code messages} are an instance of @@ -146,7 +162,7 @@ public interface ChannelGroup extends Set, Comparable { ChannelGroup flush(); /** - * Flush all {@link Channel}s in this group that match the given {@link ChannelMatcher}. + * Flush all {@link Channel}s in this group that are matched by the given {@link ChannelMatcher}. * If the specified {@code messages} are an instance of * {@link ByteBuf}, it is automatically * {@linkplain ByteBuf#duplicate() duplicated} to avoid a race @@ -171,10 +187,16 @@ public interface ChannelGroup extends Set, Comparable { /** * Shortcut for calling {@link #write(Object)} and {@link #flush()} and only act on - * {@link Channel}s that match the {@link ChannelMatcher}. + * {@link Channel}s that are matched by the {@link ChannelMatcher}. */ ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher); + /** + * Shortcut for calling {@link #write(Object, ChannelMatcher, boolean)} and {@link #flush()} and only act on + * {@link Channel}s that are matched by the {@link ChannelMatcher}. + */ + ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise); + /** * @deprecated Use {@link #writeAndFlush(Object, ChannelMatcher)} instead. */ @@ -191,7 +213,7 @@ public interface ChannelGroup extends Set, Comparable { /** * Disconnects all {@link Channel}s in this group from their remote peers, - * that match the given {@link ChannelMatcher}. + * that are matched by the given {@link ChannelMatcher}. * * @return the {@link ChannelGroupFuture} instance that notifies when * the operation is done for all channels @@ -209,7 +231,7 @@ public interface ChannelGroup extends Set, Comparable { ChannelGroupFuture close(); /** - * Closes all {@link Channel}s in this group that match the given {@link ChannelMatcher}. + * Closes all {@link Channel}s in this group that are matched by the given {@link ChannelMatcher}. * If the {@link Channel} is connected to a remote peer or bound to a local address, it is * automatically disconnected and unbound. * @@ -233,7 +255,7 @@ public interface ChannelGroup extends Set, Comparable { /** * @deprecated This method will be removed in the next major feature release. * - * Deregister all {@link Channel}s in this group from their {@link EventLoop} that match the given + * Deregister all {@link Channel}s in this group from their {@link EventLoop} that are matched by the given * {@link ChannelMatcher}. Please note that this operation is asynchronous as {@link Channel#deregister()} is. * * @return the {@link ChannelGroupFuture} instance that notifies when diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java index 19d74d46ed..26ef91dfdc 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java @@ -52,6 +52,7 @@ public class DefaultChannelGroup extends AbstractSet implements Channel remove(future.channel()); } }; + private final VoidChannelGroupFuture voidFuture = new VoidChannelGroupFuture(this); private final boolean stayClosed; private volatile boolean closed; @@ -254,6 +255,11 @@ public class DefaultChannelGroup extends AbstractSet implements Channel @Override public ChannelGroupFuture write(Object message, ChannelMatcher matcher) { + return write(message, matcher, false); + } + + @Override + public ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise) { if (message == null) { throw new NullPointerException("message"); } @@ -261,15 +267,25 @@ public class DefaultChannelGroup extends AbstractSet implements Channel throw new NullPointerException("matcher"); } - Map futures = new LinkedHashMap(size()); - for (Channel c: nonServerChannels.values()) { - if (matcher.matches(c)) { - futures.put(c, c.write(safeDuplicate(message))); + final ChannelGroupFuture future; + if (voidPromise) { + for (Channel c: nonServerChannels.values()) { + if (matcher.matches(c)) { + c.write(safeDuplicate(message), c.voidPromise()); + } } + future = voidFuture; + } else { + Map futures = new LinkedHashMap(size()); + for (Channel c: nonServerChannels.values()) { + if (matcher.matches(c)) { + futures.put(c, c.write(safeDuplicate(message))); + } + } + future = new DefaultChannelGroupFuture(this, futures, executor); } - ReferenceCountUtil.release(message); - return new DefaultChannelGroupFuture(this, futures, executor); + return future; } @Override @@ -383,21 +399,34 @@ public class DefaultChannelGroup extends AbstractSet implements Channel @Override public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) { + return writeAndFlush(message, matcher, false); + } + + @Override + public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise) { if (message == null) { throw new NullPointerException("message"); } - Map futures = new LinkedHashMap(size()); - - for (Channel c: nonServerChannels.values()) { - if (matcher.matches(c)) { - futures.put(c, c.writeAndFlush(safeDuplicate(message))); + final ChannelGroupFuture future; + if (voidPromise) { + for (Channel c: nonServerChannels.values()) { + if (matcher.matches(c)) { + c.writeAndFlush(safeDuplicate(message), c.voidPromise()); + } } + future = voidFuture; + } else { + Map futures = new LinkedHashMap(size()); + for (Channel c: nonServerChannels.values()) { + if (matcher.matches(c)) { + futures.put(c, c.writeAndFlush(safeDuplicate(message))); + } + } + future = new DefaultChannelGroupFuture(this, futures, executor); } - ReferenceCountUtil.release(message); - - return new DefaultChannelGroupFuture(this, futures, executor); + return future; } @Override diff --git a/transport/src/main/java/io/netty/channel/group/VoidChannelGroupFuture.java b/transport/src/main/java/io/netty/channel/group/VoidChannelGroupFuture.java new file mode 100644 index 0000000000..e8fa84266b --- /dev/null +++ b/transport/src/main/java/io/netty/channel/group/VoidChannelGroupFuture.java @@ -0,0 +1,169 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.group; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; + +final class VoidChannelGroupFuture implements ChannelGroupFuture { + + private static final Iterator EMPTY = Collections.emptyList().iterator(); + private final ChannelGroup group; + + VoidChannelGroupFuture(ChannelGroup group) { + this.group = group; + } + + @Override + public ChannelGroup group() { + return group; + } + + @Override + public ChannelFuture find(Channel channel) { + return null; + } + + @Override + public boolean isSuccess() { + return false; + } + + @Override + public ChannelGroupException cause() { + return null; + } + + @Override + public boolean isPartialSuccess() { + return false; + } + + @Override + public boolean isPartialFailure() { + return false; + } + + @Override + public ChannelGroupFuture addListener(GenericFutureListener> listener) { + throw reject(); + } + + @Override + public ChannelGroupFuture addListeners(GenericFutureListener>... listeners) { + throw reject(); + } + + @Override + public ChannelGroupFuture removeListener(GenericFutureListener> listener) { + throw reject(); + } + + @Override + public ChannelGroupFuture removeListeners(GenericFutureListener>... listeners) { + throw reject(); + } + + @Override + public ChannelGroupFuture await() { + throw reject(); + } + + @Override + public ChannelGroupFuture awaitUninterruptibly() { + throw reject(); + } + + @Override + public ChannelGroupFuture syncUninterruptibly() { + throw reject(); + } + + @Override + public ChannelGroupFuture sync() { + throw reject(); + } + + @Override + public Iterator iterator() { + return EMPTY; + } + + @Override + public boolean isCancellable() { + return false; + } + + @Override + public boolean await(long timeout, TimeUnit unit) { + throw reject(); + } + + @Override + public boolean await(long timeoutMillis) { + throw reject(); + } + + @Override + public boolean awaitUninterruptibly(long timeout, TimeUnit unit) { + throw reject(); + } + + @Override + public boolean awaitUninterruptibly(long timeoutMillis) { + throw reject(); + } + + @Override + public Void getNow() { + return null; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public Void get() { + throw reject(); + } + + @Override + public Void get(long timeout, TimeUnit unit) { + throw reject(); + } + + private static RuntimeException reject() { + return new IllegalStateException("void future"); + } +}