diff --git a/src/main/java/org/jboss/netty/channel/ChannelPipeline.java b/src/main/java/org/jboss/netty/channel/ChannelPipeline.java index 7b8d599b24..d03ab0413b 100644 --- a/src/main/java/org/jboss/netty/channel/ChannelPipeline.java +++ b/src/main/java/org/jboss/netty/channel/ChannelPipeline.java @@ -59,40 +59,40 @@ import org.jboss.netty.handler.ssl.SslHandler; * *
  *
- *                                         I/O Request
- *                                       via {@link Channel} or
- *                                   {@link ChannelHandlerContext}
- *                                             |
- *   +-----------------------------------------+----------------+
- *   |                     ChannelPipeline     |                |
- *   |                                        \|/               |
- *   |   +----------------------+  +-----------+------------+   |
- *   |   | Upstream Handler  N  |  | Downstream Handler  1  |   |
- *   |   +----------+-----------+  +-----------+------------+   |
- *   |             /|\                         |                |
- *   |              |                         \|/               |
- *   |   +----------+-----------+  +-----------+------------+   |
- *   |   | Upstream Handler N-1 |  | Downstream Handler  2  |   |
- *   |   +----------+-----------+  +-----------+------------+   |
- *   |             /|\                         .                |
- *   |              .                          .                |
- *   |      [ Going Upstream ]        [ Going Downstream ]      |
- *   |              .                          .                |
- *   |              .                         \|/               |
- *   |   +----------+-----------+  +-----------+------------+   |
- *   |   | Upstream Handler  2  |  | Downstream Handler M-1 |   |
- *   |   +----------+-----------+  +-----------+------------+   |
- *   |             /|\                         |                |
- *   |              |                         \|/               |
- *   |   +----------+-----------+  +-----------+------------+   |
- *   |   | Upstream Handler  1  |  | Downstream Handler  M  |   |
- *   |   +----------+-----------+  +-----------+------------+   |
- *   |             /|\                         |                |
- *   +--------------+--------------------------+----------------+
- *                  |                         \|/
- *   +--------------+--------------------------+----------------+
- *   |         I/O Threads (Transport Implementation)           |
- *   +----------------------------------------------------------+
+ *                                             I/O Request
+ *                                           via {@link Channel} or
+ *                                       {@link ChannelHandlerContext}
+ *                                                 |
+ *   +---------------------------------------------+----------------+
+ *   |                       ChannelPipeline       |                |
+ *   |                                            \|/               |
+ *   |       +----------------------+  +-----------+------------+   |
+ *   |  LAST | Upstream Handler  N  |  | Downstream Handler  M  |   |
+ *   |   .   +----------+-----------+  +-----------+------------+   |
+ *   |   .             /|\                         |                |
+ *   |   .              |                         \|/               |
+ *   |   .   +----------+-----------+  +-----------+------------+   |
+ *   |   .   | Upstream Handler N-1 |  | Downstream Handler M-1 |   |
+ *   |   .   +----------+-----------+  +-----------+------------+   |
+ *   |   .             /|\                         .                |
+ *   |   .              .                          .                |
+ *   |   .      [ Going UPSTREAM ]        [ Going DOWNSTREAM ]      |
+ *   |   .              .                          .                |
+ *   |   .              .                         \|/               |
+ *   |   .   +----------+-----------+  +-----------+------------+   |
+ *   |   .   | Upstream Handler  2  |  | Downstream Handler  2  |   |
+ *   |   .   +----------+-----------+  +-----------+------------+   |
+ *   |   .             /|\                         |                |
+ *   |   .              |                         \|/               |
+ *   |   .   +----------+-----------+  +-----------+------------+   |
+ *   | FIRST | Upstream Handler  1  |  | Downstream Handler  1  |   |
+ *   |       +----------+-----------+  +-----------+------------+   |
+ *   |                 /|\                         |                |
+ *   +------------------+--------------------------+----------------+
+ *                      |                         \|/
+ *   +------------------+--------------------------+----------------+
+ *   |             I/O Threads (Transport Implementation)           |
+ *   +--------------------------------------------------------------+
  * 
