Fixed issue: NETTY-380 releaseExternalResources() hang indefinitely

when called from a handler

* Replaced IoWorkerRunnable with DeadLockProofWorker
* ExecutorUtil now checks dead lock
This commit is contained in:
Trustin Lee 2011-02-01 10:56:59 +09:00
parent 8eb2d8eb43
commit dfe960855f
12 changed files with 102 additions and 74 deletions

View File

@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker;
/** /**
* The default {@link ChannelFuture} implementation. It is recommended to * The default {@link ChannelFuture} implementation. It is recommended to
@ -305,7 +305,7 @@ public class DefaultChannelFuture implements ChannelFuture {
} }
private void checkDeadLock() { private void checkDeadLock() {
if (isUseDeadLockChecker() && IoWorkerRunnable.IN_IO_THREAD.get()) { if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) {
throw new IllegalStateException( throw new IllegalStateException(
"await*() in I/O thread causes a dead lock or " + "await*() in I/O thread causes a dead lock or " +
"sudden performance drop. Use addListener() instead or " + "sudden performance drop. Use addListener() instead or " +

View File

@ -31,7 +31,7 @@ import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker;
/** /**
* The default {@link ChannelGroupFuture} implementation. * The default {@link ChannelGroupFuture} implementation.
@ -338,7 +338,7 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture {
} }
private void checkDeadLock() { private void checkDeadLock() {
if (IoWorkerRunnable.IN_IO_THREAD.get()) { if (DeadLockProofWorker.PARENT.get() != null) {
throw new IllegalStateException( throw new IllegalStateException(
"await*() in I/O thread causes a dead lock or " + "await*() in I/O thread causes a dead lock or " +
"sudden performance drop. Use addListener() instead or " + "sudden performance drop. Use addListener() instead or " +

View File

@ -43,7 +43,7 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.LinkedTransferQueue;
/** /**
@ -196,9 +196,10 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
// Start the worker thread with the new Selector. // Start the worker thread with the new Selector.
boolean success = false; boolean success = false;
try { try {
bossExecutor.execute( DeadLockProofWorker.start(
new IoWorkerRunnable(new ThreadRenamingRunnable( bossExecutor,
this, "NewIO", "ClientBoss", null, String.valueOf(id), null))); new ThreadRenamingRunnable(
this, "NewIO", "ClientBoss", null, String.valueOf(id), null));
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {

View File

@ -40,7 +40,7 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker;
/** /**
* *
@ -154,11 +154,12 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
Executor bossExecutor = Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
bossExecutor.execute( DeadLockProofWorker.start(
new IoWorkerRunnable(new ThreadRenamingRunnable( bossExecutor,
new ThreadRenamingRunnable(
new Boss(channel), new Boss(channel),
"NewIO", "ServerBoss", null, String.valueOf(id), "NewIO", "ServerBoss", null, String.valueOf(id),
channel.toString()))); channel.toString()));
bossStarted = true; bossStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);

View File

@ -47,7 +47,7 @@ import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.LinkedTransferQueue;
/** /**
@ -108,11 +108,12 @@ class NioWorker implements Runnable {
// Start the worker thread with the new Selector. // Start the worker thread with the new Selector.
boolean success = false; boolean success = false;
try { try {
executor.execute( DeadLockProofWorker.start(
new IoWorkerRunnable(new ThreadRenamingRunnable( executor,
new ThreadRenamingRunnable(
this, "NewIO", this, "NewIO",
server? "ServerWorker" : "ClientWorker", server? "ServerWorker" : "ClientWorker",
String.valueOf(bossId), String.valueOf(id), null))); String.valueOf(bossId), String.valueOf(id), null));
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {

View File

@ -31,7 +31,7 @@ import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker;
/** /**
* *
@ -132,13 +132,13 @@ class OioClientSocketPipelineSink extends AbstractChannelSink {
fireChannelConnected(channel, channel.getRemoteAddress()); fireChannelConnected(channel, channel.getRemoteAddress());
// Start the business. // Start the business.
workerExecutor.execute( DeadLockProofWorker.start(
new IoWorkerRunnable( workerExecutor,
new ThreadRenamingRunnable( new ThreadRenamingRunnable(
new OioWorker(channel), new OioWorker(channel),
"OldIO", "ClientWorker", "OldIO", "ClientWorker",
String.valueOf(id), String.valueOf(channel.getId()), String.valueOf(id), String.valueOf(channel.getId()),
channel.toString()))); channel.toString()));
workerStarted = true; workerStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);

View File

@ -29,7 +29,7 @@ import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker;
/** /**
* *
@ -103,14 +103,13 @@ class OioDatagramPipelineSink extends AbstractChannelSink {
fireChannelBound(channel, channel.getLocalAddress()); fireChannelBound(channel, channel.getLocalAddress());
// Start the business. // Start the business.
workerExecutor.execute( DeadLockProofWorker.start(
new IoWorkerRunnable( workerExecutor,
new ThreadRenamingRunnable( new ThreadRenamingRunnable(
new OioDatagramWorker(channel), new OioDatagramWorker(channel),
"OldIO", "OldIO", "DatagramWorker",
"DatagramWorker", String.valueOf(id), String.valueOf(channel.getId()),
String.valueOf(id), String.valueOf(channel.getId()), channel.toString()));
channel.toString())));
workerStarted = true; workerStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
@ -153,9 +152,12 @@ class OioDatagramPipelineSink extends AbstractChannelSink {
if (!bound) { if (!bound) {
// Start the business. // Start the business.
workerExecutor.execute(new IoWorkerRunnable(new ThreadRenamingRunnable( DeadLockProofWorker.start(
new OioDatagramWorker(channel), workerExecutor,
service, category, String.valueOf(id), String.valueOf(channel.getId()), comment))); new ThreadRenamingRunnable(
new OioDatagramWorker(channel),
service, category, String.valueOf(id),
String.valueOf(channel.getId()), comment));
} else { } else {
// Worker started by bind() - just rename. // Worker started by bind() - just rename.
Thread workerThread = channel.workerThread; Thread workerThread = channel.workerThread;

View File

@ -35,7 +35,7 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker;
/** /**
* *
@ -148,12 +148,11 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
Executor bossExecutor = Executor bossExecutor =
((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor; ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
bossExecutor.execute( DeadLockProofWorker.start(
new IoWorkerRunnable( bossExecutor,
new ThreadRenamingRunnable( new ThreadRenamingRunnable(
new Boss(channel), new Boss(channel), "OldIO", "ServerBoss", null,
"OldIO", "ServerBoss", null, String.valueOf(id), channel.toString()));
String.valueOf(id), channel.toString())));
bossStarted = true; bossStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
@ -217,14 +216,14 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
pipeline, pipeline,
OioServerSocketPipelineSink.this, OioServerSocketPipelineSink.this,
acceptedSocket); acceptedSocket);
workerExecutor.execute( DeadLockProofWorker.start(
new IoWorkerRunnable( workerExecutor,
new ThreadRenamingRunnable( new ThreadRenamingRunnable(
new OioWorker(acceptedChannel), new OioWorker(acceptedChannel),
"OldIO", "ServerWorker", "OldIO", "ServerWorker",
String.valueOf(id), String.valueOf(id),
String.valueOf(acceptedChannel.getId()), String.valueOf(acceptedChannel.getId()),
acceptedChannel.toString()))); acceptedChannel.toString()));
} catch (Exception e) { } catch (Exception e) {
logger.warn( logger.warn(
"Failed to initialize an accepted socket.", e); "Failed to initialize an accepted socket.", e);

View File

@ -28,7 +28,7 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.LinkedTransferQueue;
/** /**
@ -244,7 +244,7 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler {
} }
private void detectDeadLock() { private void detectDeadLock() {
if (IoWorkerRunnable.IN_IO_THREAD.get()) { if (DeadLockProofWorker.PARENT.get() != null) {
throw new IllegalStateException( throw new IllegalStateException(
"read*(...) in I/O thread causes a dead lock or " + "read*(...) in I/O thread causes a dead lock or " +
"sudden performance drop. Implement a state machine or " + "sudden performance drop. Implement a state machine or " +

View File

@ -15,38 +15,42 @@
*/ */
package org.jboss.netty.util.internal; package org.jboss.netty.util.internal;
import org.jboss.netty.channel.ChannelFuture; import java.util.concurrent.Executor;
/** /**
* @author <a href="http://gleamynode.net/">Trustin Lee</a> * @author <a href="http://gleamynode.net/">Trustin Lee</a>
* @version $Rev$, $Date$ * @version $Rev$, $Date$
*/ */
public class IoWorkerRunnable implements Runnable { public final class DeadLockProofWorker {
/** /**
* An <em>internal use only</em> thread-local variable that determines if * An <em>internal use only</em> thread-local variable that tells the
* the caller is running on an I/O worker thread, which is the case where * {@link Executor} that this worker acquired a worker thread from.
* the caller enters a dead lock if the caller calls
* {@link ChannelFuture#await()} or {@link ChannelFuture#awaitUninterruptibly()}.
*/ */
public static final ThreadLocal<Boolean> IN_IO_THREAD = new ThreadLocalBoolean(); public static final ThreadLocal<Executor> PARENT = new ThreadLocal<Executor>();
private final Runnable runnable; public static void start(final Executor parent, final Runnable runnable) {
if (parent == null) {
public IoWorkerRunnable(Runnable runnable) { throw new NullPointerException("parent");
}
if (runnable == null) { if (runnable == null) {
throw new NullPointerException("runnable"); throw new NullPointerException("runnable");
} }
this.runnable = runnable;
parent.execute(new Runnable() {
@Override
public void run() {
PARENT.set(parent);
try {
runnable.run();
} finally {
PARENT.remove();
}
}
});
} }
@Override private DeadLockProofWorker() {
public void run() { super();
IN_IO_THREAD.set(Boolean.TRUE);
try {
runnable.run();
} finally {
IN_IO_THREAD.remove();
}
} }
} }

