This commit is contained in:
Norman Maurer 2019-11-07 22:02:34 +01:00
parent cca886d26c
commit 00b74f24af
2 changed files with 88 additions and 0 deletions

View File

@ -183,6 +183,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeChannelRegistered() { private void findAndInvokeChannelRegistered() {
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_REGISTERED); DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_REGISTERED);
if (context == null) {
pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessInboundDirectly()) { if (context.isProcessInboundDirectly()) {
context.invokeChannelRegistered(); context.invokeChannelRegistered();
} else { } else {
@ -218,6 +222,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeChannelUnregistered() { private void findAndInvokeChannelUnregistered() {
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_UNREGISTERED); DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_UNREGISTERED);
if (context == null) {
pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessInboundDirectly()) { if (context.isProcessInboundDirectly()) {
context.invokeChannelUnregistered(); context.invokeChannelUnregistered();
} else { } else {
@ -253,6 +261,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeChannelActive() { private void findAndInvokeChannelActive() {
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_ACTIVE); DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_ACTIVE);
if (context == null) {
pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessInboundDirectly()) { if (context.isProcessInboundDirectly()) {
context.invokeChannelActive(); context.invokeChannelActive();
} else { } else {
@ -288,6 +300,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeChannelInactive() { private void findAndInvokeChannelInactive() {
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_INACTIVE); DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_INACTIVE);
if (context == null) {
pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessInboundDirectly()) { if (context.isProcessInboundDirectly()) {
context.invokeChannelInactive(); context.invokeChannelInactive();
} else { } else {
@ -331,6 +347,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeExceptionCaught(Throwable cause) { private void findAndInvokeExceptionCaught(Throwable cause) {
DefaultChannelHandlerContext context = findContextInbound(MASK_EXCEPTION_CAUGHT); DefaultChannelHandlerContext context = findContextInbound(MASK_EXCEPTION_CAUGHT);
if (context == null) {
pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessInboundDirectly()) { if (context.isProcessInboundDirectly()) {
context.invokeExceptionCaught(cause); context.invokeExceptionCaught(cause);
} else { } else {
@ -378,6 +398,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeUserEventTriggered(Object event) { private void findAndInvokeUserEventTriggered(Object event) {
DefaultChannelHandlerContext context = findContextInbound(MASK_USER_EVENT_TRIGGERED); DefaultChannelHandlerContext context = findContextInbound(MASK_USER_EVENT_TRIGGERED);
if (context == null) {
pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessInboundDirectly()) { if (context.isProcessInboundDirectly()) {
context.invokeUserEventTriggered(event); context.invokeUserEventTriggered(event);
} else { } else {
@ -419,6 +443,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeChannelRead(Object msg) { private void findAndInvokeChannelRead(Object msg) {
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_READ); DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_READ);
if (context == null) {
pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessInboundDirectly()) { if (context.isProcessInboundDirectly()) {
context.invokeChannelRead(msg); context.invokeChannelRead(msg);
} else { } else {
@ -456,6 +484,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeChannelReadComplete() { private void findAndInvokeChannelReadComplete() {
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_READ_COMPLETE); DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
if (context == null) {
pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessInboundDirectly()) { if (context.isProcessInboundDirectly()) {
context.invokeChannelReadComplete(); context.invokeChannelReadComplete();
} else { } else {
@ -492,6 +524,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeChannelWritabilityChanged() { private void findAndInvokeChannelWritabilityChanged() {
DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED); DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
if (context == null) {
pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessInboundDirectly()) { if (context.isProcessInboundDirectly()) {
context.invokeChannelWritabilityChanged(); context.invokeChannelWritabilityChanged();
} else { } else {
@ -568,6 +604,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeBind(SocketAddress localAddress, ChannelPromise promise) { private void findAndInvokeBind(SocketAddress localAddress, ChannelPromise promise) {
DefaultChannelHandlerContext context = findContextOutbound(MASK_BIND); DefaultChannelHandlerContext context = findContextOutbound(MASK_BIND);
if (context == null) {
promise.setFailure(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessOutboundDirectly()) { if (context.isProcessOutboundDirectly()) {
context.invokeBind(localAddress, promise); context.invokeBind(localAddress, promise);
} else { } else {
@ -615,6 +655,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { private void findAndInvokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
DefaultChannelHandlerContext context = findContextOutbound(MASK_CONNECT); DefaultChannelHandlerContext context = findContextOutbound(MASK_CONNECT);
if (context == null) {
promise.setFailure(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessOutboundDirectly()) { if (context.isProcessOutboundDirectly()) {
context.invokeConnect(remoteAddress, localAddress, promise); context.invokeConnect(remoteAddress, localAddress, promise);
} else { } else {
@ -662,6 +706,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeDisconnect(ChannelPromise promise) { private void findAndInvokeDisconnect(ChannelPromise promise) {
DefaultChannelHandlerContext context = findContextOutbound(MASK_DISCONNECT); DefaultChannelHandlerContext context = findContextOutbound(MASK_DISCONNECT);
if (context == null) {
promise.setFailure(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessOutboundDirectly()) { if (context.isProcessOutboundDirectly()) {
context.invokeDisconnect(promise); context.invokeDisconnect(promise);
} else { } else {
@ -702,6 +750,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeClose(ChannelPromise promise) { private void findAndInvokeClose(ChannelPromise promise) {
DefaultChannelHandlerContext context = findContextOutbound(MASK_CLOSE); DefaultChannelHandlerContext context = findContextOutbound(MASK_CLOSE);
if (context == null) {
promise.setFailure(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessOutboundDirectly()) { if (context.isProcessOutboundDirectly()) {
context.invokeClose(promise); context.invokeClose(promise);
} else { } else {
@ -742,6 +794,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeRegister(ChannelPromise promise) { private void findAndInvokeRegister(ChannelPromise promise) {
DefaultChannelHandlerContext context = findContextOutbound(MASK_REGISTER); DefaultChannelHandlerContext context = findContextOutbound(MASK_REGISTER);
if (context == null) {
promise.setFailure(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessOutboundDirectly()) { if (context.isProcessOutboundDirectly()) {
context.invokeRegister(promise); context.invokeRegister(promise);
} else { } else {
@ -782,6 +838,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeDeregister(ChannelPromise promise) { private void findAndInvokeDeregister(ChannelPromise promise) {
DefaultChannelHandlerContext context = findContextOutbound(MASK_DEREGISTER); DefaultChannelHandlerContext context = findContextOutbound(MASK_DEREGISTER);
if (context == null) {
promise.setFailure(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessOutboundDirectly()) { if (context.isProcessOutboundDirectly()) {
context.invokeDeregister(promise); context.invokeDeregister(promise);
} else { } else {
@ -818,6 +878,9 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeRead() { private void findAndInvokeRead() {
DefaultChannelHandlerContext context = findContextOutbound(MASK_READ); DefaultChannelHandlerContext context = findContextOutbound(MASK_READ);
if (context == null) {
return;
}
if (context.isProcessOutboundDirectly()) { if (context.isProcessOutboundDirectly()) {
context.invokeRead(); context.invokeRead();
} else { } else {
@ -840,11 +903,20 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
} }
} }
private ChannelPipelineException newHandlerAlreadyRemoveException() {
return new ChannelPipelineException("Handler '" + name() + "' + already removed");
}
private void invokeExceptionCaughtFromOutbound(Throwable t) { private void invokeExceptionCaughtFromOutbound(Throwable t) {
if ((executionMask & MASK_EXCEPTION_CAUGHT) != 0) { if ((executionMask & MASK_EXCEPTION_CAUGHT) != 0) {
notifyHandlerException(t); notifyHandlerException(t);
} else { } else {
DefaultChannelHandlerContext context = findContextInbound(MASK_EXCEPTION_CAUGHT); DefaultChannelHandlerContext context = findContextInbound(MASK_EXCEPTION_CAUGHT);
if (context == null) {
pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException());
return;
}
if (context.isProcessInboundDirectly()) { if (context.isProcessInboundDirectly()) {
context.invokeExceptionCaught(t); context.invokeExceptionCaught(t);
} else { } else {
@ -896,6 +968,9 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
private void findAndInvokeFlush() { private void findAndInvokeFlush() {
DefaultChannelHandlerContext context = findContextOutbound(MASK_FLUSH); DefaultChannelHandlerContext context = findContextOutbound(MASK_FLUSH);
if (context == null) {
return;
}
if (context.isProcessOutboundDirectly()) { if (context.isProcessOutboundDirectly()) {
context.invokeFlush(); context.invokeFlush();
} else { } else {
@ -941,6 +1016,11 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
if (executor.inEventLoop()) { if (executor.inEventLoop()) {
final DefaultChannelHandlerContext next = findContextOutbound(flush ? final DefaultChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE); (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
if (next == null) {
ReferenceCountUtil.release(msg);
promise.setFailure(newHandlerAlreadyRemoveException());
return;
}
if (flush) { if (flush) {
if (next.isProcessOutboundDirectly()) { if (next.isProcessOutboundDirectly()) {
next.invokeWrite(msg, promise); next.invokeWrite(msg, promise);
@ -1206,6 +1286,11 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou
try { try {
decrementPendingOutboundBytes(); decrementPendingOutboundBytes();
DefaultChannelHandlerContext next = findContext(ctx); DefaultChannelHandlerContext next = findContext(ctx);
if (next == null) {
ReferenceCountUtil.release(msg);
promise.setFailure(new ChannelPipelineException("Handler already removed"));
return;
}
write(next, msg, promise); write(next, msg, promise);
} finally { } finally {
recycle(); recycle();

View File

@ -501,6 +501,9 @@ public class DefaultChannelPipeline implements ChannelPipeline {
DefaultChannelHandlerContext next = ctx.next; DefaultChannelHandlerContext next = ctx.next;
prev.next = next; prev.next = next;
next.prev = prev; next.prev = prev;
ctx.next = null;
ctx.prev = null;
} }
private void remove0(DefaultChannelHandlerContext ctx) { private void remove0(DefaultChannelHandlerContext ctx) {