From be6cdb4a11d4991e31bb0c3de2e4a8f9103a3295 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Tue, 1 Feb 2011 10:56:59 +0900 Subject: [PATCH] Fixed issue: NETTY-380 releaseExternalResources() hang indefinitely when called from a handler * Replaced IoWorkerRunnable with DeadLockProofWorker * ExecutorUtil now checks dead lock --- .../netty/channel/DefaultChannelFuture.java | 4 +- .../group/DefaultChannelGroupFuture.java | 4 +- .../nio/NioClientSocketPipelineSink.java | 10 ++--- .../nio/NioServerSocketPipelineSink.java | 13 +++--- .../netty/channel/socket/nio/NioWorker.java | 7 ++-- .../oio/OioClientSocketPipelineSink.java | 12 +++--- .../socket/oio/OioDatagramPipelineSink.java | 20 +++++----- .../oio/OioServerSocketPipelineSink.java | 24 +++++------ .../handler/queue/BlockingReadHandler.java | 4 +- ...Runnable.java => DeadLockProofWorker.java} | 40 ++++++++++--------- .../netty/util/internal/ExecutorUtil.java | 20 ++++++++++ .../util/internal/StackTraceSimplifier.java | 4 +- 12 files changed, 92 insertions(+), 70 deletions(-) rename src/main/java/org/jboss/netty/util/internal/{IoWorkerRunnable.java => DeadLockProofWorker.java} (58%) diff --git a/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java b/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java index 346796b0c1..1c0e8e12a4 100644 --- a/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java +++ b/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * The default {@link ChannelFuture} implementation. It is recommended to @@ -292,7 +292,7 @@ public class DefaultChannelFuture implements ChannelFuture { } private void checkDeadLock() { - if (isUseDeadLockChecker() && IoWorkerRunnable.IN_IO_THREAD.get()) { + if (isUseDeadLockChecker() && DeadLockProofWorker.PARENT.get() != null) { throw new IllegalStateException( "await*() in I/O thread causes a dead lock or " + "sudden performance drop. Use addListener() instead or " + diff --git a/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroupFuture.java b/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroupFuture.java index fc942cdc17..a30d6cc8f6 100644 --- a/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroupFuture.java +++ b/src/main/java/org/jboss/netty/channel/group/DefaultChannelGroupFuture.java @@ -31,7 +31,7 @@ import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * The default {@link ChannelGroupFuture} implementation. @@ -320,7 +320,7 @@ public class DefaultChannelGroupFuture implements ChannelGroupFuture { } private void checkDeadLock() { - if (IoWorkerRunnable.IN_IO_THREAD.get()) { + if (DeadLockProofWorker.PARENT.get() != null) { throw new IllegalStateException( "await*() in I/O thread causes a dead lock or " + "sudden performance drop. Use addListener() instead or " + diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index 8eea02d8a3..b38f3c18b6 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -43,7 +43,7 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.LinkedTransferQueue; /** @@ -194,10 +194,10 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { // Start the worker thread with the new Selector. boolean success = false; try { - bossExecutor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable( - this, "New I/O client boss #" + id))); + DeadLockProofWorker.start( + bossExecutor, + new ThreadRenamingRunnable( + this, "New I/O client boss #" + id)); success = true; } finally { if (!success) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index f0b64b72bb..9552db08d0 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -40,7 +40,7 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * @@ -153,12 +153,11 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { Executor bossExecutor = ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; - bossExecutor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable( - new Boss(channel), - "New I/O server boss #" + id + - " (" + channel + ')'))); + DeadLockProofWorker.start( + bossExecutor, + new ThreadRenamingRunnable( + new Boss(channel), + "New I/O server boss #" + id + " (" + channel + ')')); bossStarted = true; } catch (Throwable t) { future.setFailure(t); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index f0c4fecaaa..1dec6576a7 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -47,7 +47,7 @@ import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.LinkedTransferQueue; /** @@ -112,9 +112,8 @@ class NioWorker implements Runnable { boolean success = false; try { - executor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable(this, threadName))); + DeadLockProofWorker.start( + executor, new ThreadRenamingRunnable(this, threadName)); success = true; } finally { if (!success) { diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java index 402ff37cde..a180514603 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java @@ -30,7 +30,7 @@ import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * @@ -127,11 +127,11 @@ class OioClientSocketPipelineSink extends AbstractChannelSink { fireChannelConnected(channel, channel.getRemoteAddress()); // Start the business. - workerExecutor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable( - new OioWorker(channel), - "Old I/O client worker (" + channel + ')'))); + DeadLockProofWorker.start( + workerExecutor, + new ThreadRenamingRunnable( + new OioWorker(channel), + "Old I/O client worker (" + channel + ')')); workerStarted = true; } catch (Throwable t) { future.setFailure(t); diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java index 71324177ef..a0101be7e5 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java @@ -29,7 +29,7 @@ import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * @@ -100,11 +100,11 @@ class OioDatagramPipelineSink extends AbstractChannelSink { fireChannelBound(channel, channel.getLocalAddress()); // Start the business. - workerExecutor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable( - new OioDatagramWorker(channel), - "Old I/O datagram worker (" + channel + ')'))); + DeadLockProofWorker.start( + workerExecutor, + new ThreadRenamingRunnable( + new OioDatagramWorker(channel), + "Old I/O datagram worker (" + channel + ')')); workerStarted = true; } catch (Throwable t) { future.setFailure(t); @@ -144,10 +144,10 @@ class OioDatagramPipelineSink extends AbstractChannelSink { String threadName = "Old I/O datagram worker (" + channel + ')'; if (!bound) { // Start the business. - workerExecutor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable( - new OioDatagramWorker(channel), threadName))); + DeadLockProofWorker.start( + workerExecutor, + new ThreadRenamingRunnable( + new OioDatagramWorker(channel), threadName)); } else { // Worker started by bind() - just rename. Thread workerThread = channel.workerThread; diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java index 40c3beb542..16664297b2 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java @@ -34,7 +34,7 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.util.ThreadRenamingRunnable; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; /** * @@ -143,11 +143,11 @@ class OioServerSocketPipelineSink extends AbstractChannelSink { Executor bossExecutor = ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor; - bossExecutor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable( - new Boss(channel), - "Old I/O server boss (" + channel + ')'))); + DeadLockProofWorker.start( + bossExecutor, + new ThreadRenamingRunnable( + new Boss(channel), + "Old I/O server boss (" + channel + ')')); bossStarted = true; } catch (Throwable t) { future.setFailure(t); @@ -210,12 +210,12 @@ class OioServerSocketPipelineSink extends AbstractChannelSink { pipeline, OioServerSocketPipelineSink.this, acceptedSocket); - workerExecutor.execute( - new IoWorkerRunnable( - new ThreadRenamingRunnable( - new OioWorker(acceptedChannel), - "Old I/O server worker (parentId: " + - channel.getId() + ", " + channel + ')'))); + DeadLockProofWorker.start( + workerExecutor, + new ThreadRenamingRunnable( + new OioWorker(acceptedChannel), + "Old I/O server worker (parentId: " + + channel.getId() + ", " + channel + ')')); } catch (Exception e) { logger.warn( "Failed to initialize an accepted socket.", e); diff --git a/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java b/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java index 94699bce63..177a4594ac 100644 --- a/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java +++ b/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java @@ -28,7 +28,7 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.LinkedTransferQueue; /** @@ -244,7 +244,7 @@ public class BlockingReadHandler extends SimpleChannelUpstreamHandler { } private void detectDeadLock() { - if (IoWorkerRunnable.IN_IO_THREAD.get()) { + if (DeadLockProofWorker.PARENT.get() != null) { throw new IllegalStateException( "read*(...) in I/O thread causes a dead lock or " + "sudden performance drop. Implement a state machine or " + diff --git a/src/main/java/org/jboss/netty/util/internal/IoWorkerRunnable.java b/src/main/java/org/jboss/netty/util/internal/DeadLockProofWorker.java similarity index 58% rename from src/main/java/org/jboss/netty/util/internal/IoWorkerRunnable.java rename to src/main/java/org/jboss/netty/util/internal/DeadLockProofWorker.java index e12a26503e..3e4ed6f227 100644 --- a/src/main/java/org/jboss/netty/util/internal/IoWorkerRunnable.java +++ b/src/main/java/org/jboss/netty/util/internal/DeadLockProofWorker.java @@ -15,37 +15,41 @@ */ package org.jboss.netty.util.internal; -import org.jboss.netty.channel.ChannelFuture; +import java.util.concurrent.Executor; /** * @author Trustin Lee * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $ */ -public class IoWorkerRunnable implements Runnable { +public final class DeadLockProofWorker { /** - * An internal use only thread-local variable that determines if - * the caller is running on an I/O worker thread, which is the case where - * the caller enters a dead lock if the caller calls - * {@link ChannelFuture#await()} or {@link ChannelFuture#awaitUninterruptibly()}. + * An internal use only thread-local variable that tells the + * {@link Executor} that this worker acquired a worker thread from. */ - public static final ThreadLocal IN_IO_THREAD = new ThreadLocalBoolean(); + public static final ThreadLocal PARENT = new ThreadLocal(); - private final Runnable runnable; - - public IoWorkerRunnable(Runnable runnable) { + public static void start(final Executor parent, final Runnable runnable) { + if (parent == null) { + throw new NullPointerException("parent"); + } if (runnable == null) { throw new NullPointerException("runnable"); } - this.runnable = runnable; + + parent.execute(new Runnable() { + public void run() { + PARENT.set(parent); + try { + runnable.run(); + } finally { + PARENT.remove(); + } + } + }); } - public void run() { - IN_IO_THREAD.set(Boolean.TRUE); - try { - runnable.run(); - } finally { - IN_IO_THREAD.remove(); - } + private DeadLockProofWorker() { + super(); } } diff --git a/src/main/java/org/jboss/netty/util/internal/ExecutorUtil.java b/src/main/java/org/jboss/netty/util/internal/ExecutorUtil.java index 2a723e2a7a..2e2f2250f3 100644 --- a/src/main/java/org/jboss/netty/util/internal/ExecutorUtil.java +++ b/src/main/java/org/jboss/netty/util/internal/ExecutorUtil.java @@ -50,6 +50,11 @@ public class ExecutorUtil { * Shuts down the specified executors. */ public static void terminate(Executor... executors) { + // Check nulls. + if (executors == null) { + throw new NullPointerException("executors"); + } + Executor[] executorsCopy = new Executor[executors.length]; for (int i = 0; i < executors.length; i ++) { if (executors[i] == null) { @@ -58,6 +63,21 @@ public class ExecutorUtil { executorsCopy[i] = executors[i]; } + // Check dead lock. + final Executor currentParent = DeadLockProofWorker.PARENT.get(); + if (currentParent != null) { + for (Executor e: executorsCopy) { + if (e == currentParent) { + throw new IllegalStateException( + "An Executor cannot be shut down from the thread " + + "acquired from itself. Please make sure you are " + + "not calling releaseExternalResources() from an " + + "I/O worker thread."); + } + } + } + + // Shut down all executors. boolean interrupted = false; for (Executor e: executorsCopy) { if (!(e instanceof ExecutorService)) { diff --git a/src/main/java/org/jboss/netty/util/internal/StackTraceSimplifier.java b/src/main/java/org/jboss/netty/util/internal/StackTraceSimplifier.java index cfd537d934..8af8f8fa72 100644 --- a/src/main/java/org/jboss/netty/util/internal/StackTraceSimplifier.java +++ b/src/main/java/org/jboss/netty/util/internal/StackTraceSimplifier.java @@ -42,8 +42,8 @@ public class StackTraceSimplifier { private static final Pattern EXCLUDED_STACK_TRACE = Pattern.compile( "^org\\.jboss\\.netty\\." + - "(util\\.(ThreadRenamingRunnable)" + - "|channel\\.(SimpleChannel(Upstream|Downstream)?Handler|(Default|Static)ChannelPipeline.*))$"); + "(util\\.(ThreadRenamingRunnable|internal\\.DeadLockProofWorker)" + + "|channel\\.(SimpleChannel(Upstream|Downstream)?Handler|(Default|Static)ChannelPipeline.*))(\\$.*)?$"); /** * Removes unnecessary {@link StackTraceElement}s from the specified