[#776] Introduce the possiblilty to specify a BossPool when construct ChannelFactory instances.

* Beside this allow to set a ThreadNameDeterminer per BossPool and WorkerPool. This allows to have different determiner instances in the same JVM.
This fixes #771.
* With the possiblity to share BossPools it is even easier to limit the Thread counts even if you need to create many ChannelFactory instances. So this gives a lot of flexibility.
This commit is contained in:
Norman Maurer 2012-11-28 09:35:34 +01:00
parent 1e515b092c
commit 15c67f87ff
18 changed files with 1279 additions and 852 deletions

View File

@ -0,0 +1,92 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class AbstractNioBossPool<E extends Boss>
implements BossPool<E>, ExternalResourceReleasable {
private final Boss[] bosses;
private final AtomicInteger bossIndex = new AtomicInteger();
private final Executor bossExecutor;
private volatile boolean initDone;
/**
* Create a new instance
*
* @param bossExecutor the {@link Executor} to use for the {@link Boss}'s
* @param bossCount the count of {@link Boss}'s to create
*/
AbstractNioBossPool(Executor bossExecutor, int bossCount) {
this(bossExecutor, bossCount, true);
}
AbstractNioBossPool(Executor bossExecutor, int bossCount, boolean autoInit) {
if (bossExecutor == null) {
throw new NullPointerException("bossExecutor");
}
if (bossCount <= 0) {
throw new IllegalArgumentException(
"bossCount (" + bossCount + ") " +
"must be a positive integer.");
}
bosses = new Boss[bossCount];
this.bossExecutor = bossExecutor;
if (autoInit) {
init();
}
}
protected void init() {
if (initDone) {
throw new IllegalStateException("Init was done before");
}
initDone = true;
for (int i = 0; i < bosses.length; i++) {
bosses[i] = newBoss(bossExecutor);
}
}
/**
* Create a new {@link Boss} which uses the given {@link Executor} to service IO
*
*
* @param executor the {@link Executor} to use
* @return worker the new {@link Boss}
*/
protected abstract E newBoss(Executor executor);
@SuppressWarnings("unchecked")
public E nextBoss() {
return (E) bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
}
public void releaseExternalResources() {
ExecutorUtil.terminate(bossExecutor);
for (Boss boss: bosses) {
if (boss instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) boss).releaseExternalResources();
}
}
}
}

View File

