Made sure MemoryAwareThreadPoolExecutor and ExecutionHandler get along well with other traffic controlling handlers

This commit is contained in:
Trustin Lee 2009-03-11 11:22:06 +00:00
parent d2c4a1143e
commit 0edc9abb0f
2 changed files with 39 additions and 2 deletions

View File

@ -24,10 +24,14 @@ package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.util.ExecutorUtil;
import org.jboss.netty.util.ExternalResourceReleasable;
@ -62,7 +66,7 @@ import org.jboss.netty.util.ExternalResourceReleasable;
* @apiviz.has java.util.concurrent.ThreadPoolExecutor
*/
@ChannelPipelineCoverage("all")
public class ExecutionHandler implements ChannelUpstreamHandler, ExternalResourceReleasable {
public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, ExternalResourceReleasable {
private final Executor executor;
@ -95,4 +99,25 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ExternalResourc
ChannelHandlerContext context, ChannelEvent e) throws Exception {
executor.execute(new ChannelEventRunnable(context, e));
}
public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent cse = (ChannelStateEvent) e;
if (cse.getState() == ChannelState.INTEREST_OPS &&
(((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
// setReadable(true) requested
boolean readSuspended = ctx.getAttachment() != null;
if (readSuspended) {
// Drop the request silently if MemoryAwareThreadPool has
// set the flag.
e.getFuture().setSuccess();
return;
}
}
}
ctx.sendDownstream(e);
}
}

View File

@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.logging.InternalLogger;
@ -360,6 +361,11 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
if (maxChannelMemorySize != 0 && channelCounter >= maxChannelMemorySize && channel.isOpen()) {
if (channel.isReadable()) {
//System.out.println("UNREADABLE");
ChannelHandlerContext ctx = eventTask.getContext();
if (ctx.getHandler() instanceof ExecutionHandler) {
// readSuspended = true;
ctx.setAttachment(Boolean.TRUE);
}
channel.setReadable(false);
}
}
@ -396,12 +402,18 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
}
if (task instanceof ChannelEventRunnable) {
Channel channel = ((ChannelEventRunnable) task).getEvent().getChannel();
ChannelEventRunnable eventTask = (ChannelEventRunnable) task;
Channel channel = eventTask.getEvent().getChannel();
long channelCounter = getChannelCounter(channel).addAndGet(-increment);
//System.out.println("DC: " + channelCounter + ", " + increment);
if (maxChannelMemorySize != 0 && channelCounter < maxChannelMemorySize && channel.isOpen()) {
if (!channel.isReadable()) {
//System.out.println("READABLE");
ChannelHandlerContext ctx = eventTask.getContext();
if (ctx.getHandler() instanceof ExecutionHandler) {
// readSuspended = false;
ctx.setAttachment(null);
}
channel.setReadable(true);
}
}