* Deprecated ChannelPipelineCoverage (Will be replaced with something I will introduce later)
* Revised JavaDoc so that a user can learn how to manage handler states * Revised JavaDoc about using ExecutionHandler
This commit is contained in:
parent
7e3358f0f3
commit
901b1f099a
@ -29,7 +29,6 @@ import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.ChannelHandler;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
import org.jboss.netty.util.ExternalResourceReleasable;
|
||||
|
||||
@ -116,12 +115,10 @@ public class Bootstrap implements ExternalResourceReleasable {
|
||||
* <p>
|
||||
* Please note that this method is a convenience method that works only
|
||||
* when <b>1)</b> you create only one channel from this bootstrap (e.g.
|
||||
* one-time client-side or connectionless channel) or <b>2)</b> the
|
||||
* {@link ChannelPipelineCoverage} of all handlers in the pipeline is
|
||||
* {@code "all"}. You have to use
|
||||
* one-time client-side or connectionless channel) or <b>2)</b> all handlers
|
||||
* in the pipeline is stateless. You have to use
|
||||
* {@link #setPipelineFactory(ChannelPipelineFactory)} if <b>1)</b> your
|
||||
* pipeline contains a {@link ChannelHandler} whose
|
||||
* {@link ChannelPipelineCoverage} is {@code "one"} and <b>2)</b> one or
|
||||
* pipeline contains a stateful {@link ChannelHandler} and <b>2)</b> one or
|
||||
* more channels are going to be created by this bootstrap (e.g. server-side
|
||||
* channels).
|
||||
*
|
||||
@ -152,12 +149,10 @@ public class Bootstrap implements ExternalResourceReleasable {
|
||||
* <p>
|
||||
* Please note that this method is a convenience method that works only
|
||||
* when <b>1)</b> you create only one channel from this bootstrap (e.g.
|
||||
* one-time client-side or connectionless channel) or <b>2)</b> the
|
||||
* {@link ChannelPipelineCoverage} of all handlers in the pipeline is
|
||||
* {@code "all"}. You have to use
|
||||
* one-time client-side or connectionless channel) or <b>2)</b> all handlers
|
||||
* in the pipeline is stateless. You have to use
|
||||
* {@link #setPipelineFactory(ChannelPipelineFactory)} if <b>1)</b> your
|
||||
* pipeline contains a {@link ChannelHandler} whose
|
||||
* {@link ChannelPipelineCoverage} is {@code "one"} and <b>2)</b> one or
|
||||
* pipeline contains a stateful {@link ChannelHandler} and <b>2)</b> one or
|
||||
* more channels are going to be created by this bootstrap (e.g. server-side
|
||||
* channels).
|
||||
*/
|
||||
@ -176,12 +171,10 @@ public class Bootstrap implements ExternalResourceReleasable {
|
||||
* <p>
|
||||
* Please note that this method is a convenience method that works only
|
||||
* when <b>1)</b> you create only one channel from this bootstrap (e.g.
|
||||
* one-time client-side or connectionless channel) or <b>2)</b> the
|
||||
* {@link ChannelPipelineCoverage} of all handlers in the pipeline is
|
||||
* {@code "all"}. You have to use
|
||||
* one-time client-side or connectionless channel) or <b>2)</b> all handlers
|
||||
* in the pipeline is stateless. You have to use
|
||||
* {@link #setPipelineFactory(ChannelPipelineFactory)} if <b>1)</b> your
|
||||
* pipeline contains a {@link ChannelHandler} whose
|
||||
* {@link ChannelPipelineCoverage} is {@code "one"} and <b>2)</b> one or
|
||||
* pipeline contains a stateful {@link ChannelHandler} and <b>2)</b> one or
|
||||
* more channels are going to be created by this bootstrap (e.g. server-side
|
||||
* channels).
|
||||
*
|
||||
@ -204,12 +197,10 @@ public class Bootstrap implements ExternalResourceReleasable {
|
||||
* <p>
|
||||
* Please note that this method is a convenience method that works only
|
||||
* when <b>1)</b> you create only one channel from this bootstrap (e.g.
|
||||
* one-time client-side or connectionless channel) or <b>2)</b> the
|
||||
* {@link ChannelPipelineCoverage} of all handlers in the pipeline is
|
||||
* {@code "all"}. You have to use
|
||||
* one-time client-side or connectionless channel) or <b>2)</b> all handlers
|
||||
* in the pipeline is stateless. You have to use
|
||||
* {@link #setPipelineFactory(ChannelPipelineFactory)} if <b>1)</b> your
|
||||
* pipeline contains a {@link ChannelHandler} whose
|
||||
* {@link ChannelPipelineCoverage} is {@code "one"} and <b>2)</b> one or
|
||||
* pipeline contains a stateful {@link ChannelHandler} and <b>2)</b> one or
|
||||
* more channels are going to be created by this bootstrap (e.g. server-side
|
||||
* channels).
|
||||
*
|
||||
|
@ -34,7 +34,6 @@ import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelHandler;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.Channels;
|
||||
@ -310,7 +309,6 @@ public class ServerBootstrap extends Bootstrap {
|
||||
return channel;
|
||||
}
|
||||
|
||||
@ChannelPipelineCoverage("one")
|
||||
private final class Binder extends SimpleChannelUpstreamHandler {
|
||||
|
||||
private final SocketAddress localAddress;
|
||||
|
@ -57,15 +57,16 @@ package org.jboss.netty.channel;
|
||||
* You will also find various helper methods in {@link Channels} to be useful
|
||||
* to generate and send an artificial or manipulated event.
|
||||
*
|
||||
* <h3>State management</h3>
|
||||
*
|
||||
* Please refer to {@link ChannelHandler}.
|
||||
*
|
||||
* <h3>Thread safety</h3>
|
||||
* <p>
|
||||
* {@link #handleDownstream(ChannelHandlerContext, ChannelEvent) handleDownstream}
|
||||
* may be invoked by more than one thread simultaneously. If the handler
|
||||
* accesses a shared resource or stores stateful information, you might need
|
||||
* proper synchronization in the handler implementation.
|
||||
* <p>
|
||||
* Also, please refer to the {@link ChannelPipelineCoverage} annotation to
|
||||
* understand the relationship between a handler and its stateful properties.
|
||||
*
|
||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
||||
|
@ -15,6 +15,8 @@
|
||||
*/
|
||||
package org.jboss.netty.channel;
|
||||
|
||||
import org.jboss.netty.bootstrap.Bootstrap;
|
||||
|
||||
/**
|
||||
* Handles or intercepts a {@link ChannelEvent}, and sends a
|
||||
* {@link ChannelEvent} to the next handler in a {@link ChannelPipeline}.
|
||||
@ -43,6 +45,124 @@ package org.jboss.netty.channel;
|
||||
* downstream, modify the behavior of the pipeline, or store the information
|
||||
* (attachment) which is specific to the handler.
|
||||
*
|
||||
* <h3>State management</h3>
|
||||
*
|
||||
* A {@link ChannelHandler} often needs to store some stateful information.
|
||||
* The simplest and recommended approach is to use member variables:
|
||||
* <pre>
|
||||
* public class DataServerHandler extends SimpleChannelHandler {
|
||||
*
|
||||
* <b>private boolean loggedIn;</b>
|
||||
*
|
||||
* public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
|
||||
* Channel ch = e.getChannel();
|
||||
* Object o = e.getMessage();
|
||||
* if (o instanceof LoginMessage) {
|
||||
* authenticate((LoginMessage) o);
|
||||
* <b>loggedIn = true;</b>
|
||||
* } else (o instanceof GetDataMessage) {
|
||||
* if (<b>loggedIn</b>) {
|
||||
* ch.write(fetchSecret((GetDataMessage) o));
|
||||
* } else {
|
||||
* fail();
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* ...
|
||||
* }
|
||||
* </pre>
|
||||
* Because the handler instance has a state variable which is dedicated to
|
||||
* one connection, you have to create a new handler instance for each new
|
||||
* channel to avoid a race condition where a unauthenticated client can get
|
||||
* the confidential information:
|
||||
* <pre>
|
||||
* // Create a new handler instance per channel.
|
||||
* // See {@link Bootstrap#setPipelineFactory(ChannelPipelineFactory)}.
|
||||
* public class DataServerPipelineFactory implements ChannelPipelineFactory {
|
||||
* public ChannelPipeline getPipeline() {
|
||||
* return Channels.pipeline(<b>new DataServerHandler()</b>);
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* <h4>Using an attachment</h4>
|
||||
*
|
||||
* Although it's recommended to use member variables to store the state of a
|
||||
* handler, for some reason you might not want to create many handler instances.
|
||||
* In such a case, you can use an <em>attachment</em> which is provided by
|
||||
* {@link ChannelHandlerContext}:
|
||||
* <pre>
|
||||
* public class DataServerHandler extends SimpleChannelHandler {
|
||||
*
|
||||
* public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
|
||||
* Channel ch = e.getChannel();
|
||||
* Object o = e.getMessage();
|
||||
* if (o instanceof LoginMessage) {
|
||||
* authenticate((LoginMessage) o);
|
||||
* <b>ctx.setAttachment(true)</b>;
|
||||
* } else (o instanceof GetDataMessage) {
|
||||
* if (<b>Boolean.TRUE.equals(ctx.getAttachment())</b>) {
|
||||
* ch.write(fetchSecret((GetDataMessage) o));
|
||||
* } else {
|
||||
* fail();
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* ...
|
||||
* }
|
||||
* </pre>
|
||||
* Now that the state of the handler is stored as an attachment, you can add the
|
||||
* same handler instance to different pipelines:
|
||||
* <pre>
|
||||
* public class DataServerPipelineFactory implements ChannelPipelineFactory {
|
||||
* private static final StatefulHandler SINGLETON = new DataServerHandler();
|
||||
* public ChannelPipeline getPipeline() {
|
||||
* return Channels.pipeline(<b>SINGLETON</b>);
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* <h4>Using a {@link ChannelLocal}</h4>
|
||||
*
|
||||
* If you have a state variable which needs to be accessed either from other
|
||||
* handlers or outside handlers, you can use {@link ChannelLocal}:
|
||||
* <pre>
|
||||
* public final class DataServerState {
|
||||
*
|
||||
* <b>public static final ChannelLocal<Boolean> loggedIn = new ChannelLocal<Boolean>() {
|
||||
* protected Boolean initialValue(Channel channel) {
|
||||
* return false;
|
||||
* }
|
||||
* }</b>
|
||||
* ...
|
||||
* }
|
||||
*
|
||||
* public class DataServerHandler extends SimpleChannelHandler {
|
||||
* public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
|
||||
* Channel ch = e.getChannel();
|
||||
* Object o = e.getMessage();
|
||||
* if (o instanceof LoginMessage) {
|
||||
* authenticate((LoginMessage) o);
|
||||
* <b>DataServerState.loggedIn.set(ch, true);</b>
|
||||
* } else (o instanceof GetDataMessage) {
|
||||
* if (<b>DataServerState.loggedIn.get(ch)</b>) {
|
||||
* ctx.getChannel().write(fetchSecret((GetDataMessage) o));
|
||||
* } else {
|
||||
* fail();
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
* ...
|
||||
* }
|
||||
*
|
||||
* // Print the remote addresses of the authenticated clients:
|
||||
* for (Channel ch: allClientChannels) {
|
||||
* if (<b>DataServerState.loggedIn.get(ch)</b>) {
|
||||
* System.out.println(ch.getRemoteAddress());
|
||||
* }
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
* <h3>Additional resources worth reading</h3>
|
||||
* <p>
|
||||
* Please refer to the {@link ChannelEvent} and {@link ChannelPipeline} to find
|
||||
|
@ -15,7 +15,6 @@
|
||||
*/
|
||||
package org.jboss.netty.channel;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
/**
|
||||
* Provides the properties and operations which are specific to a
|
||||
@ -175,10 +174,6 @@ public interface ChannelHandlerContext {
|
||||
/**
|
||||
* Retrieves an object which is {@link #setAttachment(Object) attached} to
|
||||
* this context.
|
||||
* <p>
|
||||
* As an alternative, you might want to use a {@link ChannelLocal} variable
|
||||
* or a {@link ConcurrentMap} whose key is {@link ChannelHandlerContext}.
|
||||
* Please refer to {@link ChannelPipelineCoverage} for the detailed examples.
|
||||
*
|
||||
* @return {@code null} if no object was attached or
|
||||
* {@code null} was attached
|
||||
@ -189,10 +184,6 @@ public interface ChannelHandlerContext {
|
||||
* Attaches an object to this context to store a stateful information
|
||||
* specific to the {@link ChannelHandler} which is associated with this
|
||||
* context.
|
||||
* <p>
|
||||
* As an alternative, you might want to use a {@link ChannelLocal} variable
|
||||
* or a {@link ConcurrentMap} whose key is {@link ChannelHandlerContext}.
|
||||
* Please refer to {@link ChannelPipelineCoverage} for the detailed examples.
|
||||
*/
|
||||
void setAttachment(Object attachment);
|
||||
}
|
||||
|
@ -212,6 +212,7 @@ import java.util.concurrent.ConcurrentMap;
|
||||
@Documented
|
||||
@Target(ElementType.TYPE)
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Deprecated
|
||||
public @interface ChannelPipelineCoverage {
|
||||
|
||||
/**
|
||||
|
@ -18,7 +18,6 @@ package org.jboss.netty.channel;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.jboss.netty.handler.execution.ExecutionHandler;
|
||||
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
|
||||
|
||||
|
||||
/**
|
||||
@ -64,39 +63,26 @@ import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
|
||||
* You will also find various helper methods in {@link Channels} to be useful
|
||||
* to generate and send an artificial or manipulated event.
|
||||
*
|
||||
* <h3>State management</h3>
|
||||
*
|
||||
* Please refer to {@link ChannelHandler}.
|
||||
*
|
||||
* <h3>Thread safety</h3>
|
||||
* <p>
|
||||
* If there's no {@link ExecutionHandler} in the {@link ChannelPipeline},
|
||||
* {@link #handleUpstream(ChannelHandlerContext, ChannelEvent) handleUpstream}
|
||||
* will be invoked sequentially by the same thread (i.e. an I/O thread).
|
||||
* Please note that this does not necessarily mean that there's a dedicated
|
||||
* thread per {@link Channel}; the I/O thread of some transport can serve more
|
||||
* than one {@link Channel} (e.g. NIO transport), while the I/O thread of
|
||||
* other transports can serve only one (e.g. OIO transport).
|
||||
* will be invoked sequentially by the same thread (i.e. an I/O thread) and
|
||||
* therefore a handler does not need to worry about being invoked with a new
|
||||
* upstream event before the previous upstream event is finished.
|
||||
* <p>
|
||||
* If an {@link ExecutionHandler} is added in the {@link ChannelPipeline},
|
||||
* {@link #handleUpstream(ChannelHandlerContext, ChannelEvent) handleUpstream}
|
||||
* may be invoked by different threads at the same time, depending on what
|
||||
* {@link Executor} implementation is used with the {@link ExecutionHandler}.
|
||||
* This does not necessarily mean that there's a dedicated thread per
|
||||
* {@link Channel}; the I/O thread of some transport can serve more than one
|
||||
* {@link Channel} (e.g. NIO transport), while the I/O thread of other
|
||||
* transports can serve only one (e.g. OIO transport).
|
||||
* <p>
|
||||
* {@link OrderedMemoryAwareThreadPoolExecutor} is provided to guarantee the
|
||||
* order of {@link ChannelEvent}s. It does not guarantee that the invocation
|
||||
* will be made by the same thread for the same channel, but it does guarantee
|
||||
* that the invocation will be made sequentially for the events of the same
|
||||
* channel. For example, the events can be processed as depicted below:
|
||||
*
|
||||
* <pre>
|
||||
* -----------------------------------> Timeline ----------------------------------->
|
||||
*
|
||||
* Thread X: --- Channel A (Event 1) --. .-- Channel B (Event 2) --- Channel B (Event 3) --->
|
||||
* \ /
|
||||
* X
|
||||
* / \
|
||||
* Thread Y: --- Channel B (Event 1) --' '-- Channel A (Event 2) --- Channel A (Event 3) --->
|
||||
* </pre>
|
||||
* <p>
|
||||
* Also, please refer to the {@link ChannelPipelineCoverage} annotation to
|
||||
* understand the relationship between a handler and its stateful properties.
|
||||
* However, if you add an {@link ExecutionHandler} to a {@link ChannelPipeline},
|
||||
* this behavior changes depending on what {@link Executor} was employed to
|
||||
* dispatch the events. Please refer to {@link ExecutionHandler} for more
|
||||
* information.
|
||||
*
|
||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
||||
|
@ -15,9 +15,6 @@
|
||||
*/
|
||||
package org.jboss.netty.channel;
|
||||
|
||||
import static org.jboss.netty.channel.ChannelPipelineCoverage.*;
|
||||
|
||||
import java.lang.annotation.AnnotationFormatError;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
@ -700,35 +697,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
ChannelDownstreamHandler.class.getName() + '.');
|
||||
}
|
||||
|
||||
|
||||
ChannelPipelineCoverage coverage = handler.getClass().getAnnotation(ChannelPipelineCoverage.class);
|
||||
if (coverage == null) {
|
||||
logger.warn(
|
||||
"Handler '" + handler.getClass().getName() +
|
||||
"' does not have a '" +
|
||||
ChannelPipelineCoverage.class.getSimpleName() +
|
||||
"' annotation with its class declaration. " +
|
||||
"It is strongly recommended to add the annotation " +
|
||||
"for a documentation purpose to tell if a single " +
|
||||
"handler instance can handle more than one pipeline " +
|
||||
"(\"" + ALL + "\") or not (\"" + ONE + "\")");
|
||||
} else {
|
||||
String coverageValue = coverage.value();
|
||||
if (coverageValue == null) {
|
||||
throw new AnnotationFormatError(
|
||||
ChannelPipelineCoverage.class.getSimpleName() +
|
||||
" annotation value is undefined for type: " +
|
||||
handler.getClass().getName());
|
||||
}
|
||||
|
||||
if (!coverageValue.equals(ALL) && !coverageValue.equals(ONE)) {
|
||||
throw new AnnotationFormatError(
|
||||
ChannelPipelineCoverage.class.getSimpleName() +
|
||||
" annotation value: " + coverageValue +
|
||||
" (must be either \"" + ALL + "\" or \"" + ONE + ")");
|
||||
}
|
||||
}
|
||||
|
||||
this.prev = prev;
|
||||
this.next = next;
|
||||
this.name = name;
|
||||
|
@ -15,9 +15,6 @@
|
||||
*/
|
||||
package org.jboss.netty.channel;
|
||||
|
||||
import static org.jboss.netty.channel.ChannelPipelineCoverage.*;
|
||||
|
||||
import java.lang.annotation.AnnotationFormatError;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
@ -448,35 +445,6 @@ public class StaticChannelPipeline implements ChannelPipeline {
|
||||
ChannelDownstreamHandler.class.getName() + '.');
|
||||
}
|
||||
|
||||
|
||||
ChannelPipelineCoverage coverage = handler.getClass().getAnnotation(ChannelPipelineCoverage.class);
|
||||
if (coverage == null) {
|
||||
logger.warn(
|
||||
"Handler '" + handler.getClass().getName() +
|
||||
"' does not have a '" +
|
||||
ChannelPipelineCoverage.class.getSimpleName() +
|
||||
"' annotation with its class declaration. " +
|
||||
"It is strongly recommended to add the annotation " +
|
||||
"for a documentation purpose to tell if a single " +
|
||||
"handler instance can handle more than one pipeline " +
|
||||
"(\"" + ALL + "\") or not (\"" + ONE + "\")");
|
||||
} else {
|
||||
String coverageValue = coverage.value();
|
||||
if (coverageValue == null) {
|
||||
throw new AnnotationFormatError(
|
||||
ChannelPipelineCoverage.class.getSimpleName() +
|
||||
" annotation value is undefined for type: " +
|
||||
handler.getClass().getName());
|
||||
}
|
||||
|
||||
if (!coverageValue.equals(ALL) && !coverageValue.equals(ONE)) {
|
||||
throw new AnnotationFormatError(
|
||||
ChannelPipelineCoverage.class.getSimpleName() +
|
||||
" annotation value: " + coverageValue +
|
||||
" (must be either \"" + ALL + "\" or \"" + ONE + ")");
|
||||
}
|
||||
}
|
||||
|
||||
this.index = index;
|
||||
this.name = name;
|
||||
this.handler = handler;
|
||||
|
@ -33,7 +33,6 @@ import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelSink;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.DefaultChannelPipeline;
|
||||
@ -321,7 +320,6 @@ class HttpTunnelingClientSocketChannel extends AbstractChannel
|
||||
});
|
||||
}
|
||||
|
||||
@ChannelPipelineCoverage("one")
|
||||
final class ServletChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
private volatile boolean readingChunks;
|
||||
|
@ -35,7 +35,6 @@ import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
@ -221,7 +220,6 @@ public class HttpTunnelingServlet extends HttpServlet {
|
||||
return buffer;
|
||||
}
|
||||
|
||||
@ChannelPipelineCoverage("one")
|
||||
private static final class OutboundConnectionHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
private final ServletOutputStream out;
|
||||
|
@ -33,12 +33,10 @@ import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
|
||||
/**
|
||||
* Handler for a client-side channel. Please note that this handler's
|
||||
* {@link ChannelPipelineCoverage} annotation value is "one". It means
|
||||
* this handler maintains some stateful information which is specific to
|
||||
* a certain channel. Therefore, an instance of this handler can
|
||||
* cover only one ChannelPipeline and Channel pair. You have to create
|
||||
* a new handler instance whenever you create a new channel and insert
|
||||
* Handler for a client-side channel. This handler maintains stateful
|
||||
* information which is specific to a certain channel using member variables.
|
||||
* Therefore, an instance of this handler can cover only one channel. You have
|
||||
* to create a new handler instance whenever you create a new channel and insert
|
||||
* this handler to avoid a race condition.
|
||||
*
|
||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||
|
@ -29,12 +29,10 @@ import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
|
||||
/**
|
||||
* Handler for a server-side channel. Please note that this handler's
|
||||
* {@link ChannelPipelineCoverage} annotation value is "one". It means
|
||||
* this handler maintains some stateful information which is specific to
|
||||
* a certain channel. Therefore, an instance of this handler can
|
||||
* cover only one ChannelPipeline and Channel pair. You have to create
|
||||
* a new handler instance whenever you create a new channel and insert
|
||||
* Handler for a server-side channel. This handler maintains stateful
|
||||
* information which is specific to a certain channel using member variables.
|
||||
* Therefore, an instance of this handler can cover only one channel. You have
|
||||
* to create a new handler instance whenever you create a new channel and insert
|
||||
* this handler to avoid a race condition.
|
||||
*
|
||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||
|
@ -20,8 +20,8 @@ import java.util.concurrent.Executor;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelDownstreamHandler;
|
||||
import org.jboss.netty.channel.ChannelEvent;
|
||||
import org.jboss.netty.channel.ChannelHandler;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelState;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
@ -32,23 +32,71 @@ import org.jboss.netty.util.internal.ExecutorUtil;
|
||||
/**
|
||||
* Forwards an upstream {@link ChannelEvent} to an {@link Executor}.
|
||||
* <p>
|
||||
* You can implement various thread model by adding this handler to a
|
||||
* {@link ChannelPipeline}. The most common use case of this handler is to
|
||||
* add a {@link ExecutionHandler} which was specified with
|
||||
* {@link OrderedMemoryAwareThreadPoolExecutor}:
|
||||
* {@link ExecutionHandler} is often used when your {@link ChannelHandler}
|
||||
* performs a blocking operation that takes long time or accesses a resource
|
||||
* which is not CPU-bound. Running such operations in a pipeline without an
|
||||
* {@link ExecutionHandler} will result in unwanted hiccup during I/O because
|
||||
* an I/O thread cannot perform I/O until your handler returns the control to
|
||||
* the I/O thread.
|
||||
* <p>
|
||||
* In most cases, an {@link ExecutionHandler} is coupled with an
|
||||
* {@link OrderedMemoryAwareThreadPoolExecutor} because it guarantees the
|
||||
* correct event execution order and prevents an {@link OutOfMemoryError}
|
||||
* under load:
|
||||
* <pre>
|
||||
* ChannelPipeline pipeline = ...;
|
||||
* pipeline.addLast("decoder", new MyProtocolDecoder());
|
||||
* pipeline.addLast("encoder", new MyProtocolEncoder());
|
||||
* public class DatabaseGatewayPipelineFactory implements ChannelPipelineFactory {
|
||||
*
|
||||
* // HERE
|
||||
* <strong>pipeline.addLast("executor", new {@link ExecutionHandler}(new {@link OrderedMemoryAwareThreadPoolExecutor}(16, 1048576, 1048576)));</strong>
|
||||
* <b>private final ExecutionHandler executionHandler;</b>
|
||||
*
|
||||
* pipeline.addLast("handler", new MyBusinessLogicHandler());
|
||||
* public DatabaseGatewayPipelineFactory(ExecutionHandler executionHandler) {
|
||||
* this.executionHandler = executionHandler;
|
||||
* }
|
||||
*
|
||||
* public ChannelPipeline getPipeline() {
|
||||
* return Channels.pipeline(
|
||||
* new DatabaseGatewayProtocolEncoder(),
|
||||
* new DatabaseGatewayProtocolDecoder(),
|
||||
* <b>executionHandler, // Must be shared</b>
|
||||
* new DatabaseQueryingHandler());
|
||||
* }
|
||||
* }
|
||||
* ...
|
||||
*
|
||||
* public static void main(String[] args) {
|
||||
* ServerBootstrap bootstrap = ...;
|
||||
* ...
|
||||
* <b>ExecutionHandler executionHandler = new ExecutionHandler(
|
||||
* new {@link OrderedMemoryAwareThreadPoolExecutor}(16, 1048576, 1048576))
|
||||
* bootstrap.setPipelineFactory(
|
||||
* new DatabaseGatewayPipelineFactory(executionHandler));</b>
|
||||
* ...
|
||||
* bootstrap.bind(...);
|
||||
* ...
|
||||
*
|
||||
* while (!isServerReadyToShutDown()) {
|
||||
* // ... wait ...
|
||||
* }
|
||||
*
|
||||
* bootstrap.releaseExternalResources();
|
||||
* <b>executionHandler.releaseEXternalResources();</b>
|
||||
* }
|
||||
* </pre>
|
||||
* to utilize more processors to handle {@link ChannelEvent}s. You can also
|
||||
* use other {@link Executor} implementation than the recommended
|
||||
* {@link OrderedMemoryAwareThreadPoolExecutor}.
|
||||
*
|
||||
* Please refer to {@link OrderedMemoryAwareThreadPoolExecutor} for the
|
||||
* detailed information about how the event order is guaranteed.
|
||||
*
|
||||
* <h3>SEDA (Staged Event-Driven Architecture)</h3>
|
||||
* You can implement an alternative thread model such as
|
||||
* <a href="http://en.wikipedia.org/wiki/Staged_event-driven_architecture">SEDA</a>
|
||||
* by adding more than one {@link ExecutionHandler} to the pipeline.
|
||||
*
|
||||
* <h3>Using other {@link Executor} implementation</h3>
|
||||
*
|
||||
* Although it's recommended to use {@link OrderedMemoryAwareThreadPoolExecutor},
|
||||
* you can use other {@link Executor} implementations. However, you must note
|
||||
* that other {@link Executor} implementation might break your application
|
||||
* because they often do not maintain event execution order nor interact with
|
||||
* I/O threads to control the incoming traffic and avoid {@link OutOfMemoryError}.
|
||||
*
|
||||
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
|
||||
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
|
||||
|
@ -32,37 +32,34 @@ import org.jboss.netty.util.ObjectSizeEstimator;
|
||||
import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
|
||||
|
||||
/**
|
||||
* A {@link MemoryAwareThreadPoolExecutor} which maintains the
|
||||
* {@link ChannelEvent} order for the same {@link Channel}.
|
||||
* A {@link MemoryAwareThreadPoolExecutor} which makes sure the events from the
|
||||
* same {@link Channel} are executed sequentially.
|
||||
* <p>
|
||||
* {@link OrderedMemoryAwareThreadPoolExecutor} executes the queued task in the
|
||||
* same thread if an existing thread is running a task associated with the same
|
||||
* {@link Channel}. This behavior is to make sure the events from the same
|
||||
* {@link Channel} are handled in a correct order. A different thread might be
|
||||
* chosen only when the task queue of the {@link Channel} is empty.
|
||||
* <p>
|
||||
* Although {@link OrderedMemoryAwareThreadPoolExecutor} guarantees the order
|
||||
* of {@link ChannelEvent}s. It does not guarantee that the invocation will be
|
||||
* made by the same thread for the same channel, but it does guarantee that
|
||||
* the invocation will be made sequentially for the events of the same channel.
|
||||
* For example, the events can be processed as depicted below:
|
||||
*
|
||||
* For example, let's say there are two executor threads that handle the events
|
||||
* from the two channels:
|
||||
* <pre>
|
||||
* -----------------------------------> Timeline ----------------------------------->
|
||||
* -------------------------------------> Timeline ------------------------------------>
|
||||
*
|
||||
* Thread X: --- Channel A (Event 1) --. .-- Channel B (Event 2) --- Channel B (Event 3) --->
|
||||
* Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) --->
|
||||
* \ /
|
||||
* X
|
||||
* / \
|
||||
* Thread Y: --- Channel B (Event 1) --' '-- Channel A (Event 2) --- Channel A (Event 3) --->
|
||||
* Thread Y: --- Channel B (Event B1) --' '-- Channel A (Event A2) --- Channel A (Event A3) --->
|
||||
* </pre>
|
||||
*
|
||||
* Please note that the events of different channels are independent from each
|
||||
* As you see, the events from different channels are independent from each
|
||||
* other. That is, an event of Channel B will not be blocked by an event of
|
||||
* Channel A and vice versa, unless the thread pool is exhausted.
|
||||
* <p>
|
||||
* If you want the events associated with the same channel to be executed
|
||||
* simultaneously, please use {@link MemoryAwareThreadPoolExecutor} instead.
|
||||
* Also, it is guaranteed that the invocation will be made sequentially for the
|
||||
* events from the same channel. For example, the event A2 is never executed
|
||||
* before the event A1 is finished. (Although not recommended, if you want the
|
||||
* events from the same channel to be executed simultaneously, please use
|
||||
* {@link MemoryAwareThreadPoolExecutor} instead.)
|
||||
* <p>
|
||||
* However, it is not guaranteed that the invocation will be made by the same
|
||||
* thread for the same channel. The events from the same channel can be
|
||||
* executed by different threads. For example, the Event A2 is executed by the
|
||||
* thread Y while the event A1 was executed by the thread X.
|
||||
*
|
||||
* <h3>Using a different key other than {@link Channel} to maintain event order</h3>
|
||||
* <p>
|
||||
|
@ -30,7 +30,6 @@ import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelException;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelPipelineException;
|
||||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
import org.jboss.netty.channel.ChildChannelStateEvent;
|
||||
@ -202,7 +201,6 @@ public abstract class AbstractSocketServerBootstrapTest {
|
||||
new ServerBootstrap(createMock(ServerChannelFactory.class)).bind(null);
|
||||
}
|
||||
|
||||
@ChannelPipelineCoverage("all")
|
||||
private static class ParentChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
volatile Channel child;
|
||||
|
@ -33,7 +33,6 @@ import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
@ -148,7 +147,6 @@ public abstract class AbstractSocketEchoTest {
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelPipelineCoverage("one")
|
||||
private class EchoHandler extends SimpleChannelUpstreamHandler {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
|
@ -25,7 +25,6 @@ import java.util.concurrent.Executors;
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
|
||||
@ -93,7 +92,6 @@ public class NioServerSocketShutdownTimeTest {
|
||||
assertTrue("Shutdown takes too long: " + shutdownTime + " ms", shutdownTime < 500);
|
||||
}
|
||||
|
||||
@ChannelPipelineCoverage("all")
|
||||
private static class DummyHandler extends SimpleChannelUpstreamHandler {
|
||||
volatile boolean connected;
|
||||
volatile boolean closed;
|
||||
|
@ -17,7 +17,6 @@ package org.jboss.netty.channel.socket.nio;
|
||||
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelHandler;
|
||||
|
||||
@ -27,7 +26,6 @@ import org.jboss.netty.channel.SimpleChannelHandler;
|
||||
* @author Daniel Bevenius (dbevenius@jboss.com)
|
||||
* @version $Rev$, $Date$
|
||||
*/
|
||||
@ChannelPipelineCoverage("all")
|
||||
public class SimpleHandler extends SimpleChannelHandler {
|
||||
|
||||
@Override
|
||||
|
@ -33,7 +33,6 @@ import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
@ -150,7 +149,6 @@ public abstract class AbstractSocketFixedLengthEchoTest {
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelPipelineCoverage("one")
|
||||
private class EchoHandler extends SimpleChannelUpstreamHandler {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
|
@ -31,7 +31,6 @@ import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
@ -158,7 +157,6 @@ public abstract class AbstractSocketCompatibleObjectStreamEchoTest {
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelPipelineCoverage("one")
|
||||
private class EchoHandler extends SimpleChannelUpstreamHandler {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
|
@ -31,7 +31,6 @@ import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
@ -157,7 +156,6 @@ public abstract class AbstractSocketObjectStreamEchoTest {
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelPipelineCoverage("one")
|
||||
private class EchoHandler extends SimpleChannelUpstreamHandler {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
|
@ -31,7 +31,6 @@ import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
@ -163,7 +162,6 @@ public abstract class AbstractSocketStringEchoTest {
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelPipelineCoverage("one")
|
||||
private class EchoHandler extends SimpleChannelUpstreamHandler {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
|
@ -35,7 +35,6 @@ import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFactory;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelStateEvent;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
@ -195,7 +194,6 @@ public abstract class AbstractSocketSslEchoTest {
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelPipelineCoverage("one")
|
||||
private class EchoHandler extends SimpleChannelUpstreamHandler {
|
||||
volatile Channel channel;
|
||||
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
|
||||
|
@ -18,7 +18,6 @@ package org.jboss.netty.util;
|
||||
import org.jboss.netty.channel.ChannelDownstreamHandler;
|
||||
import org.jboss.netty.channel.ChannelEvent;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ChannelUpstreamHandler;
|
||||
|
||||
/**
|
||||
@ -29,7 +28,6 @@ import org.jboss.netty.channel.ChannelUpstreamHandler;
|
||||
*
|
||||
* @version $Rev$, $Date$
|
||||
*/
|
||||
@ChannelPipelineCoverage("all")
|
||||
public class DummyHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler {
|
||||
|
||||
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
|
||||
|
Loading…
x
Reference in New Issue
Block a user