@ -24,6 +24,7 @@ import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ExternalResourceReleasable; import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable; import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker; import org.jboss.netty.util.internal.DeadLockProofWorker;
@ -123,8 +124,12 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool(); protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
AbstractNioWorker(Executor executor) { AbstractNioWorker(Executor executor) {
this(executor, null);
}
AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) {
this.executor = executor; this.executor = executor;
openSelector(); openSelector(determiner);
} }
void register(AbstractNioChannel<?> channel, ChannelFuture future) { void register(AbstractNioChannel<?> channel, ChannelFuture future) {
@ -189,7 +194,7 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
* Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for
* the {@link AbstractNioChannel}'s when they get registered * the {@link AbstractNioChannel}'s when they get registered
*/ */
private void openSelector() { private void openSelector(ThreadNameDeterminer determiner) {
try { try {
selector = Selector.open(); selector = Selector.open();
} catch (Throwable t) { } catch (Throwable t) {
@ -199,7 +204,7 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
// Start the worker thread with the new Selector. // Start the worker thread with the new Selector.
boolean success = false; boolean success = false;
try { try {
DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id)); DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner));
success = true; success = true;
} finally { } finally {
if (!success) { if (!success) {

View File

@ -33,7 +33,7 @@ public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
private final AbstractNioWorker[] workers; private final AbstractNioWorker[] workers;
private final AtomicInteger workerIndex = new AtomicInteger(); private final AtomicInteger workerIndex = new AtomicInteger();
private final Executor workerExecutor; private final Executor workerExecutor;
private volatile boolean initDone;
/** /**
* Create a new instance * Create a new instance
@ -42,30 +42,61 @@ public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
* @param workerCount the count of {@link Worker}'s to create * @param workerCount the count of {@link Worker}'s to create
*/ */
AbstractNioWorkerPool(Executor workerExecutor, int workerCount) { AbstractNioWorkerPool(Executor workerExecutor, int workerCount) {
this(workerExecutor, workerCount, true);
}
AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean autoInit) {
if (workerExecutor == null) { if (workerExecutor == null) {
throw new NullPointerException("workerExecutor"); throw new NullPointerException("workerExecutor");
} }
if (workerCount <= 0) { if (workerCount <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"workerCount (" + workerCount + ") " + "workerCount (" + workerCount + ") " + "must be a positive integer.");
"must be a positive integer.");
} }
workers = new AbstractNioWorker[workerCount]; workers = new AbstractNioWorker[workerCount];
this.workerExecutor = workerExecutor;
if (autoInit) {
init();
}
}
protected void init() {
if (initDone) {
throw new IllegalStateException("Init was done before");
}
initDone = true;
for (int i = 0; i < workers.length; i++) { for (int i = 0; i < workers.length; i++) {
workers[i] = createWorker(workerExecutor); workers[i] = newWorker(workerExecutor);
} }
this.workerExecutor = workerExecutor;
} }
/** /**
* Create a new {@link Worker} which uses the given {@link Executor} to service IO * Only here for backward compability and will be removed in later releases. Please use
* {@link #newWorker(java.util.concurrent.Executor)}
*
*
* @param executor the {@link Executor} to use
* @return worker the new {@link Worker}
* @deprecated use {@link #newWorker(java.util.concurrent.Executor)}
*/
@Deprecated
protected E createWorker(Executor executor) {
throw new IllegalStateException("This will be removed. Override this and the newWorker(..) method!");
}
/**
* Create a new {@link Worker} which uses the given {@link Executor} to service IO.
*
* This method will be made abstract in further releases (once {@link #createWorker(java.util.concurrent.Executor)}
* was removed).
* *
* *
* @param executor the {@link Executor} to use * @param executor the {@link Executor} to use
* @return worker the new {@link Worker} * @return worker the new {@link Worker}
*/ */
protected abstract E createWorker(Executor executor); protected E newWorker(Executor executor) {
return createWorker(executor);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public E nextWorker() { public E nextWorker() {

View File

@ -0,0 +1,22 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
/**
* Serves the boss tasks like connecting/accepting
*/
public interface Boss extends Runnable {
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
/**
* A Pool that holds {@link Boss} instances
*/
public interface BossPool<E extends Boss> {
/**
* Return the next {@link Boss} to use
*
*/
E nextBoss();
}

View File

@ -0,0 +1,454 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.jboss.netty.channel.Channels.*;
/**
* {@link Boss} implementation that handles the connection attempts of clients
*/
public final class NioClientBoss implements Boss {
private static final AtomicInteger nextId = new AtomicInteger();
private final int id = nextId.incrementAndGet();
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientBoss.class);
private volatile Selector selector;
private boolean started;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
private final TimerTask wakeupTask = new TimerTask() {
public void run(Timeout timeout) throws Exception {
// This is needed to prevent a possible race that can lead to a NPE
// when the selector is closed before this is run
//
// See https://github.com/netty/netty/issues/685
Selector selector = NioClientBoss.this.selector;
if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
};
private final Executor bossExecutor;
private final ThreadNameDeterminer determiner;
private final Timer timer;
NioClientBoss(Executor bossExecutor, Timer timer, ThreadNameDeterminer determiner) {
this.bossExecutor = bossExecutor;
this.determiner = determiner;
this.timer = timer;
}
void register(NioClientSocketChannel channel) {
Runnable registerTask = new RegisterTask(this, channel);
Selector selector;
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
this.selector = selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(bossExecutor,
new ThreadRenamingRunnable(this,
"New I/O client boss #" + id , determiner));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a selector.", t);
}
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
int timeout = channel.getConfig().getConnectTimeoutMillis();
if (timeout > 0) {
if (!channel.isConnected()) {
channel.timoutTimer = timer.newTimeout(wakeupTask,
timeout, TimeUnit.MILLISECONDS);
}
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
public void run() {
boolean shutdown = false;
int selectReturnsImmediately = 0;
Selector selector = this.selector;
// use 80% of the timeout for measure
final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
boolean wakenupFromLoop = false;
for (;;) {
wakenUp.set(false);
try {
long beforeSelect = System.nanoTime();
int selected = SelectorUtil.select(selector);
if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
long timeBlocked = System.nanoTime() - beforeSelect;
if (timeBlocked < minSelectTimeout) {
boolean notConnected = false;
// loop over all keys as the selector may was unblocked because of a closed channel
for (SelectionKey key: selector.keys()) {
SelectableChannel ch = key.channel();
try {
if (ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
notConnected = true;
// cancel the key just to be on the safe side
key.cancel();
}
} catch (CancelledKeyException e) {
// ignore
}
}
if (notConnected) {
selectReturnsImmediately = 0;
} else {
// returned before the minSelectTimeout elapsed with nothing select.
// this may be the cause of the jdk epoll(..) bug, so increment the counter
// which we use later to see if its really the jdk bug.
selectReturnsImmediately ++;
}
} else {
selectReturnsImmediately = 0;
}
if (selectReturnsImmediately == 1024) {
// The selector returned immediately for 10 times in a row,
// so recreate one selector as it seems like we hit the
// famous epoll(..) jdk bug.
selector = recreateSelector();
selectReturnsImmediately = 0;
wakenupFromLoop = false;
// try to select again
continue;
}
} else {
// reset counter
selectReturnsImmediately = 0;
}
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
wakenupFromLoop = true;
selector.wakeup();
} else {
wakenupFromLoop = false;
}
processRegisterTaskQueue();
processSelectedKeys(selector.selectedKeys());
// Handle connection timeout every 10 milliseconds approximately.
long currentTimeNanos = System.nanoTime();
processConnectTimeout(selector.keys(), currentTimeNanos);
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connection attempts are made in a one-by-one manner
// instead of concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown ||
bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
started = false;
try {
selector.close();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a selector.", e);
}
} finally {
this.selector = null;
}
break;
} else {
shutdown = false;
}
}
} else {
// Give one more second.
shutdown = true;
}
} else {
shutdown = false;
}
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"Unexpected exception in the selector loop.", t);
}
// Prevent possible consecutive immediate failures.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
private void processRegisterTaskQueue() {
for (;;) {
final Runnable task = registerTaskQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
if (!k.isValid()) {
close(k);
continue;
}
try {
if (k.isConnectable()) {
connect(k);
}
} catch (Throwable t) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
ch.connectFuture.setFailure(t);
fireExceptionCaught(ch, t);
k.cancel(); // Some JDK implementations run into an infinite loop without this.
ch.worker.close(ch, succeededFuture(ch));
}
}
}
private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
ConnectException cause = null;
for (SelectionKey k: keys) {
if (!k.isValid()) {
// Comment the close call again as it gave us major problems
// with ClosedChannelExceptions.
//
// See:
// * https://github.com/netty/netty/issues/142
// * https://github.com/netty/netty/issues/138
//
// close(k);
continue;
}
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
if (ch.connectDeadlineNanos > 0 &&
currentTimeNanos >= ch.connectDeadlineNanos) {
if (cause == null) {
cause = new ConnectException("connection timed out");
}
ch.connectFuture.setFailure(cause);
fireExceptionCaught(ch, cause);
ch.worker.close(ch, succeededFuture(ch));
}
}
}
private void connect(SelectionKey k) throws IOException {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
if (ch.channel.finishConnect()) {
k.cancel();
if (ch.timoutTimer != null) {
ch.timoutTimer.cancel();
}
ch.worker.register(ch, ch.connectFuture);
}
}
private void close(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
ch.worker.close(ch, succeededFuture(ch));
}
// Create a new selector and "transfer" all channels from the old
// selector to the new one
private Selector recreateSelector() throws IOException {
Selector newSelector = Selector.open();
Selector selector = this.selector;
this.selector = newSelector;
// loop over all the keys that are registered with the old Selector
// and register them with the new one
for (SelectionKey key: selector.keys()) {
SelectableChannel ch = key.channel();
int ops = key.interestOps();
Object att = key.attachment();
// cancel the old key
key.cancel();
try {
// register the channel with the new selector now
ch.register(newSelector, ops, att);
} catch (ClosedChannelException e) {
// close the Channel if we can't register it
close(key);
}
}
try {
// time to close the old selector as everything else is registered to the new one
selector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a selector.", t);
}
}
if (logger.isWarnEnabled()) {
logger.warn("Recreated Selector because of possible jdk epoll(..) bug");
}
return newSelector;
}
private static final class RegisterTask implements Runnable {
private final NioClientBoss boss;
private final NioClientSocketChannel channel;
RegisterTask(NioClientBoss boss, NioClientSocketChannel channel) {
this.boss = boss;
this.channel = channel;
}
public void run() {
try {
channel.channel.register(
boss.selector, SelectionKey.OP_CONNECT, channel);
} catch (ClosedChannelException e) {
channel.worker.close(channel, succeededFuture(channel));
}
int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
if (connectTimeout > 0) {
channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
}
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.Timer;
import java.util.concurrent.Executor;
/**
* Holds {@link NioClientBoss} instances to use
*/
public class NioClientBossPool extends AbstractNioBossPool<NioClientBoss> {
private final ThreadNameDeterminer determiner;
private final Timer timer;
/**
* Create a new instance
*
* @param bossExecutor the Executor to use for server the {@link NioClientBoss}
* @param bossCount the number of {@link NioClientBoss} instances this {@link NioClientBossPool} will hold
* @param timer the Timer to use for handle connect timeouts
* @param determiner the {@link ThreadNameDeterminer} to use for name the threads. Use <code>null</code>
* if you not want to set one explicit.
*/
public NioClientBossPool(Executor bossExecutor, int bossCount, Timer timer, ThreadNameDeterminer determiner) {
super(bossExecutor, bossCount, false);
this.determiner = determiner;
this.timer = timer;
init();
}
/**
* Create a new instance using a new {@link HashedWheelTimer} and no {@link ThreadNameDeterminer}
*
* @param bossExecutor the Executor to use for server the {@link NioClientBoss}
* @param bossCount the number of {@link NioClientBoss} instances this {@link NioClientBoss} will hold
*/
public NioClientBossPool(Executor bossExecutor, int bossCount) {
this(bossExecutor, bossCount, new HashedWheelTimer(), null);
}
@Override
protected NioClientBoss newBoss(Executor executor) {
return new NioClientBoss(executor, timer, determiner);
}
@Override
public void releaseExternalResources() {
super.releaseExternalResources();
timer.stop();
}
}

View File

@ -28,7 +28,6 @@ import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.util.ExternalResourceReleasable; import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.HashedWheelTimer; import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timer; import org.jboss.netty.util.Timer;
import org.jboss.netty.util.internal.ExecutorUtil;
/** /**
* A {@link ClientSocketChannelFactory} which creates a client-side NIO-based * A {@link ClientSocketChannelFactory} which creates a client-side NIO-based
@ -85,10 +84,9 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
private static final int DEFAULT_BOSS_COUNT = 1; private static final int DEFAULT_BOSS_COUNT = 1;
private final Executor bossExecutor; private final BossPool<NioClientBoss> bossPool;
private final WorkerPool<NioWorker> workerPool; private final WorkerPool<NioWorker> workerPool;
private final NioClientSocketPipelineSink sink; private final NioClientSocketPipelineSink sink;
private final Timer timer;
/** /**
* Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()} * Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()}
@ -183,34 +181,41 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
public NioClientSocketChannelFactory( public NioClientSocketChannelFactory(
Executor bossExecutor, int bossCount, Executor bossExecutor, int bossCount,
WorkerPool<NioWorker> workerPool, Timer timer) { WorkerPool<NioWorker> workerPool, Timer timer) {
this(new NioClientBossPool(bossExecutor, bossCount, timer, null), workerPool);
}
if (bossExecutor == null) { /**
throw new NullPointerException("bossExecutor"); * Creates a new instance.
*
* @param bossPool
* the {@link BossPool} to use to handle the connects
* @param workerPool
* the {@link WorkerPool} to use to do the IO
*/
public NioClientSocketChannelFactory(
BossPool<NioClientBoss> bossPool,
WorkerPool<NioWorker> workerPool) {
if (bossPool == null) {
throw new NullPointerException("bossPool");
} }
if (workerPool == null) { if (workerPool == null) {
throw new NullPointerException("workerPool"); throw new NullPointerException("workerPool");
} }
if (bossCount <= 0) { this.bossPool = bossPool;
throw new IllegalArgumentException(
"bossCount (" + bossCount + ") " +
"must be a positive integer.");
}
this.bossExecutor = bossExecutor;
this.workerPool = workerPool; this.workerPool = workerPool;
this.timer = timer; sink = new NioClientSocketPipelineSink(bossPool);
sink = new NioClientSocketPipelineSink(
bossExecutor, bossCount, workerPool, timer);
} }
public SocketChannel newChannel(ChannelPipeline pipeline) { public SocketChannel newChannel(ChannelPipeline pipeline) {
return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker()); return new NioClientSocketChannel(this, pipeline, sink, workerPool.nextWorker());
} }
public void releaseExternalResources() { public void releaseExternalResources() {
ExecutorUtil.terminate(bossExecutor); if (bossPool instanceof ExternalResourceReleasable) {
timer.stop(); ((ExternalResourceReleasable) bossPool).releaseExternalResources();
}
if (workerPool instanceof ExternalResourceReleasable) { if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources(); ((ExternalResourceReleasable) workerPool).releaseExternalResources();
} }

View File

@ -16,7 +16,6 @@
package org.jboss.netty.channel.socket.nio; package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
@ -25,62 +24,22 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger; import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory; import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.jboss.netty.channel.Channels.*; import static org.jboss.netty.channel.Channels.*;
class NioClientSocketPipelineSink extends AbstractNioChannelSink { class NioClientSocketPipelineSink extends AbstractNioChannelSink {
private static final AtomicInteger nextId = new AtomicInteger();
static final InternalLogger logger = static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class); InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
final Executor bossExecutor;
final int id = nextId.incrementAndGet(); private final BossPool<NioClientBoss> bossPool;
private final Boss[] bosses;
private final AtomicInteger bossIndex = new AtomicInteger(); NioClientSocketPipelineSink(BossPool<NioClientBoss> bossPool) {
this.bossPool = bossPool;
private final WorkerPool<NioWorker> workerPool;
private final Timer timer;
NioClientSocketPipelineSink(
Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool, Timer timer) {
this.bossExecutor = bossExecutor;
this.timer = timer;
bosses = new Boss[bossCount];
for (int i = 0; i < bosses.length; i ++) {
bosses[i] = new Boss(i);
}
this.workerPool = workerPool;
} }
public void eventSunk( public void eventSunk(
@ -168,407 +127,8 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
} }
} }
private Boss nextBoss() { private NioClientBoss nextBoss() {
return bosses[Math.abs( return bossPool.nextBoss();
bossIndex.getAndIncrement() % bosses.length)];
} }
NioWorker nextWorker() {
return workerPool.nextWorker();
}
private final class Boss implements Runnable {
volatile Selector selector;
private boolean started;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
private final int subId;
private final TimerTask wakeupTask = new TimerTask() {
public void run(Timeout timeout) throws Exception {
// This is needed to prevent a possible race that can lead to a NPE
// when the selector is closed before this is run
//
// See https://github.com/netty/netty/issues/685
Selector selector = Boss.this.selector;
if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
};
Boss(int subId) {
this.subId = subId;
}
void register(NioClientSocketChannel channel) {
Runnable registerTask = new RegisterTask(this, channel);
Selector selector;
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
this.selector = selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(bossExecutor,
new ThreadRenamingRunnable(this,
"New I/O client boss #" + id + '-' + subId));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a selector.", t);
}
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
boolean offered = registerTaskQueue.offer(registerTask);
assert offered;
}
int timeout = channel.getConfig().getConnectTimeoutMillis();
if (timeout > 0) {
if (!channel.isConnected()) {
channel.timoutTimer = timer.newTimeout(wakeupTask,
timeout, TimeUnit.MILLISECONDS);
}
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
public void run() {
boolean shutdown = false;
int selectReturnsImmediately = 0;
Selector selector = this.selector;
// use 80% of the timeout for measure
final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
boolean wakenupFromLoop = false;
for (;;) {
wakenUp.set(false);
try {
long beforeSelect = System.nanoTime();
int selected = SelectorUtil.select(selector);
if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
long timeBlocked = System.nanoTime() - beforeSelect;
if (timeBlocked < minSelectTimeout) {
boolean notConnected = false;
// loop over all keys as the selector may was unblocked because of a closed channel
for (SelectionKey key: selector.keys()) {
SelectableChannel ch = key.channel();
try {
if (ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
notConnected = true;
// cancel the key just to be on the safe side
key.cancel();
}
} catch (CancelledKeyException e) {
// ignore
}
}
if (notConnected) {
selectReturnsImmediately = 0;
} else {
// returned before the minSelectTimeout elapsed with nothing select.
// this may be the cause of the jdk epoll(..) bug, so increment the counter
// which we use later to see if its really the jdk bug.
selectReturnsImmediately ++;
}
} else {
selectReturnsImmediately = 0;
}
if (selectReturnsImmediately == 1024) {
// The selector returned immediately for 10 times in a row,
// so recreate one selector as it seems like we hit the
// famous epoll(..) jdk bug.
selector = recreateSelector();
selectReturnsImmediately = 0;
wakenupFromLoop = false;
// try to select again
continue;
}
} else {
// reset counter
selectReturnsImmediately = 0;
}
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
wakenupFromLoop = true;
selector.wakeup();
} else {
wakenupFromLoop = false;
}
processRegisterTaskQueue();
processSelectedKeys(selector.selectedKeys());
// Handle connection timeout every 10 milliseconds approximately.
long currentTimeNanos = System.nanoTime();
processConnectTimeout(selector.keys(), currentTimeNanos);
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connection attempts are made in a one-by-one manner
// instead of concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown ||
bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
started = false;
try {
selector.close();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a selector.", e);
}
} finally {
this.selector = null;
}
break;
} else {
shutdown = false;
}
}
} else {
// Give one more second.
shutdown = true;
}
} else {
shutdown = false;
}
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"Unexpected exception in the selector loop.", t);
}
// Prevent possible consecutive immediate failures.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
private void processRegisterTaskQueue() {
for (;;) {
final Runnable task = registerTaskQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
if (!k.isValid()) {
close(k);
continue;
}
try {
if (k.isConnectable()) {
connect(k);
}
} catch (Throwable t) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
ch.connectFuture.setFailure(t);
fireExceptionCaught(ch, t);
k.cancel(); // Some JDK implementations run into an infinite loop without this.
ch.worker.close(ch, succeededFuture(ch));
}
}
}
private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
ConnectException cause = null;
for (SelectionKey k: keys) {
if (!k.isValid()) {
// Comment the close call again as it gave us major problems
// with ClosedChannelExceptions.
//
// See:
// * https://github.com/netty/netty/issues/142
// * https://github.com/netty/netty/issues/138
//
// close(k);
continue;
}
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
if (ch.connectDeadlineNanos > 0 &&
currentTimeNanos >= ch.connectDeadlineNanos) {
if (cause == null) {
cause = new ConnectException("connection timed out");
}
ch.connectFuture.setFailure(cause);
fireExceptionCaught(ch, cause);
ch.worker.close(ch, succeededFuture(ch));
}
}
}
private void connect(SelectionKey k) throws IOException {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
if (ch.channel.finishConnect()) {
k.cancel();
if (ch.timoutTimer != null) {
ch.timoutTimer.cancel();
}
ch.worker.register(ch, ch.connectFuture);
}
}
private void close(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
ch.worker.close(ch, succeededFuture(ch));
}
// Create a new selector and "transfer" all channels from the old
// selector to the new one
private Selector recreateSelector() throws IOException {
Selector newSelector = Selector.open();
Selector selector = this.selector;
this.selector = newSelector;
// loop over all the keys that are registered with the old Selector
// and register them with the new one
for (SelectionKey key: selector.keys()) {
SelectableChannel ch = key.channel();
int ops = key.interestOps();
Object att = key.attachment();
// cancel the old key
key.cancel();
try {
// register the channel with the new selector now
ch.register(newSelector, ops, att);
} catch (ClosedChannelException e) {
// close the Channel if we can't register it
close(key);
}
}
try {
// time to close the old selector as everything else is registered to the new one
selector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a selector.", t);
}
}
if (logger.isWarnEnabled()) {
logger.warn("Recreated Selector because of possible jdk epoll(..) bug");
}
return newSelector;
}
}
private static final class RegisterTask implements Runnable {
private final Boss boss;
private final NioClientSocketChannel channel;
RegisterTask(Boss boss, NioClientSocketChannel channel) {
this.boss = boss;
this.channel = channel;
}
public void run() {
try {
channel.channel.register(
boss.selector, SelectionKey.OP_CONNECT, channel);
} catch (ClosedChannelException e) {
channel.worker.close(channel, succeededFuture(channel));
}
int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
if (connectTimeout > 0) {
channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
}
}
}
} }

