* Resolved issue: NETTY-336 Fine-grained control over thread renaming

** Redefined ThreadNameDeterminer
* Improved caching in AbstractChannel.toString()
* Improved overall OIO thread naming (thread ID number is a large number which doesn't look good though.  will fix it later)
This commit is contained in:
Trustin Lee 2010-06-30 09:17:08 +00:00
parent cd6e7f0c3e
commit 9d89ad0837
15 changed files with 197 additions and 124 deletions

View File

@ -68,6 +68,7 @@ public abstract class AbstractChannel implements Channel {
private volatile int interestOps = OP_READ;
/** Cache for the string representation of this channel */
private boolean strValConnected;
private String strVal;
/**
@ -271,7 +272,7 @@ public abstract class AbstractChannel implements Channel {
@Override
public String toString() {
boolean connected = isConnected();
if (connected && strVal != null) {
if (strValConnected == connected && strVal != null) {
return strVal;
}
@ -285,11 +286,11 @@ public abstract class AbstractChannel implements Channel {
buf.append(", ");
if (getParent() == null) {
buf.append(localAddress);
buf.append(" => ");
buf.append(connected? " => " : " :> ");
buf.append(remoteAddress);
} else {
buf.append(remoteAddress);
buf.append(" => ");
buf.append(connected? " => " : " :> ");
buf.append(localAddress);
}
} else if (localAddress != null) {
@ -300,11 +301,8 @@ public abstract class AbstractChannel implements Channel {
buf.append(']');
String strVal = buf.toString();
if (connected) {
this.strVal = strVal;
} else {
this.strVal = null;
}
this.strVal = strVal;
strValConnected = connected;
return strVal;
}

View File

@ -195,9 +195,8 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
boolean success = false;
try {
bossExecutor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(
this, "New I/O client boss #" + id)));
new IoWorkerRunnable(new ThreadRenamingRunnable(
this, "New I/O", "client boss", String.valueOf(id), null)));
success = true;
} finally {
if (!success) {

View File

@ -46,9 +46,7 @@ class NioDatagramPipelineSink extends AbstractChannelSink {
private static final AtomicInteger nextId = new AtomicInteger();
private final int id = nextId.incrementAndGet();
private final NioDatagramWorker[] workers;
private final AtomicInteger workerIndex = new AtomicInteger();
/**

View File

@ -168,8 +168,8 @@ class NioDatagramWorker implements Runnable {
boolean success = false;
try {
// Start the main selector loop. See run() for details.
executor.execute(new ThreadRenamingRunnable(this,
"New I/O datagram worker #" + bossId + "'-'" + id));
executor.execute(new ThreadRenamingRunnable(
this, "New I/O", "datagram worker", bossId + "-" + id, null));
success = true;
} finally {
if (!success) {

View File

@ -154,12 +154,10 @@ class NioServerSocketPipelineSink extends AbstractChannelSink {
Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
bossExecutor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new Boss(channel),
"New I/O server boss #" + id +
" (channelId: " + channel.getId() +
", " + channel.getLocalAddress() + ')')));
new IoWorkerRunnable(new ThreadRenamingRunnable(
new Boss(channel),
"New I/O", "server boss", String.valueOf(id),
channel.toString())));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);

View File

@ -106,15 +106,13 @@ class NioWorker implements Runnable {
}
// Start the worker thread with the new Selector.
String threadName =
(server ? "New I/O server worker #"
: "New I/O client worker #") + bossId + '-' + id;
boolean success = false;
try {
executor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(this, threadName)));
new IoWorkerRunnable(new ThreadRenamingRunnable(
this, "New I/O",
server? "server worker" : "client worker",
bossId + "-" + id, null)));
success = true;
} finally {
if (!success) {

View File

@ -20,6 +20,7 @@ import static org.jboss.netty.channel.Channels.*;
import java.io.PushbackInputStream;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.ChannelEvent;
@ -42,6 +43,9 @@ import org.jboss.netty.util.internal.IoWorkerRunnable;
*/
class OioClientSocketPipelineSink extends AbstractChannelSink {
private static final AtomicInteger nextId = new AtomicInteger();
private final int id = nextId.incrementAndGet();
private final Executor workerExecutor;
OioClientSocketPipelineSink(Executor workerExecutor) {
@ -131,10 +135,9 @@ class OioClientSocketPipelineSink extends AbstractChannelSink {
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new OioWorker(channel),
"Old I/O client worker (channelId: " +
channel.getId() + ", " +
channel.getLocalAddress() + " => " +
channel.getRemoteAddress() + ')')));
"Old I/O", "client worker",
id + "-" + channel.getId(),
channel.toString())));
workerStarted = true;
} catch (Throwable t) {
future.setFailure(t);

View File

@ -17,6 +17,7 @@ package org.jboss.netty.channel.socket.oio;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
@ -77,8 +78,11 @@ import org.jboss.netty.util.internal.ExecutorUtil;
*/
public class OioDatagramChannelFactory implements DatagramChannelFactory {
private static final AtomicInteger nextId = new AtomicInteger();
private final Executor workerExecutor;
final OioDatagramPipelineSink sink;
final int id = nextId.incrementAndGet();
/**
* Creates a new instance.
@ -91,7 +95,7 @@ public class OioDatagramChannelFactory implements DatagramChannelFactory {
throw new NullPointerException("workerExecutor");
}
this.workerExecutor = workerExecutor;
sink = new OioDatagramPipelineSink(workerExecutor);
sink = new OioDatagramPipelineSink(id, workerExecutor);
}
public DatagramChannel newChannel(ChannelPipeline pipeline) {

View File

@ -41,9 +41,11 @@ import org.jboss.netty.util.internal.IoWorkerRunnable;
*/
class OioDatagramPipelineSink extends AbstractChannelSink {
private final int id;
private final Executor workerExecutor;
OioDatagramPipelineSink(Executor workerExecutor) {
OioDatagramPipelineSink(int id, Executor workerExecutor) {
this.id = id;
this.workerExecutor = workerExecutor;
}
@ -104,9 +106,10 @@ class OioDatagramPipelineSink extends AbstractChannelSink {
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new OioDatagramWorker(channel),
"Old I/O datagram worker (channelId: " +
channel.getId() + ", " +
channel.getLocalAddress() + ')')));
"Old I/O",
"datagram worker",
id + "-" + channel.getId(),
channel.toString())));
workerStarted = true;
} catch (Throwable t) {
future.setFailure(t);
@ -143,25 +146,21 @@ class OioDatagramPipelineSink extends AbstractChannelSink {
}
fireChannelConnected(channel, channel.getRemoteAddress());
String threadName =
"Old I/O datagram worker (channelId: " + channel.getId() + ", " +
channel.getLocalAddress() + " => " +
channel.getRemoteAddress() + ')';
final String service = "Old I/O";
final String category = "datagram worker";
final String comment = channel.toString();
if (!bound) {
// Start the business.
workerExecutor.execute(
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new OioDatagramWorker(channel), threadName)));
workerExecutor.execute(new IoWorkerRunnable(new ThreadRenamingRunnable(
new OioDatagramWorker(channel),
service, category, id + "-" + channel.getId(), comment)));
} else {
// Worker started by bind() - just rename.
Thread workerThread = channel.workerThread;
if (workerThread != null) {
try {
workerThread.setName(threadName);
} catch (SecurityException e) {
// Ignore.
}
ThreadRenamingRunnable.renameThread(
workerThread, service, category, id + "-" + channel.getId(), comment);
}
}

