Introduce ChannelGroupMatcher which can be used to only apply operations of a ChannelGroup on matching Channels.

This is often useful if you for example use a ChannelGroup to hold all connected Channels and want to broadcast a message too all of them
except one Channel.
This commit is contained in:
Norman Maurer 2013-07-08 07:42:19 +02:00
parent 12ea35fd5f
commit d23c3b3382
4 changed files with 391 additions and 61 deletions

View File

@ -17,6 +17,7 @@ package io.netty.channel.group;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@ -100,28 +101,63 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
* group. If the specified {@code message} is an instance of
* {@link ByteBuf}, it is automatically
* {@linkplain ByteBuf#duplicate() duplicated} to avoid a race
* condition. Please note that this operation is asynchronous as
* condition. The same is true for {@link ByteBufHolder}. Please note that this operation is asynchronous as
* {@link Channel#write(Object)} is.
*
* @return itself
*/
ChannelGroupFuture write(Object message);
/**
* Writes the specified {@code message} to all {@link Channel}s in this
* group that match the given {@link ChannelGroupMatcher}. If the specified {@code message} is an instance of
* {@link ByteBuf}, it is automatically
* {@linkplain ByteBuf#duplicate() duplicated} to avoid a race
* condition. The same is true for {@link ByteBufHolder}. Please note that this operation is asynchronous as
* {@link Channel#write(Object)} is.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
ChannelGroupFuture write(Object message, ChannelGroupMatcher matcher);
/**
* Flush all {@link Channel}s in this
* group. Please note that this operation is asynchronous as
* {@link Channel#flush()} is.
* group. If the specified {@code messages} are an instance of
* {@link ByteBuf}, it is automatically
* {@linkplain ByteBuf#duplicate() duplicated} to avoid a race
* condition. Please note that this operation is asynchronous as
* {@link Channel#write(Object)} is.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
ChannelGroup flush();
/**
* Flush all {@link Channel}s in this group that match the given {@link ChannelGroupMatcher}.
* If the specified {@code messages} are an instance of
* {@link ByteBuf}, it is automatically
* {@linkplain ByteBuf#duplicate() duplicated} to avoid a race
* condition. Please note that this operation is asynchronous as
* {@link Channel#write(Object)} is.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
ChannelGroup flush(ChannelGroupMatcher matcher);
/**
* Shortcut for calling {@link #write(Object)} and {@link #flush()}.
*/
ChannelGroupFuture flushAndWrite(Object message);
/**
* Shortcut for calling {@link #write(Object)} and {@link #flush()} and only act on
* {@link Channel}s that match the {@link ChannelGroupMatcher}.
*/
ChannelGroupFuture flushAndWrite(Object message, ChannelGroupMatcher matcher);
/**
* Disconnects all {@link Channel}s in this group from their remote peers.
*
@ -130,6 +166,15 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
*/
ChannelGroupFuture disconnect();
/**
* Disconnects all {@link Channel}s in this group from their remote peers,
* that match the given {@link ChannelGroupMatcher}.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
ChannelGroupFuture disconnect(ChannelGroupMatcher matcher);
/**
* Closes all {@link Channel}s in this group. If the {@link Channel} is
* connected to a remote peer or bound to a local address, it is
@ -140,6 +185,16 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
*/
ChannelGroupFuture close();
/**
* Closes all {@link Channel}s in this group that match the given {@link ChannelGroupMatcher}.
* If the {@link Channel} is connected to a remote peer or bound to a local address, it is
* automatically disconnected and unbound.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
ChannelGroupFuture close(ChannelGroupMatcher matcher);
/**
* Deregister all {@link Channel}s in this group from their {@link EventLoop}.
* Please note that this operation is asynchronous as {@link Channel#deregister()} is.
@ -148,4 +203,13 @@ public interface ChannelGroup extends Set<Channel>, Comparable<ChannelGroup> {
* the operation is done for all channels
*/
ChannelGroupFuture deregister();
/**
* Deregister all {@link Channel}s in this group from their {@link EventLoop} that match the given
* {@link ChannelGroupMatcher}. Please note that this operation is asynchronous as {@link Channel#deregister()} is.
*
* @return the {@link ChannelGroupFuture} instance that notifies when
* the operation is done for all channels
*/
ChannelGroupFuture deregister(ChannelGroupMatcher matcher);
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.group;
import io.netty.channel.Channel;
/**
* Allows to only match some {@link Channel}'s for operations in {@link ChannelGroup}.
*
* {@link ChannelGroupMatchers} provide you with helper methods for usual needed implementations.
*/
public interface ChannelGroupMatcher {
/**
* Returns {@code true} if the operation should be also executed on the given {@link Channel}.
*/
boolean matches(Channel channel);
}

View File

@ -0,0 +1,169 @@
/*
* Copyright 2013 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.group;
import io.netty.channel.Channel;
import io.netty.channel.ServerChannel;
/**
* Helper class which provides often used {@link ChannelGroupMatcher} implementations.
*/
public final class ChannelGroupMatchers {
private static final ChannelGroupMatcher ALL_MATCHER = new ChannelGroupMatcher() {
@Override
public boolean matches(Channel channel) {
return true;
}
};
private static final ChannelGroupMatcher SERVER_CHANNEL_MATCHER = isInstanceOf(ServerChannel.class);
private static final ChannelGroupMatcher NON_SERVER_CHANNEL_MATCHER = isNotInstanceOf(ServerChannel.class);
private ChannelGroupMatchers() {
// static methods only
}
/**
* Returns a {@link ChannelGroupMatcher} that matches all {@link Channel}s.
*/
public static ChannelGroupMatcher all() {
return ALL_MATCHER;
}
/**
* Returns a {@link ChannelGroupMatcher} that matches all {@link Channel}s except the given.
*/
public static ChannelGroupMatcher isNot(Channel channel) {
return invert(is(channel));
}
/**
* Returns a {@link ChannelGroupMatcher} that matches the given {@link Channel}.
*/
public static ChannelGroupMatcher is(Channel channel) {
return new InstanceMatcher(channel);
}
/**
* Returns a {@link ChannelGroupMatcher} that matches all {@link Channel}s that are an instance of sub-type of
* the given class.
*/
public static ChannelGroupMatcher isInstanceOf(Class<? extends Channel> clazz) {
return new ClassMatcher(clazz);
}
/**
* Returns a {@link ChannelGroupMatcher} that matches all {@link Channel}s that are <strong>not</strong> an
* instance of sub-type of the given class.
*/
public static ChannelGroupMatcher isNotInstanceOf(Class<? extends Channel> clazz) {
return invert(isInstanceOf(clazz));
}
/**
* Returns a {@link ChannelGroupMatcher} that matches all {@link Channel}s that are of type {@link ServerChannel}.
*/
public static ChannelGroupMatcher isServerChannel() {
return SERVER_CHANNEL_MATCHER;
}
/**
* Returns a {@link ChannelGroupMatcher} that matches all {@link Channel}s that are <strong>not</strong> of type
* {@link ServerChannel}.
*/
public static ChannelGroupMatcher isNonServerChannel() {
return NON_SERVER_CHANNEL_MATCHER;
}
/**
* Invert the given {@link ChannelGroupMatcher}.
*/
public static ChannelGroupMatcher invert(ChannelGroupMatcher matcher) {
return new InvertMatcher(matcher);
}
/**
* Return a composite of the given {@link ChannelGroupMatcher}s. This means all {@link ChannelGroupMatcher} must
* return {@code true} to match.
*/
public static ChannelGroupMatcher compose(ChannelGroupMatcher... matchers) {
if (matchers.length < 1) {
throw new IllegalArgumentException("matchers must at least contain one element");
}
if (matchers.length == 1) {
return matchers[0];
}
return new CompositeMatcher(matchers);
}
private static final class CompositeMatcher implements ChannelGroupMatcher {
private final ChannelGroupMatcher[] matchers;
CompositeMatcher(ChannelGroupMatcher... matchers) {
this.matchers = matchers;
}
@Override
public boolean matches(Channel channel) {
for (int i = 0; i < matchers.length; i++) {
if (!matchers[i].matches(channel)) {
return true;
}
}
return false;
}
}
private static final class InvertMatcher implements ChannelGroupMatcher {
private final ChannelGroupMatcher matcher;
InvertMatcher(ChannelGroupMatcher matcher) {
this.matcher = matcher;
}
@Override
public boolean matches(Channel channel) {
return !matcher.matches(channel);
}
}
private static final class InstanceMatcher implements ChannelGroupMatcher {
private final Channel channel;
InstanceMatcher(Channel channel) {
this.channel = channel;
}
@Override
public boolean matches(Channel ch) {
return channel == ch;
}
}
private static final class ClassMatcher implements ChannelGroupMatcher {
private final Class<? extends Channel> clazz;
ClassMatcher(Class<? extends Channel> clazz) {
this.clazz = clazz;
}
@Override
public boolean matches(Channel ch) {
return clazz.isInstance(ch);
}
}
}

View File

@ -163,69 +163,21 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
@Override
public ChannelGroupFuture close() {
Map<Channel, ChannelFuture> futures =
new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: serverChannels) {
futures.put(c, c.close());
}
for (Channel c: nonServerChannels) {
futures.put(c, c.close());
}
return new DefaultChannelGroupFuture(this, futures, executor);
return close(ChannelGroupMatchers.all());
}
@Override
public ChannelGroupFuture disconnect() {
Map<Channel, ChannelFuture> futures =
new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: serverChannels) {
futures.put(c, c.disconnect());
}
for (Channel c: nonServerChannels) {
futures.put(c, c.disconnect());
}
return new DefaultChannelGroupFuture(this, futures, executor);
return disconnect(ChannelGroupMatchers.all());
}
@Override
public ChannelGroupFuture deregister() {
return deregister(ChannelGroupMatchers.all());
}
@Override
public ChannelGroupFuture write(Object message) {
if (message == null) {
throw new NullPointerException("message");
}
Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels) {
futures.put(c, c.write(safeDuplicate(message)));
}
ReferenceCountUtil.release(message);
return new DefaultChannelGroupFuture(this, futures, executor);
}
@Override
public ChannelGroup flush() {
for (Channel c: nonServerChannels) {
c.flush();
}
return this;
}
@Override
public ChannelGroupFuture flushAndWrite(Object message) {
Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels) {
futures.put(c, c.writeAndFlush(safeDuplicate(message)));
}
ReferenceCountUtil.release(message);
return new DefaultChannelGroupFuture(this, futures, executor);
return write(message, ChannelGroupMatchers.all());
}
// Create a safe duplicate of the message to write it to a channel but not affect other writes.
@ -241,20 +193,133 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
}
@Override
public ChannelGroupFuture deregister() {
public ChannelGroupFuture write(Object message, ChannelGroupMatcher matcher) {
if (message == null) {
throw new NullPointerException("message");
}
if (matcher == null) {
throw new NullPointerException("matcher");
}
Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels) {
if (matcher.matches(c)) {
futures.put(c, c.write(safeDuplicate(message)));
}
}
ReferenceCountUtil.release(message);
return new DefaultChannelGroupFuture(this, futures, executor);
}
@Override
public ChannelGroup flush() {
return flush(ChannelGroupMatchers.all());
}
@Override
public ChannelGroupFuture flushAndWrite(Object message) {
return flushAndWrite(message, ChannelGroupMatchers.all());
}
@Override
public ChannelGroupFuture disconnect(ChannelGroupMatcher matcher) {
if (matcher == null) {
throw new NullPointerException("matcher");
}
Map<Channel, ChannelFuture> futures =
new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: serverChannels) {
futures.put(c, c.deregister());
if (matcher.matches(c)) {
futures.put(c, c.disconnect());
}
}
for (Channel c: nonServerChannels) {
futures.put(c, c.deregister());
if (matcher.matches(c)) {
futures.put(c, c.disconnect());
}
}
return new DefaultChannelGroupFuture(this, futures, executor);
}
@Override
public ChannelGroupFuture close(ChannelGroupMatcher matcher) {
if (matcher == null) {
throw new NullPointerException("matcher");
}
Map<Channel, ChannelFuture> futures =
new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: serverChannels) {
if (matcher.matches(c)) {
futures.put(c, c.close());
}
}
for (Channel c: nonServerChannels) {
if (matcher.matches(c)) {
futures.put(c, c.close());
}
}
return new DefaultChannelGroupFuture(this, futures, executor);
}
@Override
public ChannelGroupFuture deregister(ChannelGroupMatcher matcher) {
if (matcher == null) {
throw new NullPointerException("matcher");
}
Map<Channel, ChannelFuture> futures =
new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: serverChannels) {
if (matcher.matches(c)) {
futures.put(c, c.deregister());
}
}
for (Channel c: nonServerChannels) {
if (matcher.matches(c)) {
futures.put(c, c.deregister());
}
}
return new DefaultChannelGroupFuture(this, futures, executor);
}
@Override
public ChannelGroup flush(ChannelGroupMatcher matcher) {
for (Channel c: nonServerChannels) {
if (matcher.matches(c)) {
c.flush();
}
}
return this;
}
@Override
public ChannelGroupFuture flushAndWrite(Object message, ChannelGroupMatcher matcher) {
if (message == null) {
throw new NullPointerException("message");
}
Map<Channel, ChannelFuture> futures = new LinkedHashMap<Channel, ChannelFuture>(size());
for (Channel c: nonServerChannels) {
if (matcher.matches(c)) {
futures.put(c, c.writeAndFlush(safeDuplicate(message)));
}
}
ReferenceCountUtil.release(message);
return new DefaultChannelGroupFuture(this, futures, executor);
}
@Override
public int hashCode() {
return System.identityHashCode(this);