From a0b75a705e4011a5c983e4ded8fe94378d23cb69 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 4 Sep 2008 07:24:42 +0000 Subject: [PATCH] Added JavaDoc to the handler.execution package --- .../execution/ChannelEventRunnable.java | 20 ++++ .../execution/DefaultObjectSizeEstimator.java | 4 + .../handler/execution/ExecutionHandler.java | 26 ++++ .../MemoryAwareThreadPoolExecutor.java | 102 ++++++++++++++++ .../execution/ObjectSizeEstimator.java | 8 ++ .../OrderedMemoryAwareThreadPoolExecutor.java | 111 ++++++++++++++---- 6 files changed, 250 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java b/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java index 0bea314484..7a3431d915 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java +++ b/src/main/java/org/jboss/netty/handler/execution/ChannelEventRunnable.java @@ -22,10 +22,15 @@ */ package org.jboss.netty.handler.execution; +import java.util.concurrent.Executor; + import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; /** + * a {@link Runnable} which sends the specified {@link ChannelEvent} upstream. + * Most users will not see this type at all because it's used by + * {@link Executor} implementors only * * @author The Netty Project (netty-dev@lists.jboss.org) * @author Trustin Lee (tlee@redhat.com) @@ -34,22 +39,37 @@ import org.jboss.netty.channel.ChannelHandlerContext; * */ public class ChannelEventRunnable implements Runnable { + private final ChannelHandlerContext ctx; private final ChannelEvent e; + /** + * Creates a {@link Runnable} which sends the specified {@link ChannelEvent} + * upstream via the specified {@link ChannelHandlerContext}. + */ public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) { this.ctx = ctx; this.e = e; } + /** + * Returns the {@link ChannelHandlerContext} which will be used to + * send the {@link ChannelEvent} upstream. + */ public ChannelHandlerContext getContext() { return ctx; } + /** + * Returns the {@link ChannelEvent} which will be sent upstream. + */ public ChannelEvent getEvent() { return e; } + /** + * Sends the event upstream. + */ public void run() { ctx.sendUpstream(e); } diff --git a/src/main/java/org/jboss/netty/handler/execution/DefaultObjectSizeEstimator.java b/src/main/java/org/jboss/netty/handler/execution/DefaultObjectSizeEstimator.java index 1762274983..60409d8e40 100644 --- a/src/main/java/org/jboss/netty/handler/execution/DefaultObjectSizeEstimator.java +++ b/src/main/java/org/jboss/netty/handler/execution/DefaultObjectSizeEstimator.java @@ -34,6 +34,7 @@ import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.MessageEvent; /** + * The default {@link ObjectSizeEstimator} implementation for general purpose. * * @author The Netty Project (netty-dev@lists.jboss.org) * @author Trustin Lee (tlee@redhat.com) @@ -46,6 +47,9 @@ public class DefaultObjectSizeEstimator implements ObjectSizeEstimator { private final ConcurrentMap, Integer> class2size = new ConcurrentHashMap, Integer>(); + /** + * Creates a new instance. + */ public DefaultObjectSizeEstimator() { class2size.put(boolean.class, 4); // Probably an integer. class2size.put(byte.class, 1); diff --git a/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java b/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java index da7380b71f..0cd0dc0e66 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java +++ b/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java @@ -26,10 +26,30 @@ import java.util.concurrent.Executor; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineCoverage; import org.jboss.netty.channel.ChannelUpstreamHandler; /** + * Forwards a upstream {@link ChannelEvent} to an {@link Executor}. + *

+ * You can implement various thread model by adding this handler to a + * {@link ChannelPipeline}. The most common use case of this handler is to + * add a {@link ExecutionHandler} which was specified with + * {@link OrderedMemoryAwareThreadPoolExecutor}: + *

+ * ChannelPipeline pipeline = ...;
+ * pipeline.addLast("decoder", new MyProtocolDecoder());
+ * pipeline.addLast("encoder", new MyProtocolEncoder());
+ *
+ * // HERE
+ * pipeline.addLast("executor", new {@link ExecutionHandler}(new {@link OrderedMemoryAwareThreadPoolExecutor}(16, 1048576, 1048576)));
+ *
+ * pipeline.addLast("handler", new MyBusinessLogicHandler());
+ * 
+ * to utilize more processors to handle {@link ChannelEvent}s. You can also + * use other {@link Executor} implementation than the recommended + * {@link OrderedMemoryAwareThreadPoolExecutor}. * * @author The Netty Project (netty-dev@lists.jboss.org) * @author Trustin Lee (tlee@redhat.com) @@ -44,6 +64,9 @@ public class ExecutionHandler implements ChannelUpstreamHandler { private final Executor executor; + /** + * Creates a new instance with the specified {@link Executor}. + */ public ExecutionHandler(Executor executor) { if (executor == null) { throw new NullPointerException("executor"); @@ -51,6 +74,9 @@ public class ExecutionHandler implements ChannelUpstreamHandler { this.executor = executor; } + /** + * Returns the {@link Executor} which was specified with the constructor. + */ public final Executor getExecutor() { return executor; } diff --git a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java index f13e4943bf..77022a27c3 100644 --- a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java @@ -25,6 +25,7 @@ package org.jboss.netty.handler.execution; import java.lang.reflect.Method; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; @@ -34,10 +35,41 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; /** + * A {@link ThreadPoolExecutor} which blocks the task submission when there's + * too many tasks in the queue. + *

+ * Both per-{@link Channel} and per-{@link Executor} limitation can be applied. + * If the total size of the unprocessed tasks (i.e. {@link Runnable}s) exceeds + * either per-{@link Channel} or per-{@link Executor} threshold, any further + * {@link #execute(Runnable)} call will block until the tasks in the queue + * are processed so that the total size goes under the threshold. + *

+ * {@link ObjectSizeEstimator} is used to calculate the size of each task. + *

+ * Please note that this executor does not maintain the order of the + * {@link ChannelEvent}s for the same {@link Channel}. For example, + * you can even receive a {@code "channelClosed"} event before a + * {@code "messageReceived"} event, as depicted by the following diagram. + * + * For example, the events can be processed as depicted below: + * + *

+ *           --------------------------------> Timeline --------------------------------->
+ *
+ * Thread X: --- Channel A (Event 2) --- Channel A (Event 1) --------------------------->
+ *
+ * Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) --->
+ *
+ * Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->
+ * 
+ * + * To maintain the event order, you must use {@link OrderedMemoryAwareThreadPoolExecutor}. + * * @author The Netty Project (netty-dev@lists.jboss.org) * @author Trustin Lee (tlee@redhat.com) * @@ -59,20 +91,66 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { private final Semaphore semaphore = new Semaphore(0); + /** + * Creates a new instance. + * + * @param corePoolSize the maximum number of active threads + * @param maxChannelMemorySize the maximum total size of the queued events per channel. + * Specify {@code 0} to disable. + * @param maxTotalMemorySize the maximum total size of the queued events for this pool + * Specify {@code 0} to disable. + */ public MemoryAwareThreadPoolExecutor( int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize) { this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 30, TimeUnit.SECONDS); } + /** + * Creates a new instance. + * + * @param corePoolSize the maximum number of active threads + * @param maxChannelMemorySize the maximum total size of the queued events per channel. + * Specify {@code 0} to disable. + * @param maxTotalMemorySize the maximum total size of the queued events for this pool + * Specify {@code 0} to disable. + * @param keepAliveTime the amount of time for an inactive thread to shut itself down + * @param unit the {@link TimeUnit} of {@code keepAliveTime} + */ public MemoryAwareThreadPoolExecutor( int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, long keepAliveTime, TimeUnit unit) { this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, Executors.defaultThreadFactory()); } + /** + * Creates a new instance. + * + * @param corePoolSize the maximum number of active threads + * @param maxChannelMemorySize the maximum total size of the queued events per channel. + * Specify {@code 0} to disable. + * @param maxTotalMemorySize the maximum total size of the queued events for this pool + * Specify {@code 0} to disable. + * @param keepAliveTime the amount of time for an inactive thread to shut itself down + * @param unit the {@link TimeUnit} of {@code keepAliveTime} + * @param threadFactory the {@link ThreadFactory} of this pool + */ public MemoryAwareThreadPoolExecutor( int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, new DefaultObjectSizeEstimator(), threadFactory); } + + /** + * Creates a new instance. + * + * @param corePoolSize the maximum number of active threads + * @param maxChannelMemorySize the maximum total size of the queued events per channel. + * Specify {@code 0} to disable. + * @param maxTotalMemorySize the maximum total size of the queued events for this pool + * Specify {@code 0} to disable. + * @param keepAliveTime the amount of time for an inactive thread to shut itself down + * @param unit the {@link TimeUnit} of {@code keepAliveTime} + * @param threadFactory the {@link ThreadFactory} of this pool + * @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool + */ public MemoryAwareThreadPoolExecutor( int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) { super(corePoolSize, corePoolSize, keepAliveTime, unit, new LinkedBlockingQueue(), threadFactory); @@ -95,14 +173,24 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { setMaxTotalMemorySize(maxTotalMemorySize); } + /** + * Returns the {@link ObjectSizeEstimator} of this pool. + */ public ObjectSizeEstimator getObjectSizeEstimator() { return objectSizeEstimator; } + /** + * Returns the maximum total size of the queued events per channel. + */ public int getMaxChannelMemorySize() { return maxChannelMemorySize; } + /** + * Sets the maximum total size of the queued events per channel. + * Specify {@code 0} to disable. + */ public void setMaxChannelMemorySize(int maxChannelMemorySize) { if (maxChannelMemorySize < 0) { throw new IllegalArgumentException( @@ -111,10 +199,17 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { this.maxChannelMemorySize = maxChannelMemorySize; } + /** + * Returns the maximum total size of the queued events for this pool. + */ public int getMaxTotalMemorySize() { return maxTotalMemorySize; } + /** + * Sets the maximum total size of the queued events for this pool. + * Specify {@code 0} to disable. + */ public void setMaxTotalMemorySize(int maxTotalMemorySize) { if (maxTotalMemorySize < 0) { throw new IllegalArgumentException( @@ -139,10 +234,17 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { } } + /** + * Put the actual execution logic here. The default implementation simply + * calls {@link #doUnorderedExecute(Runnable)}. + */ protected void doExecute(Runnable task) { doUnorderedExecute(task); } + /** + * Executes the specified task without maintaining the event order. + */ protected final void doUnorderedExecute(Runnable task) { super.execute(task); } diff --git a/src/main/java/org/jboss/netty/handler/execution/ObjectSizeEstimator.java b/src/main/java/org/jboss/netty/handler/execution/ObjectSizeEstimator.java index 97e52608bc..adb0a23360 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ObjectSizeEstimator.java +++ b/src/main/java/org/jboss/netty/handler/execution/ObjectSizeEstimator.java @@ -23,6 +23,7 @@ package org.jboss.netty.handler.execution; /** + * Estimates the size of an object in byte unit. * * @author The Netty Project (netty-dev@lists.jboss.org) * @author Trustin Lee (tlee@redhat.com) @@ -31,5 +32,12 @@ package org.jboss.netty.handler.execution; * */ public interface ObjectSizeEstimator { + + /** + * Returns the estimated size of the specified object in byte unit. + * This method must be implemented to return the same value for the same + * object. {@link MemoryAwareThreadPoolExecutor} and + * {@link OrderedMemoryAwareThreadPoolExecutor} will malfunction otherwise. + */ int estimateSize(Object o); } diff --git a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java index e7cdc0513b..a5ca8f67b8 100644 --- a/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.java @@ -33,8 +33,28 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelEvent; /** + * A {@link MemoryAwareThreadPoolExecutor} which maintains the + * {@link ChannelEvent} order for the same {@link Channel}. + *

+ * Although {@link OrderedMemoryAwareThreadPoolExecutor} guarantees the order + * of {@link ChannelEvent}s. It does not guarantee that the invocation will be + * made by the same thread for the same channel, but it does guarantee that + * the invocation will be made sequentially for the events of the same channel. + * For example, the events can be processed as depicted below: + * + *

+ *           -----------------------------------> Timeline ----------------------------------->
+ *
+ * Thread X: --- Channel A (Event 1) --.   .-- Channel B (Event 2) --- Channel B (Event 3) --->
+ *                                      \ /
+ *                                       X
+ *                                      / \
+ * Thread Y: --- Channel B (Event 1) --'   '-- Channel A (Event 2) --- Channel A (Event 3) --->
+ * 
+ * * @author The Netty Project (netty-dev@lists.jboss.org) * @author Trustin Lee (tlee@redhat.com) * @author David M. Lloyd (david.lloyd@redhat.com) @@ -48,33 +68,82 @@ public class OrderedMemoryAwareThreadPoolExecutor extends private final ConcurrentMap childExecutors = new ConcurrentHashMap(); - public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, - int maxChannelMemorySize, int maxTotalMemorySize, + /** + * Creates a new instance. + * + * @param corePoolSize the maximum number of active threads + * @param maxChannelMemorySize the maximum total size of the queued events per channel. + * Specify {@code 0} to disable. + * @param maxTotalMemorySize the maximum total size of the queued events for this pool + * Specify {@code 0} to disable. + */ + public OrderedMemoryAwareThreadPoolExecutor( + int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize) { + super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize); + } + + /** + * Creates a new instance. + * + * @param corePoolSize the maximum number of active threads + * @param maxChannelMemorySize the maximum total size of the queued events per channel. + * Specify {@code 0} to disable. + * @param maxTotalMemorySize the maximum total size of the queued events for this pool + * Specify {@code 0} to disable. + * @param keepAliveTime the amount of time for an inactive thread to shut itself down + * @param unit the {@link TimeUnit} of {@code keepAliveTime} + */ + public OrderedMemoryAwareThreadPoolExecutor( + int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, + long keepAliveTime, TimeUnit unit) { + super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, + keepAliveTime, unit); + } + + /** + * Creates a new instance. + * + * @param corePoolSize the maximum number of active threads + * @param maxChannelMemorySize the maximum total size of the queued events per channel. + * Specify {@code 0} to disable. + * @param maxTotalMemorySize the maximum total size of the queued events for this pool + * Specify {@code 0} to disable. + * @param keepAliveTime the amount of time for an inactive thread to shut itself down + * @param unit the {@link TimeUnit} of {@code keepAliveTime} + * @param threadFactory the {@link ThreadFactory} of this pool + */ + public OrderedMemoryAwareThreadPoolExecutor( + int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, + long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { + super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, + keepAliveTime, unit, threadFactory); + } + + /** + * Creates a new instance. + * + * @param corePoolSize the maximum number of active threads + * @param maxChannelMemorySize the maximum total size of the queued events per channel. + * Specify {@code 0} to disable. + * @param maxTotalMemorySize the maximum total size of the queued events for this pool + * Specify {@code 0} to disable. + * @param keepAliveTime the amount of time for an inactive thread to shut itself down + * @param unit the {@link TimeUnit} of {@code keepAliveTime} + * @param threadFactory the {@link ThreadFactory} of this pool + * @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool + */ + public OrderedMemoryAwareThreadPoolExecutor( + int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) { super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, objectSizeEstimator, threadFactory); } - public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, - int maxChannelMemorySize, int maxTotalMemorySize, - long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { - super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, - keepAliveTime, unit, threadFactory); - } - - public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, - int maxChannelMemorySize, int maxTotalMemorySize, - long keepAliveTime, TimeUnit unit) { - super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, - keepAliveTime, unit); - } - - public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, - int maxChannelMemorySize, int maxTotalMemorySize) { - super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize); - } - + /** + * Executes the specified task concurrently while maintaining the event + * order. + */ @Override protected void doExecute(Runnable task) { if (!(task instanceof ChannelEventRunnable)) {