diff --git a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java index 7fcda017ea..68ddbe5136 100644 --- a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java @@ -27,11 +27,13 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.WriteCompletionEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; @@ -43,16 +45,70 @@ import org.jboss.netty.util.internal.SharedResourceMisuseDetector; /** * A {@link ThreadPoolExecutor} which blocks the task submission when there's - * too many tasks in the queue. + * too many tasks in the queue. Both per-{@link Channel} and per-{@link Executor} + * limitation can be applied. *
- * Both per-{@link Channel} and per-{@link Executor} limitation can be applied. - * If the total size of the unprocessed tasks (i.e. {@link Runnable}s) exceeds - * either per-{@link Channel} or per-{@link Executor} threshold, any further - * {@link #execute(Runnable)} call will block until the tasks in the queue - * are processed so that the total size goes under the threshold. - *
- * {@link ObjectSizeEstimator} is used to calculate the size of each task. + * When a task (i.e. {@link Runnable}) is submitted, + * {@link MemoryAwareThreadPoolExecutor} calls {@link ObjectSizeEstimator#estimateSize(Object)} + * to get the estimated size of the task in bytes to calculate the amount of + * memory occupied by the unprocessed tasks. *
+ * If the total size of the unprocessed tasks exceeds either per-{@link Channel} + * or per-{@link Executor} threshold, any further {@link #execute(Runnable)} + * call will block until the tasks in the queue are processed so that the total + * size goes under the threshold. + * + *
+ * public class MyRunnable implements {@link Runnable} { + * + * private final byte[] data; + * + * public MyRunnable(byte[] data) { + * this.data = data; + * } + * + * public void run() { + * // Process 'data' .. + * } + * } + * + * public class MyObjectSizeEstimator extends {@link DefaultObjectSizeEstimator} { + * + * {@literal @Override} + * public int estimateSize(Object o) { + * if (o instanceof MyRunnable) { + * return ((MyRunnable) o).data.length + 8; + * } + * return super.estimateSize(o); + * } + * } + * + * {@link ThreadPoolExecutor} pool = new {@link MemoryAwareThreadPoolExecutor}( + * 16, 65536, 1048576, 30, {@link TimeUnit}.SECONDS, + * new MyObjectSizeEstimator(), + * {@link Executors}.defaultThreadFactory()); + * + * pool.execute(new MyRunnable(data)); + *+ * + *
+ * NOTE: This thread pool inherits most characteristics of its super + * type, so please make sure to refer to {@link MemoryAwareThreadPoolExecutor} + * to understand how it works basically. + * + *