[#789] Allow a user to trigger the epoll bug workaround

This commit adds rebuildSelector(s) method to Boss, BossPool, Worker, and WorkerPool.  A user can call rebuildSelector(s) method to initiate the workaround for the infamous epoll 100% CPU bug in Linux based on his or her observations rather than potentially incorrect automatic initiation of the workaround.

Previously, setInterestOps() were executed by a caller thread, which made re-registration of SelectionKeys unsafe.  This commit also ensures setInterestOps() is always executed by an I/O thread.  With this change, we don't need NioProviderMetadata anymore.
This commit is contained in:
Trustin Lee 2012-12-04 19:26:12 +09:00 committed by Norman Maurer
parent d4a7722cfd
commit 95684d92eb
16 changed files with 355 additions and 724 deletions

View File

@ -16,6 +16,8 @@
package org.jboss.netty.channel.socket; package org.jboss.netty.channel.socket;
import java.nio.channels.Selector;
/** /**
* A {@link Worker} is responsible to dispatch IO operations * A {@link Worker} is responsible to dispatch IO operations
* *
@ -29,5 +31,11 @@ public interface Worker extends Runnable {
* @param task * @param task
* the {@link Runnable} to execute * the {@link Runnable} to execute
*/ */
void executeInIoThread(Runnable task); void executeInIoThread(Runnable task);
/**
* Replaces the current {@link Selector} with a new {@link Selector} to work around the infamous epoll 100% CPU
* bug.
*/
void rebuildSelector();
} }

View File

