Resolved issue: NETTY-140 ChannelFuture.await*() should throw an IllegalStateException if dead lock is expected

* Added IoWorkerRunnable which maintains a thread local boolean variable
* Improved DefaultChannelFuture to check IoWorkerRunnable.IN_IO_THREAD to detect possible dead lock
* All I/O worker runnables are wrapped by IoWorkerRunnable.
This commit is contained in:
Trustin Lee 2009-04-06 07:09:11 +00:00
parent c86bf34b30
commit 5c97c7fd1c
9 changed files with 128 additions and 37 deletions

View File

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.IoWorkerRunnable;
/** /**
* The default {@link ChannelFuture} implementation. It is recommended to * The default {@link ChannelFuture} implementation. It is recommended to
@ -144,6 +145,7 @@ public class DefaultChannelFuture implements ChannelFuture {
public ChannelFuture await() throws InterruptedException { public ChannelFuture await() throws InterruptedException {
synchronized (this) { synchronized (this) {
while (!done) { while (!done) {
checkDeadLock();
waiters++; waiters++;
try { try {
this.wait(); this.wait();
@ -167,6 +169,7 @@ public class DefaultChannelFuture implements ChannelFuture {
public ChannelFuture awaitUninterruptibly() { public ChannelFuture awaitUninterruptibly() {
synchronized (this) { synchronized (this) {
while (!done) { while (!done) {
checkDeadLock();
waiters++; waiters++;
try { try {
this.wait(); this.wait();
@ -208,6 +211,7 @@ public class DefaultChannelFuture implements ChannelFuture {
return done; return done;
} }
checkDeadLock();
waiters++; waiters++;
try { try {
for (;;) { for (;;) {
@ -234,6 +238,14 @@ public class DefaultChannelFuture implements ChannelFuture {
} }
} }
private void checkDeadLock() {
if (IoWorkerRunnable.IN_IO_THREAD.get()) {
throw new IllegalStateException(
"await*() in I/O thread causes a dead lock or " +
"sudden performance drop.");
}
}
public boolean setSuccess() { public boolean setSuccess() {
synchronized (this) { synchronized (this) {
// Allow only once. // Allow only once.

View File

@ -35,6 +35,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.internal.IoWorkerRunnable;
import org.jboss.netty.util.internal.ThreadRenamingRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable;
/** /**
@ -123,12 +124,14 @@ final class HttpTunnelingClientSocketPipelineSink extends AbstractChannelSink {
fireChannelConnected(channel, channel.getRemoteAddress()); fireChannelConnected(channel, channel.getRemoteAddress());
// Start the business. // Start the business.
workerExecutor.execute(new ThreadRenamingRunnable( workerExecutor.execute(
new HttpTunnelWorker(channel), new IoWorkerRunnable(
"Old I/O client worker (channelId: " + channel.getId() + ", " + new ThreadRenamingRunnable(
channel.getLocalAddress() + " => " + new HttpTunnelWorker(channel),
channel.getRemoteAddress() + ')')); "Old I/O client worker (channelId: " +
channel.getId() + ", " +
channel.getLocalAddress() + " => " +
channel.getRemoteAddress() + ')')));
workerStarted = true; workerStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);

View File

@ -48,6 +48,7 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.IoWorkerRunnable;
import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.ThreadRenamingRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable;
@ -189,8 +190,10 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
// Start the worker thread with the new Selector. // Start the worker thread with the new Selector.
boolean success = false; boolean success = false;
try { try {
bossExecutor.execute(new ThreadRenamingRunnable( bossExecutor.execute(
this, "New I/O client boss #" + id)); new IoWorkerRunnable(
new ThreadRenamingRunnable(
this, "New I/O client boss #" + id)));
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {

View File

@ -42,6 +42,7 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.IoWorkerRunnable;
import org.jboss.netty.util.internal.ThreadRenamingRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable;
/** /**
@ -155,10 +156,13 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
Executor bossExecutor = Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
bossExecutor.execute(new ThreadRenamingRunnable( bossExecutor.execute(
new Boss(channel), new IoWorkerRunnable(
"New I/O server boss #" + id +" (channelId: " + channel.getId() + new ThreadRenamingRunnable(
", " + channel.getLocalAddress() + ')')); new Boss(channel),
"New I/O server boss #" + id +
" (channelId: " + channel.getId() +
", " + channel.getLocalAddress() + ')')));
bossStarted = true; bossStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);

View File

@ -51,6 +51,7 @@ import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.ReceiveBufferSizePredictor; import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.IoWorkerRunnable;
import org.jboss.netty.util.internal.LinkedTransferQueue; import org.jboss.netty.util.internal.LinkedTransferQueue;
import org.jboss.netty.util.internal.ThreadRenamingRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable;
@ -110,7 +111,9 @@ class NioWorker implements Runnable {
boolean success = false; boolean success = false;
try { try {
executor.execute(new ThreadRenamingRunnable(this, threadName)); executor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(this, threadName)));
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {

View File

@ -36,6 +36,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.internal.IoWorkerRunnable;
import org.jboss.netty.util.internal.ThreadRenamingRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable;
/** /**
@ -133,12 +134,14 @@ class OioClientSocketPipelineSink extends AbstractChannelSink {
fireChannelConnected(channel, channel.getRemoteAddress()); fireChannelConnected(channel, channel.getRemoteAddress());
// Start the business. // Start the business.
workerExecutor.execute(new ThreadRenamingRunnable( workerExecutor.execute(
new OioWorker(channel), new IoWorkerRunnable(
"Old I/O client worker (channelId: " + channel.getId() + ", " + new ThreadRenamingRunnable(
channel.getLocalAddress() + " => " + new OioWorker(channel),
channel.getRemoteAddress() + ')')); "Old I/O client worker (channelId: " +
channel.getId() + ", " +
channel.getLocalAddress() + " => " +
channel.getRemoteAddress() + ')')));
workerStarted = true; workerStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);

View File

@ -35,6 +35,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.internal.IoWorkerRunnable;
import org.jboss.netty.util.internal.ThreadRenamingRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable;
/** /**
@ -106,11 +107,13 @@ class OioDatagramPipelineSink extends AbstractChannelSink {
fireChannelBound(channel, channel.getLocalAddress()); fireChannelBound(channel, channel.getLocalAddress());
// Start the business. // Start the business.
workerExecutor.execute(new ThreadRenamingRunnable( workerExecutor.execute(
new OioDatagramWorker(channel), new IoWorkerRunnable(
"Old I/O datagram worker (channelId: " + channel.getId() + ", " + new ThreadRenamingRunnable(
channel.getLocalAddress() + ')')); new OioDatagramWorker(channel),
"Old I/O datagram worker (channelId: " +
channel.getId() + ", " +
channel.getLocalAddress() + ')')));
workerStarted = true; workerStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
@ -149,8 +152,10 @@ class OioDatagramPipelineSink extends AbstractChannelSink {
channel.getRemoteAddress() + ')'; channel.getRemoteAddress() + ')';
if (!bound) { if (!bound) {
// Start the business. // Start the business.
workerExecutor.execute(new ThreadRenamingRunnable( workerExecutor.execute(
new OioDatagramWorker(channel), threadName)); new IoWorkerRunnable(
new ThreadRenamingRunnable(
new OioDatagramWorker(channel), threadName)));
} else { } else {
// Worker started by bind() - just rename. // Worker started by bind() - just rename.
Thread workerThread = channel.workerThread; Thread workerThread = channel.workerThread;

View File

@ -40,6 +40,7 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.IoWorkerRunnable;
import org.jboss.netty.util.internal.ThreadRenamingRunnable; import org.jboss.netty.util.internal.ThreadRenamingRunnable;
/** /**
@ -149,10 +150,12 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
Executor bossExecutor = Executor bossExecutor =
((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor; ((OioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
bossExecutor.execute(new ThreadRenamingRunnable( bossExecutor.execute(
new Boss(channel), new IoWorkerRunnable(
"Old I/O server boss (channelId: " + channel.getId() + new ThreadRenamingRunnable(
", " + localAddress + ')')); new Boss(channel),
"Old I/O server boss (channelId: " +
channel.getId() + ", " + localAddress + ')')));
bossStarted = true; bossStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
@ -203,13 +206,14 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
OioServerSocketPipelineSink.this, OioServerSocketPipelineSink.this,
acceptedSocket); acceptedSocket);
workerExecutor.execute( workerExecutor.execute(
new ThreadRenamingRunnable( new IoWorkerRunnable(
new OioWorker(acceptedChannel), new ThreadRenamingRunnable(
"Old I/O server worker (parentId: " + new OioWorker(acceptedChannel),
channel.getId() + "Old I/O server worker (parentId: " +
", channelId: " + acceptedChannel.getId() + ", " + channel.getId() + ", channelId: " +
channel.getRemoteAddress() + " => " + acceptedChannel.getId() + ", " +
channel.getLocalAddress() + ')')); channel.getRemoteAddress() + " => " +
channel.getLocalAddress() + ')')));
} catch (Exception e) { } catch (Exception e) {
logger.warn( logger.warn(
"Failed to initialize an accepted socket.", e); "Failed to initialize an accepted socket.", e);

View File

@ -0,0 +1,54 @@
/*
* Copyright (C) 2009 Trustin Heuiseung Lee
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.netty.util.internal;
import org.jboss.netty.channel.ChannelFuture;
/**
* @author Trustin Heui-seung Lee (trustin@gmail.com)
* @version $Rev$, $Date$
*/
public class IoWorkerRunnable implements Runnable {
/**
* An <em>internal use only</em> thread-local variable that determines if
* the caller is running on an I/O worker thread, which is the case where
* the caller enters a dead lock if the caller calls
* {@link ChannelFuture#await()} or {@link ChannelFuture#awaitUninterruptibly()}.
*/
public static final ThreadLocal<Boolean> IN_IO_THREAD = new ThreadLocalBoolean();
private final Runnable runnable;
public IoWorkerRunnable(Runnable runnable) {
if (runnable == null) {
throw new NullPointerException("runnable");
}
this.runnable = runnable;
}
public void run() {
IN_IO_THREAD.set(Boolean.TRUE);
try {
runnable.run();
} finally {
IN_IO_THREAD.remove();
}
}
}