Fixed another race condition which occurs when OP_WRITE and OP_READ flags are set/cleared at the same time

This commit is contained in:
Trustin Lee 2008-10-01 08:42:26 +00:00
parent cfa6794292
commit 8d5d8fd172
2 changed files with 106 additions and 95 deletions

View File

@ -50,9 +50,11 @@ abstract class NioSocketChannel extends AbstractChannel
final SocketChannel socket; final SocketChannel socket;
private final NioSocketChannelConfig config; private final NioSocketChannelConfig config;
final Object interestOpsLock = new Object();
final Object writeLock = new Object();
final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean(); final AtomicBoolean writeTaskInTaskQueue = new AtomicBoolean();
final Runnable writeTask = new WriteTask(); final Runnable writeTask = new WriteTask();
final Object writeLock = new Object();
final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>(); final Queue<MessageEvent> writeBuffer = new LinkedTransferQueue<MessageEvent>();
MessageEvent currentWriteEvent; MessageEvent currentWriteEvent;
int currentWriteIndex; int currentWriteIndex;

View File

@ -569,102 +569,107 @@ class NioWorker implements Runnable {
} }
int interestOps; int interestOps;
boolean changed = false; boolean changed = false;
if (opWrite) {
if (!mightNeedWakeup) { // interestOps can change at any time and at any thread.
interestOps = channel.getInterestOps(); // Acquire a lock to avoid possible race condition.
if ((interestOps & SelectionKey.OP_WRITE) == 0) { synchronized (channel.interestOpsLock) {
interestOps |= SelectionKey.OP_WRITE; if (opWrite) {
key.interestOps(interestOps); if (!mightNeedWakeup) {
changed = true;
}
} else {
switch (CONSTRAINT_LEVEL) {
case 0:
interestOps = channel.getInterestOps(); interestOps = channel.getInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) { if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
if (Thread.currentThread() != worker.thread &&
worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true; changed = true;
} }
break; } else {
case 1: switch (CONSTRAINT_LEVEL) {
case 2: case 0:
interestOps = channel.getInterestOps(); interestOps = channel.getInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) { if ((interestOps & SelectionKey.OP_WRITE) == 0) {
if (Thread.currentThread() == worker.thread) {
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
if (Thread.currentThread() != worker.thread &&
worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true; changed = true;
} else { }
worker.selectorGuard.readLock().lock(); break;
try { case 1:
if (worker.wakenUp.compareAndSet(false, true)) { case 2:
selector.wakeup(); interestOps = channel.getInterestOps();
} if ((interestOps & SelectionKey.OP_WRITE) == 0) {
if (Thread.currentThread() == worker.thread) {
interestOps |= SelectionKey.OP_WRITE; interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} finally { } else {
worker.selectorGuard.readLock().unlock(); worker.selectorGuard.readLock().lock();
try {
if (worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} finally {
worker.selectorGuard.readLock().unlock();
}
} }
} }
break;
default:
throw new Error();
} }
break;
default:
throw new Error();
}
}
} else {
if (!mightNeedWakeup) {
interestOps = channel.getInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} }
} else { } else {
switch (CONSTRAINT_LEVEL) { if (!mightNeedWakeup) {
case 0:
interestOps = channel.getInterestOps(); interestOps = channel.getInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) { if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE; interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
if (Thread.currentThread() != worker.thread &&
worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true; changed = true;
} }
break; } else {
case 1: switch (CONSTRAINT_LEVEL) {
case 2: case 0:
interestOps = channel.getInterestOps(); interestOps = channel.getInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) { if ((interestOps & SelectionKey.OP_WRITE) != 0) {
if (Thread.currentThread() == worker.thread) {
interestOps &= ~SelectionKey.OP_WRITE; interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
if (Thread.currentThread() != worker.thread &&
worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true; changed = true;
} else { }
worker.selectorGuard.readLock().lock(); break;
try { case 1:
if (worker.wakenUp.compareAndSet(false, true)) { case 2:
selector.wakeup(); interestOps = channel.getInterestOps();
} if ((interestOps & SelectionKey.OP_WRITE) != 0) {
if (Thread.currentThread() == worker.thread) {
interestOps &= ~SelectionKey.OP_WRITE; interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} finally { } else {
worker.selectorGuard.readLock().unlock(); worker.selectorGuard.readLock().lock();
try {
if (worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} finally {
worker.selectorGuard.readLock().unlock();
}
} }
} }
break;
default:
throw new Error();
} }
break;
default:
throw new Error();
} }
} }
} }
@ -763,45 +768,49 @@ class NioWorker implements Runnable {
fireExceptionCaught(channel, cause); fireExceptionCaught(channel, cause);
} }
// Override OP_WRITE flag - a user cannot change this flag.
interestOps &= ~Channel.OP_WRITE;
interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
boolean changed = false; boolean changed = false;
try { try {
switch (CONSTRAINT_LEVEL) { // interestOps can change at any time and at any thread.
case 0: // Acquire a lock to avoid possible race condition.
if (channel.getInterestOps() != interestOps) { synchronized (channel.interestOpsLock) {
key.interestOps(interestOps); // Override OP_WRITE flag - a user cannot change this flag.
if (Thread.currentThread() != worker.thread && interestOps &= ~Channel.OP_WRITE;
worker.wakenUp.compareAndSet(false, true)) { interestOps |= channel.getInterestOps() & Channel.OP_WRITE;
selector.wakeup();
} switch (CONSTRAINT_LEVEL) {
changed = true; case 0:
} if (channel.getInterestOps() != interestOps) {
break;
case 1:
case 2:
if (channel.getInterestOps() != interestOps) {
if (Thread.currentThread() == worker.thread) {
key.interestOps(interestOps); key.interestOps(interestOps);
if (Thread.currentThread() != worker.thread &&
worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true; changed = true;
} else { }
worker.selectorGuard.readLock().lock(); break;
try { case 1:
if (worker.wakenUp.compareAndSet(false, true)) { case 2:
selector.wakeup(); if (channel.getInterestOps() != interestOps) {
} if (Thread.currentThread() == worker.thread) {
key.interestOps(interestOps); key.interestOps(interestOps);
changed = true; changed = true;
} finally { } else {
worker.selectorGuard.readLock().unlock(); worker.selectorGuard.readLock().lock();
try {
if (worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
key.interestOps(interestOps);
changed = true;
} finally {
worker.selectorGuard.readLock().unlock();
}
} }
} }
break;
default:
throw new Error();
} }
break;
default:
throw new Error();
} }
future.setSuccess(); future.setSuccess();