[#775] Rework closing logic of Worker/Boss

Added a shutdown() method on ChannelFactory, Boss, Worker, BossPool, WorkerPool and Bootstrap which can be used to just shutdown the instance and so release all internal created resources. This method is also called when releaseExternalResources() which will also release external created resources like Executors.

This commit also fixes the problem that the Worker/Boss Thread will never be released if you use an Executor (no ExecutorService).
This commit is contained in:
Norman Maurer 2012-12-04 15:51:31 +01:00
parent 95684d92eb
commit 7aa2cfad65
48 changed files with 867 additions and 1241 deletions

View File

@ -319,6 +319,16 @@ public class Bootstrap implements ExternalResourceReleasable {
}
}
/**
* This method simply delegates the call to
* {@link ChannelFactory#shutdown()}.
*/
public void shutdown() {
ChannelFactory factory = this.factory;
if (factory != null) {
factory.shutdown();
}
}
/**
* Returns {@code true} if and only if the specified {@code map} is an
* ordered map, like {@link LinkedHashMap} is.

View File

@ -67,6 +67,11 @@ public interface ChannelFactory extends ExternalResourceReleasable {
*/
Channel newChannel(ChannelPipeline pipeline);
/**
* Shudown the ChannelFactory and all the resource it created internal.
*/
void shutdown();
/**
* Releases the external resources that this factory depends on to function.
* An external resource is a resource that this factory didn't create by
@ -76,6 +81,8 @@ public interface ChannelFactory extends ExternalResourceReleasable {
* this factory or any other part of your application. An unexpected
* behavior will be resulted in if the resources are released when there's
* an open channel which is managed by this factory.
*
* This will also call {@link #shutdown()} before do any action
*/
void releaseExternalResources();
}

View File

@ -44,4 +44,8 @@ public class DefaultLocalClientChannelFactory implements LocalClientChannelFacto
public void releaseExternalResources() {
// No external resources.
}
public void shutdown() {
// nothing to shutdown
}
}

View File

@ -41,4 +41,8 @@ public class DefaultLocalServerChannelFactory implements LocalServerChannelFacto
public void releaseExternalResources() {
group.close().awaitUninterruptibly();
}
public void shutdown() {
// nothing to shutdown
}
}

View File

@ -16,8 +16,6 @@
package org.jboss.netty.channel.socket;
import java.nio.channels.Selector;
/**
* A {@link Worker} is responsible to dispatch IO operations
*
@ -33,9 +31,4 @@ public interface Worker extends Runnable {
*/
void executeInIoThread(Runnable task);
/**
* Replaces the current {@link Selector} with a new {@link Selector} to work around the infamous epoll 100% CPU
* bug.
*/
void rebuildSelector();
}

View File

@ -47,4 +47,8 @@ public class HttpTunnelingClientSocketChannelFactory implements ClientSocketChan
public void releaseExternalResources() {
clientSocketChannelFactory.releaseExternalResources();
}
public void shutdown() {
clientSocketChannelFactory.shutdown();
}
}

View File

@ -87,11 +87,13 @@ public abstract class AbstractNioBossPool<E extends Boss>
}
public void releaseExternalResources() {
shutdown();
ExecutorUtil.terminate(bossExecutor);
}
public void shutdown() {
for (Boss boss: bosses) {
if (boss instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) boss).releaseExternalResources();
}
boss.shutdown();
}
}
}

View File

@ -0,0 +1,418 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
abstract class AbstractNioSelector implements NioSelector {
private static final AtomicInteger nextId = new AtomicInteger();
private final int id = nextId.incrementAndGet();
/**
* Internal Netty logger.
*/
protected static final InternalLogger logger = InternalLoggerFactory
.getInstance(AbstractNioSelector.class);
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
/**
* Executor used to execute {@link Runnable}s such as channel registration
* task.
*/
private final Executor executor;
/**
* If this worker has been started thread will be a reference to the thread
* used when starting. i.e. the current thread when the run method is executed.
*/
protected volatile Thread thread;
/**
* The NIO {@link Selector}.
*/
protected volatile Selector selector;
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeone for
* the select method and the select method will block for that time unless
* waken up.
*/
protected final AtomicBoolean wakenUp = new AtomicBoolean();
private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private volatile boolean shutdown;
AbstractNioSelector(Executor executor) {
this(executor, null);
}
AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
this.executor = executor;
openSelector(determiner);
}
public void register(Channel channel, ChannelFuture future) {
Runnable task = createRegisterTask(channel, future);
registerTask(task);
}
protected final void registerTask(Runnable task) {
taskQueue.add(task);
Selector selector = this.selector;
if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
} else {
if (taskQueue.remove(task)) {
// the selector was null this means the Worker has already been shutdown.
throw new RejectedExecutionException("Worker has already been shutdown");
}
}
}
protected final boolean isIoThread() {
return Thread.currentThread() == thread;
}
public void rebuildSelector() {
if (!isIoThread()) {
taskQueue.add(new Runnable() {
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = Selector.open();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, key.attachment());
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector,", e);
close(key);
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
}
public void run() {
thread = Thread.currentThread();
int selectReturnsImmediately = 0;
Selector selector = this.selector;
if (selector == null) {
return;
}
// use 80% of the timeout for measure
final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
boolean wakenupFromLoop = false;
for (;;) {
wakenUp.set(false);
try {
long beforeSelect = System.nanoTime();
int selected = select(selector);
if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
long timeBlocked = System.nanoTime() - beforeSelect;
if (timeBlocked < minSelectTimeout) {
boolean notConnected = false;
// loop over all keys as the selector may was unblocked because of a closed channel
for (SelectionKey key: selector.keys()) {
SelectableChannel ch = key.channel();
try {
if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() ||
ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
notConnected = true;
// cancel the key just to be on the safe side
key.cancel();
}
} catch (CancelledKeyException e) {
// ignore
}
}
if (notConnected) {
selectReturnsImmediately = 0;
} else {
// returned before the minSelectTimeout elapsed with nothing select.
// this may be the cause of the jdk epoll(..) bug, so increment the counter
// which we use later to see if its really the jdk bug.
selectReturnsImmediately ++;
}
} else {
selectReturnsImmediately = 0;
}
if (selectReturnsImmediately == 1024) {
// The selector returned immediately for 10 times in a row,
// so recreate one selector as it seems like we hit the
// famous epoll(..) jdk bug.
rebuildSelector();
selector = this.selector;
selectReturnsImmediately = 0;
wakenupFromLoop = false;
// try to select again
continue;
}
} else {
// reset counter
selectReturnsImmediately = 0;
}
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
wakenupFromLoop = true;
selector.wakeup();
} else {
wakenupFromLoop = false;
}
cancelledKeys = 0;
processTaskQueue();
selector = this.selector; // processTaskQueue() can call rebuildSelector()
if (shutdown) {
this.selector = null;
// process one time again
processTaskQueue();
for (Iterator<SelectionKey> i = selector.keys().iterator(); i.hasNext();) {
close(i.next());
}
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
}
shutdownLatch.countDown();
break;
} else {
process(selector);
}
} catch (Throwable t) {
logger.warn(
"Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
/**
* Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for
* the {@link AbstractNioChannel}'s when they get registered
*/
private void openSelector(ThreadNameDeterminer determiner) {
try {
selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
assert selector != null && selector.isOpen();
}
private void processTaskQueue() {
for (;;) {
final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
task.run();
try {
cleanUpCancelledKeys();
} catch (IOException e) {
// Ignore
}
}
}
protected final void increaseCancelledKeys() {
cancelledKeys ++;
}
protected final boolean cleanUpCancelledKeys() throws IOException {
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
selector.selectNow();
return true;
}
return false;
}
public void shutdown() {
if (isIoThread()) {
throw new IllegalStateException("Must not be called from a I/O-Thread to prevent deadlocks!");
}
Selector selector = this.selector;
shutdown = true;
if (selector != null) {
selector.wakeup();
}
try {
shutdownLatch.await();
} catch (InterruptedException e) {
logger.error("Interrupted while wait for resources to be released #" + id);
Thread.currentThread().interrupt();
}
}
protected abstract void process(Selector selector) throws IOException;
protected int select(Selector selector) throws IOException {
return SelectorUtil.select(selector);
}
protected abstract void close(SelectionKey k);
protected abstract ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner);
protected abstract Runnable createRegisterTask(Channel channel, ChannelFuture future);
}

