diff --git a/src/main/java/org/jboss/netty/channel/group/ChannelGroup.java b/src/main/java/org/jboss/netty/channel/group/ChannelGroup.java index f0ba53424c..a153757d9c 100644 --- a/src/main/java/org/jboss/netty/channel/group/ChannelGroup.java +++ b/src/main/java/org/jboss/netty/channel/group/ChannelGroup.java @@ -25,9 +25,54 @@ package org.jboss.netty.channel.group; import java.net.SocketAddress; import java.util.Set; +import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ServerChannel; /** + * A thread-safe {@link Set} that contains open {@link Channel}s and provides + * various bulk operations on them. Using {@link ChannelGroup}, you can + * categorize {@link Channel}s into a meaningful group (e.g. on a per-service + * or per-state basis.) A closed {@link Channel} is automatically removed from + * the collection, so that you don't need to worry about the life cycle of the + * added {@link Channel}. A {@link Channel} can belong to more than one + * {@link ChannelGroup}. + * + *

Simplify shutdown process with {@link ChannelGroup}

+ *

+ * If both {@link ServerChannel}s and non-{@link ServerChannel}s exists in the + * same {@link ChannelGroup}, any requested I/O operations on the group are + * performed for the {@link ServerChannel}s first and then for the others. + *

+ * This rule is very useful when you shut down a server in one shot: + * + *

+ * ChannelGroup allChannels = new DefaultChannelGroup();
+ *
+ * public static void main(String[] args) throws Exception {
+ *     ServerBootstrap b = new ServerBootstrap(..);
+ *     ...
+ *
+ *     // Start the server
+ *     b.getPipeline().addLast("handler", new MyHandler());
+ *     Channel serverChannel = b.bind(..);
+ *
+ *     ... Wait until the shutdown signal reception ...
+ *
+ *     // Close the serverChannel and then all accepted connections.
+ *     allChannels.close().awaitUninterruptibly();
+ *     b.releaseExternalResources();
+ * }
+ *
+ * public class MyHandler extends SimpleChannelUpstreamHandler {
+ *     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ *         // Add all open channels to the global group so that they are
+ *         // closed on shutdown.
+ *         allChannels.add(e.getChannel());
+ *     }
+ * }
+ * 
+ * * @author The Netty Project (netty-dev@lists.jboss.org) * @author Trustin Lee (tlee@redhat.com) * @version $Rev$, $Date$ @@ -36,13 +81,89 @@ import org.jboss.netty.channel.Channel; * @apiviz.has org.jboss.netty.channel.group.ChannelGroupFuture oneway - - returns */ public interface ChannelGroup extends Set, Comparable { + + /** + * Returns the name of this group. A group name is purely for helping + * you to distinguish one group from others. + */ String getName(); + + /** + * Returns the {@link Channel} whose ID matches the specified integer. + * + * @return the matching {@link Channel} if found. {@code null} otherwise. + */ Channel find(Integer id); + + /** + * Calls {@link Channel#setInterestOps(int)} for all {@link Channel}s in + * this group with the specified {@code interestOps}. Please note that + * this operation is asynchronous as {@link Channel#setInterestOps(int)} is. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ ChannelGroupFuture setInterestOps(int interestOps); + + /** + * Calls {@link Channel#setReadable(boolean)} for all {@link Channel}s in + * this group with the specified boolean flag. Please note that this + * operation is asynchronous as {@link Channel#setReadable(boolean)} is. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ ChannelGroupFuture setReadable(boolean readable); + + /** + * Writes the specified {@code message} to all {@link Channel}s in this + * group. If the specified {@code message} is an instance of + * {@link ChannelBuffer}, it is automatically + * {@linkplain ChannelBuffer#duplicate() duplicated} to avoid a race + * condition. Please note that this operation is asynchronous as + * {@link Channel#write(Object)} is. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ ChannelGroupFuture write(Object message); + + /** + * Writes the specified {@code message} with the specified + * {@code remoteAddress} to all {@link Channel}s in this group. If the + * specified {@code message} is an instance of {@link ChannelBuffer}, it is + * automatically {@linkplain ChannelBuffer#duplicate() duplicated} to avoid + * a race condition. Please note that this operation is asynchronous as + * {@link Channel#write(Object, SocketAddress)} is. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ ChannelGroupFuture write(Object message, SocketAddress remoteAddress); + + /** + * Disconnects all {@link Channel}s in this group from their remote peers. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ ChannelGroupFuture disconnect(); + + /** + * Unbinds all {@link Channel}s in this group from their local address. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ ChannelGroupFuture unbind(); + + /** + * Closes all {@link Channel}s in this group. If the {@link Channel} is + * connected to a remote peer or bound to a local address, it is + * automatically disconnected and unbound. + * + * @return the {@link ChannelGroupFuture} instance that notifies when + * the operation is done for all channels + */ ChannelGroupFuture close(); } diff --git a/src/main/java/org/jboss/netty/channel/group/ChannelGroupFuture.java b/src/main/java/org/jboss/netty/channel/group/ChannelGroupFuture.java index e660d637d8..cf25aeaac8 100644 --- a/src/main/java/org/jboss/netty/channel/group/ChannelGroupFuture.java +++ b/src/main/java/org/jboss/netty/channel/group/ChannelGroupFuture.java @@ -22,12 +22,87 @@ */ package org.jboss.netty.channel.group; +import java.util.Iterator; import java.util.concurrent.TimeUnit; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelHandler; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.handler.execution.ExecutionHandler; /** + * The result of an asynchronous {@link ChannelGroup} operation. + * + *

+ * All I/O operations in {@link ChannelGroup} are asynchronous. It means any + * I/O calls will return immediately with no guarantee that the requested I/O + * operations have been completed at the end of the call. Instead, you will be + * returned with a {@link ChannelGroupFuture} instance which tells you when the + * requested I/O operations have succeeded, failed, or cancelled. + *

+ * Various methods are provided to let you check if the I/O operations has been + * completed, wait for the completion, and retrieve the result of the I/O + * operation. It also allows you to add more than one + * {@link ChannelGroupFutureListener} so you can get notified when the I/O + * operation have been completed. + * + *

Prefer {@link #addListener(ChannelGroupFutureListener)} to {@link #await()}

+ * + * It is recommended to prefer {@link #addListener(ChannelGroupFutureListener)} to + * {@link #await()} wherever possible to get notified when I/O operations are + * done and to do any follow-up tasks. + *

+ * {@link #addListener(ChannelGroupFutureListener)} is non-blocking. It simply + * adds the specified {@link ChannelGroupFutureListener} to the + * {@link ChannelGroupFuture}, and I/O thread will notify the listeners when + * the I/O operations associated with the future is done. + * {@link ChannelGroupFutureListener} yields the best performance and resource + * utilization because it does not block at all, but it could be tricky to + * implement a sequential logic if you are not used to event-driven programming. + *

+ * By contrast, {@link #await()} is a blocking operation. Once called, the + * caller thread blocks until all I/O operations are done. It is easier to + * implement a sequential logic with {@link #await()}, but the caller thread + * blocks unnecessarily until all I/O operations are done and there's relatively + * expensive cost of inter-thread notification. Moreover, there's a chance of + * dead lock in a particular circumstance, which is described below. + * + *

Do not call {@link #await()} inside {@link ChannelHandler}

+ *

+ * The event handler methods in {@link ChannelHandler} is often called by + * an I/O thread unless an {@link ExecutionHandler} is in the + * {@link ChannelPipeline}. If {@link #await()} is called by an event handler + * method, which is called by the I/O thread, the I/O operation it is waiting + * for might never be complete because {@link #await()} can block the I/O + * operation it is waiting for, which is a dead lock. + *

+ * // BAD - NEVER DO THIS
+ * public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ *     if (e.getMessage() instanceof ShutdownMessage) {
+ *         ChannelGroup allChannels = MyServer.getAllChannels();
+ *         ChannelGroupFuture future = allChannels.close();
+ *         future.awaitUninterruptibly();
+ *         // Perform post-shutdown operation
+ *         // ...
+ *     }
+ * }
+ *
+ * // GOOD
+ * public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+ *     if (e.getMessage() instanceof ShutdownMessage) {
+ *         ChannelGroup allChannels = MyServer.getAllChannels();
+ *         ChannelGroupFuture future = allChannels.close();
+ *         future.addListener(new ChannelGroupFutureListener() {
+ *             public void operationComplete(ChannelGroupFuture future) {
+ *                 // Perform post-closure operation
+ *                 // ...
+ *             }
+ *         });
+ *     }
+ * }
+ * 
+ * * @author The Netty Project (netty-dev@lists.jboss.org) * @author Trustin Lee (tlee@redhat.com) * @version $Rev$, $Date$ @@ -36,9 +111,28 @@ import org.jboss.netty.channel.ChannelFuture; */ public interface ChannelGroupFuture extends Iterable{ + /** + * Returns the {@link ChannelGroup} which is associated with this future. + */ ChannelGroup getGroup(); + /** + * Returns the {@link ChannelFuture} of the I/O operation which is + * associated with the {@link Channel} whose ID matches the specified + * integer. + * + * @return the matching {@link ChannelFuture} if found. + * {@code null} otherwise. + */ ChannelFuture find(Integer channelId); + + /** + * Returns the {@link ChannelFuture} of the I/O operation which is + * associated with the specified {@link Channel}. + * + * @return the matching {@link ChannelFuture} if found. + * {@code null} otherwise. + */ ChannelFuture find(Channel channel); /** @@ -48,12 +142,28 @@ public interface ChannelGroupFuture extends Iterable{ */ boolean isDone(); + /** + * Returns {@code true} if and only if all I/O operations associated with + * this future were successful without any failure. + */ boolean isCompleteSuccess(); + /** + * Returns {@code true} if and only if the I/O operations associated with + * this future were partially successful with some failure. + */ boolean isPartialSuccess(); + /** + * Returns {@code true} if and only if all I/O operations associated with + * this future have failed without any success. + */ boolean isCompleteFailure(); + /** + * Returns {@code true} if and only if the I/O operations associated with + * this future have failed partially with some success. + */ boolean isPartialFailure(); /** @@ -131,4 +241,12 @@ public interface ChannelGroupFuture extends Iterable{ * the specified time limit */ boolean awaitUninterruptibly(long timeoutMillis); + + /** + * Returns the {@link Iterator} that enumerates all {@link ChannelFuture}s + * which are associated with this future. Please note that the returned + * {@link Iterator} is is unmodifiable, which means a {@link ChannelFuture} + * cannot be removed from this future. + */ + Iterator iterator(); } diff --git a/src/main/java/org/jboss/netty/channel/group/ChannelGroupFutureListener.java b/src/main/java/org/jboss/netty/channel/group/ChannelGroupFutureListener.java index f288a151a8..680b26ce60 100644 --- a/src/main/java/org/jboss/netty/channel/group/ChannelGroupFutureListener.java +++ b/src/main/java/org/jboss/netty/channel/group/ChannelGroupFutureListener.java @@ -26,8 +26,9 @@ import java.util.EventListener; /** * Listens to the result of a {@link ChannelGroupFuture}. The result of the - * asynchronous {@link ChannelGroup} I/O operation is notified once this - * listener is added by calling {@link ChannelGroupFuture#addListener(ChannelGroupFutureListener)}. + * asynchronous {@link ChannelGroup} I/O operations is notified once this + * listener is added by calling {@link ChannelGroupFuture#addListener(ChannelGroupFutureListener)} + * and all I/O operations are complete. * * @author The Netty Project (netty-dev@lists.jboss.org) * @author Trustin Lee (tlee@redhat.com) @@ -37,8 +38,8 @@ import java.util.EventListener; public interface ChannelGroupFutureListener extends EventListener { /** - * Invoked when the I/O operation associated with the - * {@link ChannelGroupFuture} has been completed. + * Invoked when all I/O operations associated with the + * {@link ChannelGroupFuture} have been completed. * * @param future The source {@link ChannelGroupFuture} which called this * callback. diff --git a/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroup.java b/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroup.java index bca34883a4..a50f241c7d 100644 --- a/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroup.java +++ b/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroup.java @@ -41,6 +41,8 @@ import org.jboss.netty.util.internal.CombinedIterator; import org.jboss.netty.util.internal.ConcurrentHashMap; /** + * The default {@link ChannelGroup} implementation. + * * @author The Netty Project (netty-dev@lists.jboss.org) * @author Trustin Lee (tlee@redhat.com) * @version $Rev$, $Date$ @@ -58,10 +60,18 @@ public class DefaultChannelGroup extends AbstractSet implements Channel } }; + /** + * Creates a new group with a generated name. + */ public DefaultChannelGroup() { this("group-0x" + Integer.toHexString(nextId.incrementAndGet())); } + /** + * Creates a new group with the specified {@code name}. Please note that + * different groups can have the same name, which means no duplicate check + * is done against group names. + */ public DefaultChannelGroup(String name) { if (name == null) { throw new NullPointerException("name"); diff --git a/src/main/java/org/jboss/netty/channel/group/package-info.java b/src/main/java/org/jboss/netty/channel/group/package-info.java index dd8ed0469d..b8eda4399d 100644 --- a/src/main/java/org/jboss/netty/channel/group/package-info.java +++ b/src/main/java/org/jboss/netty/channel/group/package-info.java @@ -22,8 +22,8 @@ */ /** - * A global channel registry which helps a user maintain the list of open - * channels and perform bulk operations on them. + * A channel registry which helps a user maintain the list of open + * {@link org.jboss.netty.channel.Channel}s and perform bulk operations on them. * * @apiviz.exclude ^java * @apiviz.exclude \.(Abstract|Default).*$