From cb6ae72df2dbaeeb73b9daa9edf075dccd298218 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Fri, 14 Dec 2018 10:11:34 +0000 Subject: [PATCH] =?UTF-8?q?Handling=20AUTO=5FREAD=20should=20not=20be=20th?= =?UTF-8?q?e=20responsibility=20of=20DefaultChannel=E2=80=A6=20(#8650)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Handling AUTO_READ should not be the responsibility of DefaultChannelPipeline but the Channel itself. Motivation: At the moment we do automatically call read() in the DefaultChannelPipeline when fireChannelReadComplete() / fireChannelActive() is called and the Channel is using auto read. This is nice in terms of sharing code but imho is not the responsibility of the ChannelPipeline implementation but the responsibility of the Channel implementation. Modifications: Move handing of auto read from DefaultChannelPipeline to Channel implementations. Result: More clear responsibiliy and not depending on implemention details of the ChannelPipeline. --- .../handler/codec/http2/Http2MultiplexCodec.java | 7 +++++++ .../netty/channel/epoll/AbstractEpollChannel.java | 1 + .../channel/epoll/AbstractEpollServerChannel.java | 1 + .../channel/epoll/AbstractEpollStreamChannel.java | 4 ++++ .../netty/channel/epoll/EpollDatagramChannel.java | 1 + .../channel/epoll/EpollDomainSocketChannel.java | 1 + .../channel/kqueue/AbstractKQueueChannel.java | 1 + .../kqueue/AbstractKQueueServerChannel.java | 1 + .../kqueue/AbstractKQueueStreamChannel.java | 4 ++++ .../channel/kqueue/KQueueDatagramChannel.java | 2 ++ .../channel/kqueue/KQueueDomainSocketChannel.java | 1 + .../java/io/netty/channel/AbstractChannel.java | 14 ++++++++------ .../io/netty/channel/DefaultChannelPipeline.java | 10 ---------- .../io/netty/channel/embedded/EmbeddedChannel.java | 1 + .../java/io/netty/channel/local/LocalChannel.java | 2 ++ .../io/netty/channel/local/LocalServerChannel.java | 1 + .../netty/channel/nio/AbstractNioByteChannel.java | 4 ++++ .../io/netty/channel/nio/AbstractNioChannel.java | 1 + .../channel/nio/AbstractNioMessageChannel.java | 2 ++ .../channel/DefaultChannelPipelineTailTest.java | 1 + 20 files changed, 44 insertions(+), 16 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java index d19ce2b8f8..db15993a1a 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java @@ -850,6 +850,9 @@ public class Http2MultiplexCodec extends Http2FrameCodec { pipeline().fireChannelRegistered(); if (isActive()) { pipeline().fireChannelActive(); + if (config().isAutoRead()) { + read(); + } } } @@ -1047,6 +1050,10 @@ public class Http2MultiplexCodec extends Http2FrameCodec { } allocHandle.readComplete(); pipeline().fireChannelReadComplete(); + if (config().isAutoRead()) { + read(); + } + // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent // channel is not currently reading we need to force a flush at the child channel, because we cannot // rely upon flush occurring in channelReadComplete on the parent channel. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 25ae95b2d4..7f84f77417 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -611,6 +611,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // because what happened is what happened. if (!wasActive && active) { pipeline().fireChannelActive(); + readIfIsAutoRead(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index b7ccf37b2e..fdb2822e8e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -129,6 +129,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im if (exception != null) { pipeline.fireExceptionCaught(exception); } + readIfIsAutoRead(); } finally { epollInFinally(config); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 70208d2672..299997b364 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -736,6 +736,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { shutdownInput(false); + } else { + readIfIsAutoRead(); } } @@ -820,6 +822,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im if (close) { shutdownInput(false); + } else { + readIfIsAutoRead(); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 714f612634..a773602df8 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -499,6 +499,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements if (exception != null) { pipeline.fireExceptionCaught(exception); } + readIfIsAutoRead(); } finally { epollInFinally(config); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java index 4fe574e9e4..b77bbe575f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -185,6 +185,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(t); } finally { + readIfIsAutoRead(); epollInFinally(config); } } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 6073fa9ca1..a98ee5777e 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -617,6 +617,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan // because what happened is what happened. if (!wasActive && active) { pipeline().fireChannelActive(); + readIfIsAutoRead(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java index 4e7534dff1..4bfc7e107a 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java @@ -121,6 +121,7 @@ public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel if (exception != null) { pipeline.fireExceptionCaught(exception); } + readIfIsAutoRead(); } finally { readReadyFinally(config); } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java index 7d604f150a..d3cd99f186 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java @@ -562,6 +562,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel if (close) { shutdownInput(false); + } else { + readIfIsAutoRead(); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); @@ -586,6 +588,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { shutdownInput(false); + } else { + readIfIsAutoRead(); } } } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java index 887951a28f..5f582cdfb4 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java @@ -464,6 +464,8 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement if (exception != null) { pipeline.fireExceptionCaught(exception); + } else { + readIfIsAutoRead(); } } finally { readReadyFinally(config); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannel.java index 5172d6ea16..dd0c8aa4c8 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannel.java @@ -180,6 +180,7 @@ public final class KQueueDomainSocketChannel extends AbstractKQueueStreamChannel pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(t); } finally { + readIfIsAutoRead(); readReadyFinally(config); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 11c14a9624..1f39a54dfe 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -421,6 +421,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return pipeline.voidPromise(); } + protected final void readIfIsAutoRead() { + if (config().isAutoRead()) { + read(); + } + } + /** * {@link Unsafe} implementation which sub-classes must extend and use. */ @@ -520,13 +526,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); - } else if (config().isAutoRead()) { - // This channel was registered before and autoRead() is set. This means we need to begin read - // again so that we process inbound data. - // - // See https://github.com/netty/netty/issues/4805 - beginRead(); } + readIfIsAutoRead(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. @@ -571,6 +572,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public void run() { pipeline.fireChannelActive(); + readIfIsAutoRead(); } }); } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 8a6f6dae46..368e115eb9 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -1420,8 +1420,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); - - readIfIsAutoRead(); } @Override @@ -1437,14 +1435,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); - - readIfIsAutoRead(); - } - - private void readIfIsAutoRead() { - if (channel.config().isAutoRead()) { - channel.read(); - } } @Override diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index cf4e29841f..3caf4f9c1b 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -365,6 +365,7 @@ public class EmbeddedChannel extends AbstractChannel { private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) { if (checkOpen(recordException)) { pipeline().fireChannelReadComplete(); + readIfIsAutoRead(); runPendingTasks(); } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 62bd4d6a1c..8a38dc56c6 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -188,6 +188,7 @@ public class LocalChannel extends AbstractChannel { // connectPromise may be set to null if doClose() was called in the meantime. if (promise != null && promise.trySuccess()) { peer.pipeline().fireChannelActive(); + peer.readIfIsAutoRead(); } } }); @@ -305,6 +306,7 @@ public class LocalChannel extends AbstractChannel { } while (handle.continueReading()); pipeline.fireChannelReadComplete(); + readIfIsAutoRead(); } @Override diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java index 6b3437547a..8f42751591 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java @@ -158,6 +158,7 @@ public class LocalServerChannel extends AbstractServerChannel { } while (handle.continueReading()); pipeline.fireChannelReadComplete(); + readIfIsAutoRead(); } /** diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index db87b100b0..115f41061b 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -125,6 +125,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { closeOnRead(pipeline); + } else { + readIfIsAutoRead(); } } @@ -169,6 +171,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (close) { closeOnRead(pipeline); + } else { + readIfIsAutoRead(); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 01467b9836..87ff6c4a7f 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -309,6 +309,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { // because what happened is what happened. if (!wasActive && active) { pipeline().fireChannelActive(); + readIfIsAutoRead(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 57dcc85e58..f0082ba607 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -107,6 +107,8 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { if (isOpen()) { close(voidPromise()); } + } else { + readIfIsAutoRead(); } } finally { // Check if there is a readPending which was not processed yet. diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java index 697db84af5..e0c9452b49 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java @@ -357,6 +357,7 @@ public class DefaultChannelPipelineTailTest { if (!active) { active = true; pipeline().fireChannelActive(); + readIfIsAutoRead(); } promise.setSuccess();