From a4bfc4011db5d1a2d4488db0355dffec9b9d2c13 Mon Sep 17 00:00:00 2001 From: norman Date: Mon, 12 Dec 2011 13:18:27 +0100 Subject: [PATCH] Some cleanup and javadocs for the SEDA implementation. Also fixes a bug in the ExecutionHandler which I introduces while working on SEDA. See #111 --- .../handler/execution/ExecutionHandler.java | 11 +++++++---- .../execution/seda/FineGrainedSedaExecutor.java | 12 ++++++++++++ .../handler/execution/seda/SedaHandler.java | 15 +++++++++++++-- .../seda/SedaMemoryAwareThreadPoolExecutor.java | 15 +++++++++++++++ ...SedaOrderedMemoryAwareThreadPoolExecutor.java | 16 ++++++++++++++++ .../execution/seda/SimpleSedaExecutor.java | 4 ++-- 6 files changed, 65 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/netty/handler/execution/ExecutionHandler.java b/src/main/java/io/netty/handler/execution/ExecutionHandler.java index 6583a00300..95c519bae3 100644 --- a/src/main/java/io/netty/handler/execution/ExecutionHandler.java +++ b/src/main/java/io/netty/handler/execution/ExecutionHandler.java @@ -147,8 +147,10 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre @Override public void handleDownstream( ChannelHandlerContext ctx, ChannelEvent e) throws Exception { - handleReadSuspend(ctx, e); - ctx.sendDownstream(e); + // check if the read was suspend + if (!handleReadSuspend(ctx, e)) { + ctx.sendDownstream(e); + } } /** @@ -157,7 +159,7 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre * @param ctx * @param e */ - protected void handleReadSuspend(ChannelHandlerContext ctx, ChannelEvent e) { + protected boolean handleReadSuspend(ChannelHandlerContext ctx, ChannelEvent e) { if (e instanceof ChannelStateEvent) { ChannelStateEvent cse = (ChannelStateEvent) e; if (cse.getState() == ChannelState.INTEREST_OPS && @@ -169,9 +171,10 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre // Drop the request silently if MemoryAwareThreadPool has // set the flag. e.getFuture().setSuccess(); - return; + return true; } } } + return false; } } diff --git a/src/main/java/io/netty/handler/execution/seda/FineGrainedSedaExecutor.java b/src/main/java/io/netty/handler/execution/seda/FineGrainedSedaExecutor.java index 6740834429..d53448ea42 100644 --- a/src/main/java/io/netty/handler/execution/seda/FineGrainedSedaExecutor.java +++ b/src/main/java/io/netty/handler/execution/seda/FineGrainedSedaExecutor.java @@ -39,10 +39,22 @@ import io.netty.handler.execution.ChannelEventRunnable; */ public abstract class FineGrainedSedaExecutor extends SimpleSedaExecutor{ + /** + * Create a new {@link FineGrainedSedaExecutor} which use the two given {@link Executor}'s as default. One is used for upstream events and one for downstream events. + * + * @param upstreamExecutor use the given {@link Executor} as default for downstream events + * @param downstreamExecutor use the given {@link Executor} as default for upstream events + */ public FineGrainedSedaExecutor(Executor upstreamExecutor, Executor downstreamExecutor) { super(upstreamExecutor, downstreamExecutor); } + /** + * Create a new {@link FineGrainedSedaExecutor} which used the given {@link Executor} as default for upstream and downstream events + * + * @param executor use the given {@link Executor} as default for upstream and downstream events + * + */ public FineGrainedSedaExecutor(Executor executor) { super(executor); } diff --git a/src/main/java/io/netty/handler/execution/seda/SedaHandler.java b/src/main/java/io/netty/handler/execution/seda/SedaHandler.java index 71ac43f3c1..fb58f61a12 100644 --- a/src/main/java/io/netty/handler/execution/seda/SedaHandler.java +++ b/src/main/java/io/netty/handler/execution/seda/SedaHandler.java @@ -29,14 +29,25 @@ import io.netty.handler.execution.ExecutionHandler; */ public class SedaHandler extends ExecutionHandler { + /** + * Create a new {@link SedaHandler} which uses the given {@link SedaExecutor} + * + * @param executor the {@link SedaExecutor} to hand off tasks + */ public SedaHandler(SedaExecutor executor) { super(executor); } + /** + * Hand the event to the {@link Executor} + */ @Override public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { - handleReadSuspend(ctx, e); - getExecutor().execute(new ChannelDownstreamEventRunnable(ctx, e)); + + // check if the read was suspend + if (!handleReadSuspend(ctx, e)) { + getExecutor().execute(new ChannelDownstreamEventRunnable(ctx, e)); + } } @Override diff --git a/src/main/java/io/netty/handler/execution/seda/SedaMemoryAwareThreadPoolExecutor.java b/src/main/java/io/netty/handler/execution/seda/SedaMemoryAwareThreadPoolExecutor.java index 1d32e2afb9..83ac0bfc8d 100644 --- a/src/main/java/io/netty/handler/execution/seda/SedaMemoryAwareThreadPoolExecutor.java +++ b/src/main/java/io/netty/handler/execution/seda/SedaMemoryAwareThreadPoolExecutor.java @@ -31,18 +31,33 @@ import io.netty.util.ObjectSizeEstimator; */ public class SedaMemoryAwareThreadPoolExecutor extends MemoryAwareThreadPoolExecutor{ + /** + * + * @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ObjectSizeEstimator, ThreadFactory) + */ public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) { super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, objectSizeEstimator, threadFactory); } + /** + * @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ThreadFactory) + */ public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, threadFactory); } + /** + * + * @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit) + */ public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) { super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit); } + /** + * + * @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long) + */ public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) { super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize); } diff --git a/src/main/java/io/netty/handler/execution/seda/SedaOrderedMemoryAwareThreadPoolExecutor.java b/src/main/java/io/netty/handler/execution/seda/SedaOrderedMemoryAwareThreadPoolExecutor.java index c1d1ea4b8f..1c68aacaaf 100644 --- a/src/main/java/io/netty/handler/execution/seda/SedaOrderedMemoryAwareThreadPoolExecutor.java +++ b/src/main/java/io/netty/handler/execution/seda/SedaOrderedMemoryAwareThreadPoolExecutor.java @@ -30,18 +30,34 @@ import io.netty.util.ObjectSizeEstimator; */ public class SedaOrderedMemoryAwareThreadPoolExecutor extends OrderedMemoryAwareThreadPoolExecutor{ + /** + * + * @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ObjectSizeEstimator, ThreadFactory) + */ public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) { super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, objectSizeEstimator, threadFactory); } + /** + * + * @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ThreadFactory) + */ public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, threadFactory); } + /** + * + * @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit) + */ public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) { super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit); } + /** + * + * @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long) + */ public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) { super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize); } diff --git a/src/main/java/io/netty/handler/execution/seda/SimpleSedaExecutor.java b/src/main/java/io/netty/handler/execution/seda/SimpleSedaExecutor.java index 66b44dd93e..6e634b06e6 100644 --- a/src/main/java/io/netty/handler/execution/seda/SimpleSedaExecutor.java +++ b/src/main/java/io/netty/handler/execution/seda/SimpleSedaExecutor.java @@ -23,7 +23,7 @@ import io.netty.util.internal.ExecutorUtil; /** * {@link SedaExecutor} which use two different {@link Executor}'s. One is used for upstream events and one for downstream events. * - * You should use an {@link SedaOrderedMemoryAwareThreadPoolExecutor} if you care about the order of thread-execution. In most cases this should be the case + * You should use a {@link SedaOrderedMemoryAwareThreadPoolExecutor} if you care about the order of thread-execution. In most cases this should be the case * * * @@ -34,7 +34,7 @@ public class SimpleSedaExecutor extends SedaExecutor{ private final Executor downstreamExecutor; /** - * Constrct an {@link SimpleSedaExecutor} which use two different {@link Executor}'s. One is used for upstream events and one for downstream events. + * Construct an {@link SimpleSedaExecutor} which use two different {@link Executor}'s. One is used for upstream events and one for downstream events. * * @param upstreamExecutor the {@link Executor} which is used for upstream events * @param downstreamExecutor the {@link Executor} which is used for downstream events