From 5c97c7fd1c3784bcf1ad2e0b9fb5d5fc66d82f1a Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Mon, 6 Apr 2009 07:09:11 +0000 Subject: [PATCH] Resolved issue: NETTY-140 ChannelFuture.await*() should throw an IllegalStateException if dead lock is expected * Added IoWorkerRunnable which maintains a thread local boolean variable * Improved DefaultChannelFuture to check IoWorkerRunnable.IN_IO_THREAD to detect possible dead lock * All I/O worker runnables are wrapped by IoWorkerRunnable. --- .../netty/channel/DefaultChannelFuture.java | 12 +++++ ...HttpTunnelingClientSocketPipelineSink.java | 15 +++--- .../nio/NioClientSocketPipelineSink.java | 7 ++- .../nio/NioServerSocketPipelineSink.java | 12 +++-- .../netty/channel/socket/nio/NioWorker.java | 5 +- .../oio/OioClientSocketPipelineSink.java | 15 +++--- .../socket/oio/OioDatagramPipelineSink.java | 19 ++++--- .../oio/OioServerSocketPipelineSink.java | 26 +++++---- .../netty/util/internal/IoWorkerRunnable.java | 54 +++++++++++++++++++ 9 files changed, 128 insertions(+), 37 deletions(-) create mode 100644 src/main/java/org/jboss/netty/util/internal/IoWorkerRunnable.java diff --git a/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java b/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java index 3563ed8f6f..b8dd193d90 100644 --- a/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java +++ b/src/main/java/org/jboss/netty/channel/DefaultChannelFuture.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.internal.IoWorkerRunnable; /** * The default {@link ChannelFuture} implementation. It is recommended to @@ -144,6 +145,7 @@ public class DefaultChannelFuture implements ChannelFuture { public ChannelFuture await() throws InterruptedException { synchronized (this) { while (!done) { + checkDeadLock(); waiters++; try { this.wait(); @@ -167,6 +169,7 @@ public class DefaultChannelFuture implements ChannelFuture { public ChannelFuture awaitUninterruptibly() { synchronized (this) { while (!done) { + checkDeadLock(); waiters++; try { this.wait(); @@ -208,6 +211,7 @@ public class DefaultChannelFuture implements ChannelFuture { return done; } + checkDeadLock(); waiters++; try { for (;;) { @@ -234,6 +238,14 @@ public class DefaultChannelFuture implements ChannelFuture { } } + private void checkDeadLock() { + if (IoWorkerRunnable.IN_IO_THREAD.get()) { + throw new IllegalStateException( + "await*() in I/O thread causes a dead lock or " + + "sudden performance drop."); + } + } + public boolean setSuccess() { synchronized (this) { // Allow only once. diff --git a/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketPipelineSink.java index bd8e15f1c3..df10854b84 100644 --- a/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/http/HttpTunnelingClientSocketPipelineSink.java @@ -35,6 +35,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable; /** @@ -123,12 +124,14 @@ final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink { fireChannelConnected(channel, channel.getRemoteAddress()); // Start the business. - workerExecutor.execute(new ThreadRenamingRunnable( - new HttpTunnelWorker(channel), - "Old I/O client worker (channelId: " + channel.getId() + ", " + - channel.getLocalAddress() + " => " + - channel.getRemoteAddress() + ')')); - + workerExecutor.execute( + new IoWorkerRunnable( + new ThreadRenamingRunnable( + new HttpTunnelWorker(channel), + "Old I/O client worker (channelId: " + + channel.getId() + ", " + + channel.getLocalAddress() + " => " + + channel.getRemoteAddress() + ')'))); workerStarted = true; } catch (Throwable t) { future.setFailure(t); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index b5cfb0ef01..56a6293e87 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -48,6 +48,7 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.ThreadRenamingRunnable; @@ -189,8 +190,10 @@ class NioClientSocketPipelineSink extends AbstractChannelSink { // Start the worker thread with the new Selector. boolean success = false; try { - bossExecutor.execute(new ThreadRenamingRunnable( - this, "New I/O client boss #" + id)); + bossExecutor.execute( + new IoWorkerRunnable( + new ThreadRenamingRunnable( + this, "New I/O client boss #" + id))); success = true; } finally { if (!success) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 06d9d25a18..53248e32cb 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -42,6 +42,7 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable; /** @@ -155,10 +156,13 @@ class NioServerSocketPipelineSink extends AbstractChannelSink { Executor bossExecutor = ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; - bossExecutor.execute(new ThreadRenamingRunnable( - new Boss(channel), - "New I/O server boss #" + id +" (channelId: " + channel.getId() + - ", " + channel.getLocalAddress() + ')')); + bossExecutor.execute( + new IoWorkerRunnable( + new ThreadRenamingRunnable( + new Boss(channel), + "New I/O server boss #" + id + + " (channelId: " + channel.getId() + + ", " + channel.getLocalAddress() + ')'))); bossStarted = true; } catch (Throwable t) { future.setFailure(t); diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index 98702ad17b..9df7ca97f0 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -51,6 +51,7 @@ import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.ReceiveBufferSizePredictor; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.ThreadRenamingRunnable; @@ -110,7 +111,9 @@ class NioWorker implements Runnable { boolean success = false; try { - executor.execute(new ThreadRenamingRunnable(this, threadName)); + executor.execute( + new IoWorkerRunnable( + new ThreadRenamingRunnable(this, threadName))); success = true; } finally { if (!success) { diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java index 60cb439255..13bfb48652 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioClientSocketPipelineSink.java @@ -36,6 +36,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable; /** @@ -133,12 +134,14 @@ class OioClientSocketPipelineSink extends AbstractChannelSink { fireChannelConnected(channel, channel.getRemoteAddress()); // Start the business. - workerExecutor.execute(new ThreadRenamingRunnable( - new OioWorker(channel), - "Old I/O client worker (channelId: " + channel.getId() + ", " + - channel.getLocalAddress() + " => " + - channel.getRemoteAddress() + ')')); - + workerExecutor.execute( + new IoWorkerRunnable( + new ThreadRenamingRunnable( + new OioWorker(channel), + "Old I/O client worker (channelId: " + + channel.getId() + ", " + + channel.getLocalAddress() + " => " + + channel.getRemoteAddress() + ')'))); workerStarted = true; } catch (Throwable t) { future.setFailure(t); diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java index 3daf7e70db..2a4f7a47fb 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioDatagramPipelineSink.java @@ -35,6 +35,7 @@ import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable; /** @@ -106,11 +107,13 @@ class OioDatagramPipelineSink extends AbstractChannelSink { fireChannelBound(channel, channel.getLocalAddress()); // Start the business. - workerExecutor.execute(new ThreadRenamingRunnable( - new OioDatagramWorker(channel), - "Old I/O datagram worker (channelId: " + channel.getId() + ", " + - channel.getLocalAddress() + ')')); - + workerExecutor.execute( + new IoWorkerRunnable( + new ThreadRenamingRunnable( + new OioDatagramWorker(channel), + "Old I/O datagram worker (channelId: " + + channel.getId() + ", " + + channel.getLocalAddress() + ')'))); workerStarted = true; } catch (Throwable t) { future.setFailure(t); @@ -149,8 +152,10 @@ class OioDatagramPipelineSink extends AbstractChannelSink { channel.getRemoteAddress() + ')'; if (!bound) { // Start the business. - workerExecutor.execute(new ThreadRenamingRunnable( - new OioDatagramWorker(channel), threadName)); + workerExecutor.execute( + new IoWorkerRunnable( + new ThreadRenamingRunnable( + new OioDatagramWorker(channel), threadName))); } else { // Worker started by bind() - just rename. Thread workerThread = channel.workerThread; diff --git a/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java index c8334ce46a..00769135a9 100644 --- a/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/oio/OioServerSocketPipelineSink.java @@ -40,6 +40,7 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.internal.IoWorkerRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable; /** @@ -149,10 +150,12 @@ class OioServerSocketPipelineSink extends AbstractChannelSink { Executor bossExecutor = ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor; - bossExecutor.execute(new ThreadRenamingRunnable( - new Boss(channel), - "Old I/O server boss (channelId: " + channel.getId() + - ", " + localAddress + ')')); + bossExecutor.execute( + new IoWorkerRunnable( + new ThreadRenamingRunnable( + new Boss(channel), + "Old I/O server boss (channelId: " + + channel.getId() + ", " + localAddress + ')'))); bossStarted = true; } catch (Throwable t) { future.setFailure(t); @@ -203,13 +206,14 @@ class OioServerSocketPipelineSink extends AbstractChannelSink { OioServerSocketPipelineSink.this, acceptedSocket); workerExecutor.execute( - new ThreadRenamingRunnable( - new OioWorker(acceptedChannel), - "Old I/O server worker (parentId: " + - channel.getId() + - ", channelId: " + acceptedChannel.getId() + ", " + - channel.getRemoteAddress() + " => " + - channel.getLocalAddress() + ')')); + new IoWorkerRunnable( + new ThreadRenamingRunnable( + new OioWorker(acceptedChannel), + "Old I/O server worker (parentId: " + + channel.getId() + ", channelId: " + + acceptedChannel.getId() + ", " + + channel.getRemoteAddress() + " => " + + channel.getLocalAddress() + ')'))); } catch (Exception e) { logger.warn( "Failed to initialize an accepted socket.", e); diff --git a/src/main/java/org/jboss/netty/util/internal/IoWorkerRunnable.java b/src/main/java/org/jboss/netty/util/internal/IoWorkerRunnable.java new file mode 100644 index 0000000000..54456e59c6 --- /dev/null +++ b/src/main/java/org/jboss/netty/util/internal/IoWorkerRunnable.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2009 Trustin Heuiseung Lee + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.util.internal; + +import org.jboss.netty.channel.ChannelFuture; + +/** + * @author Trustin Heui-seung Lee (trustin@gmail.com) + * @version $Rev$, $Date$ + */ +public class IoWorkerRunnable implements Runnable { + + /** + * An internal use only thread-local variable that determines if + * the caller is running on an I/O worker thread, which is the case where + * the caller enters a dead lock if the caller calls + * {@link ChannelFuture#await()} or {@link ChannelFuture#awaitUninterruptibly()}. + */ + public static final ThreadLocal IN_IO_THREAD = new ThreadLocalBoolean(); + + private final Runnable runnable; + + public IoWorkerRunnable(Runnable runnable) { + if (runnable == null) { + throw new NullPointerException("runnable"); + } + this.runnable = runnable; + } + + public void run() { + IN_IO_THREAD.set(Boolean.TRUE); + try { + runnable.run(); + } finally { + IN_IO_THREAD.remove(); + } + } +}