* *

Building a pipeline

diff --git a/src/main/java/org/jboss/netty/channel/Channels.java b/src/main/java/org/jboss/netty/channel/Channels.java index d48f87d0df..d77453f5ed 100644 --- a/src/main/java/org/jboss/netty/channel/Channels.java +++ b/src/main/java/org/jboss/netty/channel/Channels.java @@ -179,9 +179,10 @@ public class Channels { // event emission methods /** - * Fires a {@link ChannelUpstreamHandler channelOpen} event to the specified - * {@link Channel}. If the specified channel has a parent, a - * {@link ChannelUpstreamHandler childChannelOpen} event will be fired too. + * Fires a {@code "channelOpen"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. If the specified channel has a parent, + * a {@code "childChannelOpen"} event will be fired, too. */ public static void fireChannelOpen(Channel channel) { if (channel.getParent() != null) { @@ -194,9 +195,10 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler channelOpen} event to the specified - * {@link ChannelHandlerContext}. Please note that this method doesn't - * fire a {@link ChannelUpstreamHandler childChannelOpen} event unlike + * Fires a {@code "channelOpen"} event to the next + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. Please note that + * this method doesn't fire a {@code "childChannelOpen"} event unlike * {@link #fireChannelOpen(Channel)} method. */ public static void fireChannelOpen( @@ -208,8 +210,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler channelBound} event to the specified - * {@link Channel}. + * Fires a {@code "channelBound"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. * * @param localAddress * the local address where the specified channel is bound @@ -222,8 +225,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler channelBound} event to the specified - * {@link Channel}. + * Fires a {@code "channelBound"} event to the next + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. * * @param localAddress * the local address where the specified channel is bound @@ -237,8 +241,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler channelConnected} event to the - * specified {@link Channel}. + * Fires a {@code "channelConnected"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. * * @param remoteAddress * the remote address where the specified channel is connected @@ -251,8 +256,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler channelConnected} event to the - * specified {@link ChannelHandlerContext}. + * Fires a {@code "channelConnected"} event to the next + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. * * @param remoteAddress * the remote address where the specified channel is connected @@ -266,8 +272,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler messageReceived} event to the - * specified {@link Channel}. + * Fires a {@code "messageReceived"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. * * @param message the received message */ @@ -276,8 +283,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler messageReceived} event to the - * specified {@link Channel}. + * Fires a {@code "messageReceived"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel} belongs. * * @param message the received message * @param remoteAddress the remote address where the received message @@ -291,8 +299,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler messageReceived} event to the - * specified {@link ChannelHandlerContext}. + * Fires a {@code "messageReceived"} event to the next + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. * * @param message the received message */ @@ -303,8 +312,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler messageReceived} event to the - * specified {@link ChannelHandlerContext}. + * Fires a {@code "messageReceived"} event to the next + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. * * @param message the received message * @param remoteAddress the remote address where the received message @@ -318,8 +328,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler channelInterestChanged} event to the - * specified {@link Channel}. + * Fires a {@code "channelInterestChanged"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. * * @param interestOps the new interestOps */ @@ -332,8 +343,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler channelInterestChanged} event to the - * specified {@link ChannelHandlerContext}. + * Fires a {@code "channelInterestChanged"} event to the next + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. * * @param interestOps the new interestOps */ @@ -348,8 +360,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler channelDisconnected} event to the - * specified {@link Channel}. + * Fires a {@code "channelDisconnected"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. */ public static void fireChannelDisconnected(Channel channel) { channel.getPipeline().sendUpstream( @@ -359,8 +372,9 @@ public class Channels { } /** - * Fires a {@link ChannelUpstreamHandler channelDisconnected} event to the - * specified {@link ChannelHandlerContext}. + * Fires a {@code "channelDisconnected"} event to the next + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. */ public static void fireChannelDisconnected( ChannelHandlerContext ctx, Channel channel) { @@ -369,11 +383,21 @@ public class Channels { ChannelState.CONNECTED, null)); } + /** + * Fires a {@code "channelUnbound"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. + */ public static void fireChannelUnbound(Channel channel) { channel.getPipeline().sendUpstream(new DefaultChannelStateEvent( channel, succeededFuture(channel), ChannelState.BOUND, null)); } + /** + * Fires a {@code "channelUnbound"} event to the next + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. + */ public static void fireChannelUnbound( ChannelHandlerContext ctx, Channel channel) { @@ -381,6 +405,11 @@ public class Channels { channel, succeededFuture(channel), ChannelState.BOUND, null)); } + /** + * Fires a {@code "channelClosed"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. + */ public static void fireChannelClosed(Channel channel) { channel.getPipeline().sendUpstream( new DefaultChannelStateEvent( @@ -391,6 +420,11 @@ public class Channels { } } + /** + * Fires a {@code "channelClosed"} event to the next + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. + */ public static void fireChannelClosed( ChannelHandlerContext ctx, Channel channel) { ctx.sendUpstream( @@ -399,12 +433,22 @@ public class Channels { ChannelState.OPEN, Boolean.FALSE)); } + /** + * Fires a {@code "exceptionCaught"} event to the first + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. + */ public static void fireExceptionCaught(Channel channel, Throwable cause) { channel.getPipeline().sendUpstream( new DefaultExceptionEvent( channel, succeededFuture(channel), cause)); } + /** + * Fires a {@code "exceptionCaught"} event to the next + * {@link ChannelUpstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. + */ public static void fireExceptionCaught( ChannelHandlerContext ctx, Channel channel, Throwable cause) { ctx.sendUpstream(new DefaultExceptionEvent( @@ -418,6 +462,17 @@ public class Channels { channel, succeededFuture(channel), childChannel)); } + /** + * Sends a {@code "bind"} request to the last + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. + * + * @param channel the channel to bind + * @param localAddress the local address to bind to + * + * @return the {@link ChannelFuture} which will be notified when the + * bind operation is done + */ public static ChannelFuture bind(Channel channel, SocketAddress localAddress) { if (localAddress == null) { throw new NullPointerException("localAddress"); @@ -428,6 +483,17 @@ public class Channels { return future; } + /** + * Sends a {@code "bind"} request to the previous + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. + * + * @param ctx the context + * @param channel the channel to bind + * @param future the future which will be notified when the bind + * operation is done + * @param localAddress the local address to bind to + */ public static void bind( ChannelHandlerContext ctx, Channel channel, ChannelFuture future, SocketAddress localAddress) { @@ -438,6 +504,17 @@ public class Channels { channel, future, ChannelState.BOUND, localAddress)); } + /** + * Sends a {@code "connect"} request to the last + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. + * + * @param channel the channel to attempt a connection + * @param remoteAddress the remote address to connect to + * + * @return the {@link ChannelFuture} which will be notified when the + * connection attempt is done + */ public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); @@ -448,6 +525,17 @@ public class Channels { return future; } + /** + * Sends a {@code "connect"} request to the previous + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. + * + * @param ctx the context + * @param channel the channel to attempt a connection + * @param future the future which will be notified when the connection + * attempt is done + * @param remoteAddress the remote address to connect to + */ public static void connect( ChannelHandlerContext ctx, Channel channel, ChannelFuture future, SocketAddress remoteAddress) { @@ -458,16 +546,50 @@ public class Channels { channel, future, ChannelState.CONNECTED, remoteAddress)); } + /** + * Sends a {@code "write"} request to the last + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. + * + * @param channel the channel to write a message + * @param message the message to write to the channel + * + * @return the {@link ChannelFuture} which will be notified when the + * write operation is done + */ public static ChannelFuture write(Channel channel, Object message) { return write(channel, message, null); } + /** + * Sends a {@code "write"} request to the previous + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. + * + * @param ctx the context + * @param channel the channel to write a message + * @param future the future which will be notified when the write + * operation is done + */ public static void write( ChannelHandlerContext ctx, Channel channel, ChannelFuture future, Object message) { write(ctx, channel, future, message, null); } + /** + * Sends a {@code "write"} request to the last + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. + * + * @param channel the channel to write a message + * @param message the message to write to the channel + * @param remoteAddress the destination of the message. + * {@code null} to use the default remote address + * + * @return the {@link ChannelFuture} which will be notified when the + * write operation is done + */ public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) { ChannelFuture future = future(channel); channel.getPipeline().sendDownstream( @@ -475,6 +597,19 @@ public class Channels { return future; } + /** + * Sends a {@code "write"} request to the previous + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. + * + * @param ctx the context + * @param channel the channel to write a message + * @param future the future which will be notified when the write + * operation is done + * @param message the message to write to the channel + * @param remoteAddress the destination of the message. + * {@code null} to use the default remote address. + */ public static void write( ChannelHandlerContext ctx, Channel channel, ChannelFuture future, Object message, SocketAddress remoteAddress) { @@ -482,6 +617,17 @@ public class Channels { new DefaultMessageEvent(channel, future, message, remoteAddress)); } + /** + * Sends a {@code "setInterestOps"} request to the last + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. + * + * @param channel the channel to change its interestOps + * @param interestOps the new interestOps + * + * @return the {@link ChannelFuture} which will be notified when the + * interestOps is changed + */ public static ChannelFuture setInterestOps(Channel channel, int interestOps) { validateInterestOps(interestOps); validateDownstreamInterestOps(channel, interestOps); @@ -492,6 +638,16 @@ public class Channels { return future; } + /** + * Sends a {@code "setInterestOps"} request to the previous + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. + * + * @param ctx the context + * @param channel the channel to change the interestOps + * @param future the future which will be notified when the interestOps is + * changed. + */ public static void setInterestOps( ChannelHandlerContext ctx, Channel channel, ChannelFuture future, int interestOps) { @@ -504,6 +660,15 @@ public class Channels { Integer.valueOf(interestOps))); } + /** + * Sends a {@code "disconnect"} request to the last + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. + * + * @param channel the channel to disconnect + * + * @return the {@link ChannelFuture} which will be notified on disconnection + */ public static ChannelFuture disconnect(Channel channel) { ChannelFuture future = future(channel); channel.getPipeline().sendDownstream(new DefaultChannelStateEvent( @@ -511,6 +676,15 @@ public class Channels { return future; } + /** + * Sends a {@code "disconnect"} request to the previous + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. + * + * @param ctx the context + * @param channel the channel to disconnect + * @param future the future which will be notified on disconnection + */ public static void disconnect( ChannelHandlerContext ctx, Channel channel, ChannelFuture future) { ctx.sendDownstream(new DefaultChannelStateEvent( @@ -518,11 +692,13 @@ public class Channels { } /** - * Sends a close request to the specified {@link Channel}. + * Sends a {@code "close"} request to the last + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of + * the specified {@link Channel}. * - * @param channel the channel to close + * @param channel the channel to close * - * @return the future which will be notified on closure + * @return the {@link ChannelFuture} which will be notified on closure */ public static ChannelFuture close(Channel channel) { ChannelFuture future = future(channel); @@ -532,12 +708,9 @@ public class Channels { } /** - * Sends a close request to the specified {@link ChannelHandlerContext}. - * This method is a shortcut to the following code: - *
-     * ctx.sendDownstream(new DefaultChannelStateEvent(
-     *         channel, future, ChannelState.OPEN, Boolean.FALSE));
-     * 
+ * Sends a {@code "close"} request to the previous + * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} where + * the specified {@link ChannelHandlerContext} belongs. * * @param ctx the context * @param channel the channel to close