From dfe960855fec0fc053194f6da5a9cd912565c292 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 1 Feb 2011 10:56:59 +0900 Subject: [PATCH] Fixed issue: NETTY-380 releaseExternalResources() hang indefinitely when called from a handler * Replaced IoWorkerRunnable with DeadLockProofWorker * ExecutorUtil now checks dead lock --- .../netty/channel/DefaultChannelFuture.java | 4 +- .../group/DefaultChannelGroupFuture.java | 4 +- .../nio/NioClientSocketPipelineSink.java | 9 ++-- .../nio/NioServerSocketPipelineSink.java | 9 ++-- .../netty/channel/socket/nio/NioWorker.java | 9 ++-- .../oio/OioClientSocketPipelineSink.java | 16 +++---- .../socket/oio/OioDatagramPipelineSink.java | 26 ++++++------ .../oio/OioServerSocketPipelineSink.java | 29 +++++++------ .../handler/queue/BlockingReadHandler.java | 4 +- ...Runnable.java => DeadLockProofWorker.java} | 42 ++++++++++--------- .../netty/util/internal/ExecutorUtil.java | 20 +++++++++ .../util/internal/StackTraceSimplifier.java | 4 +- 12 files changed, 102 insertions(+), 74 deletions(-) rename src/main/java/org/jboss/netty/util/internal/{IoWorkerRunnable.java => DeadLockProofWorker.java} (56%) diff --git a/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java b/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java index 5eb5d15e3e..9d80a09388 100644 --- a/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java +++ b/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * The default {@link ChannelFuture} implementation. It is recommended to @@ -305,7 +305,7 @@ public class DefaultChannelFuture implements ChannelFuture { } private void checkDeadLock() { - if (isUseDeadLockChecker() && IoWorkerRunnable.IN_IO_THREAD.get()) { + if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) { throw new IllegalStateException( "await*() in I/O thread causes a dead lock or " + "sudden performance drop. Use addListener() instead or " + diff --git a/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroupFuture.java b/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroupFuture.java index 0850da1bbd..f151e5ee4c 100644 --- a/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroupFuture.java +++ b/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroupFuture.java @@ -31,7 +31,7 @@ import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * The default {@link ChannelGroupFuture} implementation. @@ -338,7 +338,7 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture { } private void checkDeadLock() { - if (IoWorkerRunnable.IN_IO_THREAD.get()) { + if (DeadLockProofWorker.PARENT.get() != null) { throw new IllegalStateException( "await*() in I/O thread causes a dead lock or " + "sudden performance drop. Use addListener() instead or " + diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 5e04535dac..d6b9e6e183 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -43,7 +43,7 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.LinkedTransferQueue; /** @@ -196,9 +196,10 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { // Start the worker thread with the new Selector. boolean success = false; try { - bossExecutor.execute( - new IoWorkerRunnable(new ThreadRenamingRunnable( - this, "NewIO", "ClientBoss", null, String.valueOf(id), null))); + DeadLockProofWorker.start( + bossExecutor, + new ThreadRenamingRunnable( + this, "NewIO", "ClientBoss", null, String.valueOf(id), null)); success = true; } finally { if (!success) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 918cfc2a0b..dfe823651d 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -40,7 +40,7 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * @@ -154,11 +154,12 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { Executor bossExecutor = ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; - bossExecutor.execute( - new IoWorkerRunnable(new ThreadRenamingRunnable( + DeadLockProofWorker.start( + bossExecutor, + new ThreadRenamingRunnable( new Boss(channel), "NewIO", "ServerBoss", null, String.valueOf(id), - channel.toString()))); + channel.toString())); bossStarted = true; } catch (Throwable t) { future.setFailure(t); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 0938072ff5..0f756e4a93 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -47,7 +47,7 @@ import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.LinkedTransferQueue; /** @@ -108,11 +108,12 @@ class NioWorker implements Runnable { // Start the worker thread with the new Selector. boolean success = false; try { - executor.execute( - new IoWorkerRunnable(new ThreadRenamingRunnable( + DeadLockProofWorker.start( + executor, + new ThreadRenamingRunnable( this, "NewIO", server? "ServerWorker" : "ClientWorker", - String.valueOf(bossId), String.valueOf(id), null))); + String.valueOf(bossId), String.valueOf(id), null)); success = true; } finally { if (!success) { diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java index 7182ecfd3b..b80fcc05d0 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java @@ -31,7 +31,7 @@ import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * @@ -132,13 +132,13 @@ class OioClientSocketPipelineSink extends AbstractChannelSink { fireChannelConnected(channel, channel.getRemoteAddress()); // Start the business. - workerExecutor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable( - new OioWorker(channel), - "OldIO", "ClientWorker", - String.valueOf(id), String.valueOf(channel.getId()), - channel.toString()))); + DeadLockProofWorker.start( + workerExecutor, + new ThreadRenamingRunnable( + new OioWorker(channel), + "OldIO", "ClientWorker", + String.valueOf(id), String.valueOf(channel.getId()), + channel.toString())); workerStarted = true; } catch (Throwable t) { future.setFailure(t); diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java index ee16fe0298..9b35c9e9ab 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java @@ -29,7 +29,7 @@ import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * @@ -103,14 +103,13 @@ class OioDatagramPipelineSink extends AbstractChannelSink { fireChannelBound(channel, channel.getLocalAddress()); // Start the business. - workerExecutor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable( - new OioDatagramWorker(channel), - "OldIO", - "DatagramWorker", - String.valueOf(id), String.valueOf(channel.getId()), - channel.toString()))); + DeadLockProofWorker.start( + workerExecutor, + new ThreadRenamingRunnable( + new OioDatagramWorker(channel), + "OldIO", "DatagramWorker", + String.valueOf(id), String.valueOf(channel.getId()), + channel.toString())); workerStarted = true; } catch (Throwable t) { future.setFailure(t); @@ -153,9 +152,12 @@ class OioDatagramPipelineSink extends AbstractChannelSink { if (!bound) { // Start the business. - workerExecutor.execute(new IoWorkerRunnable(new ThreadRenamingRunnable( - new OioDatagramWorker(channel), - service, category, String.valueOf(id), String.valueOf(channel.getId()), comment))); + DeadLockProofWorker.start( + workerExecutor, + new ThreadRenamingRunnable( + new OioDatagramWorker(channel), + service, category, String.valueOf(id), + String.valueOf(channel.getId()), comment)); } else { // Worker started by bind() - just rename. Thread workerThread = channel.workerThread; diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java index 210dbf9608..b0a94d9517 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java @@ -35,7 +35,7 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * @@ -148,12 +148,11 @@ class OioServerSocketPipelineSink extends AbstractChannelSink { Executor bossExecutor = ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor; - bossExecutor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable( - new Boss(channel), - "OldIO", "ServerBoss", null, - String.valueOf(id), channel.toString()))); + DeadLockProofWorker.start( + bossExecutor, + new ThreadRenamingRunnable( + new Boss(channel), "OldIO", "ServerBoss", null, + String.valueOf(id), channel.toString())); bossStarted = true; } catch (Throwable t) { future.setFailure(t); @@ -217,14 +216,14 @@ class OioServerSocketPipelineSink extends AbstractChannelSink { pipeline, OioServerSocketPipelineSink.this, acceptedSocket); - workerExecutor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable( - new OioWorker(acceptedChannel), - "OldIO", "ServerWorker", - String.valueOf(id), - String.valueOf(acceptedChannel.getId()), - acceptedChannel.toString()))); + DeadLockProofWorker.start( + workerExecutor, + new ThreadRenamingRunnable( + new OioWorker(acceptedChannel), + "OldIO", "ServerWorker", + String.valueOf(id), + String.valueOf(acceptedChannel.getId()), + acceptedChannel.toString())); } catch (Exception e) { logger.warn( "Failed to initialize an accepted socket.", e); diff --git a/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java b/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java index 26a69ffc6d..b5c4f77e7b 100644 --- a/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java +++ b/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java @@ -28,7 +28,7 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.LinkedTransferQueue; /** @@ -244,7 +244,7 @@ public class BlockingReadHandler extends SimpleChannelUpstreamHandler { } private void detectDeadLock() { - if (IoWorkerRunnable.IN_IO_THREAD.get()) { + if (DeadLockProofWorker.PARENT.get() != null) { throw new IllegalStateException( "read*(...) in I/O thread causes a dead lock or " + "sudden performance drop. Implement a state machine or " + diff --git a/src/main/java/org/jboss/netty/util/internal/IoWorkerRunnable.java b/src/main/java/org/jboss/netty/util/internal/DeadLockProofWorker.java similarity index 56% rename from src/main/java/org/jboss/netty/util/internal/IoWorkerRunnable.java rename to src/main/java/org/jboss/netty/util/internal/DeadLockProofWorker.java index 58821f3adc..1fc3f6ac3f 100644 --- a/src/main/java/org/jboss/netty/util/internal/IoWorkerRunnable.java +++ b/src/main/java/org/jboss/netty/util/internal/DeadLockProofWorker.java @@ -15,38 +15,42 @@ */ package org.jboss.netty.util.internal; -import org.jboss.netty.channel.ChannelFuture; +import java.util.concurrent.Executor; /** * @author Trustin Lee * @version $Rev$, $Date$ */ -public class IoWorkerRunnable implements Runnable { +public final class DeadLockProofWorker { /** - * An internal use only thread-local variable that determines if - * the caller is running on an I/O worker thread, which is the case where - * the caller enters a dead lock if the caller calls - * {@link ChannelFuture#await()} or {@link ChannelFuture#awaitUninterruptibly()}. + * An internal use only thread-local variable that tells the + * {@link Executor} that this worker acquired a worker thread from. */ - public static final ThreadLocal IN_IO_THREAD = new ThreadLocalBoolean(); + public static final ThreadLocal PARENT = new ThreadLocal(); - private final Runnable runnable; - - public IoWorkerRunnable(Runnable runnable) { + public static void start(final Executor parent, final Runnable runnable) { + if (parent == null) { + throw new NullPointerException("parent"); + } if (runnable == null) { throw new NullPointerException("runnable"); } - this.runnable = runnable; + + parent.execute(new Runnable() { + @Override + public void run() { + PARENT.set(parent); + try { + runnable.run(); + } finally { + PARENT.remove(); + } + } + }); } - @Override - public void run() { - IN_IO_THREAD.set(Boolean.TRUE); - try { - runnable.run(); - } finally { - IN_IO_THREAD.remove(); - } + private DeadLockProofWorker() { + super(); } } diff --git a/src/main/java/org/jboss/netty/util/internal/ExecutorUtil.java b/src/main/java/org/jboss/netty/util/internal/ExecutorUtil.java index 763ad69465..08409b7f22 100644 --- a/src/main/java/org/jboss/netty/util/internal/ExecutorUtil.java +++ b/src/main/java/org/jboss/netty/util/internal/ExecutorUtil.java @@ -50,6 +50,11 @@ public class ExecutorUtil { * Shuts down the specified executors. */ public static void terminate(Executor... executors) { + // Check nulls. + if (executors == null) { + throw new NullPointerException("executors"); + } + Executor[] executorsCopy = new Executor[executors.length]; for (int i = 0; i < executors.length; i ++) { if (executors[i] == null) { @@ -58,6 +63,21 @@ public class ExecutorUtil { executorsCopy[i] = executors[i]; } + // Check dead lock. + final Executor currentParent = DeadLockProofWorker.PARENT.get(); + if (currentParent != null) { + for (Executor e: executorsCopy) { + if (e == currentParent) { + throw new IllegalStateException( + "An Executor cannot be shut down from the thread " + + "acquired from itself. Please make sure you are " + + "not calling releaseExternalResources() from an " + + "I/O worker thread."); + } + } + } + + // Shut down all executors. boolean interrupted = false; for (Executor e: executorsCopy) { if (!(e instanceof ExecutorService)) { diff --git a/src/main/java/org/jboss/netty/util/internal/StackTraceSimplifier.java b/src/main/java/org/jboss/netty/util/internal/StackTraceSimplifier.java index ecd72b7f23..9aef4d06a6 100644 --- a/src/main/java/org/jboss/netty/util/internal/StackTraceSimplifier.java +++ b/src/main/java/org/jboss/netty/util/internal/StackTraceSimplifier.java @@ -42,8 +42,8 @@ public class StackTraceSimplifier { private static final Pattern EXCLUDED_STACK_TRACE = Pattern.compile( "^org\\.jboss\\.netty\\." + - "(util\\.(ThreadRenamingRunnable)" + - "|channel\\.(SimpleChannel(Upstream|Downstream)?Handler|(Default|Static)ChannelPipeline.*))$"); + "(util\\.(ThreadRenamingRunnable|internal\\.DeadLockProofWorker)" + + "|channel\\.(SimpleChannel(Upstream|Downstream)?Handler|(Default|Static)ChannelPipeline.*))(\\$.*)?$"); /** * Removes unnecessary {@link StackTraceElement}s from the specified