Fixed an infinite loop in NioClientSocketPipelineSink.register() when Selector fails to open
This commit is contained in:
parent
f2807aaf51
commit
878eda0d26
@ -30,6 +30,7 @@ import java.nio.channels.ClosedChannelException;
|
|||||||
import java.nio.channels.SelectionKey;
|
import java.nio.channels.SelectionKey;
|
||||||
import java.nio.channels.Selector;
|
import java.nio.channels.Selector;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
@ -47,6 +48,7 @@ import org.jboss.netty.channel.ChannelStateEvent;
|
|||||||
import org.jboss.netty.channel.MessageEvent;
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
import org.jboss.netty.logging.InternalLogger;
|
import org.jboss.netty.logging.InternalLogger;
|
||||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||||
|
import org.jboss.netty.util.LinkedTransferQueue;
|
||||||
import org.jboss.netty.util.ThreadRenamingRunnable;
|
import org.jboss.netty.util.ThreadRenamingRunnable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -167,55 +169,61 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
|
|||||||
|
|
||||||
private final class Boss implements Runnable {
|
private final class Boss implements Runnable {
|
||||||
|
|
||||||
private final AtomicBoolean started = new AtomicBoolean();
|
volatile Selector selector;
|
||||||
private volatile Selector selector;
|
private boolean started;
|
||||||
private final Object selectorGuard = new Object();
|
private final AtomicBoolean wakenUp = new AtomicBoolean();
|
||||||
|
private final Object startStopLock = new Object();
|
||||||
|
private final Queue<Runnable> registerTaskQueue = new LinkedTransferQueue<Runnable>();
|
||||||
|
|
||||||
Boss() {
|
Boss() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
void register(NioSocketChannel channel) {
|
void register(NioSocketChannel channel) {
|
||||||
// FIXME: Infinite loop on selector creation failure.
|
Runnable registerTask = new RegisterTask(this, channel);
|
||||||
// Apply the same fix with what's applied in NioWorker.register()
|
|
||||||
boolean firstChannel = started.compareAndSet(false, true);
|
|
||||||
Selector selector;
|
Selector selector;
|
||||||
if (firstChannel) {
|
|
||||||
try {
|
synchronized (startStopLock) {
|
||||||
this.selector = selector = Selector.open();
|
if (!started) {
|
||||||
} catch (IOException e) {
|
// Open a selector if this worker didn't start yet.
|
||||||
throw new ChannelException(
|
try {
|
||||||
"Failed to create a selector.", e);
|
this.selector = selector = Selector.open();
|
||||||
}
|
} catch (Throwable t) {
|
||||||
} else {
|
throw new ChannelException(
|
||||||
selector = this.selector;
|
"Failed to create a selector.", t);
|
||||||
if (selector == null) {
|
}
|
||||||
do {
|
|
||||||
Thread.yield();
|
// Start the worker thread with the new Selector.
|
||||||
selector = this.selector;
|
boolean success = false;
|
||||||
} while (selector == null);
|
try {
|
||||||
|
bossExecutor.execute(new ThreadRenamingRunnable(
|
||||||
|
this, "New I/O client boss #" + id));
|
||||||
|
success = true;
|
||||||
|
} finally {
|
||||||
|
if (!success) {
|
||||||
|
// Release the Selector if the execution fails.
|
||||||
|
try {
|
||||||
|
selector.close();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
logger.warn("Failed to close a selector.", t);
|
||||||
|
}
|
||||||
|
this.selector = selector = null;
|
||||||
|
// The method will return to the caller at this point.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Use the existing selector if this worker has been started.
|
||||||
|
selector = this.selector;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert selector != null && selector.isOpen();
|
||||||
|
|
||||||
|
started = true;
|
||||||
|
registerTaskQueue.offer(registerTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (firstChannel) {
|
if (wakenUp.compareAndSet(false, true)) {
|
||||||
try {
|
selector.wakeup();
|
||||||
channel.socket.register(selector, SelectionKey.OP_CONNECT, channel);
|
|
||||||
} catch (ClosedChannelException e) {
|
|
||||||
throw new ChannelException(
|
|
||||||
"Failed to register a socket to the selector.", e);
|
|
||||||
}
|
|
||||||
bossExecutor.execute(new ThreadRenamingRunnable(
|
|
||||||
this, "New I/O client boss #" + id));
|
|
||||||
} else {
|
|
||||||
synchronized (selectorGuard) {
|
|
||||||
selector.wakeup();
|
|
||||||
try {
|
|
||||||
channel.socket.register(selector, SelectionKey.OP_CONNECT, channel);
|
|
||||||
} catch (ClosedChannelException e) {
|
|
||||||
throw new ChannelException(
|
|
||||||
"Failed to register a socket to the selector.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -223,12 +231,13 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
|
|||||||
boolean shutdown = false;
|
boolean shutdown = false;
|
||||||
Selector selector = this.selector;
|
Selector selector = this.selector;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
synchronized (selectorGuard) {
|
wakenUp.set(false);
|
||||||
// This empty synchronization block prevents the selector
|
|
||||||
// from acquiring its lock.
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
int selectedKeyCount = selector.select(500);
|
int selectedKeyCount = selector.select(500);
|
||||||
|
|
||||||
|
processRegisterTaskQueue();
|
||||||
|
|
||||||
if (selectedKeyCount > 0) {
|
if (selectedKeyCount > 0) {
|
||||||
processSelectedKeys(selector.selectedKeys());
|
processSelectedKeys(selector.selectedKeys());
|
||||||
}
|
}
|
||||||
@ -242,18 +251,17 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
|
|||||||
if (shutdown ||
|
if (shutdown ||
|
||||||
bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
|
bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
|
||||||
|
|
||||||
synchronized (selectorGuard) {
|
synchronized (startStopLock) {
|
||||||
if (selector.keys().isEmpty()) {
|
if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
|
||||||
|
started = false;
|
||||||
try {
|
try {
|
||||||
selector.close();
|
selector.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
"Failed to close a selector.",
|
"Failed to close a selector.", e);
|
||||||
e);
|
|
||||||
} finally {
|
} finally {
|
||||||
this.selector = null;
|
this.selector = null;
|
||||||
}
|
}
|
||||||
started.set(false);
|
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
shutdown = false;
|
shutdown = false;
|
||||||
@ -280,6 +288,17 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void processRegisterTaskQueue() {
|
||||||
|
for (;;) {
|
||||||
|
final Runnable task = registerTaskQueue.poll();
|
||||||
|
if (task == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
task.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
|
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
|
||||||
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
|
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
|
||||||
SelectionKey k = i.next();
|
SelectionKey k = i.next();
|
||||||
@ -318,4 +337,24 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
|
|||||||
NioWorker.close(ch, ch.getSucceededFuture());
|
NioWorker.close(ch, ch.getSucceededFuture());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final class RegisterTask implements Runnable {
|
||||||
|
private final Boss boss;
|
||||||
|
private final NioSocketChannel channel;
|
||||||
|
|
||||||
|
RegisterTask(Boss boss, NioSocketChannel channel) {
|
||||||
|
this.boss = boss;
|
||||||
|
this.channel = channel;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
channel.socket.register(
|
||||||
|
boss.selector, SelectionKey.OP_CONNECT, channel);
|
||||||
|
} catch (ClosedChannelException e) {
|
||||||
|
throw new ChannelException(
|
||||||
|
"Failed to register a socket to the selector.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user