From 7c95d475d7b351fd8c937aaf0b96b992f11e29d6 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 26 Apr 2012 21:52:13 +0200 Subject: [PATCH] Use ThreadRenamingRunnable in Boss and Workers again. See #289 --- .../channel/socket/nio/AbstractNioWorker.java | 12 +++++++++-- .../nio/NioClientSocketPipelineSink.java | 20 +++++++++++++------ .../nio/NioServerSocketPipelineSink.java | 12 +++++++++-- 3 files changed, 34 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java index db71385dc9..6f8d91ea6c 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/AbstractNioWorker.java @@ -25,6 +25,7 @@ import org.jboss.netty.channel.socket.Worker; import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.QueueFactory; @@ -42,10 +43,17 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; abstract class AbstractNioWorker implements Worker { + + + private static final AtomicInteger nextId = new AtomicInteger(); + + final int id = nextId.incrementAndGet(); + /** * Internal Netty logger. */ @@ -124,7 +132,7 @@ abstract class AbstractNioWorker implements Worker { this.executor = executor; this.allowShutdownOnIdle = allowShutdownOnIdle; } - + void register(AbstractNioChannel channel, ChannelFuture future) { Runnable registerTask = createRegisterTask(channel, future); @@ -157,7 +165,7 @@ abstract class AbstractNioWorker implements Worker { // Start the worker thread with the new Selector. boolean success = false; try { - DeadLockProofWorker.start(executor, this); + DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id)); success = true; } finally { if (!success) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java index ea45dc94fa..2a20569bf1 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioClientSocketPipelineSink.java @@ -42,17 +42,21 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.QueueFactory; import org.jboss.netty.util.internal.SocketUtil; class NioClientSocketPipelineSink extends AbstractNioChannelSink { + private static final AtomicInteger nextId = new AtomicInteger(); + static final InternalLogger logger = InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); final Executor bossExecutor; + final int id = nextId.incrementAndGet(); private final Boss[] bosses; private final AtomicInteger bossIndex = new AtomicInteger(); @@ -66,11 +70,12 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { bosses = new Boss[bossCount]; for (int i = 0; i < bosses.length; i ++) { - bosses[i] = new Boss(); + bosses[i] = new Boss(i); } this.workerPool = workerPool; } + public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { @@ -172,9 +177,11 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { private boolean started; private final AtomicBoolean wakenUp = new AtomicBoolean(); private final Object startStopLock = new Object(); - private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class);; + private final Queue registerTaskQueue = QueueFactory.createQueue(Runnable.class); + private final int subId;; - Boss() { + Boss(int subId) { + this.subId = subId; } void register(NioClientSocketChannel channel) { @@ -194,9 +201,10 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink { // Start the worker thread with the new Selector. boolean success = false; try { - DeadLockProofWorker.start( - bossExecutor, this); - + DeadLockProofWorker.start(bossExecutor, + new ThreadRenamingRunnable(this, + "New I/O client boss #" + id + '-' + subId)); + success = true; } finally { if (!success) { diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java index 246d3fed17..e55025e6c0 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioServerSocketPipelineSink.java @@ -27,6 +27,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelEvent; @@ -37,12 +38,18 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLoggerFactory; +import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.internal.DeadLockProofWorker; class NioServerSocketPipelineSink extends AbstractNioChannelSink { + private static final AtomicInteger nextId = new AtomicInteger(); + static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class); + + final int id = nextId.incrementAndGet(); + private final WorkerPool workerPool; NioServerSocketPipelineSink(WorkerPool workerPool) { @@ -136,8 +143,9 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink { Executor bossExecutor = ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; - DeadLockProofWorker.start( - bossExecutor, new Boss(channel)); + DeadLockProofWorker.start(bossExecutor, + new ThreadRenamingRunnable(new Boss(channel), + "New I/O server boss #" + id + " (" + channel + ')')); bossStarted = true; } catch (Throwable t) { future.setFailure(t);