Use ThreadRenamingRunnable in Boss and Workers again. See #289

This commit is contained in:
Norman Maurer 2012-04-26 21:52:13 +02:00
parent 7c5ae2787d
commit 7c95d475d7
3 changed files with 34 additions and 10 deletions

View File

@ -25,6 +25,7 @@ import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.QueueFactory; import org.jboss.netty.util.internal.QueueFactory;
@ -42,10 +43,17 @@ import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
abstract class AbstractNioWorker implements Worker { abstract class AbstractNioWorker implements Worker {
private static final AtomicInteger nextId = new AtomicInteger();
final int id = nextId.incrementAndGet();
/** /**
* Internal Netty logger. * Internal Netty logger.
*/ */
@ -124,7 +132,7 @@ abstract class AbstractNioWorker implements Worker {
this.executor = executor; this.executor = executor;
this.allowShutdownOnIdle = allowShutdownOnIdle; this.allowShutdownOnIdle = allowShutdownOnIdle;
} }
void register(AbstractNioChannel<?> channel, ChannelFuture future) { void register(AbstractNioChannel<?> channel, ChannelFuture future) {
Runnable registerTask = createRegisterTask(channel, future); Runnable registerTask = createRegisterTask(channel, future);
@ -157,7 +165,7 @@ abstract class AbstractNioWorker implements Worker {
// Start the worker thread with the new Selector. // Start the worker thread with the new Selector.
boolean success = false; boolean success = false;
try { try {
DeadLockProofWorker.start(executor, this); DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id));
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {

View File

@ -42,17 +42,21 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.QueueFactory; import org.jboss.netty.util.internal.QueueFactory;
import org.jboss.netty.util.internal.SocketUtil; import org.jboss.netty.util.internal.SocketUtil;
class NioClientSocketPipelineSink extends AbstractNioChannelSink { class NioClientSocketPipelineSink extends AbstractNioChannelSink {
private static final AtomicInteger nextId = new AtomicInteger();
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
final Executor bossExecutor; final Executor bossExecutor;
final int id = nextId.incrementAndGet();
private final Boss[] bosses; private final Boss[] bosses;
private final AtomicInteger bossIndex = new AtomicInteger(); private final AtomicInteger bossIndex = new AtomicInteger();
@ -66,11 +70,12 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
bosses = new Boss[bossCount]; bosses = new Boss[bossCount];
for (int i = 0; i < bosses.length; i ++) { for (int i = 0; i < bosses.length; i ++) {
bosses[i] = new Boss(); bosses[i] = new Boss(i);
} }
this.workerPool = workerPool; this.workerPool = workerPool;
} }
public void eventSunk( public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception { ChannelPipeline pipeline, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) { if (e instanceof ChannelStateEvent) {
@ -172,9 +177,11 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
private boolean started; private boolean started;
private final AtomicBoolean wakenUp = new AtomicBoolean(); private final AtomicBoolean wakenUp = new AtomicBoolean();
private final Object startStopLock = new Object(); private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);; private final Queue<Runnable> registerTaskQueue = QueueFactory.createQueue(Runnable.class);
private final int subId;;
Boss() { Boss(int subId) {
this.subId = subId;
} }
void register(NioClientSocketChannel channel) { void register(NioClientSocketChannel channel) {
@ -194,9 +201,10 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
// Start the worker thread with the new Selector. // Start the worker thread with the new Selector.
boolean success = false; boolean success = false;
try { try {
DeadLockProofWorker.start( DeadLockProofWorker.start(bossExecutor,
bossExecutor, this); new ThreadRenamingRunnable(this,
"New I/O client boss #" + id + '-' + subId));
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {

View File

@ -27,6 +27,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
@ -37,12 +38,18 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.DeadLockProofWorker;
class NioServerSocketPipelineSink extends AbstractNioChannelSink { class NioServerSocketPipelineSink extends AbstractNioChannelSink {
private static final AtomicInteger nextId = new AtomicInteger();
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
final int id = nextId.incrementAndGet();
private final WorkerPool<NioWorker> workerPool; private final WorkerPool<NioWorker> workerPool;
NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) { NioServerSocketPipelineSink(WorkerPool<NioWorker> workerPool) {
@ -136,8 +143,9 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
Executor bossExecutor = Executor bossExecutor =
((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start( DeadLockProofWorker.start(bossExecutor,
bossExecutor, new Boss(channel)); new ThreadRenamingRunnable(new Boss(channel),
"New I/O server boss #" + id + " (" + channel + ')'));
bossStarted = true; bossStarted = true;
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);