Port rebuildSelectors() from 3.6 to 4.0

- Fix #813
This commit is contained in:
Trustin Lee 2012-12-14 11:43:04 +09:00
parent be6cc9c324
commit 9e973bbffc
3 changed files with 141 additions and 34 deletions

View File

@ -1,3 +1,19 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2012 The Netty Project
*
@ -15,6 +31,9 @@
*/
package io.netty.channel;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -66,6 +85,12 @@ public abstract class MultithreadEventExecutorGroup implements EventExecutorGrou
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
protected Set<EventExecutor> children() {
Set<EventExecutor> children = Collections.newSetFromMap(new LinkedHashMap<EventExecutor, Boolean>());
Collections.addAll(children, this.children);
return children;
}
protected abstract EventExecutor newChild(
ThreadFactory threadFactory, ChannelTaskScheduler scheduler, Object... args) throws Exception;

View File

@ -30,6 +30,22 @@
* under the License.
*/
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2012 The Netty Project
*
@ -64,6 +80,7 @@ import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
@ -174,45 +191,78 @@ public final class NioEventLoop extends SingleThreadEventLoop {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
// Create a new selector and "transfer" all channels from the old
// selector to the new one
private Selector recreateSelector() {
final Selector newSelector = openSelector();
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = Selector.open();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
boolean success = false;
try {
for (SelectionKey key: oldSelector.keys()) {
key.channel().register(newSelector, key.interestOps(), key.attachment());
}
success = true;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
} finally {
if (!success) {
try {
newSelector.close();
} catch (Exception e) {
logger.warn("Failed to close the new Selector.", e);
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
key.channel().register(newSelector, interestOps, a);
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidFuture());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (!success) {
// Keep using the old Selector on failure.
return oldSelector;
}
// Registration to the new Selector is done. Close the old Selector to cancel all old keys.
try {
selector.close();
} catch (Exception e) {
logger.warn("Failed to close the old selector.", e);
}
logger.info("Selector migration complete.");
return selector = newSelector;
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
@Override
@ -245,7 +295,8 @@ public final class NioEventLoop extends SingleThreadEventLoop {
// The selector returned immediately for 10 times in a row,
// so recreate one selector as it seems like we hit the
// famous epoll(..) jdk bug.
selector = recreateSelector();
rebuildSelector();
selector = this.selector;
selectReturnsImmediately = 0;
// try to select again
@ -290,8 +341,12 @@ public final class NioEventLoop extends SingleThreadEventLoop {
}
cancelledKeys = 0;
runAllTasks();
selector = this.selector;
processSelectedKeys();
selector = this.selector;
if (isShutdown()) {
closeAll();

View File

@ -1,3 +1,19 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright 2012 The Netty Project
*
@ -15,10 +31,11 @@
*/
package io.netty.channel.socket.nio;
import io.netty.channel.ChannelTaskScheduler;
import io.netty.channel.EventExecutor;
import io.netty.channel.MultithreadEventLoopGroup;
import io.netty.channel.ChannelTaskScheduler;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.concurrent.ThreadFactory;
@ -41,6 +58,16 @@ public class NioEventLoopGroup extends MultithreadEventLoopGroup {
super(nThreads, threadFactory, selectorProvider);
}
/**
* Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
* around the infamous epoll 100% CPU bug.
*/
public void rebuildSelectors() {
for (EventExecutor e: children()) {
((NioEventLoop) e).rebuildSelector();
}
}
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, ChannelTaskScheduler scheduler, Object... args) throws Exception {