From 924f0df93ff1bd1f794cc0af4d797d2e6e4f2502 Mon Sep 17 00:00:00 2001 From: norman Date: Fri, 4 May 2012 14:41:54 +0200 Subject: [PATCH] MemoryAwareThreadPoolExecutor needs to notify ChannelFuture's of the not-executed ChannelEventRunnable on shutdownNow(). See #309 --- .../MemoryAwareThreadPoolExecutor.java | 87 ++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java index b589180f7f..311449ce18 100644 --- a/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java +++ b/src/main/java/org/jboss/netty/handler/execution/MemoryAwareThreadPoolExecutor.java @@ -15,7 +15,11 @@ */ package org.jboss.netty.handler.execution; +import java.io.IOException; import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -32,6 +36,7 @@ import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.WriteCompletionEvent; import org.jboss.netty.logging.InternalLogger; @@ -144,6 +149,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { new ConcurrentIdentityHashMap(); private final Limiter totalLimiter; + private volatile boolean notifyOnShutdown; + /** * Creates a new instance. * @@ -253,13 +260,68 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { // Misuse check misuseDetector.increase(); } - + @Override protected void terminated() { super.terminated(); misuseDetector.decrease(); } + /** + * This will call {@link #shutdownNow(boolean)} with the value of {@link #getNotifyChannelFuturesOnShutdown()}. + */ + @Override + public List shutdownNow() { + return shutdownNow(notifyOnShutdown); + } + + /** + * See {@link ThreadPoolExecutor#shutdownNow()} for how it handles the shutdown. If true is given to this method it also notifies all {@link ChannelFuture}'s + * of the not executed {@link ChannelEventRunnable}'s. + * + *

+ * Be aware that if you call this with false you will need to handle the notification of the {@link ChannelFuture}'s by your self. So only use this if you + * really have a use-case for it. + *

+ * + */ + public List shutdownNow(boolean notify) { + if (!notify) { + return super.shutdownNow(); + } + Throwable cause = null; + Set channels = null; + + List tasks = super.shutdownNow(); + + // loop over all tasks and cancel the ChannelFuture of the ChannelEventRunable's + for (Runnable task: tasks) { + if (task instanceof ChannelEventRunnable) { + if (cause == null) { + cause = new IOException("Unable to process queued event"); + } + ChannelEvent event = ((ChannelEventRunnable) task).getEvent(); + event.getFuture().setFailure(cause); + + if (channels == null) { + channels = new HashSet(); + } + + + // store the Channel of the event for later notification of the exceptionCaught event + channels.add(event.getChannel()); + } + } + + // loop over all channels and fire an exceptionCaught event + if (channels != null) { + for (Channel channel: channels) { + Channels.fireExceptionCaughtLater(channel, cause); + } + } + return tasks; + } + /** * Returns the {@link ObjectSizeEstimator} of this pool. */ @@ -314,6 +376,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { return totalLimiter.limit; } + /** * @deprecated maxTotalMemorySize is not modifiable anymore. */ @@ -330,6 +393,28 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor { } } + /** + * If set to false no queued {@link ChannelEventRunnable}'s {@link ChannelFuture} will get notified once {@link #shutdownNow()} is called. + * If set to true every queued {@link ChannelEventRunnable} will get marked as failed via {@link ChannelFuture#setFailure(Throwable)}. + * + *

+ * Please only set this to false if you want to handle the notification by yourself and know what you are doing. Default is true. + *

+ */ + public void setNotifyChannelFuturesOnShutdown(boolean notifyOnShutdown) { + this.notifyOnShutdown = notifyOnShutdown; + } + + /** + * Returns if the {@link ChannelFuture}'s of the {@link ChannelEventRunnable}'s should be notified about the shutdown of this {@link MemoryAwareThreadPoolExecutor}. + * + */ + public boolean getNotifyChannelFuturesOnShutdown() { + return notifyOnShutdown; + } + + + @Override public void execute(Runnable command) { if (command instanceof ChannelDownstreamEventRunnable) {