From 0edc9abb0fa59f20312fae8ea4af246237fa97a9 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 11 Mar 2009 11:22:06 +0000 Subject: [PATCH] Made sure MemoryAwareThreadPoolExecutor and ExecutionHandler get along well with other traffic controlling handlers --- .../handler/execution/ExecutionHandler.java | 27 ++++++++++++++++++- .../MemoryAwareThreadPoolExecutor.java | 14 +++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java b/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java index 7dd2811f05..bae1a23a0d 100644 --- a/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java +++ b/src/main/java/org/jboss/netty/handler/execution/ExecutionHandler.java @@ -24,10 +24,14 @@ package org.jboss.netty.handler.execution; import java.util.concurrent.Executor; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelDownstreamHandler; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineCoverage; +import org.jboss.netty.channel.ChannelState; +import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelUpstreamHandler; import org.jboss.netty.util.ExecutorUtil; import org.jboss.netty.util.ExternalResourceReleasable; @@ -62,7 +66,7 @@ import org.jboss.netty.util.ExternalResourceReleasable; * @apiviz.has java.util.concurrent.ThreadPoolExecutor */ @ChannelPipelineCoverage("all") -public class ExecutionHandler implements ChannelUpstreamHandler, ExternalResourceReleasable { +public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, ExternalResourceReleasable { private final Executor executor; @@ -95,4 +99,25 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ExternalResourc ChannelHandlerContext context, ChannelEvent e) throws Exception { executor.execute(new ChannelEventRunnable(context, e)); } + + public void handleDownstream( + ChannelHandlerContext ctx, ChannelEvent e) throws Exception { + if (e instanceof ChannelStateEvent) { + ChannelStateEvent cse = (ChannelStateEvent) e; + if (cse.getState() == ChannelState.INTEREST_OPS && + (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) { + + // setReadable(true) requested + boolean readSuspended = ctx.getAttachment() != null; + if (readSuspended) { + // Drop the request silently if MemoryAwareThreadPool has + // set the flag. + e.getFuture().setSuccess(); + return; + } + } + } + + ctx.sendDownstream(e); + } } diff --git a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java index 2a17acddb1..029f28d99a 100644 --- a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; +import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.logging.InternalLogger; @@ -360,6 +361,11 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { if (maxChannelMemorySize != 0 && channelCounter >= maxChannelMemorySize && channel.isOpen()) { if (channel.isReadable()) { //System.out.println("UNREADABLE"); + ChannelHandlerContext ctx = eventTask.getContext(); + if (ctx.getHandler() instanceof ExecutionHandler) { + // readSuspended = true; + ctx.setAttachment(Boolean.TRUE); + } channel.setReadable(false); } } @@ -396,12 +402,18 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { } if (task instanceof ChannelEventRunnable) { - Channel channel = ((ChannelEventRunnable) task).getEvent().getChannel(); + ChannelEventRunnable eventTask = (ChannelEventRunnable) task; + Channel channel = eventTask.getEvent().getChannel(); long channelCounter = getChannelCounter(channel).addAndGet(-increment); //System.out.println("DC: " + channelCounter + ", " + increment); if (maxChannelMemorySize != 0 && channelCounter < maxChannelMemorySize && channel.isOpen()) { if (!channel.isReadable()) { //System.out.println("READABLE"); + ChannelHandlerContext ctx = eventTask.getContext(); + if (ctx.getHandler() instanceof ExecutionHandler) { + // readSuspended = false; + ctx.setAttachment(null); + } channel.setReadable(true); } }