Cleanup / Modify MATPE to reject a downstream event

This commit is contained in:
Trustin Lee 2012-01-13 20:35:46 +09:00
parent 958ffa50e3
commit fde6789f41
9 changed files with 27 additions and 37 deletions

View File

@ -16,16 +16,14 @@
package io.netty.handler.execution;
import java.util.concurrent.Executor;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor;
/**
* A special {@link Executor} which allows to <code>chain</code> {@link Executor}'s. This allows build some complex logic which can help to
* build up SEDA in an easy fashion
*
*
* A special {@link Executor} which allows to chain a series of
* {@link Executor}s and {@link ChannelEventRunnableFilter}.
*/
public class ChainedExecutor implements Executor, ExternalResourceReleasable {
@ -42,19 +40,19 @@ public class ChainedExecutor implements Executor, ExternalResourceReleasable {
* @param next the {@link Executor} to use if the {@link ChannelEventRunnableFilter} does not match
*/
public ChainedExecutor(ChannelEventRunnableFilter filter, Executor cur, Executor next) {
if (filter == null) {
throw new NullPointerException("filter");
}
if (cur == null) {
throw new NullPointerException("cur");
}
if (next == null) {
throw new NullPointerException("next");
}
if (filter == null) {
throw new NullPointerException("filter");
}
this.filter = filter;
this.cur = cur;
this.next = next;
this.filter = filter;
}
/**
@ -84,5 +82,4 @@ public class ChainedExecutor implements Executor, ExternalResourceReleasable {
((ExternalResourceReleasable) executor).releaseExternalResources();
}
}
}

View File

@ -35,5 +35,4 @@ public class ChannelDownstreamEventRunnable extends ChannelEventRunnable {
public void run() {
ctx.sendDownstream(e);
}
}

View File

@ -25,5 +25,4 @@ public class ChannelDownstreamEventRunnableFilter implements ChannelEventRunnabl
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}

View File

@ -17,8 +17,6 @@ package io.netty.handler.execution;
import java.util.concurrent.Executor;
import io.netty.handler.execution.ChannelEventRunnable;
public interface ChannelEventRunnableFilter {
/**

View File

@ -17,14 +17,10 @@ package io.netty.handler.execution;
/**
* {@link ChannelEventRunnableFilter} which matches {@link ChannelDownstreamEventRunnable}
*
*
*/
public class ChannelUpstreamEventRunnableFilter implements ChannelEventRunnableFilter {
@Override
public boolean filter(ChannelEventRunnable event) {
return event instanceof ChannelDownstreamEventRunnable;
}
}

View File

@ -22,6 +22,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelDownstreamHandler;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory;
@ -29,7 +30,6 @@ import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ChannelUpstreamHandler;
import io.netty.channel.Channels;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.internal.ExecutorUtil;
@ -93,7 +93,6 @@ import io.netty.util.internal.ExecutorUtil;
* You can implement an alternative thread model such as
* <a href="http://en.wikipedia.org/wiki/Staged_event-driven_architecture">SEDA</a>
* by adding more than one {@link ExecutionHandler} to the pipeline.
* Alternative you may want to have a look at {@link SedaExecutor}.
*
* <h3>Using other {@link Executor} implementation</h3>
*
@ -102,6 +101,7 @@ import io.netty.util.internal.ExecutorUtil;
* that other {@link Executor} implementation might break your application
* because they often do not maintain event execution order nor interact with
* I/O threads to control the incoming traffic and avoid {@link OutOfMemoryError}.
*
* @apiviz.landmark
* @apiviz.has java.util.concurrent.ThreadPoolExecutor
*/
@ -119,6 +119,10 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
this(executor, false);
}
/**
* Creates a new instance with the specified {@link Executor}.
* Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
*/
public ExecutionHandler(Executor executor, boolean handleDownstream) {
if (executor == null) {
throw new NullPointerException("executor");
@ -140,6 +144,7 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
*/
@Override
public void releaseExternalResources() {
Executor executor = getExecutor();
ExecutorUtil.terminate(executor);
if (executor instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) executor).releaseExternalResources();
@ -167,9 +172,6 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
/**
* Handle suspended reads
*
* @param ctx
* @param e
*/
protected boolean handleReadSuspend(ChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof ChannelStateEvent) {
@ -187,6 +189,7 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
}
}
}
return false;
}
}

View File

@ -62,8 +62,8 @@ import io.netty.util.internal.SharedResourceMisuseDetector;
* <ul>
* <li>you are using {@link MemoryAwareThreadPoolExecutor} independently from
* {@link ExecutionHandler},</li>
* <li>you are submitting a task whose type is not {@link ChannelUpstreamEventRunnable}, or</li>
* <li>the message type of the {@link MessageEvent} in the {@link ChannelUpstreamEventRunnable}
* <li>you are submitting a task whose type is not {@link ChannelEventRunnable}, or</li>
* <li>the message type of the {@link MessageEvent} in the {@link ChannelEventRunnable}
* is not {@link ChannelBuffer}.</li>
* </ul>
* Here is an example that demonstrates how to implement an {@link ObjectSizeEstimator}
@ -313,7 +313,10 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
@Override
public void execute(Runnable command) {
if (!(command instanceof ChannelUpstreamEventRunnable)) {
if (command instanceof ChannelDownstreamEventRunnable) {
throw new RejectedExecutionException("command must be enclosed with an upstream event.");
}
if (!(command instanceof ChannelEventRunnable)) {
command = new MemoryAwareRunnable(command);
}
@ -396,8 +399,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
long maxChannelMemorySize = settings.maxChannelMemorySize;
int increment;
if (task instanceof ChannelUpstreamEventRunnable) {
increment = ((ChannelUpstreamEventRunnable) task).estimatedSize;
if (task instanceof ChannelEventRunnable) {
increment = ((ChannelEventRunnable) task).estimatedSize;
} else {
increment = ((MemoryAwareRunnable) task).estimatedSize;
}
@ -475,9 +478,6 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
}
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
NewThreadRunsPolicy() {
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {

View File

@ -127,6 +127,7 @@ import io.netty.util.internal.QueueFactory;
* use a weak key map such as <a href="http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/src/jsr166y/ConcurrentWeakHashMap.java?view=markup">ConcurrentWeakHashMap</a>
* or synchronized {@link WeakHashMap} instead of managing the life cycle of the
* keys by yourself.
*
* @apiviz.landmark
*/
public class OrderedMemoryAwareThreadPoolExecutor extends
@ -281,9 +282,6 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
private final Queue<Runnable> tasks = QueueFactory.createQueue(Runnable.class);
private final AtomicBoolean isRunning = new AtomicBoolean();
ChildExecutor() {
}
@Override
public void execute(Runnable command) {
// TODO: What todo if the add return false ?

View File

@ -21,6 +21,6 @@
* @apiviz.exclude ^java\.lang\.
* @apiviz.exclude \.netty\.channel\.
* @apiviz.exclude \.ExternalResourceReleasable$
* @apiviz.exclude \.ChannelEventRunnable$
* @apiviz.exclude \.Channel[A-Za-z]*EventRunnable[A-Za-z]*$
*/
package io.netty.handler.execution;