diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index 69cf681e04..cdde9172c8 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -19,7 +19,6 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandler.Skip; import io.netty.util.Attribute; import io.netty.util.AttributeKey; -import io.netty.util.ReferenceCountUtil; import io.netty.util.ResourceLeakHint; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.FastThreadLocal; @@ -355,8 +354,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelHandlerContext fireChannelRead(Object msg) { AbstractChannelHandlerContext next = findContextInbound(); - ReferenceCountUtil.touch(msg, next); - next.invoker().invokeChannelRead(next, msg); + next.invoker().invokeChannelRead(next, pipeline.touch(msg, next)); return this; } @@ -463,8 +461,7 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelFuture write(Object msg, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); - ReferenceCountUtil.touch(msg, next); - next.invoker().invokeWrite(next, msg, promise); + next.invoker().invokeWrite(next, pipeline.touch(msg, next), promise); return promise; } @@ -477,12 +474,10 @@ abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, R @Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { - AbstractChannelHandlerContext next; - next = findContextOutbound(); - ReferenceCountUtil.touch(msg, next); - next.invoker().invokeWrite(next, msg, promise); - next = findContextOutbound(); - next.invoker().invokeFlush(next); + AbstractChannelHandlerContext next = findContextOutbound(); + ChannelHandlerInvoker invoker = next.invoker(); + invoker.invokeWrite(next, pipeline.touch(msg, next) , promise); + invoker.invokeFlush(next); return promise; } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 52cdcbed15..0ed2790418 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -17,6 +17,7 @@ package io.netty.channel; import io.netty.channel.Channel.Unsafe; import io.netty.util.ReferenceCountUtil; +import io.netty.util.ResourceLeakDetector; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.PausableEventExecutor; @@ -64,6 +65,7 @@ final class DefaultChannelPipeline implements ChannelPipeline { private final Map name2ctx = new HashMap(4); + private final boolean touch = ResourceLeakDetector.getLevel() != ResourceLeakDetector.Level.DISABLED; /** * @see #findInvoker(EventExecutorGroup) @@ -83,6 +85,10 @@ final class DefaultChannelPipeline implements ChannelPipeline { tail.prev = head; } + Object touch(Object msg, AbstractChannelHandlerContext next) { + return touch ? ReferenceCountUtil.touch(msg, next) : msg; + } + @Override public Channel channel() { return channel;