* NioClientSocketChannel.worker doesn't need to be volatile - simplified overall code

This commit is contained in:
Trustin Lee 2009-01-15 02:01:19 +00:00
parent 01bfefa6a4
commit ad595a3a5b
6 changed files with 123 additions and 160 deletions

View File

@ -39,35 +39,19 @@ import org.jboss.netty.channel.ChannelSink;
* @version $Rev$, $Date$
*
*/
class NioAcceptedSocketChannel extends NioSocketChannel {
final NioWorker worker;
final class NioAcceptedSocketChannel extends NioSocketChannel {
NioAcceptedSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline,
Channel parent, ChannelSink sink,
SocketChannel socket, NioWorker worker) {
super(parent, factory, pipeline, sink, socket);
super(parent, factory, pipeline, sink, socket, worker);
this.worker = worker;
try {
socket.configureBlocking(false);
} catch (IOException e) {
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
@Override
NioWorker getWorker() {
return worker;
}
@Override
void setWorker(NioWorker worker) {
// worker never changes.
if (this.worker != worker) {
throw new IllegalStateException("Should not reach here.");
}
}
}

View File

@ -43,7 +43,7 @@ import org.jboss.netty.logging.InternalLoggerFactory;
* @version $Rev$, $Date$
*
*/
class NioClientSocketChannel extends NioSocketChannel {
final class NioClientSocketChannel extends NioSocketChannel {
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(NioClientSocketChannel.class);
@ -77,29 +77,14 @@ class NioClientSocketChannel extends NioSocketChannel {
return socket;
}
volatile NioWorker worker;
volatile ChannelFuture connectFuture;
volatile boolean boundManually;
NioClientSocketChannel(
ChannelFactory factory, ChannelPipeline pipeline, ChannelSink sink) {
ChannelFactory factory, ChannelPipeline pipeline,
ChannelSink sink, NioWorker worker) {
super(null, factory, pipeline, sink, newSocket());
super(null, factory, pipeline, sink, newSocket(), worker);
fireChannelOpen(this);
}
@Override
NioWorker getWorker() {
return worker;
}
@Override
void setWorker(NioWorker worker) {
if (this.worker == null) {
this.worker = worker;
} else if (this.worker != worker) {
// worker never changes.
throw new IllegalStateException("Should not reach here.");
}
}
}

View File

@ -29,7 +29,6 @@ import java.util.concurrent.RejectedExecutionException;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelSink;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.SocketChannel;
import org.jboss.netty.util.ExecutorShutdownUtil;
@ -93,7 +92,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
private final Executor bossExecutor;
private final Executor workerExecutor;
private final ChannelSink sink;
private final NioClientSocketPipelineSink sink;
/**
* Creates a new instance. Calling this constructor is same with calling
@ -142,7 +141,7 @@ public class NioClientSocketChannelFactory implements ClientSocketChannelFactory
}
public SocketChannel newChannel(ChannelPipeline pipeline) {
return new NioClientSocketChannel(this, pipeline, sink);
return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker());
}
public void releaseExternalResources() {

View File

@ -141,9 +141,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
SocketAddress remoteAddress) {
try {
if (channel.socket.connect(remoteAddress)) {
NioWorker worker = nextWorker();
channel.setWorker(worker);
worker.register(channel, future);
channel.worker.register(channel, future);
} else {
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
@ -320,9 +318,7 @@ class NioClientSocketPipelineSink extends AbstractChannelSink {
try {
if (ch.socket.finishConnect()) {
k.cancel();
NioWorker worker = nextWorker();
ch.setWorker(worker);
worker.register(ch, ch.connectFuture);
ch.worker.register(ch, ch.connectFuture);
}
} catch (Throwable t) {
ch.connectFuture.setFailure(t);

View File

@ -48,10 +48,11 @@ import org.jboss.netty.util.LinkedTransferQueue;
* @version $Rev$, $Date$
*
*/
abstract class NioSocketChannel extends AbstractChannel
class NioSocketChannel extends AbstractChannel
implements org.jboss.netty.channel.socket.SocketChannel {
final SocketChannel socket;
final NioWorker worker;
private final NioSocketChannelConfig config;
final Object interestOpsLock = new Object();
@ -70,16 +71,14 @@ abstract class NioSocketChannel extends AbstractChannel
public NioSocketChannel(
Channel parent, ChannelFactory factory,
ChannelPipeline pipeline, ChannelSink sink,
SocketChannel socket) {
SocketChannel socket, NioWorker worker) {
super(parent, factory, pipeline, sink);
this.socket = socket;
this.worker = worker;
config = new DefaultNioSocketChannelConfig(socket.socket());
}
abstract NioWorker getWorker();
abstract void setWorker(NioWorker worker);
public NioSocketChannelConfig getConfig() {
return config;
}

View File

@ -315,21 +315,19 @@ class NioWorker implements Runnable {
}
if (mightNeedWakeup) {
NioWorker worker = channel.getWorker();
if (worker != null) {
Thread workerThread = worker.thread;
if (workerThread != null && Thread.currentThread() != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
worker.writeTaskQueue.offer(channel.writeTask);
}
Selector workerSelector = worker.selector;
if (workerSelector != null) {
if (worker.wakenUp.compareAndSet(false, true)) {
workerSelector.wakeup();
}
}
return;
NioWorker worker = channel.worker;
Thread workerThread = worker.thread;
if (workerThread == null || Thread.currentThread() != workerThread) {
if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
worker.writeTaskQueue.offer(channel.writeTask);
}
Selector workerSelector = worker.selector;
if (workerSelector != null) {
if (worker.wakenUp.compareAndSet(false, true)) {
workerSelector.wakeup();
}
}
return;
}
}
@ -406,23 +404,15 @@ class NioWorker implements Runnable {
if (open) {
if (addOpWrite) {
setOpWrite(channel, true, mightNeedWakeup);
setOpWrite(channel, mightNeedWakeup);
} else if (removeOpWrite) {
setOpWrite(channel, false, mightNeedWakeup);
clearOpWrite(channel, mightNeedWakeup);
}
}
}
private static void setOpWrite(
NioSocketChannel channel, boolean opWrite, boolean mightNeedWakeup) {
NioWorker worker = channel.getWorker();
if (worker == null) {
IllegalStateException cause =
new IllegalStateException("Channel not connected yet (null worker)");
fireExceptionCaught(channel, cause);
return;
}
private static void setOpWrite(NioSocketChannel channel, boolean mightNeedWakeup) {
NioWorker worker = channel.worker;
Selector selector = worker.selector;
SelectionKey key = channel.socket.keyFor(selector);
if (key == null) {
@ -438,103 +428,124 @@ class NioWorker implements Runnable {
// interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) {
if (opWrite) {
if (!mightNeedWakeup) {
if (!mightNeedWakeup) {
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
} else {
switch (CONSTRAINT_LEVEL) {
case 0:
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
if (Thread.currentThread() != worker.thread &&
worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true;
}
} else {
switch (CONSTRAINT_LEVEL) {
case 0:
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
break;
case 1:
case 2:
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
if (Thread.currentThread() == worker.thread) {
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
if (Thread.currentThread() != worker.thread &&
worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true;
}
break;
case 1:
case 2:
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
if (Thread.currentThread() == worker.thread) {
} else {
worker.selectorGuard.readLock().lock();
try {
if (worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} else {
worker.selectorGuard.readLock().lock();
try {
if (worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
interestOps |= SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} finally {
worker.selectorGuard.readLock().unlock();
}
} finally {
worker.selectorGuard.readLock().unlock();
}
}
break;
default:
throw new Error();
}
break;
default:
throw new Error();
}
}
}
if (changed) {
channel.setRawInterestOpsNow(interestOps);
}
}
private static void clearOpWrite(NioSocketChannel channel, boolean mightNeedWakeup) {
NioWorker worker = channel.worker;
Selector selector = worker.selector;
SelectionKey key = channel.socket.keyFor(selector);
if (key == null) {
return;
}
if (!key.isValid()) {
close(key);
return;
}
int interestOps;
boolean changed = false;
// interestOps can change at any time and at any thread.
// Acquire a lock to avoid possible race condition.
synchronized (channel.interestOpsLock) {
if (!mightNeedWakeup) {
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
}
} else {
if (!mightNeedWakeup) {
switch (CONSTRAINT_LEVEL) {
case 0:
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
if (Thread.currentThread() != worker.thread &&
worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true;
}
} else {
switch (CONSTRAINT_LEVEL) {
case 0:
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
break;
case 1:
case 2:
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
if (Thread.currentThread() == worker.thread) {
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
if (Thread.currentThread() != worker.thread &&
worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
changed = true;
}
break;
case 1:
case 2:
interestOps = channel.getRawInterestOps();
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
if (Thread.currentThread() == worker.thread) {
} else {
worker.selectorGuard.readLock().lock();
try {
if (worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} else {
worker.selectorGuard.readLock().lock();
try {
if (worker.wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
interestOps &= ~SelectionKey.OP_WRITE;
key.interestOps(interestOps);
changed = true;
} finally {
worker.selectorGuard.readLock().unlock();
}
} finally {
worker.selectorGuard.readLock().unlock();
}
}
break;
default:
throw new Error();
}
break;
default:
throw new Error();
}
}
}
@ -545,13 +556,11 @@ class NioWorker implements Runnable {
}
static void close(NioSocketChannel channel, ChannelFuture future) {
NioWorker worker = channel.getWorker();
if (worker != null) {
Selector selector = worker.selector;
SelectionKey key = channel.socket.keyFor(selector);
if (key != null) {
key.cancel();
}
NioWorker worker = channel.worker;
Selector selector = worker.selector;
SelectionKey key = channel.socket.keyFor(selector);
if (key != null) {
key.cancel();
}
boolean connected = channel.isConnected();
@ -612,20 +621,11 @@ class NioWorker implements Runnable {
static void setInterestOps(
NioSocketChannel channel, ChannelFuture future, int interestOps) {
NioWorker worker = channel.getWorker();
if (worker == null) {
IllegalStateException cause =
new IllegalStateException("Channel not connected yet (null worker)");
future.setFailure(cause);
fireExceptionCaught(channel, cause);
return;
}
NioWorker worker = channel.worker;
Selector selector = worker.selector;
SelectionKey key = channel.socket.keyFor(selector);
if (key == null || selector == null) {
IllegalStateException cause =
new IllegalStateException("Channel not connected yet (SelectionKey not found)");
Exception cause = new NotYetConnectedException();
future.setFailure(cause);
fireExceptionCaught(channel, cause);
}