View File

@ -16,358 +16,39 @@
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.channel.socket.nio.SocketSendBufferPool.SendBuffer;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.jboss.netty.channel.Channels.*;
abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
private static final AtomicInteger nextId = new AtomicInteger();
final int id = nextId.incrementAndGet();
/**
* Internal Netty logger.
*/
private static final InternalLogger logger = InternalLoggerFactory
.getInstance(AbstractNioWorker.class);
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
/**
* Executor used to execute {@link Runnable}s such as channel registration
* task.
*/
private final Executor executor;
/**
* If this worker has been started thread will be a reference to the thread
* used when starting. i.e. the current thread when the run method is executed.
*/
protected volatile Thread thread;
/**
* The NIO {@link Selector}.
*/
volatile Selector selector;
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeone for
* the select method and the select method will block for that time unless
* waken up.
*/
protected final AtomicBoolean wakenUp = new AtomicBoolean();
/**
* Monitor object used to synchronize selector open/close.
*/
private final Object startStopLock = new Object();
final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
abstract class AbstractNioWorker extends AbstractNioSelector implements Worker {
protected final SocketSendBufferPool sendBufferPool = new SocketSendBufferPool();
AbstractNioWorker(Executor executor) {
this(executor, null);
super(executor);
}
AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) {
this.executor = executor;
openSelector(determiner);
}
void register(AbstractNioChannel<?> channel, ChannelFuture future) {
synchronized (startStopLock) {
if (selector == null) {
// the selector was null this means the Worker has already been shutdown.
throw new RejectedExecutionException("Worker has already been shutdown");
}
taskQueue.add(createRegisterTask(channel, future));
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
}
public void rebuildSelector() {
if (Thread.currentThread() != thread) {
executeInIoThread(new Runnable() {
public void run() {
rebuildSelector();
}
}, true);
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = Selector.open();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, key.attachment());
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector,", e);
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) key.attachment();
close(channel, succeededFuture(channel));
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
}
/**
* Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for
* the {@link AbstractNioChannel}'s when they get registered
*/
private void openSelector(ThreadNameDeterminer determiner) {
try {
selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
assert selector != null && selector.isOpen();
}
public void run() {
thread = Thread.currentThread();
boolean shutdown = false;
int selectReturnsImmediately = 0;
Selector selector = this.selector;
// use 80% of the timeout for measure
final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
boolean wakenupFromLoop = false;
for (;;) {
wakenUp.set(false);
try {
long beforeSelect = System.nanoTime();
int selected = SelectorUtil.select(selector);
if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
long timeBlocked = System.nanoTime() - beforeSelect;
if (timeBlocked < minSelectTimeout) {
boolean notConnected = false;
// loop over all keys as the selector may was unblocked because of a closed channel
for (SelectionKey key: selector.keys()) {
SelectableChannel ch = key.channel();
try {
if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() ||
ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
notConnected = true;
// cancel the key just to be on the safe side
key.cancel();
}
} catch (CancelledKeyException e) {
// ignore
}
}
if (notConnected) {
selectReturnsImmediately = 0;
} else {
// returned before the minSelectTimeout elapsed with nothing select.
// this may be the cause of the jdk epoll(..) bug, so increment the counter
// which we use later to see if its really the jdk bug.
selectReturnsImmediately ++;
}
} else {
selectReturnsImmediately = 0;
}
if (selectReturnsImmediately == 1024) {
// The selector returned immediately for 10 times in a row,
// so recreate one selector as it seems like we hit the
// famous epoll(..) jdk bug.
rebuildSelector();
selector = this.selector;
selectReturnsImmediately = 0;
wakenupFromLoop = false;
// try to select again
continue;
}
} else {
// reset counter
selectReturnsImmediately = 0;
}
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
wakenupFromLoop = true;
selector.wakeup();
} else {
wakenupFromLoop = false;
}
cancelledKeys = 0;
processTaskQueue();
selector = this.selector; // processTaskQueue() can call rebuildSelector()
processSelectedKeys(selector.selectedKeys());
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
// concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown ||
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
synchronized (startStopLock) {
if (taskQueue.isEmpty() && selector.keys().isEmpty()) {
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
} finally {
this.selector = null;
}
break;
} else {
shutdown = false;
}
}
}
} else {
shutdown = false;
}
} catch (Throwable t) {
logger.warn(
"Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
super(executor, determiner);
}
public void executeInIoThread(Runnable task) {
@ -384,48 +65,33 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
* in an async fashion even if the current Thread == IO Thread
*/
public void executeInIoThread(Runnable task, boolean alwaysAsync) {
if (!alwaysAsync && Thread.currentThread() == thread) {
if (!alwaysAsync && isIoThread()) {
task.run();
} else {
taskQueue.offer(task);
synchronized (startStopLock) {
// check if the selector was shutdown already or was not started yet. If so execute all
// submitted tasks in the calling thread
if (selector == null) {
// execute everything in the event queue as the
for (;;) {
Runnable r = taskQueue.poll();
if (r == null) {
break;
}
r.run();
}
} else {
if (wakenUp.compareAndSet(false, true)) {
// wake up the selector to speed things
Selector selector = this.selector;
if (selector != null) {
selector.wakeup();
}
}
}
}
registerTask(task);
}
}
private void processTaskQueue() throws IOException {
for (;;) {
final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
@Override
protected void close(SelectionKey k) {
AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
close(ch, succeededFuture(ch));
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
@Override
protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);
}
@Override
public void run() {
super.run();
sendBufferPool.releaseExternalResources();
}
@Override
protected void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
@ -456,20 +122,6 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
}
}
private boolean cleanUpCancelledKeys() throws IOException {
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
selector.selectNow();
return true;
}
return false;
}
private void close(SelectionKey k) {
AbstractNioChannel<?> ch = (AbstractNioChannel<?>) k.attachment();
close(ch, succeededFuture(ch));
}
void writeFromUserCode(final AbstractNioChannel<?> channel) {
if (!channel.isConnected()) {
cleanUpWriteBuffer(channel);
@ -668,14 +320,14 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
}
}
void close(AbstractNioChannel<?> channel, ChannelFuture future) {
protected void close(AbstractNioChannel<?> channel, ChannelFuture future) {
boolean connected = channel.isConnected();
boolean bound = channel.isBound();
boolean iothread = isIoThread(channel);
try {
channel.channel.close();
cancelledKeys ++;
increaseCancelledKeys();
if (channel.setClosed()) {
future.setSuccess();
@ -835,10 +487,6 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
}
}
public void releaseExternalResources() {
sendBufferPool.releaseExternalResources();
}
/**
* Read is called when a Selector has been notified that the underlying channel
* was something to be read. The channel would previously have registered its interest
@ -848,9 +496,4 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
*/
protected abstract boolean read(SelectionKey k);
/**
* Create a new {@link Runnable} which will register the {@link AbstractNioWorker} with the {@link Channel}
*/
protected abstract Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future);
}

View File

@ -105,15 +105,19 @@ public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
}
public void rebuildSelectors() {
for (Worker worker: workers) {
for (AbstractNioWorker worker: workers) {
worker.rebuildSelector();
}
}
public void releaseExternalResources() {
shutdown();
ExecutorUtil.terminate(workerExecutor);
}
public void shutdown() {
for (AbstractNioWorker worker: workers) {
worker.releaseExternalResources();
worker.shutdown();
}
}

View File

@ -15,15 +15,8 @@
*/
package org.jboss.netty.channel.socket.nio;
import java.nio.channels.Selector;
/**
* Serves the boss tasks like connecting/accepting
*/
public interface Boss extends Runnable {
/**
* Replaces the current {@link Selector} with a new {@link Selector} to work around the infamous epoll 100% CPU
* bug.
*/
void rebuildSelector();
public interface Boss extends NioSelector {
}

View File

@ -15,21 +15,13 @@
*/
package org.jboss.netty.channel.socket.nio;
import java.nio.channels.Selector;
/**
* A Pool that holds {@link Boss} instances
*/
public interface BossPool<E extends Boss> {
public interface BossPool<E extends Boss> extends NioSelectorPool {
/**
* Return the next {@link Boss} to use
*
*/
E nextBoss();
/**
* Replaces the current {@link Selector}s of the {@link Boss}es with new {@link Selector}s to work around the
* infamous epoll 100% CPU bug.
*/
void rebuildSelectors();
}

View File

@ -15,55 +15,31 @@
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.Timer;
import org.jboss.netty.util.TimerTask;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.jboss.netty.channel.Channels.*;
/**
* {@link Boss} implementation that handles the connection attempts of clients
*/
public final class NioClientBoss implements Boss {
public final class NioClientBoss extends AbstractNioSelector implements Boss {
private static final AtomicInteger nextId = new AtomicInteger();
private final int id = nextId.incrementAndGet();
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientBoss.class);
private volatile Selector selector;
private volatile Thread thread;
private boolean started;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final Object startStopLock = new Object();
private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
private final TimerTask wakeupTask = new TimerTask() {
public void run(Timeout timeout) throws Exception {
// This is needed to prevent a possible race that can lead to a NPE
@ -79,242 +55,35 @@ public final class NioClientBoss implements Boss {
}
}
};
private final Executor bossExecutor;
private final ThreadNameDeterminer determiner;
private final Timer timer;
NioClientBoss(Executor bossExecutor, Timer timer, ThreadNameDeterminer determiner) {
this.bossExecutor = bossExecutor;
this.determiner = determiner;
super(bossExecutor, determiner);
this.timer = timer;
}
void register(NioClientSocketChannel channel) {
Runnable registerTask = new RegisterTask(this, channel);
Selector selector;
synchronized (startStopLock) {
if (!started) {
// Open a selector if this worker didn't start yet.
try {
this.selector = selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException(
"Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(bossExecutor,
new ThreadRenamingRunnable(this,
"New I/O client boss #" + id , determiner));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a selector.", t);
}
}
this.selector = selector = null;
// The method will return to the caller at this point.
}
}
} else {
// Use the existing selector if this worker has been started.
selector = this.selector;
}
assert selector != null && selector.isOpen();
started = true;
boolean offered = taskQueue.offer(registerTask);
assert offered;
}
int timeout = channel.getConfig().getConnectTimeoutMillis();
if (timeout > 0) {
if (!channel.isConnected()) {
channel.timoutTimer = timer.newTimeout(wakeupTask,
timeout, TimeUnit.MILLISECONDS);
}
}
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
@Override
protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
return new ThreadRenamingRunnable(this, "New I/O boss #" + id, determiner);
}
public void run() {
thread = Thread.currentThread();
boolean shutdown = false;
int selectReturnsImmediately = 0;
Selector selector = this.selector;
// use 80% of the timeout for measure
final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
boolean wakenupFromLoop = false;
for (;;) {
wakenUp.set(false);
try {
long beforeSelect = System.nanoTime();
int selected = SelectorUtil.select(selector);
if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
long timeBlocked = System.nanoTime() - beforeSelect;
if (timeBlocked < minSelectTimeout) {
boolean notConnected = false;
// loop over all keys as the selector may was unblocked because of a closed channel
for (SelectionKey key: selector.keys()) {
SelectableChannel ch = key.channel();
try {
if (ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
notConnected = true;
// cancel the key just to be on the safe side
key.cancel();
}
} catch (CancelledKeyException e) {
// ignore
}
}
if (notConnected) {
selectReturnsImmediately = 0;
} else {
// returned before the minSelectTimeout elapsed with nothing select.
// this may be the cause of the jdk epoll(..) bug, so increment the counter
// which we use later to see if its really the jdk bug.
selectReturnsImmediately ++;
}
} else {
selectReturnsImmediately = 0;
}
if (selectReturnsImmediately == 1024) {
// The selector returned immediately for 10 times in a row,
// so recreate one selector as it seems like we hit the
// famous epoll(..) jdk bug.
rebuildSelector();
selector = this.selector;
selectReturnsImmediately = 0;
wakenupFromLoop = false;
// try to select again
continue;
}
} else {
// reset counter
selectReturnsImmediately = 0;
}
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
wakenupFromLoop = true;
selector.wakeup();
} else {
wakenupFromLoop = false;
}
processTaskQueue();
selector = this.selector; // processTaskQueue() can call rebuildSelector()
processSelectedKeys(selector.selectedKeys());
// Handle connection timeout every 10 milliseconds approximately.
long currentTimeNanos = System.nanoTime();
processConnectTimeout(selector.keys(), currentTimeNanos);
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connection attempts are made in a one-by-one manner
// instead of concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown ||
bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
synchronized (startStopLock) {
if (taskQueue.isEmpty() && selector.keys().isEmpty()) {
started = false;
try {
selector.close();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a selector.", e);
}
} finally {
this.selector = null;
}
break;
} else {
shutdown = false;
}
}
} else {
// Give one more second.
shutdown = true;
}
} else {
shutdown = false;
}
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn(
"Unexpected exception in the selector loop.", t);
}
// Prevent possible consecutive immediate failures.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
@Override
protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
return new RegisterTask(this, (NioClientSocketChannel) channel);
}
private void processTaskQueue() {
for (;;) {
final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
@Override
protected void process(Selector selector) throws IOException {
processSelectedKeys(selector.selectedKeys());
task.run();
}
// Handle connection timeout every 10 milliseconds approximately.
long currentTimeNanos = System.nanoTime();
processConnectTimeout(selector.keys(), currentTimeNanos);
}
private static void processSelectedKeys(Set<SelectionKey> selectedKeys) {
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
@ -385,82 +154,13 @@ public final class NioClientBoss implements Boss {
}
}
private static void close(SelectionKey k) {
@Override
protected void close(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
ch.worker.close(ch, succeededFuture(ch));
}
public void rebuildSelector() {
if (Thread.currentThread() != thread) {
Selector selector = this.selector;
if (selector != null) {
taskQueue.add(new Runnable() {
public void run() {
rebuildSelector();
}
});
selector.wakeup();
}
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = Selector.open();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, key.attachment());
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector,", e);
NioClientSocketChannel ch = (NioClientSocketChannel) key.attachment();
ch.worker.close(ch, succeededFuture(ch));
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
}
private static final class RegisterTask implements Runnable {
private final class RegisterTask implements Runnable {
private final NioClientBoss boss;
private final NioClientSocketChannel channel;
@ -470,6 +170,13 @@ public final class NioClientBoss implements Boss {
}
public void run() {
int timeout = channel.getConfig().getConnectTimeoutMillis();
if (timeout > 0) {
if (!channel.isConnected()) {
channel.timoutTimer = timer.newTimeout(wakeupTask,
timeout, TimeUnit.MILLISECONDS);
}
}
try {
channel.channel.register(
boss.selector, SelectionKey.OP_CONNECT, channel);

View File

@ -27,6 +27,7 @@ import java.util.concurrent.Executor;
public class NioClientBossPool extends AbstractNioBossPool<NioClientBoss> {
private final ThreadNameDeterminer determiner;
private final Timer timer;
private boolean stopTimer;
/**
* Create a new instance
@ -52,6 +53,7 @@ public class NioClientBossPool extends AbstractNioBossPool<NioClientBoss> {
*/
public NioClientBossPool(Executor bossExecutor, int bossCount) {
this(bossExecutor, bossCount, new HashedWheelTimer(), null);
stopTimer = true;
}
@Override
@ -59,6 +61,14 @@ public class NioClientBossPool extends AbstractNioBossPool<NioClientBoss> {
return new NioClientBoss(executor, timer, determiner);
}
@Override
public void shutdown() {
super.shutdown();
if (stopTimer) {
timer.stop();
}
}
@Override
public void releaseExternalResources() {
super.releaseExternalResources();

View File

@ -87,6 +87,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
private final BossPool<NioClientBoss> bossPool;
private final WorkerPool<NioWorker> workerPool;
private final NioClientSocketPipelineSink sink;
private boolean releasePools;
/**
* Creates a new {@link NioClientSocketChannelFactory} which uses {@link Executors#newCachedThreadPool()}
@ -96,6 +97,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
*/
public NioClientSocketChannelFactory() {
this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
releasePools = true;
}
/**
@ -211,7 +213,20 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
return new NioClientSocketChannel(this, pipeline, sink, workerPool.nextWorker());
}
public void shutdown() {
bossPool.shutdown();
workerPool.shutdown();
if (releasePools) {
releasePools();
}
}
public void releaseExternalResources() {
shutdown();
releasePools();
}
private void releasePools() {
if (bossPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) bossPool).releaseExternalResources();
}

View File

@ -116,7 +116,7 @@ class NioClientSocketPipelineSink extends AbstractNioChannelSink {
});
cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
channel.connectFuture = cf;
nextBoss().register(channel);
nextBoss().register(channel, cf);
}
} catch (Throwable t) {

View File

@ -43,7 +43,7 @@ import static org.jboss.netty.channel.Channels.*;
/**
* Provides an NIO based {@link org.jboss.netty.channel.socket.DatagramChannel}.
*/
public final class NioDatagramChannel extends AbstractNioChannel<DatagramChannel>
public class NioDatagramChannel extends AbstractNioChannel<DatagramChannel>
implements org.jboss.netty.channel.socket.DatagramChannel {
/**

View File

@ -81,6 +81,7 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
private final NioDatagramPipelineSink sink;
private final WorkerPool<NioDatagramWorker> workerPool;
private final InternetProtocolFamily family;
private boolean releasePool;
/**
* Create a new {@link NioDatagramChannelFactory} with a {@link Executors#newCachedThreadPool()}
@ -91,7 +92,7 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
* See {@link #NioDatagramChannelFactory(Executor)}
*/
public NioDatagramChannelFactory() {
this(Executors.newCachedThreadPool(), null);
this((InternetProtocolFamily) null);
}
/**
@ -100,7 +101,10 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
* See {@link #NioDatagramChannelFactory(Executor)}
*/
public NioDatagramChannelFactory(InternetProtocolFamily family) {
this(Executors.newCachedThreadPool(), family);
workerPool = new NioDatagramWorkerPool(Executors.newCachedThreadPool(), SelectorUtil.DEFAULT_IO_THREADS);
this.family = family;
sink = new NioDatagramPipelineSink(workerPool);
releasePool = true;
}
/**
@ -203,7 +207,19 @@ public class NioDatagramChannelFactory implements DatagramChannelFactory {
return new NioDatagramChannel(this, pipeline, sink, sink.nextWorker(), family);
}
public void shutdown() {
workerPool.shutdown();
if (releasePool) {
releasePool();
}
}
public void releaseExternalResources() {
shutdown();
releasePool();
}
private void releasePool() {
if (workerPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) workerPool).releaseExternalResources();
}

View File

@ -17,6 +17,7 @@ package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.MessageEvent;
@ -108,26 +109,13 @@ public class NioDatagramWorker extends AbstractNioWorker {
return true;
}
@Override
public void releaseExternalResources() {
super.releaseExternalResources();
bufferAllocator.releaseExternalResources();
}
@Override
protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
final Thread workerThread = thread;
if (workerThread == null || Thread.currentThread() != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
// "add" the channels writeTask to the writeTaskQueue.
taskQueue.add(channel.writeTask);
}
final Selector selector = this.selector;
if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
registerTask(channel.writeTask);
}
return true;
}
@ -159,7 +147,7 @@ public class NioDatagramWorker extends AbstractNioWorker {
}
@Override
protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
return new ChannelRegistionTask((NioDatagramChannel) channel, future);
}
@ -350,4 +338,9 @@ public class NioDatagramWorker extends AbstractNioWorker {
fireWriteComplete(channel, writtenBytes);
}
@Override
public void run() {
super.run();
bufferAllocator.releaseExternalResources();
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
public interface NioSelector extends Runnable {
void register(Channel channel, ChannelFuture future);
/**
* Replaces the current {@link java.nio.channels.Selector} with a
* new {@link java.nio.channels.Selector} to work around the infamous epoll 100% CPU
* bug.
*/
void rebuildSelector();
void shutdown();
}

View File

@ -0,0 +1,31 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
public interface NioSelectorPool {
/**
* Replaces the current {@link java.nio.channels.Selector}s of the {@link Boss}es with new
* {@link java.nio.channels.Selector}s to work around the infamous epoll 100% CPU bug.
*/
void rebuildSelectors();
/**
* Shutdown the {@link NioSelectorPool} and all internal created resources
*/
void shutdown();
}

View File

@ -15,15 +15,12 @@
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.jboss.netty.util.internal.DeadLockProofWorker;
import java.io.IOException;
import java.net.SocketAddress;
@ -33,102 +30,34 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.jboss.netty.channel.Channels.*;
/**
* Boss implementation which handles accepting of new connections
*/
public final class NioServerBoss implements Boss {
private static final AtomicInteger nextId = new AtomicInteger();
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioServerBoss.class);
private final int id = nextId.incrementAndGet();
private volatile Selector selector;
private volatile Thread thread;
private final Executor bossExecutor;
/**
* Queue of channel registration tasks.
*/
private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
/**
* Monitor object used to synchronize selector open/close.
*/
private final Object startStopLock = new Object();
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeone for
* the select method and the select method will block for that time unless
* waken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
public final class NioServerBoss extends AbstractNioSelector implements Boss {
NioServerBoss(Executor bossExecutor) {
this(bossExecutor, null);
super(bossExecutor);
}
NioServerBoss(Executor bossExecutor, ThreadNameDeterminer determiner) {
this.bossExecutor = bossExecutor;
openSelector(determiner);
super(bossExecutor, determiner);
}
void bind(final NioServerSocketChannel channel, final ChannelFuture future,
final SocketAddress localAddress) {
synchronized (startStopLock) {
if (selector == null) {
// the selector was null this means the Worker has already been shutdown.
throw new RejectedExecutionException("Worker has already been shutdown");
}
registerTask(new RegisterTask(channel, future, localAddress));
}
boolean offered = taskQueue.offer(new Runnable() {
public void run() {
boolean bound = false;
boolean registered = false;
try {
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);
registered = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!registered && bound) {
close(channel, future);
}
}
}
});
assert offered;
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
@Override
protected void close(SelectionKey k) {
NioServerSocketChannel ch = (NioServerSocketChannel) k.attachment();
close(ch, succeededFuture(ch));
}
void close(NioServerSocketChannel channel, ChannelFuture future) {
@ -136,7 +65,7 @@ public final class NioServerBoss implements Boss {
try {
channel.socket.close();
cancelledKeys ++;
increaseCancelledKeys();
if (channel.setClosed()) {
future.setSuccess();
@ -154,144 +83,9 @@ public final class NioServerBoss implements Boss {
}
}
private void openSelector(ThreadNameDeterminer determiner) {
try {
selector = Selector.open();
} catch (Throwable t) {
throw new ChannelException("Failed to create a selector.", t);
}
// Start the worker thread with the new Selector.
boolean success = false;
try {
DeadLockProofWorker.start(bossExecutor, new ThreadRenamingRunnable(this,
"New I/O server boss #" + id, determiner));
success = true;
} finally {
if (!success) {
// Release the Selector if the execution fails.
try {
selector.close();
} catch (Throwable t) {
logger.warn("Failed to close a selector.", t);
}
selector = null;
// The method will return to the caller at this point.
}
}
assert selector != null && selector.isOpen();
}
public void run() {
thread = Thread.currentThread();
boolean shutdown = false;
for (;;) {
wakenUp.set(false);
try {
// Just do a blocking select without any timeout
// as this thread does not execute anything else.
selector.select();
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
processTaskQueue();
selector = this.selector; // processTaskQueue() can call rebuildSelector()
processSelectedKeys(selector.selectedKeys());
// Exit the loop when there's nothing to handle.
// The shutdown flag is used to delay the shutdown of this
// loop to avoid excessive Selector creation when
// connections are registered in a one-by-one manner instead of
// concurrent manner.
if (selector.keys().isEmpty()) {
if (shutdown || bossExecutor instanceof ExecutorService &&
((ExecutorService) bossExecutor).isShutdown()) {
synchronized (startStopLock) {
if (selector.keys().isEmpty()) {
try {
selector.close();
} catch (IOException e) {
logger.warn(
"Failed to close a selector.", e);
} finally {
selector = null;
}
break;
} else {
shutdown = false;
}
}
}
} else {
shutdown = false;
}
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to accept a connection.", e);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
// Ignore
}
}
}
}
private void processTaskQueue() throws IOException {
for (;;) {
final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
private boolean cleanUpCancelledKeys() throws IOException {
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
selector.selectNow();
return true;
}
return false;
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
@Override
protected void process(Selector selector) {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
@ -362,73 +156,56 @@ public final class NioServerBoss implements Boss {
}
}
public void rebuildSelector() {
if (Thread.currentThread() != thread) {
Selector selector = this.selector;
if (selector != null) {
taskQueue.add(new Runnable() {
public void run() {
rebuildSelector();
}
});
selector.wakeup();
}
return;
@Override
protected int select(Selector selector) throws IOException {
// Just do a blocking select without any timeout
// as this thread does not execute anything else.
return selector.select();
}
@Override
protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
return new ThreadRenamingRunnable(this,
"New I/O server boss #" + id, determiner);
}
@Override
protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
return new RegisterTask((NioServerSocketChannel) channel, future, null);
}
private final class RegisterTask implements Runnable {
private final NioServerSocketChannel channel;
private final ChannelFuture future;
private final SocketAddress localAddress;
public RegisterTask(final NioServerSocketChannel channel, final ChannelFuture future,
final SocketAddress localAddress) {
this.channel = channel;
this.future = future;
this.localAddress = localAddress;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = Selector.open();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
public void run() {
boolean bound = false;
boolean registered = false;
try {
for (SelectionKey key: oldSelector.keys()) {
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, key.attachment());
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector,", e);
NioServerSocketChannel ch = (NioServerSocketChannel) key.attachment();
close(ch, succeededFuture(ch));
}
future.setSuccess();
fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);
registered = true;
} catch (Throwable t) {
future.setFailure(t);
fireExceptionCaught(channel, t);
} finally {
if (!registered && bound) {
close(channel, future);
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
}
}

View File

@ -86,6 +86,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
private final WorkerPool<NioWorker> workerPool;
private final NioServerSocketPipelineSink sink;
private final BossPool<NioServerBoss> bossPool;
private boolean releasePools;
/**
* Create a new {@link NioServerSocketChannelFactory} using {@link Executors#newCachedThreadPool()}
@ -95,6 +96,7 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
*/
public NioServerSocketChannelFactory() {
this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
releasePools = true;
}
/**
@ -194,7 +196,6 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
if (workerPool == null) {
throw new NullPointerException("workerPool");
}
this.bossPool = bossPool;
this.workerPool = workerPool;
sink = new NioServerSocketPipelineSink();
@ -204,7 +205,20 @@ public class NioServerSocketChannelFactory implements ServerSocketChannelFactory
return new NioServerSocketChannel(this, pipeline, sink, bossPool.nextBoss(), workerPool);
}
public void shutdown() {
bossPool.shutdown();
workerPool.shutdown();
if (releasePools) {
releasePools();
}
}
public void releaseExternalResources() {
shutdown();
releasePools();
}
private void releasePools() {
if (bossPool instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) bossPool).releaseExternalResources();
}

View File

@ -37,7 +37,7 @@ class NioServerSocketPipelineSink extends AbstractNioChannelSink {
}
}
private void handleServerSocket(ChannelEvent e) {
private static void handleServerSocket(ChannelEvent e) {
if (!(e instanceof ChannelStateEvent)) {
return;
}

View File

@ -17,6 +17,7 @@ package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
@ -27,7 +28,6 @@ import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
@ -103,27 +103,7 @@ public class NioWorker extends AbstractNioWorker {
final Thread workerThread = thread;
if (currentThread != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
taskQueue.add(channel.writeTask);
}
if (!(channel instanceof NioAcceptedSocketChannel) ||
((NioAcceptedSocketChannel) channel).bossThread != currentThread) {
final Selector workerSelector = selector;
if (workerSelector != null) {
if (wakenUp.compareAndSet(false, true)) {
workerSelector.wakeup();
}
}
} else {
// A write request can be made from an acceptor thread (boss)
// when a user attempted to write something in:
//
// * channelOpen()
// * channelBound()
// * channelConnected().
//
// In this case, there's no need to wake up the selector because
// the channel is not even registered yet at this moment.
registerTask(channel.writeTask);
}
return true;
@ -133,13 +113,7 @@ public class NioWorker extends AbstractNioWorker {
}
@Override
public void releaseExternalResources() {
super.releaseExternalResources();
recvBufferPool.releaseExternalResources();
}
@Override
protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
boolean server = !(channel instanceof NioClientSocketChannel);
return new RegisterTask((NioSocketChannel) channel, future, server);
}
@ -198,4 +172,10 @@ public class NioWorker extends AbstractNioWorker {
}
}
}
@Override
public void run() {
super.run();
recvBufferPool.releaseExternalResources();
}
}

View File

@ -45,8 +45,13 @@ public final class ShareableWorkerPool<E extends Worker> implements WorkerPool<E
* Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore
*/
public void destroy() {
wrapped.shutdown();
if (wrapped instanceof ExternalResourceReleasable) {
((ExternalResourceReleasable) wrapped).releaseExternalResources();
}
}
public void shutdown() {
// do nothing
}
}

View File

@ -18,13 +18,11 @@ package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.socket.Worker;
import java.nio.channels.Selector;
/**
* The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand
*
*/
public interface WorkerPool<E extends Worker> {
public interface WorkerPool<E extends Worker> extends NioSelectorPool {
/**
* Return the next {@link Worker} to use
@ -32,10 +30,4 @@ public interface WorkerPool<E extends Worker> {
* @return worker
*/
E nextWorker();
/**
* Replaces the current {@link Selector}s of the {@link Worker}s with new {@link Selector}s to work around the
* infamous epoll 100% CPU bug.
*/
void rebuildSelectors();
}

View File

@ -53,7 +53,6 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
public void run() {
thread = channel.workerThread = Thread.currentThread();
while (channel.isOpen()) {
synchronized (channel.interestOpsLock) {
while (!channel.isReadable()) {
@ -106,10 +105,6 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
processEventQueue();
}
public void rebuildSelector() {
// OIO has no selector.
}
static boolean isIoThread(AbstractOioChannel channel) {
return Thread.currentThread() == channel.workerThread;
}

View File

@ -76,6 +76,7 @@ public class OioClientSocketChannelFactory implements ClientSocketChannelFactory
private final Executor workerExecutor;
final OioClientSocketPipelineSink sink;
private boolean shutdownExecutor;
/**
* Creates a new instance with a {@link Executors#newCachedThreadPool()} as worker executor.
@ -84,6 +85,7 @@ public class OioClientSocketChannelFactory implements ClientSocketChannelFactory
*/
public OioClientSocketChannelFactory() {
this(Executors.newCachedThreadPool());
shutdownExecutor = true;
}
/**
@ -104,7 +106,14 @@ public class OioClientSocketChannelFactory implements ClientSocketChannelFactory
return new OioClientSocketChannel(this, pipeline, sink);
}
public void shutdown() {
if (shutdownExecutor) {
ExecutorUtil.terminate(workerExecutor);
}
}
public void releaseExternalResources() {
shutdown();
ExecutorUtil.terminate(workerExecutor);
}
}

View File

@ -75,6 +75,7 @@ public class OioDatagramChannelFactory implements DatagramChannelFactory {
private final Executor workerExecutor;
final OioDatagramPipelineSink sink;
private boolean shutdownExecutor;
/**
* Creates a new instance with a {@link Executors#newCachedThreadPool()}
@ -83,6 +84,7 @@ public class OioDatagramChannelFactory implements DatagramChannelFactory {
*/
public OioDatagramChannelFactory() {
this(Executors.newCachedThreadPool());
shutdownExecutor = true;
}
/**
@ -103,7 +105,14 @@ public class OioDatagramChannelFactory implements DatagramChannelFactory {
return new OioDatagramChannel(this, pipeline, sink);
}
public void shutdown() {
if (shutdownExecutor) {
ExecutorUtil.terminate(workerExecutor);
}
}
public void releaseExternalResources() {
shutdown();
ExecutorUtil.terminate(workerExecutor);
}
}

View File

@ -89,6 +89,7 @@ public class OioServerSocketChannelFactory implements ServerSocketChannelFactory
final Executor bossExecutor;
private final Executor workerExecutor;
private final ChannelSink sink;
private boolean shutdownExecutor;
/**
* Create a new {@link OioServerSocketChannelFactory} with a {@link Executors#newCachedThreadPool()}
@ -98,6 +99,7 @@ public class OioServerSocketChannelFactory implements ServerSocketChannelFactory
*/
public OioServerSocketChannelFactory() {
this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
this.shutdownExecutor = true;
}
/**
@ -125,7 +127,14 @@ public class OioServerSocketChannelFactory implements ServerSocketChannelFactory
return new OioServerSocketChannel(this, pipeline, sink);
}
public void shutdown() {
if (shutdownExecutor) {
ExecutorUtil.terminate(workerExecutor);
}
}
public void releaseExternalResources() {
ExecutorUtil.terminate(bossExecutor, workerExecutor);
shutdown();
ExecutorUtil.terminate(workerExecutor);
}
}

View File

@ -35,4 +35,8 @@ final class EmbeddedChannelFactory implements ChannelFactory {
public void releaseExternalResources() {
// No external resources
}
public void shutdown() {
// Nothing to shutdown
}
}

View File

@ -22,18 +22,15 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipelineException;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.util.DummyHandler;
import org.jboss.netty.util.TestUtil;
import org.jboss.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -42,43 +39,32 @@ import org.junit.Test;
*/
public abstract class AbstractSocketClientBootstrapTest {
private static ExecutorService executor;
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor);
@Test(timeout = 10000)
public void testFailedConnectionAttempt() throws Exception {
ClientBootstrap bootstrap = new ClientBootstrap();
bootstrap.setFactory(newClientSocketChannelFactory(executor));
bootstrap.setFactory(newClientSocketChannelFactory(Executors.newCachedThreadPool()));
bootstrap.getPipeline().addLast("dummy", new DummyHandler());
bootstrap.setOption("remoteAddress", new InetSocketAddress("255.255.255.255", 1));
ChannelFuture future = bootstrap.connect();
future.awaitUninterruptibly();
assertFalse(future.isSuccess());
assertTrue(future.getCause() instanceof IOException);
bootstrap.releaseExternalResources();
}
@Test(timeout = 10000)
public void testSuccessfulConnectionAttempt() throws Throwable {
ClientBootstrap bootstrap =
new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool()));
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(0));
try {
serverSocket.configureBlocking(false);
ClientBootstrap bootstrap =
new ClientBootstrap(newClientSocketChannelFactory(executor));
bootstrap.getPipeline().addLast("dummy", new DummyHandler());
bootstrap.setOption(
"remoteAddress",
@ -102,19 +88,22 @@ public abstract class AbstractSocketClientBootstrapTest {
} catch (IOException e) {
// Ignore.
}
bootstrap.shutdown();
bootstrap.releaseExternalResources();
}
}
@Test(timeout = 10000)
public void testSuccessfulConnectionAttemptWithLocalAddress() throws Throwable {
ClientBootstrap bootstrap =
new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool()));
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(0));
try {
serverSocket.configureBlocking(false);
ClientBootstrap bootstrap =
new ClientBootstrap(newClientSocketChannelFactory(executor));
bootstrap.getPipeline().addLast("dummy", new DummyHandler());
bootstrap.setOption(
@ -140,6 +129,8 @@ public abstract class AbstractSocketClientBootstrapTest {
} catch (IOException e) {
// Ignore.
}
bootstrap.shutdown();
bootstrap.releaseExternalResources();
}
}
@ -152,7 +143,10 @@ public abstract class AbstractSocketClientBootstrapTest {
expect(pipelineFactory.getPipeline()).andThrow(new ChannelPipelineException());
replay(pipelineFactory);
bootstrap.connect(new InetSocketAddress(TestUtil.getLocalHost(), 1));
ChannelFuture future = bootstrap.connect(new InetSocketAddress(TestUtil.getLocalHost(), 1));
future.awaitUninterruptibly();
bootstrap.shutdown();
bootstrap.releaseExternalResources();
}
@Test(expected = IllegalStateException.class)

View File

@ -23,7 +23,6 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.channel.Channel;
@ -38,9 +37,6 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.SocketChannelConfig;
import org.jboss.netty.util.DummyHandler;
import org.jboss.netty.util.TestUtil;
import org.jboss.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -80,38 +76,28 @@ public abstract class AbstractSocketServerBootstrapTest {
}
}
private static ExecutorService executor;
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor);
@Test(timeout = 30000, expected = ChannelException.class)
public void testFailedBindAttempt() throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap();
final ServerSocket ss = new ServerSocket(0);
final int boundPort = ss.getLocalPort();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.setFactory(newServerSocketChannelFactory(executor));
bootstrap.setFactory(newServerSocketChannelFactory(Executors.newCachedThreadPool()));
bootstrap.setOption("localAddress", new InetSocketAddress(boundPort));
bootstrap.bind().close().awaitUninterruptibly();
} finally {
ss.close();
bootstrap.releaseExternalResources();
}
}
@Test(timeout = 30000)
public void testSuccessfulBindAttempt() throws Exception {
ServerBootstrap bootstrap = new ServerBootstrap(
newServerSocketChannelFactory(executor));
newServerSocketChannelFactory(Executors.newCachedThreadPool()));
bootstrap.setParentHandler(new ParentChannelHandler());
bootstrap.setOption("localAddress", new InetSocketAddress(0));
@ -172,6 +158,7 @@ public abstract class AbstractSocketServerBootstrapTest {
// Confirm the received child events.
assertEquals("12", pch.result.toString());
bootstrap.releaseExternalResources();
}
@Test(expected = ChannelPipelineException.class)
@ -184,6 +171,7 @@ public abstract class AbstractSocketServerBootstrapTest {
replay(pipelineFactory);
bootstrap.connect(new InetSocketAddress(TestUtil.getLocalHost(), 1));
bootstrap.releaseExternalResources();
}
@Test(expected = IllegalStateException.class)

View File

@ -60,6 +60,7 @@ public class BootstrapTest {
} catch (IllegalStateException e) {
// Success.
}
b.releaseExternalResources();
}
@Test(expected = NullPointerException.class)
@ -93,6 +94,7 @@ public class BootstrapTest {
ChannelPipelineFactory oldPipelineFactory = b.getPipelineFactory();
b.setPipeline(createMock(ChannelPipeline.class));
assertNotSame(oldPipelineFactory, b.getPipelineFactory());
b.releaseExternalResources();
}
@Test(expected = IllegalStateException.class)
@ -100,6 +102,7 @@ public class BootstrapTest {
Bootstrap b = new Bootstrap();
b.setPipelineFactory(createMock(ChannelPipelineFactory.class));
b.getPipeline();
b.releaseExternalResources();
}
@Test(expected = IllegalStateException.class)
@ -107,6 +110,7 @@ public class BootstrapTest {
Bootstrap b = new Bootstrap();
b.setPipelineFactory(createMock(ChannelPipelineFactory.class));
b.getPipelineAsMap();
b.releaseExternalResources();
}
@Test(expected = NullPointerException.class)
@ -145,6 +149,7 @@ public class BootstrapTest {
assertSame(p.get("d"), e.getValue());
assertFalse(m.hasNext());
b.releaseExternalResources();
}
@Test(expected = IllegalArgumentException.class)
@ -157,6 +162,7 @@ public class BootstrapTest {
Bootstrap b = new Bootstrap();
b.setPipelineAsMap(m);
b.releaseExternalResources();
}
@Test
@ -191,6 +197,7 @@ public class BootstrapTest {
} catch (NoSuchElementException e) {
// Success.
}
b.releaseExternalResources();
}
@Test
@ -210,6 +217,7 @@ public class BootstrapTest {
assertEquals("x", o.get("s"));
assertEquals(true, o.get("b"));
assertEquals(42, o.get("i"));
b.releaseExternalResources();
}
@Test
@ -229,6 +237,7 @@ public class BootstrapTest {
assertNotSame(o1, o2);
assertEquals(o1, o2);
b.releaseExternalResources();
}
@Test
@ -241,6 +250,7 @@ public class BootstrapTest {
b.setOption("s", null);
assertNull(b.getOption("s"));
assertTrue(b.getOptions().isEmpty());
b.releaseExternalResources();
}
@Test(expected = NullPointerException.class)

View File

@ -24,7 +24,6 @@ import java.net.SocketException;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -36,34 +35,22 @@ import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.util.TestUtil;
import org.jboss.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public abstract class AbstractDatagramMulticastTest {
private static ExecutorService executor;
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract DatagramChannelFactory newServerSocketChannelFactory(Executor executor);
protected abstract DatagramChannelFactory newClientSocketChannelFactory(Executor executor);
@Test
public void testMulticast() throws Throwable {
ConnectionlessBootstrap sb = new ConnectionlessBootstrap(newServerSocketChannelFactory(executor));
ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor));
ConnectionlessBootstrap sb = new ConnectionlessBootstrap(
newServerSocketChannelFactory(Executors.newCachedThreadPool()));
ConnectionlessBootstrap cb = new ConnectionlessBootstrap(
newClientSocketChannelFactory(Executors.newCachedThreadPool()));
DatagramChannel sc = null;
DatagramChannel cc = null;
try {

View File

@ -38,26 +38,16 @@ import org.junit.Test;
public abstract class AbstractDatagramTest {
private static ExecutorService executor;
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract DatagramChannelFactory newServerSocketChannelFactory(Executor executor);
protected abstract DatagramChannelFactory newClientSocketChannelFactory(Executor executor);
@Test
public void testSimpleSend() throws Throwable {
ConnectionlessBootstrap sb = new ConnectionlessBootstrap(newServerSocketChannelFactory(executor));
ConnectionlessBootstrap cb = new ConnectionlessBootstrap(newClientSocketChannelFactory(executor));
ConnectionlessBootstrap sb = new ConnectionlessBootstrap(
newServerSocketChannelFactory(Executors.newCachedThreadPool()));
ConnectionlessBootstrap cb = new ConnectionlessBootstrap(
newClientSocketChannelFactory(Executors.newCachedThreadPool()));
final CountDownLatch latch = new CountDownLatch(1);
sb.getPipeline().addFirst("handler", new SimpleChannelUpstreamHandler() {
@ -83,6 +73,9 @@ public abstract class AbstractDatagramTest {
assertTrue(latch.await(10, TimeUnit.SECONDS));
sc.close().awaitUninterruptibly();
cc.close().awaitUninterruptibly();
cb.shutdown();
sb.shutdown();
cb.releaseExternalResources();
sb.releaseExternalResources();
}
}

View File

@ -124,6 +124,11 @@ public abstract class AbstractSocketEchoTest {
sh.channel.close().awaitUninterruptibly();
ch.channel.close().awaitUninterruptibly();
sc.close().awaitUninterruptibly();
cb.shutdown();
sb.shutdown();
cb.releaseExternalResources();
sb.releaseExternalResources();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
@ -137,6 +142,7 @@ public abstract class AbstractSocketEchoTest {
if (ch.exception.get() != null) {
throw ch.exception.get();
}
}
private static class EchoHandler extends SimpleChannelUpstreamHandler {

View File

@ -74,7 +74,7 @@ public class NioClientSocketShutdownTimeTest {
// Ignore.
}
}
b.releaseExternalResources();
long shutdownTime = stopTime - startTime;
assertTrue("Shutdown takes too long: " + shutdownTime + " ms", shutdownTime < 500);
}

View File

@ -32,7 +32,7 @@ public abstract class AbstractNioWorkerTest {
@Test
public void testShutdownWorkerThrowsException() throws InterruptedException {
AbstractNioChannel<?> mockChannel = createMock(AbstractNioChannel.class);
AbstractNioChannel<?> mockChannel = createMockChannel();
replay(mockChannel);
ChannelFuture mockFuture = createMock(ChannelFuture.class);
@ -41,8 +41,7 @@ public abstract class AbstractNioWorkerTest {
ExecutorService executor = Executors.newCachedThreadPool();
AbstractNioWorker worker = createWorker(executor);
executor.shutdownNow();
worker.shutdown();
// give the Selector time to detect the shutdown
Thread.sleep(SelectorUtil.DEFAULT_SELECT_TIMEOUT * 10);
@ -57,4 +56,6 @@ public abstract class AbstractNioWorkerTest {
}
protected abstract AbstractNioWorker createWorker(Executor executor);
protected abstract AbstractNioChannel<?> createMockChannel();
}

View File

@ -15,6 +15,8 @@
*/
package org.jboss.netty.channel.socket.nio;
import org.easymock.EasyMock;
import java.util.concurrent.Executor;
public class NioDatagramWorkerTest extends AbstractNioWorkerTest {
@ -23,4 +25,9 @@ public class NioDatagramWorkerTest extends AbstractNioWorkerTest {
protected AbstractNioWorker createWorker(Executor executor) {
return new NioDatagramWorker(executor);
}
@Override
protected AbstractNioChannel<?> createMockChannel() {
return EasyMock.createMock(NioDatagramChannel.class);
}
}

View File

@ -15,6 +15,8 @@
*/
package org.jboss.netty.channel.socket.nio;
import org.easymock.EasyMock;
import java.util.concurrent.Executor;
public class NioWorkerTest extends AbstractNioWorkerTest {
@ -23,4 +25,9 @@ public class NioWorkerTest extends AbstractNioWorkerTest {
protected AbstractNioWorker createWorker(Executor executor) {
return new NioWorker(executor);
}
@Override
protected AbstractNioChannel<?> createMockChannel() {
return EasyMock.createMock(NioSocketChannel.class);
}
}

View File

@ -123,6 +123,11 @@ public abstract class AbstractSocketFixedLengthEchoTest {
sh.channel.close().awaitUninterruptibly();
ch.channel.close().awaitUninterruptibly();
sc.close().awaitUninterruptibly();
cb.shutdown();
sb.shutdown();
cb.releaseExternalResources();
sb.releaseExternalResources();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
@ -36,9 +35,6 @@ import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.util.TestUtil;
import org.jboss.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public abstract class AbstractSocketCompatibleObjectStreamEchoTest {
@ -46,7 +42,6 @@ public abstract class AbstractSocketCompatibleObjectStreamEchoTest {
static final Random random = new Random();
static final String[] data = new String[1024];
private static ExecutorService executor;
static {
for (int i = 0; i < data.length; i ++) {
@ -60,15 +55,6 @@ public abstract class AbstractSocketCompatibleObjectStreamEchoTest {
}
}
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor);
protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor);
@ -76,8 +62,8 @@ public abstract class AbstractSocketCompatibleObjectStreamEchoTest {
@Test
@SuppressWarnings("deprecation")
public void testCompatibleObjectEcho() throws Throwable {
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor));
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool()));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool()));
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
@ -134,6 +120,10 @@ public abstract class AbstractSocketCompatibleObjectStreamEchoTest {
sh.channel.close().awaitUninterruptibly();
ch.channel.close().awaitUninterruptibly();
sc.close().awaitUninterruptibly();
cb.shutdown();
sb.shutdown();
cb.releaseExternalResources();
sb.releaseExternalResources();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
@ -36,9 +35,6 @@ import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.util.TestUtil;
import org.jboss.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public abstract class AbstractSocketObjectStreamEchoTest {
@ -46,8 +42,6 @@ public abstract class AbstractSocketObjectStreamEchoTest {
static final Random random = new Random();
static final String[] data = new String[1024];
private static ExecutorService executor;
static {
for (int i = 0; i < data.length; i ++) {
int eLen = random.nextInt(512);
@ -60,23 +54,14 @@ public abstract class AbstractSocketObjectStreamEchoTest {
}
}
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor);
protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor);
@Test
public void testObjectEcho() throws Throwable {
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor));
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool()));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool()));
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
@ -135,6 +120,10 @@ public abstract class AbstractSocketObjectStreamEchoTest {
sh.channel.close().awaitUninterruptibly();
ch.channel.close().awaitUninterruptibly();
sc.close().awaitUninterruptibly();
cb.shutdown();
sb.shutdown();
cb.releaseExternalResources();
sb.releaseExternalResources();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
@ -40,9 +39,6 @@ import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.util.TestUtil;
import org.jboss.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public abstract class AbstractSocketSpdyEchoTest {
@ -50,8 +46,6 @@ public abstract class AbstractSocketSpdyEchoTest {
private static final Random random = new Random();
static final int ignoredBytes = 20;
private static ExecutorService executor;
private static ChannelBuffer createFrames(int version) {
int length = version < 3 ? 1176 : 1174;
ChannelBuffer frames = ChannelBuffers.buffer(length);
@ -174,15 +168,6 @@ public abstract class AbstractSocketSpdyEchoTest {
return frames;
}
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor);
protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor);
@ -195,8 +180,8 @@ public abstract class AbstractSocketSpdyEchoTest {
}
private void testSpdyEcho(int version) throws Throwable {
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor));
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool()));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool()));
ChannelBuffer frames = createFrames(version);
@ -236,6 +221,10 @@ public abstract class AbstractSocketSpdyEchoTest {
sh.channel.close().awaitUninterruptibly();
ch.channel.close().awaitUninterruptibly();
sc.close().awaitUninterruptibly();
cb.shutdown();
sb.shutdown();
cb.releaseExternalResources();
sb.releaseExternalResources();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();

View File

@ -49,8 +49,6 @@ public abstract class AbstractSocketStringEchoTest {
static final Random random = new Random();
static final String[] data = new String[1024];
private static ExecutorService executor;
static {
for (int i = 0; i < data.length; i ++) {
int eLen = random.nextInt(512);
@ -63,23 +61,13 @@ public abstract class AbstractSocketStringEchoTest {
}
}
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor);
}
protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor);
protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor);
@Test
public void testStringEcho() throws Throwable {
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor));
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool()));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool()));
EchoHandler sh = new EchoHandler();
EchoHandler ch = new EchoHandler();
@ -139,6 +127,12 @@ public abstract class AbstractSocketStringEchoTest {
sh.channel.close().awaitUninterruptibly();
ch.channel.close().awaitUninterruptibly();
sc.close().awaitUninterruptibly();
cc.close().awaitUninterruptibly();
cb.shutdown();
sb.shutdown();
cb.releaseExternalResources();
sb.releaseExternalResources();
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();

View File

@ -45,9 +45,6 @@ import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.TestUtil;
import org.jboss.netty.util.internal.ExecutorUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public abstract class AbstractSocketSslEchoTest {
@ -57,23 +54,10 @@ public abstract class AbstractSocketSslEchoTest {
private static final Random random = new Random();
static final byte[] data = new byte[1048576];
private static ExecutorService executor;
private static ExecutorService eventExecutor;
static {
random.nextBytes(data);
}
@BeforeClass
public static void init() {
executor = Executors.newCachedThreadPool();
eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(16, 0, 0);
}
@AfterClass
public static void destroy() {
ExecutorUtil.terminate(executor, eventExecutor);
}
protected abstract ChannelFactory newServerSocketChannelFactory(Executor executor);
protected abstract ChannelFactory newClientSocketChannelFactory(Executor executor);
@ -84,8 +68,8 @@ public abstract class AbstractSocketSslEchoTest {
@Test
public void testSslEcho() throws Throwable {
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(executor));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(executor));
ServerBootstrap sb = new ServerBootstrap(newServerSocketChannelFactory(Executors.newCachedThreadPool()));
ClientBootstrap cb = new ClientBootstrap(newClientSocketChannelFactory(Executors.newCachedThreadPool()));
EchoHandler sh = new EchoHandler(true);
EchoHandler ch = new EchoHandler(false);
@ -103,8 +87,9 @@ public abstract class AbstractSocketSslEchoTest {
sb.getPipeline().addLast("handler", sh);
cb.getPipeline().addFirst("ssl", new SslHandler(cse));
cb.getPipeline().addLast("handler", ch);
ExecutorService eventExecutor = null;
if (isExecutorRequired()) {
eventExecutor = new OrderedMemoryAwareThreadPoolExecutor(16, 0, 0);
sb.getPipeline().addFirst("executor", new ExecutionHandler(eventExecutor));
cb.getPipeline().addFirst("executor", new ExecutionHandler(eventExecutor));
}
@ -171,7 +156,14 @@ public abstract class AbstractSocketSslEchoTest {
sh.channel.close().awaitUninterruptibly();
ch.channel.close().awaitUninterruptibly();
sc.close().awaitUninterruptibly();
cb.shutdown();
sb.shutdown();
cb.releaseExternalResources();
sb.releaseExternalResources();
if (eventExecutor != null) {
eventExecutor.shutdown();
}
if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
throw sh.exception.get();
}