MemoryAwareThreadPoolExecutor needs to notify ChannelFuture's of the queued ChannelEventRunnable on shutdownNow(). See #309

This commit is contained in:
norman 2012-05-04 14:36:30 +02:00
parent 21a61ce632
commit c24eafed48

View File

@ -15,6 +15,10 @@
*/
package io.netty.handler.execution;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@ -28,9 +32,11 @@ import java.util.concurrent.atomic.AtomicLong;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelState;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.Channels;
import io.netty.channel.MessageEvent;
import io.netty.channel.WriteCompletionEvent;
import io.netty.util.internal.ConcurrentIdentityHashMap;
@ -135,6 +141,8 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
new ConcurrentIdentityHashMap<Channel, AtomicLong>();
private final Limiter totalLimiter;
private volatile boolean notifyOnShutdown;
/**
* Creates a new instance.
*
@ -234,13 +242,68 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
// Misuse check
misuseDetector.increase();
}
@Override
protected void terminated() {
super.terminated();
misuseDetector.decrease();
}
/**
* This will call {@link #shutdownNow(boolean)} with the value of {@link #getNotifyChannelFuturesOnShutdown()}.
*/
@Override
public List<Runnable> shutdownNow() {
return shutdownNow(notifyOnShutdown);
}
/**
* See {@link ThreadPoolExecutor#shutdownNow()} for how it handles the shutdown. If <code>true</code> is given to this method it also notifies all {@link ChannelFuture}'s
* of the not executed {@link ChannelEventRunnable}'s.
*
* <p>
* Be aware that if you call this with <code>false</code> you will need to handle the notification of the {@link ChannelFuture}'s by your self. So only use this if you
* really have a use-case for it.
* </p>
*
*/
public List<Runnable> shutdownNow(boolean notify) {
if (!notify) {
return super.shutdownNow();
}
Throwable cause = null;
Set<Channel> channels = null;
List<Runnable> tasks = super.shutdownNow();
// loop over all tasks and cancel the ChannelFuture of the ChannelEventRunable's
for (Runnable task: tasks) {
if (task instanceof ChannelEventRunnable) {
if (cause == null) {
cause = new IOException("Unable to process queued event");
}
ChannelEvent event = ((ChannelEventRunnable) task).getEvent();
event.getFuture().setFailure(cause);
if (channels == null) {
channels = new HashSet<Channel>();
}
// store the Channel of the event for later notification of the exceptionCaught event
channels.add(event.getChannel());
}
}
// loop over all channels and fire an exceptionCaught event
if (channels != null) {
for (Channel channel: channels) {
Channels.fireExceptionCaughtLater(channel, cause);
}
}
return tasks;
}
/**
* Returns the {@link ObjectSizeEstimator} of this pool.
*/
@ -295,6 +358,7 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
return totalLimiter.limit;
}
/**
* @deprecated <tt>maxTotalMemorySize</tt> is not modifiable anymore.
*/
@ -311,6 +375,28 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
}
}
/**
* If set to <code>false</code> no queued {@link ChannelEventRunnable}'s {@link ChannelFuture} will get notified once {@link #shutdownNow()} is called.
* If set to <code>true</code> every queued {@link ChannelEventRunnable} will get marked as failed via {@link ChannelFuture#setFailure(Throwable)}.
*
* <p>
* Please only set this to <code>false</code> if you want to handle the notification by yourself and know what you are doing. Default is <code>true</code>.
* </p>
*/
public void setNotifyChannelFuturesOnShutdown(boolean notifyOnShutdown) {
this.notifyOnShutdown = notifyOnShutdown;
}
/**
* Returns if the {@link ChannelFuture}'s of the {@link ChannelEventRunnable}'s should be notified about the shutdown of this {@link MemoryAwareThreadPoolExecutor}.
*
*/
public boolean getNotifyChannelFuturesOnShutdown() {
return notifyOnShutdown;
}
@Override
public void execute(Runnable command) {
if (command instanceof ChannelDownstreamEventRunnable) {