MemoryAwareThreadPoolExecutor needs to notify ChannelFuture's of the not-executed ChannelEventRunnable on shutdownNow(). See #309

This commit is contained in:
norman 2012-05-04 14:41:54 +02:00
parent b5706d54f7
commit 924f0df93f

View File

@ -15,7 +15,11 @@
*/
package org.jboss.netty.handler.execution;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@ -32,6 +36,7 @@ import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.WriteCompletionEvent;
import org.jboss.netty.logging.InternalLogger;
@ -144,6 +149,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
new ConcurrentIdentityHashMap<Channel, AtomicLong>();
private final Limiter totalLimiter;
private volatile boolean notifyOnShutdown;
/**
* Creates a new instance.
*
@ -260,6 +267,61 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
misuseDetector.decrease();
}
/**
* This will call {@link #shutdownNow(boolean)} with the value of {@link #getNotifyChannelFuturesOnShutdown()}.
*/
@Override
public List<Runnable> shutdownNow() {
return shutdownNow(notifyOnShutdown);
}
/**
* See {@link ThreadPoolExecutor#shutdownNow()} for how it handles the shutdown. If <code>true</code> is given to this method it also notifies all {@link ChannelFuture}'s
* of the not executed {@link ChannelEventRunnable}'s.
*
* <p>
* Be aware that if you call this with <code>false</code> you will need to handle the notification of the {@link ChannelFuture}'s by your self. So only use this if you
* really have a use-case for it.
* </p>
*
*/
public List<Runnable> shutdownNow(boolean notify) {
if (!notify) {
return super.shutdownNow();
}
Throwable cause = null;
Set<Channel> channels = null;
List<Runnable> tasks = super.shutdownNow();
// loop over all tasks and cancel the ChannelFuture of the ChannelEventRunable's
for (Runnable task: tasks) {
if (task instanceof ChannelEventRunnable) {
if (cause == null) {
cause = new IOException("Unable to process queued event");
}
ChannelEvent event = ((ChannelEventRunnable) task).getEvent();
event.getFuture().setFailure(cause);
if (channels == null) {
channels = new HashSet<Channel>();
}
// store the Channel of the event for later notification of the exceptionCaught event
channels.add(event.getChannel());
}
}
// loop over all channels and fire an exceptionCaught event
if (channels != null) {
for (Channel channel: channels) {
Channels.fireExceptionCaughtLater(channel, cause);
}
}
return tasks;
}
/**
* Returns the {@link ObjectSizeEstimator} of this pool.
*/
@ -314,6 +376,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
return totalLimiter.limit;
}
/**
* @deprecated <tt>maxTotalMemorySize</tt> is not modifiable anymore.
*/
@ -330,6 +393,28 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
}
}
/**
* If set to <code>false</code> no queued {@link ChannelEventRunnable}'s {@link ChannelFuture} will get notified once {@link #shutdownNow()} is called.
* If set to <code>true</code> every queued {@link ChannelEventRunnable} will get marked as failed via {@link ChannelFuture#setFailure(Throwable)}.
*
* <p>
* Please only set this to <code>false</code> if you want to handle the notification by yourself and know what you are doing. Default is <code>true</code>.
* </p>
*/
public void setNotifyChannelFuturesOnShutdown(boolean notifyOnShutdown) {
this.notifyOnShutdown = notifyOnShutdown;
}
/**
* Returns if the {@link ChannelFuture}'s of the {@link ChannelEventRunnable}'s should be notified about the shutdown of this {@link MemoryAwareThreadPoolExecutor}.
*
*/
public boolean getNotifyChannelFuturesOnShutdown() {
return notifyOnShutdown;
}
@Override
public void execute(Runnable command) {
if (command instanceof ChannelDownstreamEventRunnable) {