Some cleanup and javadocs for the SEDA implementation. Also fixes a bug

in the ExecutionHandler which I introduces while working on SEDA. See
#111
This commit is contained in:
norman 2011-12-12 13:18:27 +01:00
parent 75a6414639
commit a4bfc4011d
6 changed files with 65 additions and 8 deletions

View File

@ -147,8 +147,10 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
@Override @Override
public void handleDownstream( public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception { ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
handleReadSuspend(ctx, e); // check if the read was suspend
ctx.sendDownstream(e); if (!handleReadSuspend(ctx, e)) {
ctx.sendDownstream(e);
}
} }
/** /**
@ -157,7 +159,7 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
* @param ctx * @param ctx
* @param e * @param e
*/ */
protected void handleReadSuspend(ChannelHandlerContext ctx, ChannelEvent e) { protected boolean handleReadSuspend(ChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof ChannelStateEvent) { if (e instanceof ChannelStateEvent) {
ChannelStateEvent cse = (ChannelStateEvent) e; ChannelStateEvent cse = (ChannelStateEvent) e;
if (cse.getState() == ChannelState.INTEREST_OPS && if (cse.getState() == ChannelState.INTEREST_OPS &&
@ -169,9 +171,10 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
// Drop the request silently if MemoryAwareThreadPool has // Drop the request silently if MemoryAwareThreadPool has
// set the flag. // set the flag.
e.getFuture().setSuccess(); e.getFuture().setSuccess();
return; return true;
} }
} }
} }
return false;
} }
} }

View File

@ -39,10 +39,22 @@ import io.netty.handler.execution.ChannelEventRunnable;
*/ */
public abstract class FineGrainedSedaExecutor extends SimpleSedaExecutor{ public abstract class FineGrainedSedaExecutor extends SimpleSedaExecutor{
/**
* Create a new {@link FineGrainedSedaExecutor} which use the two given {@link Executor}'s as default. One is used for upstream events and one for downstream events.
*
* @param upstreamExecutor use the given {@link Executor} as default for downstream events
* @param downstreamExecutor use the given {@link Executor} as default for upstream events
*/
public FineGrainedSedaExecutor(Executor upstreamExecutor, Executor downstreamExecutor) { public FineGrainedSedaExecutor(Executor upstreamExecutor, Executor downstreamExecutor) {
super(upstreamExecutor, downstreamExecutor); super(upstreamExecutor, downstreamExecutor);
} }
/**
* Create a new {@link FineGrainedSedaExecutor} which used the given {@link Executor} as default for upstream and downstream events
*
* @param executor use the given {@link Executor} as default for upstream and downstream events
*
*/
public FineGrainedSedaExecutor(Executor executor) { public FineGrainedSedaExecutor(Executor executor) {
super(executor); super(executor);
} }

View File

@ -29,14 +29,25 @@ import io.netty.handler.execution.ExecutionHandler;
*/ */
public class SedaHandler extends ExecutionHandler { public class SedaHandler extends ExecutionHandler {
/**
* Create a new {@link SedaHandler} which uses the given {@link SedaExecutor}
*
* @param executor the {@link SedaExecutor} to hand off tasks
*/
public SedaHandler(SedaExecutor executor) { public SedaHandler(SedaExecutor executor) {
super(executor); super(executor);
} }
/**
* Hand the event to the {@link Executor}
*/
@Override @Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
handleReadSuspend(ctx, e);
getExecutor().execute(new ChannelDownstreamEventRunnable(ctx, e)); // check if the read was suspend
if (!handleReadSuspend(ctx, e)) {
getExecutor().execute(new ChannelDownstreamEventRunnable(ctx, e));
}
} }
@Override @Override

View File

@ -31,18 +31,33 @@ import io.netty.util.ObjectSizeEstimator;
*/ */
public class SedaMemoryAwareThreadPoolExecutor extends MemoryAwareThreadPoolExecutor{ public class SedaMemoryAwareThreadPoolExecutor extends MemoryAwareThreadPoolExecutor{
/**
*
* @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ObjectSizeEstimator, ThreadFactory)
*/
public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) { public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, objectSizeEstimator, threadFactory); super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, objectSizeEstimator, threadFactory);
} }
/**
* @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ThreadFactory)
*/
public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, threadFactory); super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, threadFactory);
} }
/**
*
* @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit)
*/
public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) { public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit); super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit);
} }
/**
*
* @see MemoryAwareThreadPoolExecutor#MemoryAwareThreadPoolExecutor(int, long, long)
*/
public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) { public SedaMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize); super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
} }

View File

@ -30,18 +30,34 @@ import io.netty.util.ObjectSizeEstimator;
*/ */
public class SedaOrderedMemoryAwareThreadPoolExecutor extends OrderedMemoryAwareThreadPoolExecutor{ public class SedaOrderedMemoryAwareThreadPoolExecutor extends OrderedMemoryAwareThreadPoolExecutor{
/**
*
* @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ObjectSizeEstimator, ThreadFactory)
*/
public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) { public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, objectSizeEstimator, threadFactory); super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, objectSizeEstimator, threadFactory);
} }
/**
*
* @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit, ThreadFactory)
*/
public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, threadFactory); super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit, threadFactory);
} }
/**
*
* @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long, long, TimeUnit)
*/
public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) { public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit); super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize, keepAliveTime, unit);
} }
/**
*
* @see OrderedMemoryAwareThreadPoolExecutor#OrderedMemoryAwareThreadPoolExecutor(int, long, long)
*/
public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) { public SedaOrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) {
super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize); super(corePoolSize, maxChannelMemorySize, maxTotalMemorySize);
} }

View File

@ -23,7 +23,7 @@ import io.netty.util.internal.ExecutorUtil;
/** /**
* {@link SedaExecutor} which use two different {@link Executor}'s. One is used for upstream events and one for downstream events. * {@link SedaExecutor} which use two different {@link Executor}'s. One is used for upstream events and one for downstream events.
* *
* You should use an {@link SedaOrderedMemoryAwareThreadPoolExecutor} if you care about the order of thread-execution. In most cases this should be the case * You should use a {@link SedaOrderedMemoryAwareThreadPoolExecutor} if you care about the order of thread-execution. In most cases this should be the case
* *
* *
* *
@ -34,7 +34,7 @@ public class SimpleSedaExecutor extends SedaExecutor{
private final Executor downstreamExecutor; private final Executor downstreamExecutor;
/** /**
* Constrct an {@link SimpleSedaExecutor} which use two different {@link Executor}'s. One is used for upstream events and one for downstream events. * Construct an {@link SimpleSedaExecutor} which use two different {@link Executor}'s. One is used for upstream events and one for downstream events.
* *
* @param upstreamExecutor the {@link Executor} which is used for upstream events * @param upstreamExecutor the {@link Executor} which is used for upstream events
* @param downstreamExecutor the {@link Executor} which is used for downstream events * @param downstreamExecutor the {@link Executor} which is used for downstream events