From 0bc7b48deb95b4bccd54e115a4c820bbfe1e490a Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 29 Nov 2019 08:59:47 +0100 Subject: [PATCH] WIP --- .../channel/DefaultChannelHandlerContext.java | 33 +++-- .../netty/channel/DefaultChannelPipeline.java | 140 ++++++++++++++---- 2 files changed, 133 insertions(+), 40 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 929075f6c7..3afad81828 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -76,6 +76,11 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou this.handler = handler; } + void unlink() { + prev = null; + next = null; + } + private Tasks invokeTasks() { Tasks tasks = invokeTasks; if (tasks == null) { @@ -796,6 +801,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private DefaultChannelHandlerContext findContextInbound(int mask) { DefaultChannelHandlerContext ctx = this; + if (ctx.next == null) { + assert handlerState == REMOVE_COMPLETE; + return pipeline.empty; + } do { ctx = ctx.next; } while ((ctx.executionMask & mask) == 0); @@ -804,6 +813,10 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou private DefaultChannelHandlerContext findContextOutbound(int mask) { DefaultChannelHandlerContext ctx = this; + if (ctx.prev == null) { + assert handlerState == REMOVE_COMPLETE; + return pipeline.empty; + } do { ctx = ctx.prev; } while ((ctx.executionMask & mask) == 0); @@ -925,7 +938,8 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou } } - protected abstract DefaultChannelHandlerContext findContext(DefaultChannelHandlerContext ctx); + protected abstract DefaultChannelHandlerContext findContext( + DefaultChannelHandlerContext ctx); @Override public final void run() { try { @@ -966,13 +980,7 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable { - private static final ObjectPool RECYCLER = ObjectPool.newPool( - new ObjectPool.ObjectCreator() { - @Override - public WriteTask newObject(ObjectPool.Handle handle) { - return new WriteTask(handle); - } - }); + private static final ObjectPool RECYCLER = ObjectPool.newPool(WriteTask::new); static WriteTask newInstance( DefaultChannelHandlerContext ctx, Object msg, ChannelPromise promise) { @@ -993,13 +1001,8 @@ final class DefaultChannelHandlerContext implements ChannelHandlerContext, Resou static final class WriteAndFlushTask extends AbstractWriteTask { - private static final ObjectPool RECYCLER = ObjectPool.newPool( - new ObjectPool.ObjectCreator() { - @Override - public WriteAndFlushTask newObject(ObjectPool.Handle handle) { - return new WriteAndFlushTask(handle); - } - }); + private static final ObjectPool RECYCLER = ObjectPool.newPool(WriteAndFlushTask::new); + static WriteAndFlushTask newInstance( DefaultChannelHandlerContext ctx, Object msg, ChannelPromise promise) { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 30190c4926..7f05201f03 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -48,13 +48,16 @@ public class DefaultChannelPipeline implements ChannelPipeline { private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); private static final String HEAD_NAME = generateName0(HeadHandler.class); private static final String TAIL_NAME = generateName0(TailHandler.class); + private static final String EMPTY_NAME = generateName0(EmptyHandler.class); + private static final ChannelHandler HEAD_HANDLER = new HeadHandler(); private static final ChannelHandler TAIL_HANDLER = new TailHandler(); + private static final ChannelHandler EMPTY_HANDLER = new EmptyHandler(); private static final FastThreadLocal, String>> nameCaches = new FastThreadLocal, String>>() { @Override - protected Map, String> initialValue() throws Exception { + protected Map, String> initialValue() { return new WeakHashMap<>(); } }; @@ -64,6 +67,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle"); private final DefaultChannelHandlerContext head; private final DefaultChannelHandlerContext tail; + final DefaultChannelHandlerContext empty; private final Channel channel; private final ChannelFuture succeededFuture; private final VoidChannelPromise voidPromise; @@ -76,10 +80,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { this.channel = requireNonNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, channel.eventLoop()); voidPromise = new VoidChannelPromise(channel, true); - + empty = new DefaultChannelHandlerContext(this, EMPTY_NAME, EMPTY_HANDLER); tail = new DefaultChannelHandlerContext(this, TAIL_NAME, TAIL_HANDLER); head = new DefaultChannelHandlerContext(this, HEAD_NAME, HEAD_HANDLER); - head.next = tail; tail.prev = head; head.setAddComplete(); @@ -496,7 +499,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { return (T) ctx.handler(); } - private void unlink(DefaultChannelHandlerContext ctx) { + private void relink(DefaultChannelHandlerContext ctx) { assert ctx != head && ctx != tail; DefaultChannelHandlerContext prev = ctx.prev; DefaultChannelHandlerContext next = ctx.next; @@ -505,8 +508,9 @@ public class DefaultChannelPipeline implements ChannelPipeline { } private void remove0(DefaultChannelHandlerContext ctx) { - unlink(ctx); + relink(ctx); callHandlerRemoved0(ctx); + ctx.unlink(); } @Override @@ -570,27 +574,31 @@ public class DefaultChannelPipeline implements ChannelPipeline { } private void replace0(DefaultChannelHandlerContext oldCtx, DefaultChannelHandlerContext newCtx) { - DefaultChannelHandlerContext prev = oldCtx.prev; - DefaultChannelHandlerContext next = oldCtx.next; - newCtx.prev = prev; - newCtx.next = next; + try { + DefaultChannelHandlerContext prev = oldCtx.prev; + DefaultChannelHandlerContext next = oldCtx.next; + newCtx.prev = prev; + newCtx.next = next; - // Finish the replacement of oldCtx with newCtx in the linked list. - // Note that this doesn't mean events will be sent to the new handler immediately - // because we are currently at the event handler thread and no more than one handler methods can be invoked - // at the same time (we ensured that in replace().) - prev.next = newCtx; - next.prev = newCtx; + // Finish the replacement of oldCtx with newCtx in the linked list. + // Note that this doesn't mean events will be sent to the new handler immediately + // because we are currently at the event handler thread and no more than one handler methods can be invoked + // at the same time (we ensured that in replace().) + prev.next = newCtx; + next.prev = newCtx; - // update the reference to the replacement so forward of buffered content will work correctly - oldCtx.prev = newCtx; - oldCtx.next = newCtx; + // update the reference to the replacement so forward of buffered content will work correctly + oldCtx.prev = newCtx; + oldCtx.next = newCtx; - // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) - // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those - // event handlers must be called after handlerAdded(). - callHandlerAdded0(newCtx); - callHandlerRemoved0(oldCtx); + // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) + // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those + // event handlers must be called after handlerAdded(). + callHandlerAdded0(newCtx); + callHandlerRemoved0(oldCtx); + } finally { + oldCtx.unlink(); + } } private static void checkMultiplicity(ChannelHandler handler) { @@ -615,7 +623,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { handlers.remove(ctx); } - unlink(ctx); + relink(ctx); ctx.callHandlerRemoved(); removed = true; @@ -623,6 +631,8 @@ public class DefaultChannelPipeline implements ChannelPipeline { if (logger.isWarnEnabled()) { logger.warn("Failed to remove a handler: " + ctx.name(), t2); } + } finally { + ctx.unlink(); } if (removed) { @@ -833,9 +843,10 @@ public class DefaultChannelPipeline implements ChannelPipeline { synchronized (handlers) { handlers.remove(ctx); } + DefaultChannelHandlerContext prev = ctx.prev; remove0(ctx); - ctx = ctx.prev; + ctx = prev; } } @@ -1207,4 +1218,83 @@ public class DefaultChannelPipeline implements ChannelPipeline { } } } + + private static final class EmptyHandler implements ChannelHandler { + + @Override + public void channelRegistered(ChannelHandlerContext ctx) { } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { } + + @Override + public void channelActive(ChannelHandlerContext ctx) { } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ReferenceCountUtil.release(msg); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + ReferenceCountUtil.release(evt); + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } + + @Override + public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) { + promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already")); + } + + @Override + public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, + ChannelPromise promise) { + promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already")); + } + + @Override + public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) { + promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already")); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) { + promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already")); + } + + @Override + public void register(ChannelHandlerContext ctx, ChannelPromise promise) { + promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already")); + } + + @Override + public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) { + promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already")); + } + + @Override + public void read(ChannelHandlerContext ctx) { } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + ReferenceCountUtil.release(msg); + promise.setFailure(new ChannelPipelineException("Handler " + ctx.handler() + " removed already")); + } + + @Override + public void flush(ChannelHandlerContext ctx) { } + } }