From 00b74f24afd6cf39fa6cee62e3bce7731d4b484a Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Thu, 7 Nov 2019 22:02:34 +0100 Subject: [PATCH] wip --- .../channel/DefaultChannelHandlerContext.java | 85 +++++++++++++++++++ .../netty/channel/DefaultChannelPipeline.java | 3 + 2 files changed, 88 insertions(+) diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index ae2c4883a7..c0d9e356ed 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -183,6 +183,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeChannelRegistered() { DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_REGISTERED); + if (context == null) { + pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessInboundDirectly()) { context.invokeChannelRegistered(); } else { @@ -218,6 +222,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeChannelUnregistered() { DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_UNREGISTERED); + if (context == null) { + pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessInboundDirectly()) { context.invokeChannelUnregistered(); } else { @@ -253,6 +261,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeChannelActive() { DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_ACTIVE); + if (context == null) { + pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessInboundDirectly()) { context.invokeChannelActive(); } else { @@ -288,6 +300,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeChannelInactive() { DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_INACTIVE); + if (context == null) { + pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessInboundDirectly()) { context.invokeChannelInactive(); } else { @@ -331,6 +347,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeExceptionCaught(Throwable cause) { DefaultChannelHandlerContext context = findContextInbound(MASK_EXCEPTION_CAUGHT); + if (context == null) { + pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessInboundDirectly()) { context.invokeExceptionCaught(cause); } else { @@ -378,6 +398,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeUserEventTriggered(Object event) { DefaultChannelHandlerContext context = findContextInbound(MASK_USER_EVENT_TRIGGERED); + if (context == null) { + pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessInboundDirectly()) { context.invokeUserEventTriggered(event); } else { @@ -419,6 +443,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeChannelRead(Object msg) { DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_READ); + if (context == null) { + pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessInboundDirectly()) { context.invokeChannelRead(msg); } else { @@ -456,6 +484,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeChannelReadComplete() { DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_READ_COMPLETE); + if (context == null) { + pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessInboundDirectly()) { context.invokeChannelReadComplete(); } else { @@ -492,6 +524,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeChannelWritabilityChanged() { DefaultChannelHandlerContext context = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED); + if (context == null) { + pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessInboundDirectly()) { context.invokeChannelWritabilityChanged(); } else { @@ -568,6 +604,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeBind(SocketAddress localAddress, ChannelPromise promise) { DefaultChannelHandlerContext context = findContextOutbound(MASK_BIND); + if (context == null) { + promise.setFailure(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessOutboundDirectly()) { context.invokeBind(localAddress, promise); } else { @@ -615,6 +655,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { DefaultChannelHandlerContext context = findContextOutbound(MASK_CONNECT); + if (context == null) { + promise.setFailure(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessOutboundDirectly()) { context.invokeConnect(remoteAddress, localAddress, promise); } else { @@ -662,6 +706,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeDisconnect(ChannelPromise promise) { DefaultChannelHandlerContext context = findContextOutbound(MASK_DISCONNECT); + if (context == null) { + promise.setFailure(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessOutboundDirectly()) { context.invokeDisconnect(promise); } else { @@ -702,6 +750,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeClose(ChannelPromise promise) { DefaultChannelHandlerContext context = findContextOutbound(MASK_CLOSE); + if (context == null) { + promise.setFailure(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessOutboundDirectly()) { context.invokeClose(promise); } else { @@ -742,6 +794,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeRegister(ChannelPromise promise) { DefaultChannelHandlerContext context = findContextOutbound(MASK_REGISTER); + if (context == null) { + promise.setFailure(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessOutboundDirectly()) { context.invokeRegister(promise); } else { @@ -782,6 +838,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeDeregister(ChannelPromise promise) { DefaultChannelHandlerContext context = findContextOutbound(MASK_DEREGISTER); + if (context == null) { + promise.setFailure(newHandlerAlreadyRemoveException()); + return; + } if (context.isProcessOutboundDirectly()) { context.invokeDeregister(promise); } else { @@ -818,6 +878,9 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeRead() { DefaultChannelHandlerContext context = findContextOutbound(MASK_READ); + if (context == null) { + return; + } if (context.isProcessOutboundDirectly()) { context.invokeRead(); } else { @@ -840,11 +903,20 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou } } + private ChannelPipelineException newHandlerAlreadyRemoveException() { + return new ChannelPipelineException("Handler '" + name() + "' + already removed"); + } + private void invokeExceptionCaughtFromOutbound(Throwable t) { if ((executionMask & MASK_EXCEPTION_CAUGHT) != 0) { notifyHandlerException(t); } else { DefaultChannelHandlerContext context = findContextInbound(MASK_EXCEPTION_CAUGHT); + if (context == null) { + pipeline.fireExceptionCaught(newHandlerAlreadyRemoveException()); + return; + } + if (context.isProcessInboundDirectly()) { context.invokeExceptionCaught(t); } else { @@ -896,6 +968,9 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private void findAndInvokeFlush() { DefaultChannelHandlerContext context = findContextOutbound(MASK_FLUSH); + if (context == null) { + return; + } if (context.isProcessOutboundDirectly()) { context.invokeFlush(); } else { @@ -941,6 +1016,11 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou if (executor.inEventLoop()) { final DefaultChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); + if (next == null) { + ReferenceCountUtil.release(msg); + promise.setFailure(newHandlerAlreadyRemoveException()); + return; + } if (flush) { if (next.isProcessOutboundDirectly()) { next.invokeWrite(msg, promise); @@ -1206,6 +1286,11 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou try { decrementPendingOutboundBytes(); DefaultChannelHandlerContext next = findContext(ctx); + if (next == null) { + ReferenceCountUtil.release(msg); + promise.setFailure(new ChannelPipelineException("Handler already removed")); + return; + } write(next, msg, promise); } finally { recycle(); diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 14d3f90b8c..6a7804687b 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -501,6 +501,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { DefaultChannelHandlerContext next = ctx.next; prev.next = next; next.prev = prev; + + ctx.next = null; + ctx.prev = null; } private void remove0(DefaultChannelHandlerContext ctx) {