[#3127] Allow to write with VoidPromise to Channels in ChannelGroup

Motivation:

Users sometimes want to use Channel.voidPromise() when write to a Channel to reduce GC-pressure. This should be also possible when write via a ChannelGroup.

Modifications:

Add new write(...) and writeAndFlush(...) overloads which allow to signale that a VoidPromise should be used to write to the Channel

Result:

Users can write with VoidPromise when using ChannelGroup
This commit is contained in:
Norman Maurer 2016-05-12 11:03:42 +02:00
parent 27a392b877
commit c249926784
3 changed files with 240 additions and 20 deletions

View File

@ -121,7 +121,7 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
/**
* Writes the specified {@code message} to all {@link Channel}s in this
* group that match the given {@link ChannelMatcher}. If the specified {@code message} is an instance of
* group that are matched by the given {@link ChannelMatcher}. If the specified {@code message} is an instance of
* {@link ByteBuf}, it is automatically
* {@linkplain ByteBuf#duplicate() duplicated} to avoid a race
* condition. The same is true for {@link ByteBufHolder}. Please note that this operation is asynchronous as
@ -132,6 +132,22 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
*/
ChannelGroupFuture write(Object message, ChannelMatcher matcher);
/**
* Writes the specified {@code message} to all {@link Channel}s in this
* group that are matched by the given {@link ChannelMatcher}. If the specified {@code message} is an instance of
* {@link ByteBuf}, it is automatically
* {@linkplain ByteBuf#duplicate() duplicated} to avoid a race
* condition. The same is true for {@link ByteBufHolder}. Please note that this operation is asynchronous as
* {@link Channel#write(Object)} is.
*
* If {@code voidPromise} is {@code true} {@link Channel#voidPromise()} is used for the writes and so the same
* restrictions to the returned {@link ChannelGroupFuture} apply as to a void promise.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise);
/**
* Flush all {@link Channel}s in this
* group. If the specified {@code messages} are an instance of
@ -146,7 +162,7 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
ChannelGroup flush();
/**
* Flush all {@link Channel}s in this group that match the given {@link ChannelMatcher}.
* Flush all {@link Channel}s in this group that are matched by the given {@link ChannelMatcher}.
* If the specified {@code messages} are an instance of
* {@link ByteBuf}, it is automatically
* {@linkplain ByteBuf#duplicate() duplicated} to avoid a race
@ -171,10 +187,16 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
/**
* Shortcut for calling {@link #write(Object)} and {@link #flush()} and only act on
* {@link Channel}s that match the {@link ChannelMatcher}.
* {@link Channel}s that are matched by the {@link ChannelMatcher}.
*/
ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher);
/**
* Shortcut for calling {@link #write(Object, ChannelMatcher, boolean)} and {@link #flush()} and only act on
* {@link Channel}s that are matched by the {@link ChannelMatcher}.
*/
ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise);
/**
* @deprecated Use {@link #writeAndFlush(Object, ChannelMatcher)} instead.
*/
@ -191,7 +213,7 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
/**
* Disconnects all {@link Channel}s in this group from their remote peers,
* that match the given {@link ChannelMatcher}.
* that are matched by the given {@link ChannelMatcher}.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
@ -209,7 +231,7 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
ChannelGroupFuture close();
/**
* Closes all {@link Channel}s in this group that match the given {@link ChannelMatcher}.
* Closes all {@link Channel}s in this group that are matched by the given {@link ChannelMatcher}.
* If the {@link Channel} is connected to a remote peer or bound to a local address, it is
* automatically disconnected and unbound.
*
@ -233,7 +255,7 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
/**
* @deprecated This method will be removed in the next major feature release.
*
* Deregister all {@link Channel}s in this group from their {@link EventLoop} that match the given
* Deregister all {@link Channel}s in this group from their {@link EventLoop} that are matched by the given
* {@link ChannelMatcher}. Please note that this operation is asynchronous as {@link Channel#deregister()} is.
*
* @return the {@link ChannelGroupFuture} instance that notifies when

View File

@ -52,6 +52,7 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
remove(future.channel());
}
};
private final VoidChannelGroupFuture voidFuture = new VoidChannelGroupFuture(this);
private final boolean stayClosed;
private volatile boolean closed;
@ -254,6 +255,11 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
@Override
public ChannelGroupFuture write(Object message, ChannelMatcher matcher) {
return write(message, matcher, false);
}
@Override
public ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise) {
if (message == null) {
throw new NullPointerException("message");
}
@ -261,15 +267,25 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
throw new NullPointerException("matcher");
}
Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels.values()) {
if (matcher.matches(c)) {
futures.put(c, c.write(safeDuplicate(message)));
final ChannelGroupFuture future;
if (voidPromise) {
for (Channel c: nonServerChannels.values()) {
if (matcher.matches(c)) {
c.write(safeDuplicate(message), c.voidPromise());
}
}
future = voidFuture;
} else {
Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels.values()) {
if (matcher.matches(c)) {
futures.put(c, c.write(safeDuplicate(message)));
}
}
future = new DefaultChannelGroupFuture(this, futures, executor);
}
ReferenceCountUtil.release(message);
return new DefaultChannelGroupFuture(this, futures, executor);
return future;
}
@Override
@ -383,21 +399,34 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
@Override
public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher) {
return writeAndFlush(message, matcher, false);
}
@Override
public ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise) {
if (message == null) {
throw new NullPointerException("message");
}
Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels.values()) {
if (matcher.matches(c)) {
futures.put(c, c.writeAndFlush(safeDuplicate(message)));
final ChannelGroupFuture future;
if (voidPromise) {
for (Channel c: nonServerChannels.values()) {
if (matcher.matches(c)) {
c.writeAndFlush(safeDuplicate(message), c.voidPromise());
}
}
future = voidFuture;
} else {
Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels.values()) {
if (matcher.matches(c)) {
futures.put(c, c.writeAndFlush(safeDuplicate(message)));
}
}
future = new DefaultChannelGroupFuture(this, futures, executor);
}
ReferenceCountUtil.release(message);
return new DefaultChannelGroupFuture(this, futures, executor);
return future;
}
@Override

View File

@ -0,0 +1,169 @@
/*
* Copyright 2016 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.group;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
final class VoidChannelGroupFuture implements ChannelGroupFuture {
private static final Iterator<ChannelFuture> EMPTY = Collections.<ChannelFuture>emptyList().iterator();
private final ChannelGroup group;
VoidChannelGroupFuture(ChannelGroup group) {
this.group = group;
}
@Override
public ChannelGroup group() {
return group;
}
@Override
public ChannelFuture find(Channel channel) {
return null;
}
@Override
public boolean isSuccess() {
return false;
}
@Override
public ChannelGroupException cause() {
return null;
}
@Override
public boolean isPartialSuccess() {
return false;
}
@Override
public boolean isPartialFailure() {
return false;
}
@Override
public ChannelGroupFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
throw reject();
}
@Override
public ChannelGroupFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
throw reject();
}
@Override
public ChannelGroupFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
throw reject();
}
@Override
public ChannelGroupFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners) {
throw reject();
}
@Override
public ChannelGroupFuture await() {
throw reject();
}
@Override
public ChannelGroupFuture awaitUninterruptibly() {
throw reject();
}
@Override
public ChannelGroupFuture syncUninterruptibly() {
throw reject();
}
@Override
public ChannelGroupFuture sync() {
throw reject();
}
@Override
public Iterator<ChannelFuture> iterator() {
return EMPTY;
}
@Override
public boolean isCancellable() {
return false;
}
@Override
public boolean await(long timeout, TimeUnit unit) {
throw reject();
}
@Override
public boolean await(long timeoutMillis) {
throw reject();
}
@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
throw reject();
}
@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
throw reject();
}
@Override
public Void getNow() {
return null;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public Void get() {
throw reject();
}
@Override
public Void get(long timeout, TimeUnit unit) {
throw reject();
}
private static RuntimeException reject() {
return new IllegalStateException("void future");
}
}