Make sure calling ExecutionHandler.releaseExternalResource() does not lead to a dead-lock when calling from a ChannelEventRunnable. See #200

This commit is contained in:
norman 2012-06-01 09:25:59 +02:00
parent fca7f89371
commit 7800187433
6 changed files with 49 additions and 10 deletions

View File

@ -16,6 +16,8 @@
package org.jboss.netty.handler.execution; package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
@ -24,14 +26,15 @@ import org.jboss.netty.channel.ChannelHandlerContext;
*/ */
public class ChannelDownstreamEventRunnable extends ChannelEventRunnable { public class ChannelDownstreamEventRunnable extends ChannelEventRunnable {
public ChannelDownstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) { public ChannelDownstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e, Executor executor) {
super(ctx, e); super(ctx, e, executor);
} }
/** /**
* Send the {@link ChannelEvent} downstream * Send the {@link ChannelEvent} downstream
*/ */
public void run() { @Override
public void runTask() {
ctx.sendDownstream(e); ctx.sendDownstream(e);
} }
} }

View File

@ -15,23 +15,28 @@
*/ */
package org.jboss.netty.handler.execution; package org.jboss.netty.handler.execution;
import java.util.concurrent.Executor;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.util.EstimatableObjectWrapper; import org.jboss.netty.util.EstimatableObjectWrapper;
import org.jboss.netty.util.internal.DeadLockProofWorker;
public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper { public abstract class ChannelEventRunnable implements Runnable, EstimatableObjectWrapper {
protected final ChannelHandlerContext ctx; protected final ChannelHandlerContext ctx;
protected final ChannelEvent e; protected final ChannelEvent e;
int estimatedSize; int estimatedSize;
private final Executor executor;
/** /**
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent} * Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}. * upstream via the specified {@link ChannelHandlerContext}.
*/ */
public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) { public ChannelEventRunnable(ChannelHandlerContext ctx, ChannelEvent e, Executor executor) {
this.ctx = ctx; this.ctx = ctx;
this.e = e; this.e = e;
this.executor = executor;
} }
/** /**
@ -52,4 +57,19 @@ public abstract class ChannelEventRunnable implements Runnable, EstimatableObjec
public Object unwrap() { public Object unwrap() {
return e; return e;
} }
public final void run() {
try {
DeadLockProofWorker.PARENT.set(executor);
runTask();
} finally {
DeadLockProofWorker.PARENT.remove();
}
}
/**
* Run the task
*/
protected abstract void runTask();
} }

View File

@ -32,15 +32,16 @@ public class ChannelUpstreamEventRunnable extends ChannelEventRunnable {
* Creates a {@link Runnable} which sends the specified {@link ChannelEvent} * Creates a {@link Runnable} which sends the specified {@link ChannelEvent}
* upstream via the specified {@link ChannelHandlerContext}. * upstream via the specified {@link ChannelHandlerContext}.
*/ */
public ChannelUpstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e) { public ChannelUpstreamEventRunnable(ChannelHandlerContext ctx, ChannelEvent e, Executor executor) {
super(ctx, e); super(ctx, e, executor);
} }
/** /**
* Sends the event upstream. * Sends the event upstream.
*/ */
public void run() { @Override
protected void runTask() {
ctx.sendUpstream(e); ctx.sendUpstream(e);
} }
} }

View File

@ -169,7 +169,12 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
public void handleUpstream( public void handleUpstream(
ChannelHandlerContext context, ChannelEvent e) throws Exception { ChannelHandlerContext context, ChannelEvent e) throws Exception {
if (handleUpstream) { if (handleUpstream) {
executor.execute(new ChannelUpstreamEventRunnable(context, e)); try {
executor.execute(new ChannelUpstreamEventRunnable(context, e, executor));
} finally {
}
} else { } else {
context.sendUpstream(e); context.sendUpstream(e);
} }
@ -180,7 +185,7 @@ public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstre
// check if the read was suspend // check if the read was suspend
if (!handleReadSuspend(ctx, e)) { if (!handleReadSuspend(ctx, e)) {
if (handleDownstream) { if (handleDownstream) {
executor.execute(new ChannelDownstreamEventRunnable(ctx, e)); executor.execute(new ChannelDownstreamEventRunnable(ctx, e, executor));
} else { } else {
ctx.sendDownstream(e); ctx.sendDownstream(e);
} }

View File

@ -45,6 +45,7 @@ import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.DefaultObjectSizeEstimator; import org.jboss.netty.util.DefaultObjectSizeEstimator;
import org.jboss.netty.util.ObjectSizeEstimator; import org.jboss.netty.util.ObjectSizeEstimator;
import org.jboss.netty.util.internal.ConcurrentIdentityHashMap; import org.jboss.netty.util.internal.ConcurrentIdentityHashMap;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.QueueFactory; import org.jboss.netty.util.internal.QueueFactory;
import org.jboss.netty.util.internal.SharedResourceMisuseDetector; import org.jboss.netty.util.internal.SharedResourceMisuseDetector;
@ -441,7 +442,12 @@ public class MemoryAwareThreadPoolExecutor extends ThreadPoolExecutor {
* Executes the specified task without maintaining the event order. * Executes the specified task without maintaining the event order.
*/ */
protected final void doUnorderedExecute(Runnable task) { protected final void doUnorderedExecute(Runnable task) {
super.execute(task); try {
DeadLockProofWorker.PARENT.set(this);
super.execute(task);
} finally {
DeadLockProofWorker.PARENT.remove();
}
} }
@Override @Override

View File

@ -31,6 +31,7 @@ import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.util.ObjectSizeEstimator; import org.jboss.netty.util.ObjectSizeEstimator;
import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap; import org.jboss.netty.util.internal.ConcurrentIdentityWeakKeyHashMap;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.QueueFactory; import org.jboss.netty.util.internal.QueueFactory;
/** /**
@ -302,6 +303,7 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
acquired = true; acquired = true;
try { try {
Thread thread = Thread.currentThread(); Thread thread = Thread.currentThread();
DeadLockProofWorker.PARENT.set(OrderedMemoryAwareThreadPoolExecutor.this);
for (;;) { for (;;) {
final Runnable task = tasks.poll(); final Runnable task = tasks.poll();
// if the task is null we should exit the loop // if the task is null we should exit the loop
@ -323,6 +325,8 @@ public class OrderedMemoryAwareThreadPoolExecutor extends
} }
} }
} finally { } finally {
DeadLockProofWorker.PARENT.remove();
// set it back to not running // set it back to not running
isRunning.set(false); isRunning.set(false);
} }