diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java index d19ce2b8f8..db15993a1a 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/Http2MultiplexCodec.java @@ -850,6 +850,9 @@ public class Http2MultiplexCodec extends Http2FrameCodec { pipeline().fireChannelRegistered(); if (isActive()) { pipeline().fireChannelActive(); + if (config().isAutoRead()) { + read(); + } } } @@ -1047,6 +1050,10 @@ public class Http2MultiplexCodec extends Http2FrameCodec { } allocHandle.readComplete(); pipeline().fireChannelReadComplete(); + if (config().isAutoRead()) { + read(); + } + // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent // channel is not currently reading we need to force a flush at the child channel, because we cannot // rely upon flush occurring in channelReadComplete on the parent channel. diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java index 25ae95b2d4..7f84f77417 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollChannel.java @@ -611,6 +611,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann // because what happened is what happened. if (!wasActive && active) { pipeline().fireChannelActive(); + readIfIsAutoRead(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java index b7ccf37b2e..fdb2822e8e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollServerChannel.java @@ -129,6 +129,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im if (exception != null) { pipeline.fireExceptionCaught(exception); } + readIfIsAutoRead(); } finally { epollInFinally(config); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java index 70208d2672..299997b364 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/AbstractEpollStreamChannel.java @@ -736,6 +736,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { shutdownInput(false); + } else { + readIfIsAutoRead(); } } @@ -820,6 +822,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im if (close) { shutdownInput(false); + } else { + readIfIsAutoRead(); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java index 714f612634..a773602df8 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDatagramChannel.java @@ -499,6 +499,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements if (exception != null) { pipeline.fireExceptionCaught(exception); } + readIfIsAutoRead(); } finally { epollInFinally(config); } diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java index 4fe574e9e4..b77bbe575f 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollDomainSocketChannel.java @@ -185,6 +185,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(t); } finally { + readIfIsAutoRead(); epollInFinally(config); } } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 6073fa9ca1..a98ee5777e 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -617,6 +617,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan // because what happened is what happened. if (!wasActive && active) { pipeline().fireChannelActive(); + readIfIsAutoRead(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java index 4e7534dff1..4bfc7e107a 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueServerChannel.java @@ -121,6 +121,7 @@ public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel if (exception != null) { pipeline.fireExceptionCaught(exception); } + readIfIsAutoRead(); } finally { readReadyFinally(config); } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java index 7d604f150a..d3cd99f186 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueStreamChannel.java @@ -562,6 +562,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel if (close) { shutdownInput(false); + } else { + readIfIsAutoRead(); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); @@ -586,6 +588,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { shutdownInput(false); + } else { + readIfIsAutoRead(); } } } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java index 887951a28f..5f582cdfb4 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDatagramChannel.java @@ -464,6 +464,8 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement if (exception != null) { pipeline.fireExceptionCaught(exception); + } else { + readIfIsAutoRead(); } } finally { readReadyFinally(config); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannel.java index 5172d6ea16..dd0c8aa4c8 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueDomainSocketChannel.java @@ -180,6 +180,7 @@ public final class KQueueDomainSocketChannel extends AbstractKQueueStreamChannel pipeline.fireChannelReadComplete(); pipeline.fireExceptionCaught(t); } finally { + readIfIsAutoRead(); readReadyFinally(config); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index 11c14a9624..1f39a54dfe 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -421,6 +421,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha return pipeline.voidPromise(); } + protected final void readIfIsAutoRead() { + if (config().isAutoRead()) { + read(); + } + } + /** * {@link Unsafe} implementation which sub-classes must extend and use. */ @@ -520,13 +526,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); - } else if (config().isAutoRead()) { - // This channel was registered before and autoRead() is set. This means we need to begin read - // again so that we process inbound data. - // - // See https://github.com/netty/netty/issues/4805 - beginRead(); } + readIfIsAutoRead(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. @@ -571,6 +572,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha @Override public void run() { pipeline.fireChannelActive(); + readIfIsAutoRead(); } }); } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index 8a6f6dae46..368e115eb9 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -1420,8 +1420,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); - - readIfIsAutoRead(); } @Override @@ -1437,14 +1435,6 @@ public class DefaultChannelPipeline implements ChannelPipeline { @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); - - readIfIsAutoRead(); - } - - private void readIfIsAutoRead() { - if (channel.config().isAutoRead()) { - channel.read(); - } } @Override diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index cf4e29841f..3caf4f9c1b 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -365,6 +365,7 @@ public class EmbeddedChannel extends AbstractChannel { private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) { if (checkOpen(recordException)) { pipeline().fireChannelReadComplete(); + readIfIsAutoRead(); runPendingTasks(); } diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 62bd4d6a1c..8a38dc56c6 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -188,6 +188,7 @@ public class LocalChannel extends AbstractChannel { // connectPromise may be set to null if doClose() was called in the meantime. if (promise != null && promise.trySuccess()) { peer.pipeline().fireChannelActive(); + peer.readIfIsAutoRead(); } } }); @@ -305,6 +306,7 @@ public class LocalChannel extends AbstractChannel { } while (handle.continueReading()); pipeline.fireChannelReadComplete(); + readIfIsAutoRead(); } @Override diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java index 6b3437547a..8f42751591 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java @@ -158,6 +158,7 @@ public class LocalServerChannel extends AbstractServerChannel { } while (handle.continueReading()); pipeline.fireChannelReadComplete(); + readIfIsAutoRead(); } /** diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index db87b100b0..115f41061b 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -125,6 +125,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { pipeline.fireExceptionCaught(cause); if (close || cause instanceof IOException) { closeOnRead(pipeline); + } else { + readIfIsAutoRead(); } } @@ -169,6 +171,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { if (close) { closeOnRead(pipeline); + } else { + readIfIsAutoRead(); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 01467b9836..87ff6c4a7f 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -309,6 +309,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { // because what happened is what happened. if (!wasActive && active) { pipeline().fireChannelActive(); + readIfIsAutoRead(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java index 57dcc85e58..f0082ba607 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioMessageChannel.java @@ -107,6 +107,8 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel { if (isOpen()) { close(voidPromise()); } + } else { + readIfIsAutoRead(); } } finally { // Check if there is a readPending which was not processed yet. diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java index 697db84af5..e0c9452b49 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTailTest.java @@ -357,6 +357,7 @@ public class DefaultChannelPipelineTailTest { if (!active) { active = true; pipeline().fireChannelActive(); + readIfIsAutoRead(); } promise.setSuccess();