Added JavaDoc to the handler.execution package

This commit is contained in:
Trustin Lee 2008-09-04 07:24:42 +00:00
parent 35678cdf73
commit a0b75a705e
6 changed files with 250 additions and 21 deletions

View File

@ -22,10 +22,15 @@
*/
package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
/**
* a {@link Runnable} which sends the specified {@link ChannelEvent} upstream.
* Most users will not see this type at all because it's used by
* {@link Executor} implementors only
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
@ -34,22 +39,37 @@ import org.jboss.netty.channel.ChannelHandlerContext;
*
*/
public class ChannelEventRunnable implements Runnable {
private final ChannelHandlerContext ctx;
private final ChannelEvent e;
/**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}.
*/
public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) {
this.ctx = ctx;
this.e = e;
}
/**
* Returns the {@link ChannelHandlerContext} which will be used to
* send the {@link ChannelEvent} upstream.
*/
public ChannelHandlerContext getContext() {
return ctx;
}
/**
* Returns the {@link ChannelEvent} which will be sent upstream.
*/
public ChannelEvent getEvent() {
return e;
}
/**
* Sends the event upstream.
*/
public void run() {
ctx.sendUpstream(e);
}

View File

@ -34,6 +34,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.MessageEvent;
/**
* The default {@link ObjectSizeEstimator} implementation for general purpose.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
@ -46,6 +47,9 @@ public class DefaultObjectSizeEstimator implements ObjectSizeEstimator {
private final ConcurrentMap<Class<?>, Integer> class2size =
new ConcurrentHashMap<Class<?>, Integer>();
/**
* Creates a new instance.
*/
public DefaultObjectSizeEstimator() {
class2size.put(boolean.class, 4); // Probably an integer.
class2size.put(byte.class, 1);

View File

@ -26,10 +26,30 @@ import java.util.concurrent.Executor;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelUpstreamHandler;
/**
* Forwards a upstream {@link ChannelEvent} to an {@link Executor}.
* <p>
* You can implement various thread model by adding this handler to a
* {@link ChannelPipeline}. The most common use case of this handler is to
* add a {@link ExecutionHandler} which was specified with
* {@link OrderedMemoryAwareThreadPoolExecutor}:
* <pre>
* ChannelPipeline pipeline = ...;
* pipeline.addLast("decoder", new MyProtocolDecoder());
* pipeline.addLast("encoder", new MyProtocolEncoder());
*
* // HERE
* <strong>pipeline.addLast("executor", new {@link ExecutionHandler}(new {@link OrderedMemoryAwareThreadPoolExecutor}(16, 1048576, 1048576)));</strong>
*
* pipeline.addLast("handler", new MyBusinessLogicHandler());
* </pre>
* to utilize more processors to handle {@link ChannelEvent}s. You can also
* use other {@link Executor} implementation than the recommended
* {@link OrderedMemoryAwareThreadPoolExecutor}.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
@ -44,6 +64,9 @@ public class ExecutionHandler implements ChannelUpstreamHandler {
private final Executor executor;
/**
* Creates a new instance with the specified {@link Executor}.
*/
public ExecutionHandler(Executor executor) {
if (executor == null) {
throw new NullPointerException("executor");
@ -51,6 +74,9 @@ public class ExecutionHandler implements ChannelUpstreamHandler {
this.executor = executor;
}
/**
* Returns the {@link Executor} which was specified with the constructor.
*/
public final Executor getExecutor() {
return executor;
}

View File

@ -25,6 +25,7 @@ package org.jboss.netty.handler.execution;
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
@ -34,10 +35,41 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
/**
* A {@link ThreadPoolExecutor} which blocks the task submission when there's
* too many tasks in the queue.
* <p>
* Both per-{@link Channel} and per-{@link Executor} limitation can be applied.
* If the total size of the unprocessed tasks (i.e. {@link Runnable}s) exceeds
* either per-{@link Channel} or per-{@link Executor} threshold, any further
* {@link #execute(Runnable)} call will block until the tasks in the queue
* are processed so that the total size goes under the threshold.
* <p>
* {@link ObjectSizeEstimator} is used to calculate the size of each task.
* <p>
* Please note that this executor does not maintain the order of the
* {@link ChannelEvent}s for the same {@link Channel}. For example,
* you can even receive a {@code "channelClosed"} event before a
* {@code "messageReceived"} event, as depicted by the following diagram.
*
* For example, the events can be processed as depicted below:
*
* <pre>
* --------------------------------&gt; Timeline ---------------------------------&gt;
*
* Thread X: --- Channel A (Event 2) --- Channel A (Event 1) ---------------------------&gt;
*
* Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) ---&gt;
*
* Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) ---&gt;
* </pre>
*
* To maintain the event order, you must use {@link OrderedMemoryAwareThreadPoolExecutor}.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
*
@ -59,20 +91,66 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
private final Semaphore semaphore = new Semaphore(0);
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
*/
public MemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize) {
this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, 30, TimeUnit.SECONDS);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
*/
public MemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, long keepAliveTime, TimeUnit unit) {
this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, Executors.defaultThreadFactory());
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
* @param threadFactory the {@link ThreadFactory} of this pool
*/
public MemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, new DefaultObjectSizeEstimator(), threadFactory);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
* @param threadFactory the {@link ThreadFactory} of this pool
* @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool
*/
public MemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) {
super(corePoolSize, corePoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory);
@ -95,14 +173,24 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
setMaxTotalMemorySize(maxTotalMemorySize);
}
/**
* Returns the {@link ObjectSizeEstimator} of this pool.
*/
public ObjectSizeEstimator getObjectSizeEstimator() {
return objectSizeEstimator;
}
/**
* Returns the maximum total size of the queued events per channel.
*/
public int getMaxChannelMemorySize() {
return maxChannelMemorySize;
}
/**
* Sets the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
*/
public void setMaxChannelMemorySize(int maxChannelMemorySize) {
if (maxChannelMemorySize < 0) {
throw new IllegalArgumentException(
@ -111,10 +199,17 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
this.maxChannelMemorySize = maxChannelMemorySize;
}
/**
* Returns the maximum total size of the queued events for this pool.
*/
public int getMaxTotalMemorySize() {
return maxTotalMemorySize;
}
/**
* Sets the maximum total size of the queued events for this pool.
* Specify {@code 0} to disable.
*/
public void setMaxTotalMemorySize(int maxTotalMemorySize) {
if (maxTotalMemorySize < 0) {
throw new IllegalArgumentException(
@ -139,10 +234,17 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
}
}
/**
* Put the actual execution logic here. The default implementation simply
* calls {@link #doUnorderedExecute(Runnable)}.
*/
protected void doExecute(Runnable task) {
doUnorderedExecute(task);
}
/**
* Executes the specified task without maintaining the event order.
*/
protected final void doUnorderedExecute(Runnable task) {
super.execute(task);
}

View File

@ -23,6 +23,7 @@
package org.jboss.netty.handler.execution;
/**
* Estimates the size of an object in byte unit.
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
@ -31,5 +32,12 @@ package org.jboss.netty.handler.execution;
*
*/
public interface ObjectSizeEstimator {
/**
* Returns the estimated size of the specified object in byte unit.
* This method must be implemented to return the same value for the same
* object. {@link MemoryAwareThreadPoolExecutor} and
* {@link OrderedMemoryAwareThreadPoolExecutor} will malfunction otherwise.
*/
int estimateSize(Object o);
}

View File

@ -33,8 +33,28 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
/**
* A {@link MemoryAwareThreadPoolExecutor} which maintains the
* {@link ChannelEvent} order for the same {@link Channel}.
* <p>
* Although {@link OrderedMemoryAwareThreadPoolExecutor} guarantees the order
* of {@link ChannelEvent}s. It does not guarantee that the invocation will be
* made by the same thread for the same channel, but it does guarantee that
* the invocation will be made sequentially for the events of the same channel.
* For example, the events can be processed as depicted below:
*
* <pre>
* -----------------------------------&gt; Timeline -----------------------------------&gt;
*
* Thread X: --- Channel A (Event 1) --. .-- Channel B (Event 2) --- Channel B (Event 3) ---&gt;
* \ /
* X
* / \
* Thread Y: --- Channel B (Event 1) --' '-- Channel A (Event 2) --- Channel A (Event 3) ---&gt;
* </pre>
*
* @author The Netty Project (netty-dev@lists.jboss.org)
* @author Trustin Lee (tlee@redhat.com)
* @author David M. Lloyd (david.lloyd@redhat.com)
@ -48,33 +68,82 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
private final ConcurrentMap<Channel, Executor> childExecutors =
new ConcurrentHashMap<Channel, Executor>();
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
int maxChannelMemorySize, int maxTotalMemorySize,
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
*/
public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
*/
public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize,
long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
keepAliveTime, unit);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
* @param threadFactory the {@link ThreadFactory} of this pool
*/
public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize,
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
keepAliveTime, unit, threadFactory);
}
/**
* Creates a new instance.
*
* @param corePoolSize the maximum number of active threads
* @param maxChannelMemorySize the maximum total size of the queued events per channel.
* Specify {@code 0} to disable.
* @param maxTotalMemorySize the maximum total size of the queued events for this pool
* Specify {@code 0} to disable.
* @param keepAliveTime the amount of time for an inactive thread to shut itself down
* @param unit the {@link TimeUnit} of {@code keepAliveTime}
* @param threadFactory the {@link ThreadFactory} of this pool
* @param objectSizeEstimator the {@link ObjectSizeEstimator} of this pool
*/
public OrderedMemoryAwareThreadPoolExecutor(
int corePoolSize, int maxChannelMemorySize, int maxTotalMemorySize,
long keepAliveTime, TimeUnit unit,
ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
keepAliveTime, unit, objectSizeEstimator, threadFactory);
}
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
int maxChannelMemorySize, int maxTotalMemorySize,
long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
keepAliveTime, unit, threadFactory);
}
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
int maxChannelMemorySize, int maxTotalMemorySize,
long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize,
keepAliveTime, unit);
}
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize,
int maxChannelMemorySize, int maxTotalMemorySize) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
}
/**
* Executes the specified task concurrently while maintaining the event
* order.
*/
@Override
protected void doExecute(Runnable task) {
if (!(task instanceof ChannelEventRunnable)) {