This commit is contained in:
parent
02f3df55a8
commit
3d44aeca50
@ -24,9 +24,7 @@ import io.netty.logging.InternalLogger;
|
|||||||
import io.netty.logging.InternalLoggerFactory;
|
import io.netty.logging.InternalLoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.channels.CancelledKeyException;
|
import java.nio.channels.*;
|
||||||
import java.nio.channels.SelectionKey;
|
|
||||||
import java.nio.channels.Selector;
|
|
||||||
import java.nio.channels.spi.SelectorProvider;
|
import java.nio.channels.spi.SelectorProvider;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@ -55,8 +53,10 @@ final class NioEventLoop extends SingleThreadEventLoop {
|
|||||||
/**
|
/**
|
||||||
* The NIO {@link Selector}.
|
* The NIO {@link Selector}.
|
||||||
*/
|
*/
|
||||||
protected final Selector selector;
|
protected Selector selector;
|
||||||
|
|
||||||
|
protected final SelectorProvider provider;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Boolean that controls determines if a blocked Selector.select should
|
* Boolean that controls determines if a blocked Selector.select should
|
||||||
* break out of its selection process. In our case we use a timeone for
|
* break out of its selection process. In our case we use a timeone for
|
||||||
@ -75,10 +75,11 @@ final class NioEventLoop extends SingleThreadEventLoop {
|
|||||||
if (selectorProvider == null) {
|
if (selectorProvider == null) {
|
||||||
throw new NullPointerException("selectorProvider");
|
throw new NullPointerException("selectorProvider");
|
||||||
}
|
}
|
||||||
selector = openSelector(selectorProvider);
|
provider = selectorProvider;
|
||||||
|
selector = openSelector();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Selector openSelector(SelectorProvider provider) {
|
private Selector openSelector() {
|
||||||
try {
|
try {
|
||||||
return provider.openSelector();
|
return provider.openSelector();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -92,15 +93,79 @@ final class NioEventLoop extends SingleThreadEventLoop {
|
|||||||
return new ConcurrentLinkedQueue<Runnable>();
|
return new ConcurrentLinkedQueue<Runnable>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a new selector and "transfer" all channels from the old
|
||||||
|
// selector to the new one
|
||||||
|
private Selector recreateSelector() {
|
||||||
|
Selector newSelector = openSelector();
|
||||||
|
Selector selector = this.selector;
|
||||||
|
this.selector = newSelector;
|
||||||
|
|
||||||
|
// loop over all the keys that are registered with the old Selector
|
||||||
|
// and register them with the new one
|
||||||
|
for (SelectionKey key: selector.keys()) {
|
||||||
|
SelectableChannel ch = key.channel();
|
||||||
|
int ops = key.interestOps();
|
||||||
|
Object att = key.attachment();
|
||||||
|
// cancel the old key
|
||||||
|
cancel(key);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// register the channel with the new selector now
|
||||||
|
ch.register(newSelector, ops, att);
|
||||||
|
} catch (ClosedChannelException e) {
|
||||||
|
// close channel
|
||||||
|
AbstractNioChannel channel = (AbstractNioChannel) att;
|
||||||
|
channel.unsafe().close(channel.unsafe().voidFuture());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// time to close the old selector as everything else is registered to the new one
|
||||||
|
selector.close();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("Failed to close a selector.", t);
|
||||||
|
}
|
||||||
|
logger.warn("Recreated Selector because of possible jdk epoll(..) bug");
|
||||||
|
return newSelector;
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
protected void run() {
|
protected void run() {
|
||||||
Selector selector = this.selector;
|
Selector selector = this.selector;
|
||||||
|
int selectReturnsImmediately = 0;
|
||||||
|
|
||||||
|
// use 80% of the timeout for measure
|
||||||
|
long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS / 100 * 80;
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
|
||||||
wakenUp.set(false);
|
wakenUp.set(false);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
SelectorUtil.select(selector);
|
long beforeSelect = System.nanoTime();
|
||||||
|
int selected = SelectorUtil.select(selector);
|
||||||
|
if (selected == 0) {
|
||||||
|
long timeBlocked = System.nanoTime() - beforeSelect;
|
||||||
|
if (timeBlocked < minSelectTimeout) {
|
||||||
|
// returned before the minSelectTimeout elapsed with nothing select.
|
||||||
|
// this may be the cause of the jdk epoll(..) bug, so increment the counter
|
||||||
|
// which we use later to see if its really the jdk bug.
|
||||||
|
selectReturnsImmediately ++;
|
||||||
|
} else {
|
||||||
|
selectReturnsImmediately = 0;
|
||||||
|
}
|
||||||
|
if (selectReturnsImmediately == 10) {
|
||||||
|
// The selector returned immediately for 10 times in a row,
|
||||||
|
// so recreate one selector as it seems like we hit the
|
||||||
|
// famous epoll(..) jdk bug.
|
||||||
|
selector = recreateSelector();
|
||||||
|
selectReturnsImmediately = 0;
|
||||||
|
|
||||||
|
// try to select again
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// reset counter
|
||||||
|
selectReturnsImmediately = 0;
|
||||||
|
}
|
||||||
|
|
||||||
// 'wakenUp.compareAndSet(false, true)' is always evaluated
|
// 'wakenUp.compareAndSet(false, true)' is always evaluated
|
||||||
// before calling 'selector.wakeup()' to reduce the wake-up
|
// before calling 'selector.wakeup()' to reduce the wake-up
|
||||||
|
@ -21,12 +21,14 @@ import io.netty.logging.InternalLoggerFactory;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.channels.CancelledKeyException;
|
import java.nio.channels.CancelledKeyException;
|
||||||
import java.nio.channels.Selector;
|
import java.nio.channels.Selector;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
final class SelectorUtil {
|
final class SelectorUtil {
|
||||||
private static final InternalLogger logger =
|
private static final InternalLogger logger =
|
||||||
InternalLoggerFactory.getInstance(SelectorUtil.class);
|
InternalLoggerFactory.getInstance(SelectorUtil.class);
|
||||||
static final long DEFAULT_SELECT_TIMEOUT = 10;
|
static final long DEFAULT_SELECT_TIMEOUT = 10;
|
||||||
static final long SELECT_TIMEOUT;
|
static final long SELECT_TIMEOUT;
|
||||||
|
static final long SELECT_TIMEOUT_NANOS;
|
||||||
|
|
||||||
// Workaround for JDK NIO bug.
|
// Workaround for JDK NIO bug.
|
||||||
//
|
//
|
||||||
@ -53,12 +55,13 @@ final class SelectorUtil {
|
|||||||
selectTimeout = DEFAULT_SELECT_TIMEOUT;
|
selectTimeout = DEFAULT_SELECT_TIMEOUT;
|
||||||
}
|
}
|
||||||
SELECT_TIMEOUT = selectTimeout;
|
SELECT_TIMEOUT = selectTimeout;
|
||||||
|
SELECT_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toMicros(SELECT_TIMEOUT);
|
||||||
logger.debug("Using select timeout of " + SELECT_TIMEOUT);
|
logger.debug("Using select timeout of " + SELECT_TIMEOUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void select(Selector selector) throws IOException {
|
static int select(Selector selector) throws IOException {
|
||||||
try {
|
try {
|
||||||
selector.select(SELECT_TIMEOUT);
|
return selector.select(SELECT_TIMEOUT);
|
||||||
} catch (CancelledKeyException e) {
|
} catch (CancelledKeyException e) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug(
|
logger.debug(
|
||||||
@ -67,6 +70,7 @@ final class SelectorUtil {
|
|||||||
}
|
}
|
||||||
// Harmless exception - log anyway
|
// Harmless exception - log anyway
|
||||||
}
|
}
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cleanupKeys(Selector selector) {
|
static void cleanupKeys(Selector selector) {
|
||||||
|
Loading…
Reference in New Issue
Block a user