Simplify handler invocations in DefaultChannelHandlerContext
- Sometimes we delegated executor.inEventLoop() to the next context and sometimes we did not. Now we always check inEventLoop() before delegating.
This commit is contained in:
parent
acb28e3ac8
commit
e81368af06
@ -43,8 +43,8 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
|
||||
// Lazily instantiated tasks used to trigger events to a handler with different executor.
|
||||
private Runnable invokeChannelReadCompleteTask;
|
||||
private Runnable invokeRead0Task;
|
||||
private Runnable invokeFlush0Task;
|
||||
private Runnable invokeReadTask;
|
||||
private Runnable invokeFlushTask;
|
||||
private Runnable invokeChannelWritableStateChangedTask;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -273,20 +273,17 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
throw new NullPointerException("cause");
|
||||
}
|
||||
|
||||
next.invokeExceptionCaught(cause);
|
||||
return this;
|
||||
}
|
||||
final DefaultChannelHandlerContext next = this.next;
|
||||
|
||||
private void invokeExceptionCaught(final Throwable cause) {
|
||||
EventExecutor executor = executor();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
invokeExceptionCaught0(cause);
|
||||
next.invokeExceptionCaught(cause);
|
||||
} else {
|
||||
try {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
invokeExceptionCaught0(cause);
|
||||
next.invokeExceptionCaught(cause);
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
@ -296,9 +293,11 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private void invokeExceptionCaught0(Throwable cause) {
|
||||
private void invokeExceptionCaught(final Throwable cause) {
|
||||
ChannelHandler handler = handler();
|
||||
try {
|
||||
handler.exceptionCaught(this, cause);
|
||||
@ -460,30 +459,29 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
|
||||
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
|
||||
if (localAddress == null) {
|
||||
throw new NullPointerException("localAddress");
|
||||
}
|
||||
validatePromise(promise, false);
|
||||
return findContextOutbound().invokeBind(localAddress, promise);
|
||||
}
|
||||
|
||||
private ChannelFuture invokeBind(final SocketAddress localAddress, final ChannelPromise promise) {
|
||||
EventExecutor executor = executor();
|
||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
invokeBind0(localAddress, promise);
|
||||
next.invokeBind(localAddress, promise);
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
invokeBind0(localAddress, promise);
|
||||
next.invokeBind(localAddress, promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) {
|
||||
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
|
||||
} catch (Throwable t) {
|
||||
@ -497,24 +495,23 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||
public ChannelFuture connect(
|
||||
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
|
||||
|
||||
if (remoteAddress == null) {
|
||||
throw new NullPointerException("remoteAddress");
|
||||
}
|
||||
validatePromise(promise, false);
|
||||
return findContextOutbound().invokeConnect(remoteAddress, localAddress, promise);
|
||||
}
|
||||
|
||||
private ChannelFuture invokeConnect(
|
||||
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
|
||||
EventExecutor executor = executor();
|
||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
invokeConnect0(remoteAddress, localAddress, promise);
|
||||
next.invokeConnect(remoteAddress, localAddress, promise);
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
invokeConnect0(remoteAddress, localAddress, promise);
|
||||
next.invokeConnect(remoteAddress, localAddress, promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -522,7 +519,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void invokeConnect0(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
|
||||
} catch (Throwable t) {
|
||||
@ -531,27 +528,25 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture disconnect(ChannelPromise promise) {
|
||||
public ChannelFuture disconnect(final ChannelPromise promise) {
|
||||
validatePromise(promise, false);
|
||||
|
||||
// Translate disconnect to close if the channel has no notion of disconnect-reconnect.
|
||||
// So far, UDP/IP is the only transport that has such behavior.
|
||||
if (!channel().metadata().hasDisconnect()) {
|
||||
return findContextOutbound().invokeClose(promise);
|
||||
findContextOutbound().invokeClose(promise);
|
||||
return promise;
|
||||
}
|
||||
|
||||
return findContextOutbound().invokeDisconnect(promise);
|
||||
}
|
||||
|
||||
private ChannelFuture invokeDisconnect(final ChannelPromise promise) {
|
||||
EventExecutor executor = executor();
|
||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
invokeDisconnect0(promise);
|
||||
next.invokeDisconnect(promise);
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
invokeDisconnect0(promise);
|
||||
next.invokeDisconnect(promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -559,7 +554,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void invokeDisconnect0(ChannelPromise promise) {
|
||||
private void invokeDisconnect(ChannelPromise promise) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).disconnect(this, promise);
|
||||
} catch (Throwable t) {
|
||||
@ -568,20 +563,18 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture close(ChannelPromise promise) {
|
||||
public ChannelFuture close(final ChannelPromise promise) {
|
||||
validatePromise(promise, false);
|
||||
return findContextOutbound().invokeClose(promise);
|
||||
}
|
||||
|
||||
private ChannelFuture invokeClose(final ChannelPromise promise) {
|
||||
EventExecutor executor = executor();
|
||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
invokeClose0(promise);
|
||||
next.invokeClose(promise);
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
invokeClose0(promise);
|
||||
next.invokeClose(promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -589,7 +582,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void invokeClose0(ChannelPromise promise) {
|
||||
private void invokeClose(ChannelPromise promise) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).close(this, promise);
|
||||
} catch (Throwable t) {
|
||||
@ -598,20 +591,18 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture deregister(ChannelPromise promise) {
|
||||
public ChannelFuture deregister(final ChannelPromise promise) {
|
||||
validatePromise(promise, false);
|
||||
return findContextOutbound().invokeDeregister(promise);
|
||||
}
|
||||
|
||||
private ChannelFuture invokeDeregister(final ChannelPromise promise) {
|
||||
EventExecutor executor = executor();
|
||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
invokeDeregister0(promise);
|
||||
next.invokeDeregister(promise);
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
invokeDeregister0(promise);
|
||||
next.invokeDeregister(promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -619,7 +610,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void invokeDeregister0(ChannelPromise promise) {
|
||||
private void invokeDeregister(ChannelPromise promise) {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).deregister(this, promise);
|
||||
} catch (Throwable t) {
|
||||
@ -629,29 +620,27 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext read() {
|
||||
findContextOutbound().invokeRead();
|
||||
return this;
|
||||
}
|
||||
|
||||
private void invokeRead() {
|
||||
EventExecutor executor = executor();
|
||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
invokeRead0();
|
||||
next.invokeRead();
|
||||
} else {
|
||||
Runnable task = invokeRead0Task;
|
||||
Runnable task = next.invokeReadTask;
|
||||
if (task == null) {
|
||||
invokeRead0Task = task = new Runnable() {
|
||||
next.invokeReadTask = task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
invokeRead0();
|
||||
next.invokeRead();
|
||||
}
|
||||
};
|
||||
}
|
||||
executor.execute(task);
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private void invokeRead0() {
|
||||
private void invokeRead() {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).read(this);
|
||||
} catch (Throwable t) {
|
||||
@ -665,30 +654,29 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelFuture write(Object msg, ChannelPromise promise) {
|
||||
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
|
||||
if (msg == null) {
|
||||
throw new NullPointerException("msg");
|
||||
}
|
||||
validatePromise(promise, true);
|
||||
findContextOutbound().invokeWrite(msg, promise);
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void invokeWrite(final Object msg, final ChannelPromise promise) {
|
||||
EventExecutor executor = executor();
|
||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
invokeWrite0(msg, promise);
|
||||
next.invokeWrite(msg, promise);
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
invokeWrite0(msg, promise);
|
||||
next.invokeWrite(msg, promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return promise;
|
||||
}
|
||||
|
||||
private void invokeWrite0(Object msg, ChannelPromise promise) {
|
||||
private void invokeWrite(Object msg, ChannelPromise promise) {
|
||||
ChannelOutboundHandler handler = (ChannelOutboundHandler) handler();
|
||||
try {
|
||||
handler.write(this, msg, promise);
|
||||
@ -699,29 +687,27 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
|
||||
|
||||
@Override
|
||||
public ChannelHandlerContext flush() {
|
||||
findContextOutbound().invokeFlush();
|
||||
return this;
|
||||
}
|
||||
|
||||
private void invokeFlush() {
|
||||
EventExecutor executor = executor();
|
||||
final DefaultChannelHandlerContext next = findContextOutbound();
|
||||
EventExecutor executor = next.executor();
|
||||
if (executor.inEventLoop()) {
|
||||
invokeFlush0();
|
||||
next.invokeFlush();
|
||||
} else {
|
||||
Runnable task = invokeFlush0Task;
|
||||
Runnable task = next.invokeFlushTask;
|
||||
if (task == null) {
|
||||
invokeFlush0Task = task = new Runnable() {
|
||||
next.invokeFlushTask = task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
invokeFlush0();
|
||||
next.invokeFlush();
|
||||
}
|
||||
};
|
||||
}
|
||||
executor.execute(task);
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
private void invokeFlush0() {
|
||||
private void invokeFlush() {
|
||||
try {
|
||||
((ChannelOutboundHandler) handler()).flush(this);
|
||||
} catch (Throwable t) {
|
||||
|
Loading…
Reference in New Issue
Block a user