Allow ThreadNameDeterminer everywhere.

Find a couple of places where threads are created but no
ThreadNameDeterminer can be passed in to control the name of the
threads created. Keep all existing c'tors for backwards compatibility.
This commit is contained in:
Henning Schmiedehausen 2013-03-18 23:12:59 -07:00 committed by Trustin Lee
parent bea6f57011
commit 7013315fd4
7 changed files with 88 additions and 12 deletions

View File

@ -24,6 +24,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel; import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.internal.ExecutorUtil; import org.jboss.netty.util.internal.ExecutorUtil;
/** /**
@ -95,11 +96,24 @@ public class OioClientSocketChannelFactory implements ClientSocketChannelFactory
* the {@link Executor} which will execute the I/O worker threads * the {@link Executor} which will execute the I/O worker threads
*/ */
public OioClientSocketChannelFactory(Executor workerExecutor) { public OioClientSocketChannelFactory(Executor workerExecutor) {
this(workerExecutor, null);
}
/**
* Creates a new instance.
*
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* @param determiner
* the {@link ThreadNameDeterminer} to set the thread names.
*/
public OioClientSocketChannelFactory(Executor workerExecutor,
ThreadNameDeterminer determiner) {
if (workerExecutor == null) { if (workerExecutor == null) {
throw new NullPointerException("workerExecutor"); throw new NullPointerException("workerExecutor");
} }
this.workerExecutor = workerExecutor; this.workerExecutor = workerExecutor;
sink = new OioClientSocketPipelineSink(workerExecutor); sink = new OioClientSocketPipelineSink(workerExecutor, determiner);
} }
public SocketChannel newChannel(ChannelPipeline pipeline) { public SocketChannel newChannel(ChannelPipeline pipeline) {

View File

@ -22,6 +22,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.DeadLockProofWorker;
@ -35,9 +36,11 @@ import static org.jboss.netty.channel.Channels.*;
class OioClientSocketPipelineSink extends AbstractOioChannelSink { class OioClientSocketPipelineSink extends AbstractOioChannelSink {
private final Executor workerExecutor; private final Executor workerExecutor;
private final ThreadNameDeterminer determiner;
OioClientSocketPipelineSink(Executor workerExecutor) { OioClientSocketPipelineSink(Executor workerExecutor, ThreadNameDeterminer determiner) {
this.workerExecutor = workerExecutor; this.workerExecutor = workerExecutor;
this.determiner = determiner;
} }
public void eventSunk( public void eventSunk(
@ -123,7 +126,8 @@ class OioClientSocketPipelineSink extends AbstractOioChannelSink {
workerExecutor, workerExecutor,
new ThreadRenamingRunnable( new ThreadRenamingRunnable(
new OioWorker(channel), new OioWorker(channel),
"Old I/O client worker (" + channel + ')')); "Old I/O client worker (" + channel + ')',
determiner));
workerStarted = true; workerStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
if (t instanceof ConnectException) { if (t instanceof ConnectException) {

View File

@ -24,6 +24,7 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.socket.DatagramChannel; import org.jboss.netty.channel.socket.DatagramChannel;
import org.jboss.netty.channel.socket.DatagramChannelFactory; import org.jboss.netty.channel.socket.DatagramChannelFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.internal.ExecutorUtil; import org.jboss.netty.util.internal.ExecutorUtil;
/** /**
@ -94,11 +95,24 @@ public class OioDatagramChannelFactory implements DatagramChannelFactory {
* the {@link Executor} which will execute the I/O worker threads * the {@link Executor} which will execute the I/O worker threads
*/ */
public OioDatagramChannelFactory(Executor workerExecutor) { public OioDatagramChannelFactory(Executor workerExecutor) {
this(workerExecutor, null);
}
/**
* Creates a new instance.
*
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* @param determiner
* the {@link ThreadNameDeterminer} to set the thread names.
*/
public OioDatagramChannelFactory(Executor workerExecutor,
ThreadNameDeterminer determiner) {
if (workerExecutor == null) { if (workerExecutor == null) {
throw new NullPointerException("workerExecutor"); throw new NullPointerException("workerExecutor");
} }
this.workerExecutor = workerExecutor; this.workerExecutor = workerExecutor;
sink = new OioDatagramPipelineSink(workerExecutor); sink = new OioDatagramPipelineSink(workerExecutor, determiner);
} }
public DatagramChannel newChannel(ChannelPipeline pipeline) { public DatagramChannel newChannel(ChannelPipeline pipeline) {

View File

@ -27,15 +27,18 @@ import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.DeadLockProofWorker;
class OioDatagramPipelineSink extends AbstractOioChannelSink { class OioDatagramPipelineSink extends AbstractOioChannelSink {
private final Executor workerExecutor; private final Executor workerExecutor;
private final ThreadNameDeterminer determiner;
OioDatagramPipelineSink(Executor workerExecutor) { OioDatagramPipelineSink(Executor workerExecutor, ThreadNameDeterminer determiner) {
this.workerExecutor = workerExecutor; this.workerExecutor = workerExecutor;
this.determiner = determiner;
} }
public void eventSunk( public void eventSunk(
@ -95,7 +98,8 @@ class OioDatagramPipelineSink extends AbstractOioChannelSink {
workerExecutor, workerExecutor,
new ThreadRenamingRunnable( new ThreadRenamingRunnable(
new OioDatagramWorker(channel), new OioDatagramWorker(channel),
"Old I/O datagram worker (" + channel + ')')); "Old I/O datagram worker (" + channel + ')',
determiner));
workerStarted = true; workerStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
@ -138,7 +142,7 @@ class OioDatagramPipelineSink extends AbstractOioChannelSink {
DeadLockProofWorker.start( DeadLockProofWorker.start(
workerExecutor, workerExecutor,
new ThreadRenamingRunnable( new ThreadRenamingRunnable(
new OioDatagramWorker(channel), threadName)); new OioDatagramWorker(channel), threadName, determiner));
} else { } else {
// Worker started by bind() - just rename. // Worker started by bind() - just rename.
Thread workerThread = channel.workerThread; Thread workerThread = channel.workerThread;

View File

@ -21,6 +21,7 @@ import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.socket.ServerSocketChannel; import org.jboss.netty.channel.socket.ServerSocketChannel;
import org.jboss.netty.channel.socket.ServerSocketChannelFactory; import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.internal.ExecutorUtil; import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -112,6 +113,21 @@ public class OioServerSocketChannelFactory implements ServerSocketChannelFactory
*/ */
public OioServerSocketChannelFactory( public OioServerSocketChannelFactory(
Executor bossExecutor, Executor workerExecutor) { Executor bossExecutor, Executor workerExecutor) {
this(bossExecutor, workerExecutor, null);
}
/**
* Creates a new instance.
*
* @param bossExecutor
* the {@link Executor} which will execute the boss threads
* @param workerExecutor
* the {@link Executor} which will execute the I/O worker threads
* @param determiner
* the {@link ThreadNameDeterminer} to set the thread names.
*/
public OioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor,
ThreadNameDeterminer determiner) {
if (bossExecutor == null) { if (bossExecutor == null) {
throw new NullPointerException("bossExecutor"); throw new NullPointerException("bossExecutor");
} }
@ -120,7 +136,7 @@ public class OioServerSocketChannelFactory implements ServerSocketChannelFactory
} }
this.bossExecutor = bossExecutor; this.bossExecutor = bossExecutor;
this.workerExecutor = workerExecutor; this.workerExecutor = workerExecutor;
sink = new OioServerSocketPipelineSink(workerExecutor); sink = new OioServerSocketPipelineSink(workerExecutor, determiner);
} }
public ServerSocketChannel newChannel(ChannelPipeline pipeline) { public ServerSocketChannel newChannel(ChannelPipeline pipeline) {

View File

@ -32,6 +32,7 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.DeadLockProofWorker;
@ -41,9 +42,11 @@ class OioServerSocketPipelineSink extends AbstractOioChannelSink {
InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class); InternalLoggerFactory.getInstance(OioServerSocketPipelineSink.class);
final Executor workerExecutor; final Executor workerExecutor;
private final ThreadNameDeterminer determiner;
OioServerSocketPipelineSink(Executor workerExecutor) { OioServerSocketPipelineSink(Executor workerExecutor, ThreadNameDeterminer determiner) {
this.workerExecutor = workerExecutor; this.workerExecutor = workerExecutor;
this.determiner = determiner;
} }
public void eventSunk( public void eventSunk(
@ -138,7 +141,8 @@ class OioServerSocketPipelineSink extends AbstractOioChannelSink {
bossExecutor, bossExecutor,
new ThreadRenamingRunnable( new ThreadRenamingRunnable(
new Boss(channel), new Boss(channel),
"Old I/O server boss (" + channel + ')')); "Old I/O server boss (" + channel + ')',
determiner));
bossStarted = true; bossStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
@ -206,7 +210,8 @@ class OioServerSocketPipelineSink extends AbstractOioChannelSink {
new ThreadRenamingRunnable( new ThreadRenamingRunnable(
new OioWorker(acceptedChannel), new OioWorker(acceptedChannel),
"Old I/O server worker (parentId: " + "Old I/O server worker (parentId: " +
channel.getId() + ", " + channel + ')')); channel.getId() + ", " + channel + ')',
determiner));
} catch (Exception e) { } catch (Exception e) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn( logger.warn(

View File

@ -171,6 +171,24 @@ public class HashedWheelTimer implements Timer {
public HashedWheelTimer( public HashedWheelTimer(
ThreadFactory threadFactory, ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) { long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, null, tickDuration, unit, ticksPerWheel);
}
/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param determiner thread name determiner to control thread name.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
ThreadNameDeterminer determiner,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
if (threadFactory == null) { if (threadFactory == null) {
throw new NullPointerException("threadFactory"); throw new NullPointerException("threadFactory");
@ -206,7 +224,8 @@ public class HashedWheelTimer implements Timer {
roundDuration = tickDuration * wheel.length; roundDuration = tickDuration * wheel.length;
workerThread = threadFactory.newThread(new ThreadRenamingRunnable( workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
worker, "Hashed wheel timer #" + id.incrementAndGet())); worker, "Hashed wheel timer #" + id.incrementAndGet(),
determiner));
// Misuse check // Misuse check
misuseDetector.increase(); misuseDetector.increase();