View File

@ -0,0 +1,365 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.jboss.netty.channel.Channels.*;
/**
* Boss implementation which handles accepting of new connections
*/
public final class NioServerBoss implements Boss {
private static final AtomicInteger nextId = new AtomicInteger();
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerBoss.class);
private final int id = nextId.incrementAndGet();
private volatile Selector selector;
private final Executor bossExecutor;
/**
* Queue of channel registration tasks.
*/
private final Queue<Runnable> bindTaskQueue = new ConcurrentLinkedQueue<Runnable>();
/**
* Monitor object used to synchronize selector open/close.
*/
private final Object startStopLock = new Object();
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeone for
* the select method and the select method will block for that time unless
* waken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
private Thread currentThread;
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
NioServerBoss(Executor bossExecutor) {
this(bossExecutor, null);
}
NioServerBoss(Executor bossExecutor, ThreadNameDeterminer determiner) {
this.bossExecutor = bossExecutor;
openSelector(determiner);
}
void bind(final NioServerSocketChannel channel, final ChannelFuture future,
final SocketAddress localAddress) {
synchronized (startStopLock) {
if (selector == null) {
// the selector was null this means the Worker has already been shutdown.
throw new RejectedExecutionException("Worker has already been shutdown");
}
boolean offered = bindTaskQueue.offer(new Runnable() {
public void run() {
boolean bound = false;
boolean registered = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);
registered = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!registered && bound) {
close(channel, future);
}
}
}
});
assert offered;
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
void close(NioServerSocketChannel channel, ChannelFuture future) {
boolean bound = channel.isBound();
try {
channel.socket.close();
cancelledKeys ++;
if (channel.setClosed()) {
future.setSuccess();
if (bound) {
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void openSelector(ThreadNameDeterminer determiner) {
try {
selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(bossExecutor, new ThreadRenamingRunnable(this,
"New I/O server boss #" + id, determiner));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
assert selector != null && selector.isOpen();
}
public void run() {
currentThread = Thread.currentThread();
boolean shutdown = false;
for (;;) {
wakenUp.set(false);
try {
// Just do a blocking select without any timeout
// as this thread does not execute anything else.
selector.select();
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
processBindTaskQueue();
processSelectedKeys(selector.selectedKeys());
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
// concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown || bossExecutor instanceof ExecutorService &&
((ExecutorService) bossExecutor).isShutdown()) {
synchronized (startStopLock) {
if (selector.keys().isEmpty()) {
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
} finally {
selector = null;
}
break;
} else {
shutdown = false;
}
}
}
} else {
shutdown = false;
}
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
}
private void processBindTaskQueue() throws IOException {
for (;;) {
final Runnable task = bindTaskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
private boolean cleanUpCancelledKeys() throws IOException {
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
selector.selectNow();
return true;
}
return false;
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();
try {
// accept connections in a for loop until no new connection is ready
for (;;) {
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket == null) {
break;
}
registerAcceptedChannel(channel, acceptedSocket, currentThread);
}
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
k.cancel();
channel.close();
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (ClosedChannelException e) {
// Closed as requested.
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", t);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
}
private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
Thread currentThread) {
try {
ChannelSink sink = parent.getPipeline().getSink();
ChannelPipeline pipeline =
parent.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = parent.workerPool.nextWorker();
worker.register(new NioAcceptedSocketChannel(
parent.getFactory(), pipeline, parent, sink
, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to initialize an accepted socket.", e);
}
try {
acceptedSocket.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.util.ThreadNameDeterminer;
import java.util.concurrent.Executor;
/**
* Holds {@link NioServerBoss} instances to use
*/
public class NioServerBossPool extends AbstractNioBossPool<NioServerBoss> {
private final ThreadNameDeterminer determiner;
/**
* Create a new instance
*
* @param bossExecutor the {@link Executor} to use for server the {@link NioServerBoss}
* @param bossCount the number of {@link NioServerBoss} instances this {@link NioServerBossPool} will hold
* @param determiner the {@link ThreadNameDeterminer} to use for name the threads. Use <code>null</code>
* if you not want to set one explicit.
*/
public NioServerBossPool(Executor bossExecutor, int bossCount, ThreadNameDeterminer determiner) {
super(bossExecutor, bossCount);
this.determiner = determiner;
}
/**
* Create a new instance using no {@link ThreadNameDeterminer}
*
* @param bossExecutor the {@link Executor} to use for server the {@link NioServerBoss}
* @param bossCount the number of {@link NioServerBoss} instances this {@link NioServerBossPool} will hold
*/
public NioServerBossPool(Executor bossExecutor, int bossCount) {
this(bossExecutor, bossCount, null);
}
@Override
protected NioServerBoss newBoss(Executor executor) {
return new NioServerBoss(executor, determiner);
}
}

View File

@ -38,18 +38,19 @@ class NioServerSocketChannel extends AbstractServerChannel
InternalLoggerFactory.getInstance(NioServerSocketChannel.class); InternalLoggerFactory.getInstance(NioServerSocketChannel.class);
final ServerSocketChannel socket; final ServerSocketChannel socket;
final Runnable boss; final Boss boss;
final WorkerPool<NioWorker> workerPool;
private final ServerSocketChannelConfig config; private final ServerSocketChannelConfig config;
NioServerSocketChannel( NioServerSocketChannel(
ChannelFactory factory, ChannelFactory factory,
ChannelPipeline pipeline, ChannelPipeline pipeline,
ChannelSink sink, Runnable boss) { ChannelSink sink, Boss boss, WorkerPool<NioWorker> workerPool) {
super(factory, pipeline, sink); super(factory, pipeline, sink);
this.boss = boss; this.boss = boss;
this.workerPool = workerPool;
try { try {
socket = ServerSocketChannel.open(); socket = ServerSocketChannel.open();
} catch (IOException e) { } catch (IOException e) {

View File

@ -83,9 +83,9 @@ import org.jboss.netty.util.ExternalResourceReleasable;
*/ */
public class NioServerSocketChannelFactory implements ServerSocketChannelFactory { public class NioServerSocketChannelFactory implements ServerSocketChannelFactory {
final Executor bossExecutor;
private final WorkerPool<NioWorker> workerPool; private final WorkerPool<NioWorker> workerPool;
private final NioServerSocketPipelineSink sink; private final NioServerSocketPipelineSink sink;
private final BossPool<NioServerBoss> bossPool;
/** /**
* Create a new {@link NioServerSocketChannelFactory} using {@link Executors#newCachedThreadPool()} * Create a new {@link NioServerSocketChannelFactory} using {@link Executors#newCachedThreadPool()}
@ -174,24 +174,41 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
*/ */
public NioServerSocketChannelFactory( public NioServerSocketChannelFactory(
Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) { Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
if (bossExecutor == null) { this(new NioServerBossPool(bossExecutor, bossCount, null), workerPool);
}
/**
* Create a new instance.
*
* @param bossPool
* the {@link BossPool} which will be used to obtain the {@link NioServerBoss} that execute
* the I/O for accept new connections
* @param workerPool
* the {@link WorkerPool} which will be used to obtain the {@link NioWorker} that execute
* the I/O worker threads
*/
public NioServerSocketChannelFactory(BossPool<NioServerBoss> bossPool, WorkerPool<NioWorker> workerPool) {
if (bossPool == null) {
throw new NullPointerException("bossExecutor"); throw new NullPointerException("bossExecutor");
} }
if (workerPool == null) { if (workerPool == null) {
throw new NullPointerException("workerPool"); throw new NullPointerException("workerPool");
} }
this.bossExecutor = bossExecutor; this.bossPool = bossPool;
this.workerPool = workerPool; this.workerPool = workerPool;
sink = new NioServerSocketPipelineSink(bossExecutor, bossCount, workerPool); sink = new NioServerSocketPipelineSink();
} }
public ServerSocketChannel newChannel(ChannelPipeline pipeline) { public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
return new NioServerSocketChannel(this, pipeline, sink, sink.nextBoss()); return new NioServerSocketChannel(this, pipeline, sink, bossPool.nextBoss(), workerPool);
} }
public void releaseExternalResources() { public void releaseExternalResources() {
sink.releaseExternalResources(); if (bossPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) bossPool).releaseExternalResources();
}
if (workerPool instanceof ExternalResourceReleasable) { if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources(); ((ExternalResourceReleasable) workerPool).releaseExternalResources();
} }

View File

@ -17,59 +17,15 @@ package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelState; import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import org.jboss.netty.util.internal.ExecutorUtil;
import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.jboss.netty.channel.Channels.*; class NioServerSocketPipelineSink extends AbstractNioChannelSink {
class NioServerSocketPipelineSink extends AbstractNioChannelSink implements ExternalResourceReleasable {
private static final AtomicInteger nextId = new AtomicInteger();
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerSocketPipelineSink.class);
final int id = nextId.incrementAndGet();
private final Boss[] bosses;
private final AtomicInteger bossIndex = new AtomicInteger();
private final WorkerPool<NioWorker> workerPool;
NioServerSocketPipelineSink(Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
this.workerPool = workerPool;
bosses = new Boss[bossCount];
for (int i = 0; i < bossCount; i++) {
bosses[i] = new Boss(bossExecutor);
}
}
public void eventSunk( public void eventSunk(
@ -97,14 +53,14 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink implements Exte
switch (state) { switch (state) {
case OPEN: case OPEN:
if (Boolean.FALSE.equals(value)) { if (Boolean.FALSE.equals(value)) {
((Boss) channel.boss).close(channel, future); ((NioServerBoss) channel.boss).close(channel, future);
} }
break; break;
case BOUND: case BOUND:
if (value != null) { if (value != null) {
((Boss) channel.boss).bind(channel, future, (SocketAddress) value); ((NioServerBoss) channel.boss).bind(channel, future, (SocketAddress) value);
} else { } else {
((Boss) channel.boss).close(channel, future); ((NioServerBoss) channel.boss).close(channel, future);
} }
break; break;
default: default:
@ -144,321 +100,4 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink implements Exte
channel.worker.writeFromUserCode(channel); channel.worker.writeFromUserCode(channel);
} }
} }
NioWorker nextWorker() {
return workerPool.nextWorker();
}
Boss nextBoss() {
return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
}
public void releaseExternalResources() {
for (Boss boss: bosses) {
ExecutorUtil.terminate(boss.bossExecutor);
}
}
final class Boss implements Runnable {
volatile Selector selector;
private final Executor bossExecutor;
/**
* Queue of channel registration tasks.
*/
private final Queue<Runnable> bindTaskQueue = new ConcurrentLinkedQueue<Runnable>();
/**
* Monitor object used to synchronize selector open/close.
*/
private final Object startStopLock = new Object();
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeone for
* the select method and the select method will block for that time unless
* waken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
private Thread currentThread;
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
Boss(Executor bossExecutor) {
this.bossExecutor = bossExecutor;
openSelector();
}
void bind(final NioServerSocketChannel channel, final ChannelFuture future,
final SocketAddress localAddress) {
synchronized (startStopLock) {
if (selector == null) {
// the selector was null this means the Worker has already been shutdown.
throw new RejectedExecutionException("Worker has already been shutdown");
}
boolean offered = bindTaskQueue.offer(new Runnable() {
public void run() {
boolean bound = false;
boolean registered = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);
registered = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!registered && bound) {
close(channel, future);
}
}
}
});
assert offered;
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
void close(NioServerSocketChannel channel, ChannelFuture future) {
boolean bound = channel.isBound();
try {
channel.socket.close();
cancelledKeys ++;
if (channel.setClosed()) {
future.setSuccess();
if (bound) {
fireChannelUnbound(channel);
}
fireChannelClosed(channel);
} else {
future.setSuccess();
}
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
}
}
private void openSelector() {
try {
selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(bossExecutor, new ThreadRenamingRunnable(this,
"New I/O server boss #" + id));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
assert selector != null && selector.isOpen();
}
public void run() {
currentThread = Thread.currentThread();
boolean shutdown = false;
for (;;) {
wakenUp.set(false);
try {
// Just do a blocking select without any timeout
// as this thread does not execute anything else.
selector.select();
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
processBindTaskQueue();
processSelectedKeys(selector.selectedKeys());
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
// concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown || bossExecutor instanceof ExecutorService &&
((ExecutorService) bossExecutor).isShutdown()) {
synchronized (startStopLock) {
if (selector.keys().isEmpty()) {
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
} finally {
selector = null;
}
break;
} else {
shutdown = false;
}
}
}
} else {
shutdown = false;
}
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
}
private void processBindTaskQueue() throws IOException {
for (;;) {
final Runnable task = bindTaskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
private boolean cleanUpCancelledKeys() throws IOException {
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
selector.selectNow();
return true;
}
return false;
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey k = i.next();
i.remove();
NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();
try {
// accept connections in a for loop until no new connection is ready
for (;;) {
SocketChannel acceptedSocket = channel.socket.accept();
if (acceptedSocket == null) {
break;
}
registerAcceptedChannel(channel, acceptedSocket, currentThread);
}
} catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed.
k.cancel();
channel.close();
} catch (SocketTimeoutException e) {
// Thrown every second to get ClosedChannelException
// raised.
} catch (ClosedChannelException e) {
// Closed as requested.
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", t);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
}
private void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
Thread currentThread) {
try {
ChannelPipeline pipeline =
parent.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker();
worker.register(new NioAcceptedSocketChannel(
parent.getFactory(), pipeline, parent,
NioServerSocketPipelineSink.this, acceptedSocket,
worker, currentThread), null);
} catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to initialize an accepted socket.", e);
}
try {
acceptedSocket.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially accepted socket.",
e2);
}
}
}
}
}
} }

View File

@ -31,6 +31,7 @@ import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.ChannelException; import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ReceiveBufferSizePredictor; import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.util.ThreadNameDeterminer;
public class NioWorker extends AbstractNioWorker { public class NioWorker extends AbstractNioWorker {
@ -40,6 +41,10 @@ public class NioWorker extends AbstractNioWorker {
super(executor); super(executor);
} }
public NioWorker(Executor executor, ThreadNameDeterminer determiner) {
super(executor, determiner);
}
@Override @Override
protected boolean read(SelectionKey k) { protected boolean read(SelectionKey k) {
final SocketChannel ch = (SocketChannel) k.channel(); final SocketChannel ch = (SocketChannel) k.channel();

View File

@ -15,6 +15,8 @@
*/ */
package org.jboss.netty.channel.socket.nio; package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.util.ThreadNameDeterminer;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -25,13 +27,22 @@ import java.util.concurrent.Executor;
*/ */
public class NioWorkerPool extends AbstractNioWorkerPool<NioWorker> { public class NioWorkerPool extends AbstractNioWorkerPool<NioWorker> {
private final ThreadNameDeterminer determiner;
public NioWorkerPool(Executor workerExecutor, int workerCount) { public NioWorkerPool(Executor workerExecutor, int workerCount) {
super(workerExecutor, workerCount); this(workerExecutor, workerCount, null);
}
public NioWorkerPool(Executor workerExecutor, int workerCount, ThreadNameDeterminer determiner) {
super(workerExecutor, workerCount, false);
this.determiner = determiner;
init();
} }
@Override @Override
protected NioWorker createWorker(Executor executor) { protected NioWorker createWorker(Executor executor) {
return new NioWorker(executor); return new NioWorker(executor, determiner);
} }
} }

View File

@ -34,6 +34,7 @@ public class ThreadRenamingRunnable implements Runnable {
private static volatile ThreadNameDeterminer threadNameDeterminer = private static volatile ThreadNameDeterminer threadNameDeterminer =
ThreadNameDeterminer.PROPOSED; ThreadNameDeterminer.PROPOSED;
private final ThreadNameDeterminer determiner;
/** /**
* Returns the {@link ThreadNameDeterminer} which overrides the proposed * Returns the {@link ThreadNameDeterminer} which overrides the proposed
@ -68,7 +69,7 @@ public class ThreadRenamingRunnable implements Runnable {
* and changes the thread name to the specified thread name when the * and changes the thread name to the specified thread name when the
* specified {@code runnable} is running. * specified {@code runnable} is running.
*/ */
public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName) { public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName, ThreadNameDeterminer determiner) {
if (runnable == null) { if (runnable == null) {
throw new NullPointerException("runnable"); throw new NullPointerException("runnable");
} }
@ -76,9 +77,14 @@ public class ThreadRenamingRunnable implements Runnable {
throw new NullPointerException("proposedThreadName"); throw new NullPointerException("proposedThreadName");
} }
this.runnable = runnable; this.runnable = runnable;
this.determiner = determiner;
this.proposedThreadName = proposedThreadName; this.proposedThreadName = proposedThreadName;
} }
public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName) {
this(runnable, proposedThreadName, null);
}
public void run() { public void run() {
final Thread currentThread = Thread.currentThread(); final Thread currentThread = Thread.currentThread();
final String oldThreadName = currentThread.getName(); final String oldThreadName = currentThread.getName();
@ -113,8 +119,12 @@ public class ThreadRenamingRunnable implements Runnable {
String newThreadName = null; String newThreadName = null;
try { try {
ThreadNameDeterminer nameDeterminer = determiner;
if (nameDeterminer == null) {
nameDeterminer = getThreadNameDeterminer();
}
newThreadName = newThreadName =
getThreadNameDeterminer().determineThreadName( nameDeterminer.determineThreadName(
currentThreadName, proposedThreadName); currentThreadName, proposedThreadName);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("Failed to determine the thread name", t); logger.warn("Failed to determine the thread name", t);

View File

@ -17,10 +17,13 @@ package org.jboss.netty.util;
import static org.easymock.EasyMock.*; import static org.easymock.EasyMock.*;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.security.Permission; import java.security.Permission;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Test; import org.junit.Test;
public class ThreadRenamingRunnableTest { public class ThreadRenamingRunnableTest {
@ -84,6 +87,58 @@ public class ThreadRenamingRunnableTest {
} }
} }
// Tests mainly changed which were introduced as part of #711
@Test
public void testThreadNameDeterminer() {
final String oldThreadName = Thread.currentThread().getName();
final String newThreadName = "new";
final String proposed = "proposed";
ThreadNameDeterminer determiner = new ThreadNameDeterminer() {
public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
assertEquals(proposed, proposedThreadName);
assertEquals(oldThreadName, currentThreadName);
return newThreadName;
}
};
ThreadRenamingRunnable.setThreadNameDeterminer(new ThreadNameDeterminer() {
public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
assertEquals(proposed, proposedThreadName);
assertEquals(oldThreadName, currentThreadName);
return proposed;
}
});
Executor e = new ImmediateExecutor();
try {
e.execute(new ThreadRenamingRunnable(new Runnable() {
public void run() {
assertEquals("Should use the given ThreadNameDEterminer",
newThreadName, Thread.currentThread().getName());
}
}, proposed, determiner));
} finally {
assertEquals(oldThreadName, Thread.currentThread().getName());
}
try {
e.execute(new ThreadRenamingRunnable(new Runnable() {
public void run() {
assertEquals("Should use the static set ThreadNameDeterminer",
proposed, Thread.currentThread().getName());
}
}, proposed));
} finally {
assertEquals(oldThreadName, Thread.currentThread().getName());
}
}
@AfterClass
public static void after() {
// reset to default
ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.PROPOSED);
}
private static class ImmediateExecutor implements Executor { private static class ImmediateExecutor implements Executor {
ImmediateExecutor() { ImmediateExecutor() {