diff --git a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java index b7c57e9646..55e4ed04cf 100644 --- a/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java +++ b/src/main/java/org/jboss/netty/channel/socket/nio/NioWorker.java @@ -314,28 +314,33 @@ class NioWorker implements Runnable { return; } - if (mightNeedWakeup) { - NioWorker worker = channel.worker; - Thread workerThread = worker.thread; - if (workerThread == null || Thread.currentThread() != workerThread) { - if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { - worker.writeTaskQueue.offer(channel.writeTask); - } - Selector workerSelector = worker.selector; - if (workerSelector != null) { - if (worker.wakenUp.compareAndSet(false, true)) { - workerSelector.wakeup(); - } - } - return; - } + if (mightNeedWakeup && scheduleWriteIfNecessary(channel)) { + return; } - writeNow(channel, mightNeedWakeup, channel.getConfig().getWriteSpinCount()); + writeNow(channel, channel.getConfig().getWriteSpinCount()); } - private static void writeNow(NioSocketChannel channel, - boolean mightNeedWakeup, final int writeSpinCount) { + private static boolean scheduleWriteIfNecessary(NioSocketChannel channel) { + NioWorker worker = channel.worker; + Thread workerThread = worker.thread; + if (workerThread == null || Thread.currentThread() != workerThread) { + if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) { + worker.writeTaskQueue.offer(channel.writeTask); + } + Selector workerSelector = worker.selector; + if (workerSelector != null) { + if (worker.wakenUp.compareAndSet(false, true)) { + workerSelector.wakeup(); + } + } + return true; + } + + return false; + } + + private static void writeNow(NioSocketChannel channel, int writeSpinCount) { boolean open = true; boolean addOpWrite = false; @@ -404,14 +409,14 @@ class NioWorker implements Runnable { if (open) { if (addOpWrite) { - setOpWrite(channel, mightNeedWakeup); + setOpWrite(channel); } else if (removeOpWrite) { - clearOpWrite(channel, mightNeedWakeup); + clearOpWrite(channel); } } } - private static void setOpWrite(NioSocketChannel channel, boolean mightNeedWakeup) { + private static void setOpWrite(NioSocketChannel channel) { NioWorker worker = channel.worker; Selector selector = worker.selector; SelectionKey key = channel.socket.keyFor(selector); @@ -428,53 +433,11 @@ class NioWorker implements Runnable { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { - if (!mightNeedWakeup) { - interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - interestOps |= SelectionKey.OP_WRITE; - key.interestOps(interestOps); - changed = true; - } - } else { - switch (CONSTRAINT_LEVEL) { - case 0: - interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - interestOps |= SelectionKey.OP_WRITE; - key.interestOps(interestOps); - if (Thread.currentThread() != worker.thread && - worker.wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - changed = true; - } - break; - case 1: - case 2: - interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) == 0) { - if (Thread.currentThread() == worker.thread) { - interestOps |= SelectionKey.OP_WRITE; - key.interestOps(interestOps); - changed = true; - } else { - worker.selectorGuard.readLock().lock(); - try { - if (worker.wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - interestOps |= SelectionKey.OP_WRITE; - key.interestOps(interestOps); - changed = true; - } finally { - worker.selectorGuard.readLock().unlock(); - } - } - } - break; - default: - throw new Error(); - } + interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) == 0) { + interestOps |= SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; } } @@ -483,7 +446,7 @@ class NioWorker implements Runnable { } } - private static void clearOpWrite(NioSocketChannel channel, boolean mightNeedWakeup) { + private static void clearOpWrite(NioSocketChannel channel) { NioWorker worker = channel.worker; Selector selector = worker.selector; SelectionKey key = channel.socket.keyFor(selector); @@ -500,53 +463,11 @@ class NioWorker implements Runnable { // interestOps can change at any time and at any thread. // Acquire a lock to avoid possible race condition. synchronized (channel.interestOpsLock) { - if (!mightNeedWakeup) { - interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - interestOps &= ~SelectionKey.OP_WRITE; - key.interestOps(interestOps); - changed = true; - } - } else { - switch (CONSTRAINT_LEVEL) { - case 0: - interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - interestOps &= ~SelectionKey.OP_WRITE; - key.interestOps(interestOps); - if (Thread.currentThread() != worker.thread && - worker.wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - changed = true; - } - break; - case 1: - case 2: - interestOps = channel.getRawInterestOps(); - if ((interestOps & SelectionKey.OP_WRITE) != 0) { - if (Thread.currentThread() == worker.thread) { - interestOps &= ~SelectionKey.OP_WRITE; - key.interestOps(interestOps); - changed = true; - } else { - worker.selectorGuard.readLock().lock(); - try { - if (worker.wakenUp.compareAndSet(false, true)) { - selector.wakeup(); - } - interestOps &= ~SelectionKey.OP_WRITE; - key.interestOps(interestOps); - changed = true; - } finally { - worker.selectorGuard.readLock().unlock(); - } - } - } - break; - default: - throw new Error(); - } + interestOps = channel.getRawInterestOps(); + if ((interestOps & SelectionKey.OP_WRITE) != 0) { + interestOps &= ~SelectionKey.OP_WRITE; + key.interestOps(interestOps); + changed = true; } }