Fixed NotYetConnectedException on setInterestOps by backporting the fix applied to NioWorker

This commit is contained in:
Trustin Lee 2009-06-11 07:01:14 +00:00
parent 899b16678f
commit fa3eb52f4e
2 changed files with 18 additions and 15 deletions

View File

@ -138,7 +138,7 @@ class NioDatagramPipelineSink extends AbstractChannelSink {
/**
* Will bind the DatagramSocket to the passed-in address.
* Every call bind will spawn a new thread using the that basically inturn
* Every call bind will spawn a new thread using the that basically in turn
*/
private void bind(final NioDatagramChannel channel,
final ChannelFuture future, final InetSocketAddress address) {
@ -152,7 +152,7 @@ class NioDatagramPipelineSink extends AbstractChannelSink {
future.setSuccess();
fireChannelBound(channel, address);
nextWorker().register(channel, null);
channel.worker.register(channel, null);
started = true;
} catch (final Throwable t) {
future.setFailure(t);

View File

@ -673,21 +673,23 @@ class NioUdpWorker implements Runnable {
static void setInterestOps(final NioDatagramChannel channel,
ChannelFuture future, int interestOps) {
final NioUdpWorker worker = channel.worker;
final Selector selector = worker.selector;
final SelectionKey key = channel.getDatagramChannel().keyFor(selector);
if (key == null || selector == null) {
Exception cause = new NotYetConnectedException();
future.setFailure(cause);
fireExceptionCaught(channel, cause);
}
boolean changed = false;
try {
// interestOps can change at any time and by any thread.
// Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) {
final NioUdpWorker worker = channel.worker;
final Selector selector = worker.selector;
final SelectionKey key = channel.getDatagramChannel().keyFor(selector);
if (key == null || selector == null) {
// Not registered to the worker yet.
// Set the rawInterestOps immediately; RegisterTask will pick it up.
channel.setRawInterestOpsNow(interestOps);
return;
}
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getRawInterestOps() & Channel.OP_WRITE;
@ -778,9 +780,10 @@ class NioUdpWorker implements Runnable {
}
try {
// Register interest in both reads and writes.
channel.getDatagramChannel().register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE, channel);
synchronized (channel.interestOpsLock) {
channel.getDatagramChannel().register(
selector, channel.getRawInterestOps(), channel);
}
if (future != null) {
future.setSuccess();
}
@ -792,7 +795,7 @@ class NioUdpWorker implements Runnable {
throw new ChannelException(
"Failed to register a socket to the selector.", e);
}
// XXX
// XXX: Perhaps channelBind?
fireChannelConnected(channel, localAddress);
}
}