View File

@ -27,6 +27,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.util.ThreadRenamingRunnable;
/**
*
@ -171,20 +172,18 @@ class OioDatagramWorker implements Runnable {
channel.socket.disconnect();
future.setSuccess();
if (connected) {
// Update the worker's thread name to reflect the state change.
Thread workerThread = channel.workerThread;
if (workerThread != null) {
ThreadRenamingRunnable.renameThread(
workerThread, "Old I/O", "datagram worker",
((OioDatagramChannelFactory) channel.getFactory()).id + "-" + channel.getId(),
channel.toString());
}
// Notify.
fireChannelDisconnected(channel);
}
Thread workerThread = channel.workerThread;
if (workerThread != null) {
try {
workerThread.setName(
"Old I/O datagram worker (channelId: " +
channel.getId() + ", " + channel.getLocalAddress() + ')');
} catch (SecurityException e) {
// Ignore.
}
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);

View File

@ -22,6 +22,7 @@ import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.AbstractChannelSink;
import org.jboss.netty.channel.Channel;
@ -49,6 +50,9 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class);
private static final AtomicInteger nextId = new AtomicInteger();
final int id = nextId.incrementAndGet();
final Executor workerExecutor;
OioServerSocketPipelineSink(Executor workerExecutor) {
@ -147,8 +151,8 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new Boss(channel),
"Old I/O server boss (channelId: " +
channel.getId() + ", " + localAddress + ')')));
"Old I/O", "server boss", String.valueOf(id),
channel.toString())));
bossStarted = true;
} catch (Throwable t) {
future.setFailure(t);
@ -214,11 +218,8 @@ class OioServerSocketPipelineSink extends AbstractChannelSink {
new IoWorkerRunnable(
new ThreadRenamingRunnable(
new OioWorker(acceptedChannel),
"Old I/O server worker (parentId: " +
channel.getId() + ", channelId: " +
acceptedChannel.getId() + ", " +
channel.getRemoteAddress() + " => " +
channel.getLocalAddress() + ')')));
"Old I/O", "server worker", id + "-" + acceptedChannel.getId(),
acceptedChannel.toString())));
} catch (Exception e) {
logger.warn(
"Failed to initialize an accepted socket.", e);

View File

@ -210,7 +210,9 @@ public class HashedWheelTimer implements Timer {
roundDuration = tickDuration * wheel.length;
workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
worker, "Hashed wheel timer #" + id.incrementAndGet()));
worker,
"Hashed wheel timer", null,
String.valueOf(id.incrementAndGet()), null));
// Misuse check
misuseDetector.increase();

View File

@ -25,23 +25,41 @@ package org.jboss.netty.util;
public interface ThreadNameDeterminer {
/**
* {@link ThreadNameDeterminer} that accepts the proposed thread name
* as is.
* The default {@link ThreadNameDeterminer} that generates a thread name
* which contains all specified information.
*/
ThreadNameDeterminer PROPOSED = new ThreadNameDeterminer() {
public String determineThreadName(String currentThreadName,
String proposedThreadName) throws Exception {
return proposedThreadName;
public String determineThreadName(String current, String service,
String category, String id, String comment) throws Exception {
String newName =
(format("", service, " ") +
format("", category, " ") +
format("#", id, " ") +
format("(", comment, ")")).trim();
if (newName.length() == 0) {
return null;
} else {
return newName;
}
}
private String format(String prefix, String s, String postfix) {
if (s.length() == 0) {
return "";
} else {
return prefix + s + postfix;
}
}
};
/**
* {@link ThreadNameDeterminer} that rejects the proposed thread name and
* retains the current one.
* An alternative {@link ThreadNameDeterminer} that rejects the proposed
* thread name and retains the current one.
*/
ThreadNameDeterminer CURRENT = new ThreadNameDeterminer() {
public String determineThreadName(String currentThreadName,
String proposedThreadName) throws Exception {
public String determineThreadName(String current, String service,
String category, String id, String comment) throws Exception {
return null;
}
};
@ -49,11 +67,17 @@ public interface ThreadNameDeterminer {
/**
* Overrides the thread name proposed by {@link ThreadRenamingRunnable}.
*
* @param currentThreadName the current thread name
* @param proposedThreadName the proposed new thread name
* @param current the current thread name
* @param service the service name (e.g. <tt>"New I/O"</tt>)
* @param category the category name (e.g. <tt>"server boss"</tt>)
* @param id the thread ID (e.g. <tt>"3"</tt> or <tt>"1-3"</tt>)
* @param comment the optional comment which might help debugging
*
* @return the actual new thread name.
* If {@code null} is returned, the proposed thread name is
* discarded (i.e. no rename).
*/
String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception;
String determineThreadName(
String current,
String service, String category, String id, String comment) throws Exception;
}

View File

@ -66,35 +66,41 @@ public class ThreadRenamingRunnable implements Runnable {
ThreadRenamingRunnable.threadNameDeterminer = threadNameDeterminer;
}
private final Runnable runnable;
private final String proposedThreadName;
/**
* Creates a new instance which wraps the specified {@code runnable}
* and changes the thread name to the specified thread name when the
* specified {@code runnable} is running.
* Renames the specified thread.
*
* @return {@code true} if and only if the thread was renamed
*/
public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName) {
if (runnable == null) {
throw new NullPointerException("runnable");
public static boolean renameThread(Thread thread, String service, String category, String id, String comment) {
if (thread == null) {
throw new NullPointerException("thread");
}
if (proposedThreadName == null) {
throw new NullPointerException("proposedThreadName");
}
this.runnable = runnable;
this.proposedThreadName = proposedThreadName;
}
public void run() {
final Thread currentThread = Thread.currentThread();
final String oldThreadName = currentThread.getName();
final String newThreadName = getNewThreadName(oldThreadName);
// Normalize the parameters.
service = service != null? service : "";
category = category != null? category : "";
id = id != null? id : "";
comment = comment != null? comment : "";
// Get the old & new thread names.
String oldThreadName = thread.getName();
String newThreadName = null;
try {
newThreadName = getThreadNameDeterminer().determineThreadName(
oldThreadName, service, category, id, comment);
} catch (Throwable t) {
logger.warn("Failed to determine the thread name", t);
}
if (newThreadName == null || newThreadName.length() == 0) {
newThreadName = oldThreadName;
}
// Change the thread name before starting the actual runnable.
boolean renamed = false;
if (!oldThreadName.equals(newThreadName)) {
try {
currentThread.setName(newThreadName);
System.out.println(newThreadName);
thread.setName(newThreadName);
renamed = true;
} catch (SecurityException e) {
logger.debug(
@ -103,6 +109,42 @@ public class ThreadRenamingRunnable implements Runnable {
}
}
return renamed;
}
private final Runnable runnable;
private final String service;
private final String category;
private final String id;
private final String comment;
/**
* Creates a new instance which wraps the specified {@code runnable}
* and changes the thread name to the specified thread name when the
* specified {@code runnable} is running.
*/
public ThreadRenamingRunnable(
Runnable runnable,
String service, String category, String id, String comment) {
if (runnable == null) {
throw new NullPointerException("runnable");
}
this.runnable = runnable;
this.service = service;
this.category = category;
this.id = id;
this.comment = comment;
}
public void run() {
final Thread currentThread = Thread.currentThread();
final String oldThreadName = currentThread.getName();
// Change the thread name before starting the actual runnable.
final boolean renamed = renameThread(
Thread.currentThread(), service, category, id, comment);
// Run the actual runnable and revert the name back when it ends.
try {
runnable.run();
@ -114,18 +156,4 @@ public class ThreadRenamingRunnable implements Runnable {
}
}
}
private String getNewThreadName(String currentThreadName) {
String newThreadName = null;
try {
newThreadName =
getThreadNameDeterminer().determineThreadName(
currentThreadName, proposedThreadName);
} catch (Throwable t) {
logger.warn("Failed to determine the thread name", t);
}
return newThreadName == null? currentThreadName : newThreadName;
}
}

View File

@ -15,7 +15,6 @@
*/
package org.jboss.netty.util;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
import java.security.Permission;
@ -34,13 +33,36 @@ import org.junit.Test;
public class ThreadRenamingRunnableTest {
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullName() throws Exception {
new ThreadRenamingRunnable(createMock(Runnable.class), null);
public void shouldNotAllowNullRunnable() throws Exception {
new ThreadRenamingRunnable(null, "a", "b", "c", "d");
}
@Test(expected = NullPointerException.class)
public void shouldNotAllowNullRunnable() throws Exception {
new ThreadRenamingRunnable(null, "foo");
@Test
public void testWithNulls() throws Exception {
final String oldThreadName = Thread.currentThread().getName();
Executor e = new ImmediateExecutor();
e.execute(new ThreadRenamingRunnable(
new Runnable() {
public void run() {
assertEquals(oldThreadName, Thread.currentThread().getName());
}
}, null, null, null, null));
assertEquals(oldThreadName, Thread.currentThread().getName());
}
@Test
public void testWithEmptyNames() throws Exception {
final String oldThreadName = Thread.currentThread().getName();
Executor e = new ImmediateExecutor();
e.execute(new ThreadRenamingRunnable(
new Runnable() {
public void run() {
assertEquals(oldThreadName, Thread.currentThread().getName());
}
}, "", "", "", ""));
assertEquals(oldThreadName, Thread.currentThread().getName());
}
@Test
@ -50,10 +72,10 @@ public class ThreadRenamingRunnableTest {
e.execute(new ThreadRenamingRunnable(
new Runnable() {
public void run() {
assertEquals("foo", Thread.currentThread().getName());
assertEquals("a b #c (d)", Thread.currentThread().getName());
assertFalse(oldThreadName.equals(Thread.currentThread().getName()));
}
}, "foo"));
}, "a", "b", "c", "d"));
assertEquals(oldThreadName, Thread.currentThread().getName());
}
@ -85,7 +107,7 @@ public class ThreadRenamingRunnableTest {
public void run() {
assertEquals(oldThreadName, Thread.currentThread().getName());
}
}, "foo"));
}, "a", "b", "c", "d"));
} finally {
System.setSecurityManager(null);
assertEquals(oldThreadName, Thread.currentThread().getName());