Made NIO constraint detector much more robust against indefinite blocking by enabling timeout (7 secs)
This commit is contained in:
parent
288c693f62
commit
14a49ed042
@ -30,13 +30,16 @@ import java.nio.channels.Selector;
|
||||
import java.nio.channels.ServerSocketChannel;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.SystemPropertyUtil;
|
||||
import org.jboss.netty.util.ThreadRenamingRunnable;
|
||||
|
||||
/**
|
||||
* Provides information which is specific to a NIO service provider
|
||||
@ -54,6 +57,8 @@ class NioProviderMetadata {
|
||||
|
||||
private static final String CONSTRAINT_LEVEL_PROPERTY =
|
||||
"java.nio.channels.spi.constraintLevel";
|
||||
|
||||
private static final long AUTODETECTION_TIMEOUT = 7000L;
|
||||
|
||||
/**
|
||||
* 0 - no need to wake up to get / set interestOps (most cases)
|
||||
@ -86,7 +91,16 @@ class NioProviderMetadata {
|
||||
if (constraintLevel < 0) {
|
||||
logger.debug(
|
||||
"Couldn't get the NIO constraint level from the system properties.");
|
||||
constraintLevel = detectConstraintLevel();
|
||||
ConstraintLevelAutodetector autodetector =
|
||||
new ConstraintLevelAutodetector();
|
||||
|
||||
try {
|
||||
constraintLevel = autodetector.autodetectWithTimeout();
|
||||
} catch (Exception e) {
|
||||
// Probably because of security manager - try again without
|
||||
// creating a new thread directly.
|
||||
constraintLevel = autodetector.autodetectWithoutTimeout();
|
||||
}
|
||||
}
|
||||
|
||||
if (constraintLevel < 0) {
|
||||
@ -211,85 +225,91 @@ class NioProviderMetadata {
|
||||
// Others (untested)
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
private static int detectConstraintLevel() {
|
||||
// TODO Code cleanup - what a mess.
|
||||
final int constraintLevel;
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
boolean success;
|
||||
long startTime;
|
||||
int interestOps;
|
||||
private static class ConstraintLevelAutodetector {
|
||||
|
||||
ServerSocketChannel ch = null;
|
||||
SelectorLoop loop = null;
|
||||
ConstraintLevelAutodetector() {
|
||||
super();
|
||||
}
|
||||
|
||||
try {
|
||||
// Open a channel.
|
||||
ch = ServerSocketChannel.open();
|
||||
|
||||
// Configure the channel
|
||||
try {
|
||||
ch.socket().bind(new InetSocketAddress(0));
|
||||
ch.configureBlocking(false);
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to configure a temporary socket.", e);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Prepare the selector loop.
|
||||
try {
|
||||
loop = new SelectorLoop();
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to open a temporary selector.", e);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Register the channel
|
||||
try {
|
||||
ch.register(loop.selector, 0);
|
||||
} catch (ClosedChannelException e) {
|
||||
logger.warn("Failed to register a temporary selector.", e);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SelectionKey key = ch.keyFor(loop.selector);
|
||||
|
||||
// Start the selector loop.
|
||||
executor.execute(loop);
|
||||
|
||||
// Level 0
|
||||
success = true;
|
||||
for (int i = 0; i < 10; i ++) {
|
||||
|
||||
// Increase the probability of calling interestOps
|
||||
// while select() is running.
|
||||
do {
|
||||
while (!loop.selecting) {
|
||||
Thread.yield();
|
||||
}
|
||||
|
||||
// Wait a little bit more.
|
||||
int autodetectWithTimeout() {
|
||||
final BlockingQueue<Integer> resultQueue = new LinkedBlockingQueue<Integer>();
|
||||
Runnable detector = new ThreadRenamingRunnable(new Runnable() {
|
||||
public void run() {
|
||||
int level = -1;
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
level = autodetectWithoutTimeout();
|
||||
} finally {
|
||||
resultQueue.offer(Integer.valueOf(level));
|
||||
}
|
||||
} while (!loop.selecting);
|
||||
}
|
||||
}, "NIO constraint level detector");
|
||||
|
||||
Thread detectorThread = new Thread(detector);
|
||||
detectorThread.start();
|
||||
|
||||
startTime = System.nanoTime();
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_ACCEPT);
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT);
|
||||
|
||||
if (System.nanoTime() - startTime >= 500000000L) {
|
||||
success = false;
|
||||
break;
|
||||
for (;;) {
|
||||
try {
|
||||
Integer result = resultQueue.poll(AUTODETECTION_TIMEOUT, TimeUnit.MILLISECONDS);
|
||||
if (result == null) {
|
||||
logger.warn("NIO constraint level autodetection timed out.");
|
||||
return -1;
|
||||
} else {
|
||||
return result.intValue();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Ignored
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (success) {
|
||||
constraintLevel = 0;
|
||||
} else {
|
||||
// Level 1
|
||||
int autodetectWithoutTimeout() {
|
||||
// TODO Code cleanup - what a mess.
|
||||
final int constraintLevel;
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
boolean success;
|
||||
long startTime;
|
||||
int interestOps;
|
||||
|
||||
ServerSocketChannel ch = null;
|
||||
SelectorLoop loop = null;
|
||||
|
||||
try {
|
||||
// Open a channel.
|
||||
ch = ServerSocketChannel.open();
|
||||
|
||||
// Configure the channel
|
||||
try {
|
||||
ch.socket().bind(new InetSocketAddress(0));
|
||||
ch.configureBlocking(false);
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to configure a temporary socket.", e);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Prepare the selector loop.
|
||||
try {
|
||||
loop = new SelectorLoop();
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to open a temporary selector.", e);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Register the channel
|
||||
try {
|
||||
ch.register(loop.selector, 0);
|
||||
} catch (ClosedChannelException e) {
|
||||
logger.warn("Failed to register a temporary selector.", e);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SelectionKey key = ch.keyFor(loop.selector);
|
||||
|
||||
// Start the selector loop.
|
||||
executor.execute(loop);
|
||||
|
||||
// Level 0
|
||||
success = true;
|
||||
for (int i = 0; i < 10; i ++) {
|
||||
|
||||
@ -309,62 +329,95 @@ class NioProviderMetadata {
|
||||
} while (!loop.selecting);
|
||||
|
||||
startTime = System.nanoTime();
|
||||
interestOps = key.interestOps();
|
||||
synchronized (loop) {
|
||||
loop.selector.wakeup();
|
||||
key.interestOps(interestOps | SelectionKey.OP_ACCEPT);
|
||||
key.interestOps(interestOps & ~SelectionKey.OP_ACCEPT);
|
||||
}
|
||||
key.interestOps(key.interestOps() | SelectionKey.OP_ACCEPT);
|
||||
key.interestOps(key.interestOps() & ~SelectionKey.OP_ACCEPT);
|
||||
|
||||
if (System.nanoTime() - startTime >= 500000000L) {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (success) {
|
||||
constraintLevel = 1;
|
||||
} else {
|
||||
constraintLevel = 2;
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
return -1;
|
||||
} finally {
|
||||
if (ch != null) {
|
||||
try {
|
||||
ch.close();
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to close a temporary socket.", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (loop != null) {
|
||||
loop.done = true;
|
||||
loop.selector.wakeup();
|
||||
try {
|
||||
executor.shutdownNow();
|
||||
for (;;) {
|
||||
try {
|
||||
if (executor.awaitTermination(1, TimeUnit.SECONDS)) {
|
||||
break;
|
||||
if (success) {
|
||||
constraintLevel = 0;
|
||||
} else {
|
||||
// Level 1
|
||||
success = true;
|
||||
for (int i = 0; i < 10; i ++) {
|
||||
|
||||
// Increase the probability of calling interestOps
|
||||
// while select() is running.
|
||||
do {
|
||||
while (!loop.selecting) {
|
||||
Thread.yield();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
|
||||
// Wait a little bit more.
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
} while (!loop.selecting);
|
||||
|
||||
startTime = System.nanoTime();
|
||||
interestOps = key.interestOps();
|
||||
synchronized (loop) {
|
||||
loop.selector.wakeup();
|
||||
key.interestOps(interestOps | SelectionKey.OP_ACCEPT);
|
||||
key.interestOps(interestOps & ~SelectionKey.OP_ACCEPT);
|
||||
}
|
||||
|
||||
if (System.nanoTime() - startTime >= 500000000L) {
|
||||
success = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Perhaps security exception.
|
||||
if (success) {
|
||||
constraintLevel = 1;
|
||||
} else {
|
||||
constraintLevel = 2;
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
return -1;
|
||||
} finally {
|
||||
if (ch != null) {
|
||||
try {
|
||||
ch.close();
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to close a temporary socket.", e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
loop.selector.close();
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to close a temporary selector.", e);
|
||||
if (loop != null) {
|
||||
loop.done = true;
|
||||
executor.shutdownNow();
|
||||
try {
|
||||
for (;;) {
|
||||
loop.selector.wakeup();
|
||||
try {
|
||||
if (executor.awaitTermination(1, TimeUnit.SECONDS)) {
|
||||
break;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// Perhaps security exception.
|
||||
}
|
||||
|
||||
try {
|
||||
loop.selector.close();
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to close a temporary selector.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return constraintLevel;
|
||||
return constraintLevel;
|
||||
}
|
||||
}
|
||||
|
||||
private static class SelectorLoop implements Runnable {
|
||||
|
Loading…
x
Reference in New Issue
Block a user