@ -80,6 +80,12 @@ public abstract class AbstractNioBossPool<E extends Boss>
return (E) bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; return (E) bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
} }
public void rebuildSelectors() {
for (Boss boss: bosses) {
boss.rebuildSelector();
}
}
public void releaseExternalResources() { public void releaseExternalResources() {
ExecutorUtil.terminate(bossExecutor); ExecutorUtil.terminate(bossExecutor);
for (Boss boss: bosses) { for (Boss boss: bosses) {

View File

@ -44,11 +44,6 @@ abstract class AbstractNioChannel<C extends SelectableChannel & WritableByteChan
*/ */
final AbstractNioWorker worker; final AbstractNioWorker worker;
/**
* Monitor object to synchronize access to InterestedOps.
*/
final Object interestOpsLock = new Object();
/** /**
* Monitor object for synchronizing access to the {@link WriteRequestQueue}. * Monitor object for synchronizing access to the {@link WriteRequestQueue}.
*/ */

View File

@ -39,6 +39,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.nio.channels.WritableByteChannel; import java.nio.channels.WritableByteChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
@ -48,8 +49,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.jboss.netty.channel.Channels.*; import static org.jboss.netty.channel.Channels.*;
@ -65,8 +64,6 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
private static final InternalLogger logger = InternalLoggerFactory private static final InternalLogger logger = InternalLoggerFactory
.getInstance(AbstractNioWorker.class); .getInstance(AbstractNioWorker.class);
private static final int CONSTRAINT_LEVEL = NioProviderMetadata.CONSTRAINT_LEVEL;
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
/** /**
@ -94,27 +91,12 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
*/ */
protected final AtomicBoolean wakenUp = new AtomicBoolean(); protected final AtomicBoolean wakenUp = new AtomicBoolean();
/**
* Lock for this workers Selector.
*/
private final ReadWriteLock selectorGuard = new ReentrantReadWriteLock();
/** /**
* Monitor object used to synchronize selector open/close. * Monitor object used to synchronize selector open/close.
*/ */
private final Object startStopLock = new Object(); private final Object startStopLock = new Object();
/** final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
* Queue of channel registration tasks.
*/
private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
/**
* Queue of WriteTasks
*/
protected final Queue<Runnable> writeTaskQueue = new ConcurrentLinkedQueue<Runnable>();
private final Queue<Runnable> eventQueue = new ConcurrentLinkedQueue<Runnable>();
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
@ -136,10 +118,8 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
// the selector was null this means the Worker has already been shutdown. // the selector was null this means the Worker has already been shutdown.
throw new RejectedExecutionException("Worker has already been shutdown"); throw new RejectedExecutionException("Worker has already been shutdown");
} }
Runnable registerTask = createRegisterTask(channel, future);
boolean offered = registerTaskQueue.offer(registerTask); taskQueue.add(createRegisterTask(channel, future));
assert offered;
if (wakenUp.compareAndSet(false, true)) { if (wakenUp.compareAndSet(false, true)) {
selector.wakeup(); selector.wakeup();
@ -147,43 +127,70 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
} }
} }
// Create a new selector and "transfer" all channels from the old public void rebuildSelector() {
// selector to the new one if (Thread.currentThread() != thread) {
private Selector recreateSelector() throws IOException { executeInIoThread(new Runnable() {
Selector newSelector = Selector.open(); public void run() {
Selector selector = this.selector; rebuildSelector();
this.selector = newSelector; }
}, true);
// loop over all the keys that are registered with the old Selector return;
// and register them with the new one
for (SelectionKey key: selector.keys()) {
SelectableChannel ch = key.channel();
int ops = key.interestOps();
Object att = key.attachment();
// cancel the old key
key.cancel();
try {
// register the channel with the new selector now
ch.register(newSelector, ops, att);
} catch (ClosedChannelException e) {
// close channel
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) att;
close(channel, succeededFuture(channel));
}
} }
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = Selector.open();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, key.attachment());
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector,", e);
AbstractNioChannel<?> channel = (AbstractNioChannel<?>) key.attachment();
close(channel, succeededFuture(channel));
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try { try {
// time to close the old selector as everything else is registered to the new one // time to close the old selector as everything else is registered to the new one
selector.close(); oldSelector.close();
} catch (Throwable t) { } catch (Throwable t) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("Failed to close a selector.", t); logger.warn("Failed to close the old Selector.", t);
} }
} }
if (logger.isWarnEnabled()) {
logger.warn("Recreated Selector because of possible jdk epoll(..) bug"); logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
}
return newSelector;
} }
/** /**
@ -230,13 +237,6 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
for (;;) { for (;;) {
wakenUp.set(false); wakenUp.set(false);
if (CONSTRAINT_LEVEL != 0) {
selectorGuard.writeLock().lock();
// This empty synchronization block prevents the selector
// from acquiring its lock.
selectorGuard.writeLock().unlock();
}
try { try {
long beforeSelect = System.nanoTime(); long beforeSelect = System.nanoTime();
int selected = SelectorUtil.select(selector); int selected = SelectorUtil.select(selector);
@ -275,7 +275,8 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
// The selector returned immediately for 10 times in a row, // The selector returned immediately for 10 times in a row,
// so recreate one selector as it seems like we hit the // so recreate one selector as it seems like we hit the
// famous epoll(..) jdk bug. // famous epoll(..) jdk bug.
selector = recreateSelector(); rebuildSelector();
selector = this.selector;
selectReturnsImmediately = 0; selectReturnsImmediately = 0;
wakenupFromLoop = false; wakenupFromLoop = false;
// try to select again // try to select again
@ -322,9 +323,8 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
} }
cancelledKeys = 0; cancelledKeys = 0;
processRegisterTaskQueue(); processTaskQueue();
processEventQueue(); selector = this.selector; // processTaskQueue() can call rebuildSelector()
processWriteTaskQueue();
processSelectedKeys(selector.selectedKeys()); processSelectedKeys(selector.selectedKeys());
// Exit the loop when there's nothing to handle. // Exit the loop when there's nothing to handle.
@ -337,7 +337,7 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) { executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {
synchronized (startStopLock) { synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { if (taskQueue.isEmpty() && selector.keys().isEmpty()) {
try { try {
selector.close(); selector.close();
} catch (IOException e) { } catch (IOException e) {
@ -387,7 +387,7 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
if (!alwaysAsync && Thread.currentThread() == thread) { if (!alwaysAsync && Thread.currentThread() == thread) {
task.run(); task.run();
} else { } else {
eventQueue.offer(task); taskQueue.offer(task);
synchronized (startStopLock) { synchronized (startStopLock) {
// check if the selector was shutdown already or was not started yet. If so execute all // check if the selector was shutdown already or was not started yet. If so execute all
@ -395,7 +395,7 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
if (selector == null) { if (selector == null) {
// execute everything in the event queue as the // execute everything in the event queue as the
for (;;) { for (;;) {
Runnable r = eventQueue.poll(); Runnable r = taskQueue.poll();
if (r == null) { if (r == null) {
break; break;
} }
@ -414,33 +414,9 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
} }
} }
private void processRegisterTaskQueue() throws IOException { private void processTaskQueue() throws IOException {
for (;;) { for (;;) {
final Runnable task = registerTaskQueue.poll(); final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
private void processWriteTaskQueue() throws IOException {
for (;;) {
final Runnable task = writeTaskQueue.poll();
if (task == null) {
break;
}
task.run();
cleanUpCancelledKeys();
}
}
private void processEventQueue() throws IOException {
for (;;) {
final Runnable task = eventQueue.poll();
if (task == null) { if (task == null) {
break; break;
} }
@ -665,15 +641,11 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
return; return;
} }
// interestOps can change at any time and at any thread. int interestOps = channel.getRawInterestOps();
// Acquire a lock to avoid possible race condition. if ((interestOps & SelectionKey.OP_WRITE) == 0) {
synchronized (channel.interestOpsLock) { interestOps |= SelectionKey.OP_WRITE;
int interestOps = channel.getRawInterestOps(); key.interestOps(interestOps);
if ((interestOps & SelectionKey.OP_WRITE) == 0) { channel.setRawInterestOpsNow(interestOps);
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
channel.setRawInterestOpsNow(interestOps);
}
} }
} }
@ -688,15 +660,11 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
return; return;
} }
// interestOps can change at any time and at any thread. int interestOps = channel.getRawInterestOps();
// Acquire a lock to avoid possible race condition. if ((interestOps & SelectionKey.OP_WRITE) != 0) {
synchronized (channel.interestOpsLock) { interestOps &= ~SelectionKey.OP_WRITE;
int interestOps = channel.getRawInterestOps(); key.interestOps(interestOps);
if ((interestOps & SelectionKey.OP_WRITE) != 0) { channel.setRawInterestOpsNow(interestOps);
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
channel.setRawInterestOpsNow(interestOps);
}
} }
} }
@ -803,105 +771,67 @@ abstract class AbstractNioWorker implements Worker, ExternalResourceReleasable {
} }
} }
void setInterestOps(AbstractNioChannel<?> channel, ChannelFuture future, int interestOps) { void setInterestOps(final AbstractNioChannel<?> channel, final ChannelFuture future, final int interestOps) {
boolean changed = false;
boolean iothread = isIoThread(channel); boolean iothread = isIoThread(channel);
if (!iothread) {
channel.getPipeline().execute(new Runnable() {
public void run() {
setInterestOps(channel, future, interestOps);
}
});
return;
}
boolean changed = false;
try { try {
// interestOps can change at any time and at any thread. Selector selector = this.selector;
// Acquire a lock to avoid possible race condition. SelectionKey key = channel.channel.keyFor(selector);
synchronized (channel.interestOpsLock) {
Selector selector = this.selector;
SelectionKey key = channel.channel.keyFor(selector);
// Override OP_WRITE flag - a user cannot change this flag. // Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE; int newInterestOps = interestOps & ~Channel.OP_WRITE | channel.getRawInterestOps() & Channel.OP_WRITE;
interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
if (key == null || selector == null) { if (key == null || selector == null) {
if (channel.getRawInterestOps() != interestOps) { if (channel.getRawInterestOps() != newInterestOps) {
changed = true; changed = true;
}
// Not registered to the worker yet.
// Set the rawInterestOps immediately; RegisterTask will pick it up.
channel.setRawInterestOpsNow(interestOps);
future.setSuccess();
if (changed) {
if (iothread) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
}
return;
} }
switch (CONSTRAINT_LEVEL) { // Not registered to the worker yet.
case 0: // Set the rawInterestOps immediately; RegisterTask will pick it up.
if (channel.getRawInterestOps() != interestOps) { channel.setRawInterestOpsNow(newInterestOps);
key.interestOps(interestOps);
if (Thread.currentThread() != thread &&
wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true;
}
break;
case 1:
case 2:
if (channel.getRawInterestOps() != interestOps) {
if (Thread.currentThread() == thread) {
key.interestOps(interestOps);
changed = true;
} else {
selectorGuard.readLock().lock();
try {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
key.interestOps(interestOps);
changed = true;
} finally {
selectorGuard.readLock().unlock();
}
}
}
break;
default:
throw new Error();
}
future.setSuccess();
if (changed) { if (changed) {
channel.setRawInterestOpsNow(interestOps); if (iothread) {
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
} }
return;
}
if (channel.getRawInterestOps() != newInterestOps) {
key.interestOps(newInterestOps);
if (Thread.currentThread() != thread &&
wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
channel.setRawInterestOpsNow(newInterestOps);
} }
future.setSuccess(); future.setSuccess();
if (changed) { if (changed) {
if (iothread) { fireChannelInterestChanged(channel);
fireChannelInterestChanged(channel);
} else {
fireChannelInterestChangedLater(channel);
}
} }
} catch (CancelledKeyException e) { } catch (CancelledKeyException e) {
// setInterestOps() was called on a closed channel. // setInterestOps() was called on a closed channel.
ClosedChannelException cce = new ClosedChannelException(); ClosedChannelException cce = new ClosedChannelException();
future.setFailure(cce); future.setFailure(cce);
if (iothread) { fireExceptionCaught(channel, cce);
fireExceptionCaught(channel, cce);
} else {
fireExceptionCaughtLater(channel, cce);
}
} catch (Throwable t) { } catch (Throwable t) {
future.setFailure(t); future.setFailure(t);
if (iothread) { fireExceptionCaught(channel, t);
fireExceptionCaught(channel, t);
} else {
fireExceptionCaughtLater(channel, t);
}
} }
} }

View File

@ -16,13 +16,13 @@
package org.jboss.netty.channel.socket.nio; package org.jboss.netty.channel.socket.nio;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.channel.socket.Worker; import org.jboss.netty.channel.socket.Worker;
import org.jboss.netty.util.ExternalResourceReleasable; import org.jboss.netty.util.ExternalResourceReleasable;
import org.jboss.netty.util.internal.ExecutorUtil; import org.jboss.netty.util.internal.ExecutorUtil;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s * Abstract base class for {@link WorkerPool} implementations that create the {@link Worker}'s
* up-front and return them in a "fair" fashion when calling {@link #nextWorker()} * up-front and return them in a "fair" fashion when calling {@link #nextWorker()}
@ -72,12 +72,12 @@ public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
/** /**
* Only here for backward compability and will be removed in later releases. Please use * Only here for backward compability and will be removed in later releases. Please use
* {@link #newWorker(java.util.concurrent.Executor)} * {@link #newWorker(Executor)}
* *
* *
* @param executor the {@link Executor} to use * @param executor the {@link Executor} to use
* @return worker the new {@link Worker} * @return worker the new {@link Worker}
* @deprecated use {@link #newWorker(java.util.concurrent.Executor)} * @deprecated use {@link #newWorker(Executor)}
*/ */
@Deprecated @Deprecated
protected E createWorker(Executor executor) { protected E createWorker(Executor executor) {
@ -87,13 +87,14 @@ public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
/** /**
* Create a new {@link Worker} which uses the given {@link Executor} to service IO. * Create a new {@link Worker} which uses the given {@link Executor} to service IO.
* *
* This method will be made abstract in further releases (once {@link #createWorker(java.util.concurrent.Executor)} * This method will be made abstract in further releases (once {@link #createWorker(Executor)}
* was removed). * was removed).
* *
* *
* @param executor the {@link Executor} to use * @param executor the {@link Executor} to use
* @return worker the new {@link Worker} * @return worker the new {@link Worker}
*/ */
@SuppressWarnings("deprecation")
protected E newWorker(Executor executor) { protected E newWorker(Executor executor) {
return createWorker(executor); return createWorker(executor);
} }
@ -103,6 +104,12 @@ public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]; return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
} }
public void rebuildSelectors() {
for (Worker worker: workers) {
worker.rebuildSelector();
}
}
public void releaseExternalResources() { public void releaseExternalResources() {
ExecutorUtil.terminate(workerExecutor); ExecutorUtil.terminate(workerExecutor);
for (AbstractNioWorker worker: workers) { for (AbstractNioWorker worker: workers) {

View File

@ -15,8 +15,15 @@
*/ */
package org.jboss.netty.channel.socket.nio; package org.jboss.netty.channel.socket.nio;
import java.nio.channels.Selector;
/** /**
* Serves the boss tasks like connecting/accepting * Serves the boss tasks like connecting/accepting
*/ */
public interface Boss extends Runnable { public interface Boss extends Runnable {
/**
* Replaces the current {@link Selector} with a new {@link Selector} to work around the infamous epoll 100% CPU
* bug.
*/
void rebuildSelector();
} }

View File

@ -15,6 +15,8 @@
*/ */
package org.jboss.netty.channel.socket.nio; package org.jboss.netty.channel.socket.nio;
import java.nio.channels.Selector;
/** /**
* A Pool that holds {@link Boss} instances * A Pool that holds {@link Boss} instances
*/ */
@ -25,4 +27,9 @@ public interface BossPool<E extends Boss> {
*/ */
E nextBoss(); E nextBoss();
/**
* Replaces the current {@link Selector}s of the {@link Boss}es with new {@link Selector}s to work around the
* infamous epoll 100% CPU bug.
*/
void rebuildSelectors();
} }

View File

@ -33,6 +33,7 @@ import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
@ -58,10 +59,11 @@ public final class NioClientBoss implements Boss {
InternalLoggerFactory.getInstance(NioClientBoss.class); InternalLoggerFactory.getInstance(NioClientBoss.class);
private volatile Selector selector; private volatile Selector selector;
private volatile Thread thread;
private boolean started; private boolean started;
private final AtomicBoolean wakenUp = new AtomicBoolean(); private final AtomicBoolean wakenUp = new AtomicBoolean();
private final Object startStopLock = new Object(); private final Object startStopLock = new Object();
private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>(); private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
private final TimerTask wakeupTask = new TimerTask() { private final TimerTask wakeupTask = new TimerTask() {
public void run(Timeout timeout) throws Exception { public void run(Timeout timeout) throws Exception {
// This is needed to prevent a possible race that can lead to a NPE // This is needed to prevent a possible race that can lead to a NPE
@ -131,7 +133,7 @@ public final class NioClientBoss implements Boss {
assert selector != null && selector.isOpen(); assert selector != null && selector.isOpen();
started = true; started = true;
boolean offered = registerTaskQueue.offer(registerTask); boolean offered = taskQueue.offer(registerTask);
assert offered; assert offered;
} }
int timeout = channel.getConfig().getConnectTimeoutMillis(); int timeout = channel.getConfig().getConnectTimeoutMillis();
@ -147,6 +149,8 @@ public final class NioClientBoss implements Boss {
} }
public void run() { public void run() {
thread = Thread.currentThread();
boolean shutdown = false; boolean shutdown = false;
int selectReturnsImmediately = 0; int selectReturnsImmediately = 0;
@ -195,7 +199,8 @@ public final class NioClientBoss implements Boss {
// The selector returned immediately for 10 times in a row, // The selector returned immediately for 10 times in a row,
// so recreate one selector as it seems like we hit the // so recreate one selector as it seems like we hit the
// famous epoll(..) jdk bug. // famous epoll(..) jdk bug.
selector = recreateSelector(); rebuildSelector();
selector = this.selector;
selectReturnsImmediately = 0; selectReturnsImmediately = 0;
wakenupFromLoop = false; wakenupFromLoop = false;
// try to select again // try to select again
@ -240,7 +245,8 @@ public final class NioClientBoss implements Boss {
} else { } else {
wakenupFromLoop = false; wakenupFromLoop = false;
} }
processRegisterTaskQueue(); processTaskQueue();
selector = this.selector; // processTaskQueue() can call rebuildSelector()
processSelectedKeys(selector.selectedKeys()); processSelectedKeys(selector.selectedKeys());
// Handle connection timeout every 10 milliseconds approximately. // Handle connection timeout every 10 milliseconds approximately.
@ -257,7 +263,7 @@ public final class NioClientBoss implements Boss {
bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) { bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
synchronized (startStopLock) { synchronized (startStopLock) {
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) { if (taskQueue.isEmpty() && selector.keys().isEmpty()) {
started = false; started = false;
try { try {
selector.close(); selector.close();
@ -297,9 +303,9 @@ public final class NioClientBoss implements Boss {
} }
} }
private void processRegisterTaskQueue() { private void processTaskQueue() {
for (;;) { for (;;) {
final Runnable task = registerTaskQueue.poll(); final Runnable task = taskQueue.poll();
if (task == null) { if (task == null) {
break; break;
} }
@ -308,7 +314,7 @@ public final class NioClientBoss implements Boss {
} }
} }
private void processSelectedKeys(Set<SelectionKey> selectedKeys) { private static void processSelectedKeys(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by // check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process. // creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597 // See https://github.com/netty/netty/issues/597
@ -338,7 +344,7 @@ public final class NioClientBoss implements Boss {
} }
} }
private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) { private static void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
ConnectException cause = null; ConnectException cause = null;
for (SelectionKey k: keys) { for (SelectionKey k: keys) {
if (!k.isValid()) { if (!k.isValid()) {
@ -368,7 +374,7 @@ public final class NioClientBoss implements Boss {
} }
} }
private void connect(SelectionKey k) throws IOException { private static void connect(SelectionKey k) throws IOException {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
if (ch.channel.finishConnect()) { if (ch.channel.finishConnect()) {
k.cancel(); k.cancel();
@ -379,48 +385,79 @@ public final class NioClientBoss implements Boss {
} }
} }
private void close(SelectionKey k) { private static void close(SelectionKey k) {
NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
ch.worker.close(ch, succeededFuture(ch)); ch.worker.close(ch, succeededFuture(ch));
} }
// Create a new selector and "transfer" all channels from the old public void rebuildSelector() {
// selector to the new one if (Thread.currentThread() != thread) {
private Selector recreateSelector() throws IOException { Selector selector = this.selector;
Selector newSelector = Selector.open(); if (selector != null) {
Selector selector = this.selector; taskQueue.add(new Runnable() {
this.selector = newSelector; public void run() {
rebuildSelector();
// loop over all the keys that are registered with the old Selector }
// and register them with the new one });
for (SelectionKey key: selector.keys()) { selector.wakeup();
SelectableChannel ch = key.channel();
int ops = key.interestOps();
Object att = key.attachment();
// cancel the old key
key.cancel();
try {
// register the channel with the new selector now
ch.register(newSelector, ops, att);
} catch (ClosedChannelException e) {
// close the Channel if we can't register it
close(key);
} }
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
} }
try {
newSelector = Selector.open();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, key.attachment());
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector,", e);
NioClientSocketChannel ch = (NioClientSocketChannel) key.attachment();
ch.worker.close(ch, succeededFuture(ch));
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try { try {
// time to close the old selector as everything else is registered to the new one // time to close the old selector as everything else is registered to the new one
selector.close(); oldSelector.close();
} catch (Throwable t) { } catch (Throwable t) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("Failed to close a selector.", t); logger.warn("Failed to close the old Selector.", t);
} }
} }
if (logger.isWarnEnabled()) {
logger.warn("Recreated Selector because of possible jdk epoll(..) bug"); logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
}
return newSelector;
} }
private static final class RegisterTask implements Runnable { private static final class RegisterTask implements Runnable {

View File

@ -120,8 +120,7 @@ public class NioDatagramWorker extends AbstractNioWorker {
if (workerThread == null || Thread.currentThread() != workerThread) { if (workerThread == null || Thread.currentThread() != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
// "add" the channels writeTask to the writeTaskQueue. // "add" the channels writeTask to the writeTaskQueue.
boolean offered = writeTaskQueue.offer(channel.writeTask); taskQueue.add(channel.writeTask);
assert offered;
} }
final Selector selector = this.selector; final Selector selector = this.selector;
@ -194,10 +193,9 @@ public class NioDatagramWorker extends AbstractNioWorker {
} }
try { try {
synchronized (channel.interestOpsLock) { channel.getDatagramChannel().register(
channel.getDatagramChannel().register( selector, channel.getRawInterestOps(), channel);
selector, channel.getRawInterestOps(), channel);
}
if (future != null) { if (future != null) {
future.setSuccess(); future.setSuccess();
} }

View File

@ -1,450 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.internal.SystemPropertyUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Provides information which is specific to a NIO service provider
* implementation.
*/
final class NioProviderMetadata {
static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioProviderMetadata.class);
private static final String CONSTRAINT_LEVEL_PROPERTY =
"org.jboss.netty.channel.socket.nio.constraintLevel";
private static final String OLD_CONSTRAINT_LEVEL_PROPERTY =
"java.nio.channels.spi.constraintLevel";
/**
* 0 - no need to wake up to get / set interestOps (most cases)
* 1 - no need to wake up to get interestOps, but need to wake up to set.
* 2 - need to wake up to get / set interestOps (old providers)
*/
static final int CONSTRAINT_LEVEL;
static {
int constraintLevel;
// Use the system property if possible.
constraintLevel = SystemPropertyUtil.getInt(CONSTRAINT_LEVEL_PROPERTY, -1);
if (constraintLevel < 0 || constraintLevel > 2) {
// Try the old property.
constraintLevel = SystemPropertyUtil.getInt(OLD_CONSTRAINT_LEVEL_PROPERTY, -1);
if (constraintLevel < 0 || constraintLevel > 2) {
constraintLevel = -1;
} else {
logger.warn(
"System property '" +
OLD_CONSTRAINT_LEVEL_PROPERTY +
"' has been deprecated. Use '" +
CONSTRAINT_LEVEL_PROPERTY + "' instead.");
}
}
if (constraintLevel >= 0) {
logger.debug(
"Setting the NIO constraint level to: " + constraintLevel);
}
if (constraintLevel < 0) {
constraintLevel = detectConstraintLevelFromSystemProperties();
if (constraintLevel < 0) {
constraintLevel = 2;
if (logger.isDebugEnabled()) {
logger.debug(
"Couldn't determine the NIO constraint level from " +
"the system properties; using the safest level (2)");
}
} else if (constraintLevel != 0) {
if (logger.isInfoEnabled()) {
logger.info(
"Using the autodetected NIO constraint level: " +
constraintLevel +
" (Use better NIO provider for better performance)");
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"Using the autodetected NIO constraint level: " +
constraintLevel);
}
}
}
CONSTRAINT_LEVEL = constraintLevel;
if (CONSTRAINT_LEVEL < 0 || CONSTRAINT_LEVEL > 2) {
throw new Error(
"Unexpected NIO constraint level: " +
CONSTRAINT_LEVEL + ", please report this error.");
}
}
private static int detectConstraintLevelFromSystemProperties() {
String version = SystemPropertyUtil.get("java.specification.version");
String vminfo = SystemPropertyUtil.get("java.vm.info", "");
String os = SystemPropertyUtil.get("os.name");
String vendor = SystemPropertyUtil.get("java.vm.vendor");
String provider;
try {
provider = SelectorProvider.provider().getClass().getName();
} catch (Exception e) {
// Perhaps security exception.
provider = null;
}
if (version == null || os == null || vendor == null || provider == null) {
return -1;
}
os = os.toLowerCase();
vendor = vendor.toLowerCase();
// System.out.println(version);
// System.out.println(vminfo);
// System.out.println(os);
// System.out.println(vendor);
// System.out.println(provider);
// Sun JVM
if (vendor.contains("sun")) {
// Linux
if (os.contains("linux")) {
if ("sun.nio.ch.EPollSelectorProvider".equals(provider) ||
"sun.nio.ch.PollSelectorProvider".equals(provider)) {
return 0;
}
// Windows
} else if (os.contains("windows")) {
if ("sun.nio.ch.WindowsSelectorProvider".equals(provider)) {
return 0;
}
// Solaris
} else if (os.contains("sun") || os.contains("solaris")) {
if ("sun.nio.ch.DevPollSelectorProvider".equals(provider)) {
return 0;
}
}
// Apple JVM
} else if (vendor.contains("apple")) {
// Mac OS
if (os.contains("mac") && os.contains("os")) {
if ("sun.nio.ch.KQueueSelectorProvider".equals(provider)) {
return 0;
}
}
// IBM
} else if (vendor.contains("ibm")) {
// Linux or AIX
if (os.contains("linux") || os.contains("aix")) {
if ("1.5".equals(version) || version.matches("^1\\.5\\D.*$")) {
if ("sun.nio.ch.PollSelectorProvider".equals(provider)) {
return 1;
}
} else if ("1.6".equals(version) || version.matches("^1\\.6\\D.*$")) {
// IBM JDK 1.6 has different constraint level for different
// version. The exact version can be determined only by its
// build date.
Pattern datePattern = Pattern.compile(
"(?:^|[^0-9])(" +
"[2-9][0-9]{3}" + // year
"(?:0[1-9]|1[0-2])" + // month
"(?:0[1-9]|[12][0-9]|3[01])" + // day of month
")(?:$|[^0-9])");
Matcher dateMatcher = datePattern.matcher(vminfo);
if (dateMatcher.find()) {
long dateValue = Long.parseLong(dateMatcher.group(1));
if (dateValue < 20081105L) {
// SR0, 1, and 2
return 2;
} else {
// SR3 and later
if ("sun.nio.ch.EPollSelectorProvider".equals(provider)) {
return 0;
} else if ("sun.nio.ch.PollSelectorProvider".equals(provider)) {
return 1;
}
}
}
}
}
// BEA
} else if (vendor.contains("bea") || vendor.contains("oracle")) {
// Linux
if (os.contains("linux")) {
if ("sun.nio.ch.EPollSelectorProvider".equals(provider) ||
"sun.nio.ch.PollSelectorProvider".equals(provider)) {
return 0;
}
// Windows
} else if (os.contains("windows")) {
if ("sun.nio.ch.WindowsSelectorProvider".equals(provider)) {
return 0;
}
}
// Apache Software Foundation
} else if (vendor.contains("apache")) {
if ("org.apache.harmony.nio.internal.SelectorProviderImpl".equals(provider)) {
return 1;
}
}
// Others (untested)
return -1;
}
private static int autodetect() {
final int constraintLevel;
ExecutorService executor = Executors.newCachedThreadPool();
boolean success;
long startTime;
int interestOps;
ServerSocketChannel ch = null;
SelectorLoop loop = null;
try {
// Open a channel.
ch = ServerSocketChannel.open();
// Configure the channel
try {
ch.socket().bind(new InetSocketAddress(0));
ch.configureBlocking(false);
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to configure a temporary socket.", e);
}
return -1;
}
// Prepare the selector loop.
try {
loop = new SelectorLoop();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to open a temporary selector.", e);
}
return -1;
}
// Register the channel
try {
ch.register(loop.selector, 0);
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to register a temporary selector.", e);
}
return -1;
}
SelectionKey key = ch.keyFor(loop.selector);
// Start the selector loop.
executor.execute(loop);
// Level 0
success = true;
for (int i = 0; i < 10; i ++) {
// Increase the probability of calling interestOps
// while select() is running.
do {
while (!loop.selecting) {
Thread.yield();
}
// Wait a little bit more.
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
} while (!loop.selecting);
startTime = System.nanoTime();
key.interestOps(key.interestOps() | SelectionKey.OP_ACCEPT);
key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT);
if (System.nanoTime() - startTime >= 500000000L) {
success = false;
break;
}
}
if (success) {
constraintLevel = 0;
} else {
// Level 1
success = true;
for (int i = 0; i < 10; i ++) {
// Increase the probability of calling interestOps
// while select() is running.
do {
while (!loop.selecting) {
Thread.yield();
}
// Wait a little bit more.
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
} while (!loop.selecting);
startTime = System.nanoTime();
interestOps = key.interestOps();
synchronized (loop) {
loop.selector.wakeup();
key.interestOps(interestOps | SelectionKey.OP_ACCEPT);
key.interestOps(interestOps & ~SelectionKey.OP_ACCEPT);
}
if (System.nanoTime() - startTime >= 500000000L) {
success = false;
break;
}
}
if (success) {
constraintLevel = 1;
} else {
constraintLevel = 2;
}
}
} catch (Throwable e) {
return -1;
} finally {
if (ch != null) {
try {
ch.close();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a temporary socket.", e);
}
}
}
if (loop != null) {
loop.done = true;
try {
executor.shutdownNow();
} catch (NullPointerException ex) {
// Some JDK throws NPE here, but shouldn't.
}
try {
for (;;) {
loop.selector.wakeup();
try {
if (executor.awaitTermination(1, TimeUnit.SECONDS)) {
break;
}
} catch (InterruptedException e) {
// Ignore
}
}
} catch (Throwable e) {
// Perhaps security exception.
}
try {
loop.selector.close();
} catch (Throwable e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a temporary selector.", e);
}
}
}
}
return constraintLevel;
}
private static final class SelectorLoop implements Runnable {
final Selector selector;
volatile boolean done;
volatile boolean selecting; // Just an approximation
SelectorLoop() throws IOException {
selector = Selector.open();
}
public void run() {
while (!done) {
synchronized (this) {
// Guard
}
try {
selecting = true;
try {
selector.select(1000);
} finally {
selecting = false;
}
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey k: keys) {
k.interestOps(0);
}
keys.clear();
} catch (IOException e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to wait for a temporary selector.", e);
}
}
}
}
}
public static void main(String[] args) throws Exception {
for (Entry<Object, Object> e: System.getProperties().entrySet()) {
System.out.println(e.getKey() + ": " + e.getValue());
}
System.out.println();
System.out.println("Hard-coded Constraint Level: " + CONSTRAINT_LEVEL);
System.out.println("Auto-detected Constraint Level: " + autodetect());
}
private NioProviderMetadata() {
// Unused
}
}

View File

@ -33,6 +33,7 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
@ -58,11 +59,13 @@ public final class NioServerBoss implements Boss {
private final int id = nextId.incrementAndGet(); private final int id = nextId.incrementAndGet();
private volatile Selector selector; private volatile Selector selector;
private volatile Thread thread;
private final Executor bossExecutor; private final Executor bossExecutor;
/** /**
* Queue of channel registration tasks. * Queue of channel registration tasks.
*/ */
private final Queue<Runnable> bindTaskQueue = new ConcurrentLinkedQueue<Runnable>(); private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
/** /**
* Monitor object used to synchronize selector open/close. * Monitor object used to synchronize selector open/close.
@ -77,8 +80,6 @@ public final class NioServerBoss implements Boss {
*/ */
private final AtomicBoolean wakenUp = new AtomicBoolean(); private final AtomicBoolean wakenUp = new AtomicBoolean();
private Thread currentThread;
private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization. static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
@ -99,7 +100,7 @@ public final class NioServerBoss implements Boss {
throw new RejectedExecutionException("Worker has already been shutdown"); throw new RejectedExecutionException("Worker has already been shutdown");
} }
boolean offered = bindTaskQueue.offer(new Runnable() { boolean offered = taskQueue.offer(new Runnable() {
public void run() { public void run() {
boolean bound = false; boolean bound = false;
boolean registered = false; boolean registered = false;
@ -182,7 +183,7 @@ public final class NioServerBoss implements Boss {
} }
public void run() { public void run() {
currentThread = Thread.currentThread(); thread = Thread.currentThread();
boolean shutdown = false; boolean shutdown = false;
for (;;) { for (;;) {
wakenUp.set(false); wakenUp.set(false);
@ -222,7 +223,9 @@ public final class NioServerBoss implements Boss {
if (wakenUp.get()) { if (wakenUp.get()) {
selector.wakeup(); selector.wakeup();
} }
processBindTaskQueue(); processTaskQueue();
selector = this.selector; // processTaskQueue() can call rebuildSelector()
processSelectedKeys(selector.selectedKeys()); processSelectedKeys(selector.selectedKeys());
// Exit the loop when there's nothing to handle. // Exit the loop when there's nothing to handle.
@ -268,9 +271,9 @@ public final class NioServerBoss implements Boss {
} }
} }
private void processBindTaskQueue() throws IOException { private void processTaskQueue() throws IOException {
for (;;) { for (;;) {
final Runnable task = bindTaskQueue.poll(); final Runnable task = taskQueue.poll();
if (task == null) { if (task == null) {
break; break;
} }
@ -304,7 +307,7 @@ public final class NioServerBoss implements Boss {
if (acceptedSocket == null) { if (acceptedSocket == null) {
break; break;
} }
registerAcceptedChannel(channel, acceptedSocket, currentThread); registerAcceptedChannel(channel, acceptedSocket, thread);
} }
} catch (CancelledKeyException e) { } catch (CancelledKeyException e) {
// Raised by accept() when the server socket was closed. // Raised by accept() when the server socket was closed.
@ -358,4 +361,74 @@ public final class NioServerBoss implements Boss {
} }
} }
} }
public void rebuildSelector() {
if (Thread.currentThread() != thread) {
Selector selector = this.selector;
if (selector != null) {
taskQueue.add(new Runnable() {
public void run() {
rebuildSelector();
}
});
selector.wakeup();
}
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = Selector.open();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, key.attachment());
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector,", e);
NioServerSocketChannel ch = (NioServerSocketChannel) key.attachment();
close(ch, succeededFuture(ch));
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
}
} }

View File

@ -15,7 +15,12 @@
*/ */
package org.jboss.netty.channel.socket.nio; package org.jboss.netty.channel.socket.nio;
import static org.jboss.netty.channel.Channels.*; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.util.ThreadNameDeterminer;
import java.io.IOException; import java.io.IOException;
import java.net.SocketAddress; import java.net.SocketAddress;
@ -26,12 +31,7 @@ import java.nio.channels.Selector;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.jboss.netty.buffer.ChannelBuffer; import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.buffer.ChannelBufferFactory;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import org.jboss.netty.util.ThreadNameDeterminer;
public class NioWorker extends AbstractNioWorker { public class NioWorker extends AbstractNioWorker {
@ -103,8 +103,7 @@ public class NioWorker extends AbstractNioWorker {
final Thread workerThread = thread; final Thread workerThread = thread;
if (currentThread != workerThread) { if (currentThread != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
boolean offered = writeTaskQueue.offer(channel.writeTask); taskQueue.add(channel.writeTask);
assert offered;
} }
if (!(channel instanceof NioAcceptedSocketChannel) || if (!(channel instanceof NioAcceptedSocketChannel) ||
@ -175,10 +174,9 @@ public class NioWorker extends AbstractNioWorker {
channel.channel.configureBlocking(false); channel.channel.configureBlocking(false);
} }
synchronized (channel.interestOpsLock) { channel.channel.register(
channel.channel.register( selector, channel.getRawInterestOps(), channel);
selector, channel.getRawInterestOps(), channel);
}
if (future != null) { if (future != null) {
channel.setConnected(); channel.setConnected();
future.setSuccess(); future.setSuccess();

View File

@ -37,6 +37,10 @@ public final class ShareableWorkerPool<E extends Worker> implements WorkerPool<E
return wrapped.nextWorker(); return wrapped.nextWorker();
} }
public void rebuildSelectors() {
wrapped.rebuildSelectors();
}
/** /**
* Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore * Destroy the {@link ShareableWorkerPool} and release all resources. After this is called its not usable anymore
*/ */

View File

@ -18,6 +18,8 @@ package org.jboss.netty.channel.socket.nio;
import org.jboss.netty.channel.socket.Worker; import org.jboss.netty.channel.socket.Worker;
import java.nio.channels.Selector;
/** /**
* The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand * The {@link WorkerPool} is responsible to hand of {@link Worker}'s on demand
* *
@ -30,4 +32,10 @@ public interface WorkerPool<E extends Worker> {
* @return worker * @return worker
*/ */
E nextWorker(); E nextWorker();
/**
* Replaces the current {@link Selector}s of the {@link Worker}s with new {@link Selector}s to work around the
* infamous epoll 100% CPU bug.
*/
void rebuildSelectors();
} }

View File

@ -106,6 +106,10 @@ abstract class AbstractOioWorker<C extends AbstractOioChannel> implements Worker
processEventQueue(); processEventQueue();
} }
public void rebuildSelector() {
// OIO has no selector.
}
static boolean isIoThread(AbstractOioChannel channel) { static boolean isIoThread(AbstractOioChannel channel) {
return Thread.currentThread() == channel.workerThread; return Thread.currentThread() == channel.workerThread;
} }

View File

@ -15,7 +15,9 @@
*/ */
package org.jboss.netty.channel.socket.oio; package org.jboss.netty.channel.socket.oio;
import static org.jboss.netty.channel.Channels.*; import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -23,9 +25,7 @@ import java.net.DatagramPacket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.jboss.netty.buffer.ChannelBuffer; import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ReceiveBufferSizePredictor;
class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> { class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
@ -122,5 +122,4 @@ class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
} }
} }
} }
} }