Remove unnecessary inEventLoop() checks in Channel.Unsafe

.. because HeadHandler in the pipeline always ensures those methods are always invoked from the correct I/O thread
This commit is contained in:
Trustin Lee 2013-05-17 19:20:46 +09:00
parent 41f5d5650d
commit fd1d31e7d8
8 changed files with 287 additions and 410 deletions

View File

@ -15,17 +15,16 @@
*/ */
package io.netty.channel.rxtx; package io.netty.channel.rxtx;
import static io.netty.channel.rxtx.RxtxChannelOption.*; import gnu.io.CommPort;
import gnu.io.CommPortIdentifier;
import gnu.io.SerialPort;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.oio.OioByteStreamChannel; import io.netty.channel.oio.OioByteStreamChannel;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import gnu.io.CommPort; import static io.netty.channel.rxtx.RxtxChannelOption.*;
import gnu.io.CommPortIdentifier;
import gnu.io.SerialPort;
/** /**
* A channel to a serial device using the RXTX library. * A channel to a serial device using the RXTX library.
@ -134,50 +133,41 @@ public class RxtxChannel extends OioByteStreamChannel {
public void connect( public void connect(
final SocketAddress remoteAddress, final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) { final SocketAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) { if (!ensureOpen(promise)) {
if (!ensureOpen(promise)) { return;
return; }
}
try { try {
final boolean wasActive = isActive(); final boolean wasActive = isActive();
doConnect(remoteAddress, localAddress); doConnect(remoteAddress, localAddress);
int waitTime = config().getOption(WAIT_TIME); int waitTime = config().getOption(WAIT_TIME);
if (waitTime > 0) { if (waitTime > 0) {
eventLoop().schedule(new Runnable() { eventLoop().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
doInit(); doInit();
promise.setSuccess(); promise.setSuccess();
if (!wasActive && isActive()) { if (!wasActive && isActive()) {
pipeline().fireChannelActive(); pipeline().fireChannelActive();
}
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
} }
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
} }
}, waitTime, TimeUnit.MILLISECONDS);
} else {
doInit();
promise.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
} }
}, waitTime, TimeUnit.MILLISECONDS);
} else {
doInit();
promise.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
} }
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
} }
} else { } catch (Throwable t) {
eventLoop().execute(new Runnable() { promise.setFailure(t);
@Override closeIfClosed();
public void run() {
connect(remoteAddress, localAddress, promise);
}
});
} }
} }
} }

View File

@ -476,13 +476,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
} }
} }
private final Runnable beginReadTask = new Runnable() {
@Override
public void run() {
beginRead();
}
};
private final Runnable flushLaterTask = new Runnable() { private final Runnable flushLaterTask = new Runnable() {
@Override @Override
public void run() { public void run() {
@ -495,26 +488,16 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override @Override
public final void sendFile(final FileRegion region, final ChannelPromise promise) { public final void sendFile(final FileRegion region, final ChannelPromise promise) {
if (outboundBufSize() > 0) {
if (eventLoop().inEventLoop()) { flushNotifier(newPromise()).addListener(new ChannelFutureListener() {
if (outboundBufSize() > 0) {
flushNotifier(newPromise()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) throws Exception {
sendFile0(region, promise);
}
});
} else {
// nothing pending try to send the fileRegion now!
sendFile0(region, promise);
}
} else {
eventLoop().execute(new Runnable() {
@Override @Override
public void run() { public void operationComplete(ChannelFuture cf) throws Exception {
sendFile(region, promise); sendFile0(region, promise);
} }
}); });
} else {
// nothing pending try to send the fileRegion now!
sendFile0(region, promise);
} }
} }
@ -631,114 +614,87 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override @Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) { if (!ensureOpen(promise)) {
if (!ensureOpen(promise)) { return;
return; }
try {
boolean wasActive = isActive();
// See: https://github.com/netty/netty/issues/576
if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
} }
try { doBind(localAddress);
boolean wasActive = isActive(); promise.setSuccess();
if (!wasActive && isActive()) {
// See: https://github.com/netty/netty/issues/576 pipeline.fireChannelActive();
if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
doBind(localAddress);
promise.setSuccess();
if (!wasActive && isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
} }
} else { } catch (Throwable t) {
eventLoop().execute(new Runnable() { promise.setFailure(t);
@Override closeIfClosed();
public void run() {
bind(localAddress, promise);
}
});
} }
} }
@Override @Override
public final void disconnect(final ChannelPromise promise) { public final void disconnect(final ChannelPromise promise) {
if (eventLoop().inEventLoop()) { try {
try { boolean wasActive = isActive();
boolean wasActive = isActive(); doDisconnect();
doDisconnect(); promise.setSuccess();
promise.setSuccess(); if (wasActive && !isActive()) {
if (wasActive && !isActive()) { invokeLater(new Runnable() {
invokeLater(new Runnable() { @Override
@Override public void run() {
public void run() { pipeline.fireChannelInactive();
pipeline.fireChannelInactive(); }
} });
});
}
} catch (Throwable t) {
promise.setFailure(t);
closeIfClosed();
} }
} else { } catch (Throwable t) {
eventLoop().execute(new Runnable() { promise.setFailure(t);
@Override closeIfClosed();
public void run() {
disconnect(promise);
}
});
} }
} }
@Override @Override
public final void close(final ChannelPromise promise) { public final void close(final ChannelPromise promise) {
if (eventLoop().inEventLoop()) { boolean wasActive = isActive();
boolean wasActive = isActive(); if (closeFuture.setClosed()) {
if (closeFuture.setClosed()) { try {
try { doClose();
doClose();
promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
}
if (closedChannelException == null) {
closedChannelException = new ClosedChannelException();
}
flushFutureNotifier.notifyFlushFutures(closedChannelException);
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
deregister(voidFuture());
} else {
// Closed already.
promise.setSuccess(); promise.setSuccess();
} catch (Throwable t) {
promise.setFailure(t);
} }
if (closedChannelException == null) {
closedChannelException = new ClosedChannelException();
}
flushFutureNotifier.notifyFlushFutures(closedChannelException);
if (wasActive && !isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelInactive();
}
});
}
deregister(voidFuture());
} else { } else {
eventLoop().execute(new Runnable() { // Closed already.
@Override promise.setSuccess();
public void run() {
close(promise);
}
});
} }
} }
@ -753,45 +709,36 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override @Override
public final void deregister(final ChannelPromise promise) { public final void deregister(final ChannelPromise promise) {
if (eventLoop().inEventLoop()) { if (!registered) {
if (!registered) { promise.setSuccess();
return;
}
Runnable postTask = null;
try {
postTask = doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (registered) {
registered = false;
promise.setSuccess();
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelUnregistered();
}
});
} else {
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered.
promise.setSuccess(); promise.setSuccess();
return;
} }
Runnable postTask = null; if (postTask != null) {
try { postTask.run();
postTask = doDeregister();
} catch (Throwable t) {
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (registered) {
registered = false;
promise.setSuccess();
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelUnregistered();
}
});
} else {
// Some transports like local and AIO does not allow the deregistration of
// an open channel. Their doDeregister() calls close(). Consequently,
// close() calls deregister() again - no need to fire channelUnregistered.
promise.setSuccess();
}
if (postTask != null) {
postTask.run();
}
} }
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
deregister(promise);
}
});
} }
} }
@ -801,48 +748,34 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return; return;
} }
if (eventLoop().inEventLoop()) { try {
try { doBeginRead();
doBeginRead(); } catch (final Exception e) {
} catch (final Exception e) { invokeLater(new Runnable() {
invokeLater(new Runnable() { @Override
@Override public void run() {
public void run() { pipeline.fireExceptionCaught(e);
pipeline.fireExceptionCaught(e); }
} });
}); close(unsafe().voidFuture());
close(unsafe().voidFuture());
}
} else {
eventLoop().execute(beginReadTask);
} }
} }
@Override @Override
public void flush(final ChannelPromise promise) { public void flush(final ChannelPromise promise) {
if (eventLoop().inEventLoop()) { FlushTask task = flushTaskInProgress;
FlushTask task = flushTaskInProgress; if (task == null) {
if (task != null) {
// loop over the tasks to find the last one
for (;;) {
FlushTask t = task.next;
if (t == null) {
break;
}
task = t.next;
}
task.next = new FlushTask(null, promise);
return;
}
flushNotifierAndFlush(promise); flushNotifierAndFlush(promise);
} else { } else {
eventLoop().execute(new Runnable() { // loop over the tasks to find the last one
@Override for (;;) {
public void run() { FlushTask t = task.next;
flush(promise); if (t == null) {
break;
} }
}); task = t.next;
}
task.next = new FlushTask(null, promise);
} }
} }

View File

@ -87,32 +87,14 @@ public abstract class AbstractServerChannel extends AbstractChannel implements S
private final class DefaultServerUnsafe extends AbstractUnsafe { private final class DefaultServerUnsafe extends AbstractUnsafe {
@Override @Override
public void flush(final ChannelPromise future) { public void flush(final ChannelPromise future) {
if (eventLoop().inEventLoop()) { reject(future);
reject(future);
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
flush(future);
}
});
}
} }
@Override @Override
public void connect( public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelPromise future) { final ChannelPromise future) {
if (eventLoop().inEventLoop()) { reject(future);
reject(future);
} else {
eventLoop().execute(new Runnable() {
@Override
public void run() {
connect(remoteAddress, localAddress, future);
}
});
}
} }
private void reject(ChannelPromise future) { private void reject(ChannelPromise future) {

View File

@ -150,7 +150,7 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
* {@code null} if this channel is not connected. * {@code null} if this channel is not connected.
* If this channel is not connected but it can receive messages * If this channel is not connected but it can receive messages
* from arbitrary remote addresses (e.g. {@link DatagramChannel}, * from arbitrary remote addresses (e.g. {@link DatagramChannel},
* use {@link DatagramPacket#remoteAddress()} to determine * use {@link DatagramPacket#recipient()} to determine
* the origination of the received message as this method will * the origination of the received message as this method will
* return {@code null}. * return {@code null}.
*/ */
@ -163,13 +163,22 @@ public interface Channel extends AttributeMap, ChannelOutboundInvoker, ChannelPr
ChannelFuture closeFuture(); ChannelFuture closeFuture();
/** /**
* <strong>Caution</strong> for transport implementations use only! * Returns an <em>internal-use-only</em> object that provides unsafe operations.
*/ */
Unsafe unsafe(); Unsafe unsafe();
/** /**
* <strong>Unsafe</strong> operations that should <strong>never</strong> be called * <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods
* from user-code. These methods are only provided to implement the actual transport. * are only provided to implement the actual transport, and must be invoked from an I/O thread except for the
* following methods:
* <ul>
* <li>{@link #headContext()}</li>
* <li>{@link #voidFuture()}</li>
* <li>{@link #localAddress()}</li>
* <li>{@link #remoteAddress()}</li>
* <li>{@link #closeForcibly()}</li>
* <li>{@link #register(EventLoop, ChannelPromise)}</li>
* </ul>
*/ */
interface Unsafe { interface Unsafe {
/** /**

View File

@ -94,51 +94,42 @@ public abstract class AbstractAioChannel extends AbstractChannel {
@Override @Override
public void connect(final SocketAddress remoteAddress, public void connect(final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) { final SocketAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) { if (!ensureOpen(promise)) {
if (!ensureOpen(promise)) { return;
return; }
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
} }
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
try { doConnect(remoteAddress, localAddress, promise);
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
}
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
doConnect(remoteAddress, localAddress, promise); // Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
// Schedule connect timeout. if (connectTimeoutMillis > 0) {
int connectTimeoutMillis = config().getConnectTimeoutMillis(); connectTimeoutFuture = eventLoop().schedule(new Runnable() {
if (connectTimeoutMillis > 0) { @Override
connectTimeoutFuture = eventLoop().schedule(new Runnable() { public void run() {
@Override ChannelPromise connectFuture = connectPromise;
public void run() { ConnectTimeoutException cause =
ChannelPromise connectFuture = connectPromise; new ConnectTimeoutException("connection timed out: " + remoteAddress);
ConnectTimeoutException cause = if (connectFuture != null && connectFuture.tryFailure(cause)) {
new ConnectTimeoutException("connection timed out: " + remoteAddress); close(voidFuture());
if (connectFuture != null && connectFuture.tryFailure(cause)) {
close(voidFuture());
}
} }
}, connectTimeoutMillis, TimeUnit.MILLISECONDS); }
} }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
promise.setFailure(t);
closeIfClosed();
} }
} else { } catch (Throwable t) {
eventLoop().execute(new Runnable() { if (t instanceof ConnectException) {
@Override Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
public void run() { newT.setStackTrace(t.getStackTrace());
connect(remoteAddress, localAddress, promise); t = newT;
} }
}); promise.setFailure(t);
closeIfClosed();
} }
} }

View File

@ -277,61 +277,51 @@ public class LocalChannel extends AbstractChannel {
@Override @Override
public void connect(final SocketAddress remoteAddress, public void connect(final SocketAddress remoteAddress,
SocketAddress localAddress, final ChannelPromise promise) { SocketAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) { if (!ensureOpen(promise)) {
if (!ensureOpen(promise)) { return;
return; }
if (state == 2) {
Exception cause = new AlreadyConnectedException();
promise.setFailure(cause);
pipeline().fireExceptionCaught(cause);
return;
}
if (connectPromise != null) {
throw new ConnectionPendingException();
}
connectPromise = promise;
if (state != 1) {
// Not bound yet and no localAddress specified - get one.
if (localAddress == null) {
localAddress = new LocalAddress(LocalChannel.this);
} }
}
if (state == 2) { if (localAddress != null) {
Exception cause = new AlreadyConnectedException(); try {
promise.setFailure(cause); doBind(localAddress);
pipeline().fireExceptionCaught(cause); } catch (Throwable t) {
return; promise.setFailure(t);
} pipeline().fireExceptionCaught(t);
if (connectPromise != null) {
throw new ConnectionPendingException();
}
connectPromise = promise;
if (state != 1) {
// Not bound yet and no localAddress specified - get one.
if (localAddress == null) {
localAddress = new LocalAddress(LocalChannel.this);
}
}
if (localAddress != null) {
try {
doBind(localAddress);
} catch (Throwable t) {
promise.setFailure(t);
pipeline().fireExceptionCaught(t);
close(voidFuture());
return;
}
}
Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
if (!(boundChannel instanceof LocalServerChannel)) {
Exception cause = new ChannelException("connection refused");
promise.setFailure(cause);
close(voidFuture()); close(voidFuture());
return; return;
} }
LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
peer = serverChannel.serve(LocalChannel.this);
} else {
final SocketAddress localAddress0 = localAddress;
eventLoop().execute(new Runnable() {
@Override
public void run() {
connect(remoteAddress, localAddress0, promise);
}
});
} }
Channel boundChannel = LocalChannelRegistry.get(remoteAddress);
if (!(boundChannel instanceof LocalServerChannel)) {
Exception cause = new ChannelException("connection refused");
promise.setFailure(cause);
close(voidFuture());
return;
}
LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
peer = serverChannel.serve(LocalChannel.this);
} }
} }
} }

View File

@ -157,58 +157,49 @@ public abstract class AbstractNioChannel extends AbstractChannel {
@Override @Override
public void connect( public void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) { if (!ensureOpen(promise)) {
if (!ensureOpen(promise)) { return;
return; }
try {
if (connectPromise != null) {
throw new IllegalStateException("connection attempt already made");
} }
try { boolean wasActive = isActive();
if (connectPromise != null) { if (doConnect(remoteAddress, localAddress)) {
throw new IllegalStateException("connection attempt already made"); promise.setSuccess();
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
} }
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
boolean wasActive = isActive(); // Schedule connect timeout.
if (doConnect(remoteAddress, localAddress)) { int connectTimeoutMillis = config().getConnectTimeoutMillis();
promise.setSuccess(); if (connectTimeoutMillis > 0) {
if (!wasActive && isActive()) { connectTimeoutFuture = eventLoop().schedule(new Runnable() {
pipeline().fireChannelActive(); @Override
} public void run() {
} else { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
connectPromise = promise; ConnectTimeoutException cause =
requestedRemoteAddress = remoteAddress; new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
// Schedule connect timeout. close(voidFuture());
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidFuture());
}
} }
}, connectTimeoutMillis, TimeUnit.MILLISECONDS); }
} }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
} }
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
promise.setFailure(t);
closeIfClosed();
} }
} else { } catch (Throwable t) {
eventLoop().execute(new Runnable() { if (t instanceof ConnectException) {
@Override Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
public void run() { newT.setStackTrace(t.getStackTrace());
connect(remoteAddress, localAddress, promise); t = newT;
} }
}); promise.setFailure(t);
closeIfClosed();
} }
} }

View File

@ -58,34 +58,25 @@ public abstract class AbstractOioChannel extends AbstractChannel {
public void connect( public void connect(
final SocketAddress remoteAddress, final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) { final SocketAddress localAddress, final ChannelPromise promise) {
if (eventLoop().inEventLoop()) { if (!ensureOpen(promise)) {
if (!ensureOpen(promise)) { return;
return; }
}
try { try {
boolean wasActive = isActive(); boolean wasActive = isActive();
doConnect(remoteAddress, localAddress); doConnect(remoteAddress, localAddress);
promise.setSuccess(); promise.setSuccess();
if (!wasActive && isActive()) { if (!wasActive && isActive()) {
pipeline().fireChannelActive(); pipeline().fireChannelActive();
}
} catch (Throwable t) {
if (t instanceof ConnectException) {
Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
newT.setStackTrace(t.getStackTrace());
t = newT;
}
promise.setFailure(t);
closeIfClosed();
} }
} else { } catch (Throwable t) {
eventLoop().execute(new Runnable() { if (t instanceof ConnectException) {
@Override Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
public void run() { newT.setStackTrace(t.getStackTrace());
connect(remoteAddress, localAddress, promise); t = newT;
} }
}); promise.setFailure(t);
closeIfClosed();
} }
} }
} }