View File

@ -50,6 +50,11 @@ public class ExecutorUtil {
* Shuts down the specified executors. * Shuts down the specified executors.
*/ */
public static void terminate(Executor... executors) { public static void terminate(Executor... executors) {
// Check nulls.
if (executors == null) {
throw new NullPointerException("executors");
}
Executor[] executorsCopy = new Executor[executors.length]; Executor[] executorsCopy = new Executor[executors.length];
for (int i = 0; i < executors.length; i ++) { for (int i = 0; i < executors.length; i ++) {
if (executors[i] == null) { if (executors[i] == null) {
@ -58,6 +63,21 @@ public class ExecutorUtil {
executorsCopy[i] = executors[i]; executorsCopy[i] = executors[i];
} }
// Check dead lock.
final Executor currentParent = DeadLockProofWorker.PARENT.get();
if (currentParent != null) {
for (Executor e: executorsCopy) {
if (e == currentParent) {
throw new IllegalStateException(
"An Executor cannot be shut down from the thread " +
"acquired from itself. Please make sure you are " +
"not calling releaseExternalResources() from an " +
"I/O worker thread.");
}
}
}
// Shut down all executors.
boolean interrupted = false; boolean interrupted = false;
for (Executor e: executorsCopy) { for (Executor e: executorsCopy) {
if (!(e instanceof ExecutorService)) { if (!(e instanceof ExecutorService)) {

View File

@ -42,8 +42,8 @@ public class StackTraceSimplifier {
private static final Pattern EXCLUDED_STACK_TRACE = private static final Pattern EXCLUDED_STACK_TRACE =
Pattern.compile( Pattern.compile(
"^org\\.jboss\\.netty\\." + "^org\\.jboss\\.netty\\." +
"(util\\.(ThreadRenamingRunnable)" + "(util\\.(ThreadRenamingRunnable|internal\\.DeadLockProofWorker)" +
"|channel\\.(SimpleChannel(Upstream|Downstream)?Handler|(Default|Static)ChannelPipeline.*))$"); "|channel\\.(SimpleChannel(Upstream|Downstream)?Handler|(Default|Static)ChannelPipeline.*))(\\$.*)?$");
/** /**
* Removes unnecessary {@link StackTraceElement}s from the specified * Removes unnecessary {@link StackTraceElement}s from the specified