Handling AUTO_READ should not be the responsibility of DefaultChannel… (#8650)

* Handling AUTO_READ should not be the responsibility of DefaultChannelPipeline but the Channel itself.

Motivation:

At the moment we do automatically call read() in the DefaultChannelPipeline when fireChannelReadComplete() / fireChannelActive() is called and the Channel is using auto read. This is nice in terms of sharing code but imho is not the responsibility of the ChannelPipeline implementation but the responsibility of the Channel implementation.

Modifications:

Move handing of auto read from DefaultChannelPipeline to Channel implementations.

Result:

More clear responsibiliy and not depending on implemention details of the ChannelPipeline.
This commit is contained in:
Norman Maurer 2018-12-14 10:11:34 +00:00 committed by GitHub
parent 05d481d8af
commit cb6ae72df2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 44 additions and 16 deletions

View File

@ -850,6 +850,9 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
pipeline().fireChannelRegistered(); pipeline().fireChannelRegistered();
if (isActive()) { if (isActive()) {
pipeline().fireChannelActive(); pipeline().fireChannelActive();
if (config().isAutoRead()) {
read();
}
} }
} }
@ -1047,6 +1050,10 @@ public class Http2MultiplexCodec extends Http2FrameCodec {
} }
allocHandle.readComplete(); allocHandle.readComplete();
pipeline().fireChannelReadComplete(); pipeline().fireChannelReadComplete();
if (config().isAutoRead()) {
read();
}
// Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent // Reading data may result in frames being written (e.g. WINDOW_UPDATE, RST, etc..). If the parent
// channel is not currently reading we need to force a flush at the child channel, because we cannot // channel is not currently reading we need to force a flush at the child channel, because we cannot
// rely upon flush occurring in channelReadComplete on the parent channel. // rely upon flush occurring in channelReadComplete on the parent channel.

View File

@ -611,6 +611,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// because what happened is what happened. // because what happened is what happened.
if (!wasActive && active) { if (!wasActive && active) {
pipeline().fireChannelActive(); pipeline().fireChannelActive();
readIfIsAutoRead();
} }
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().

View File

@ -129,6 +129,7 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
if (exception != null) { if (exception != null) {
pipeline.fireExceptionCaught(exception); pipeline.fireExceptionCaught(exception);
} }
readIfIsAutoRead();
} finally { } finally {
epollInFinally(config); epollInFinally(config);
} }

View File

@ -736,6 +736,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
pipeline.fireExceptionCaught(cause); pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) { if (close || cause instanceof IOException) {
shutdownInput(false); shutdownInput(false);
} else {
readIfIsAutoRead();
} }
} }
@ -820,6 +822,8 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
if (close) { if (close) {
shutdownInput(false); shutdownInput(false);
} else {
readIfIsAutoRead();
} }
} catch (Throwable t) { } catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle); handleReadException(pipeline, byteBuf, t, close, allocHandle);

View File

@ -499,6 +499,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
if (exception != null) { if (exception != null) {
pipeline.fireExceptionCaught(exception); pipeline.fireExceptionCaught(exception);
} }
readIfIsAutoRead();
} finally { } finally {
epollInFinally(config); epollInFinally(config);
} }

View File

@ -185,6 +185,7 @@ public final class EpollDomainSocketChannel extends AbstractEpollStreamChannel i
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
} finally { } finally {
readIfIsAutoRead();
epollInFinally(config); epollInFinally(config);
} }
} }

View File

@ -617,6 +617,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
// because what happened is what happened. // because what happened is what happened.
if (!wasActive && active) { if (!wasActive && active) {
pipeline().fireChannelActive(); pipeline().fireChannelActive();
readIfIsAutoRead();
} }
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().

View File

@ -121,6 +121,7 @@ public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel
if (exception != null) { if (exception != null) {
pipeline.fireExceptionCaught(exception); pipeline.fireExceptionCaught(exception);
} }
readIfIsAutoRead();
} finally { } finally {
readReadyFinally(config); readReadyFinally(config);
} }

View File

@ -562,6 +562,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
if (close) { if (close) {
shutdownInput(false); shutdownInput(false);
} else {
readIfIsAutoRead();
} }
} catch (Throwable t) { } catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle); handleReadException(pipeline, byteBuf, t, close, allocHandle);
@ -586,6 +588,8 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
pipeline.fireExceptionCaught(cause); pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) { if (close || cause instanceof IOException) {
shutdownInput(false); shutdownInput(false);
} else {
readIfIsAutoRead();
} }
} }
} }

View File

@ -464,6 +464,8 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
if (exception != null) { if (exception != null) {
pipeline.fireExceptionCaught(exception); pipeline.fireExceptionCaught(exception);
} else {
readIfIsAutoRead();
} }
} finally { } finally {
readReadyFinally(config); readReadyFinally(config);

View File

@ -180,6 +180,7 @@ public final class KQueueDomainSocketChannel extends AbstractKQueueStreamChannel
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t); pipeline.fireExceptionCaught(t);
} finally { } finally {
readIfIsAutoRead();
readReadyFinally(config); readReadyFinally(config);
} }
} }

View File

@ -421,6 +421,12 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
return pipeline.voidPromise(); return pipeline.voidPromise();
} }
protected final void readIfIsAutoRead() {
if (config().isAutoRead()) {
read();
}
}
/** /**
* {@link Unsafe} implementation which sub-classes must extend and use. * {@link Unsafe} implementation which sub-classes must extend and use.
*/ */
@ -520,13 +526,8 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
if (isActive()) { if (isActive()) {
if (firstRegistration) { if (firstRegistration) {
pipeline.fireChannelActive(); pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
} }
readIfIsAutoRead();
} }
} catch (Throwable t) { } catch (Throwable t) {
// Close the channel directly to avoid FD leak. // Close the channel directly to avoid FD leak.
@ -571,6 +572,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
@Override @Override
public void run() { public void run() {
pipeline.fireChannelActive(); pipeline.fireChannelActive();
readIfIsAutoRead();
} }
}); });
} }

View File

@ -1420,8 +1420,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive(); ctx.fireChannelActive();
readIfIsAutoRead();
} }
@Override @Override
@ -1437,14 +1435,6 @@ public class DefaultChannelPipeline implements ChannelPipeline {
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete(); ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
} }
@Override @Override

View File

@ -365,6 +365,7 @@ public class EmbeddedChannel extends AbstractChannel {
private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) { private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
if (checkOpen(recordException)) { if (checkOpen(recordException)) {
pipeline().fireChannelReadComplete(); pipeline().fireChannelReadComplete();
readIfIsAutoRead();
runPendingTasks(); runPendingTasks();
} }

View File

@ -188,6 +188,7 @@ public class LocalChannel extends AbstractChannel {
// connectPromise may be set to null if doClose() was called in the meantime. // connectPromise may be set to null if doClose() was called in the meantime.
if (promise != null && promise.trySuccess()) { if (promise != null && promise.trySuccess()) {
peer.pipeline().fireChannelActive(); peer.pipeline().fireChannelActive();
peer.readIfIsAutoRead();
} }
} }
}); });
@ -305,6 +306,7 @@ public class LocalChannel extends AbstractChannel {
} while (handle.continueReading()); } while (handle.continueReading());
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
readIfIsAutoRead();
} }
@Override @Override

View File

@ -158,6 +158,7 @@ public class LocalServerChannel extends AbstractServerChannel {
} while (handle.continueReading()); } while (handle.continueReading());
pipeline.fireChannelReadComplete(); pipeline.fireChannelReadComplete();
readIfIsAutoRead();
} }
/** /**

View File

@ -125,6 +125,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
pipeline.fireExceptionCaught(cause); pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) { if (close || cause instanceof IOException) {
closeOnRead(pipeline); closeOnRead(pipeline);
} else {
readIfIsAutoRead();
} }
} }
@ -169,6 +171,8 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
if (close) { if (close) {
closeOnRead(pipeline); closeOnRead(pipeline);
} else {
readIfIsAutoRead();
} }
} catch (Throwable t) { } catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle); handleReadException(pipeline, byteBuf, t, close, allocHandle);

View File

@ -309,6 +309,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
// because what happened is what happened. // because what happened is what happened.
if (!wasActive && active) { if (!wasActive && active) {
pipeline().fireChannelActive(); pipeline().fireChannelActive();
readIfIsAutoRead();
} }
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().

View File

@ -107,6 +107,8 @@ public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
if (isOpen()) { if (isOpen()) {
close(voidPromise()); close(voidPromise());
} }
} else {
readIfIsAutoRead();
} }
} finally { } finally {
// Check if there is a readPending which was not processed yet. // Check if there is a readPending which was not processed yet.

View File

@ -357,6 +357,7 @@ public class DefaultChannelPipelineTailTest {
if (!active) { if (!active) {
active = true; active = true;
pipeline().fireChannelActive(); pipeline().fireChannelActive();
readIfIsAutoRead();
} }
promise.setSuccess(); promise.setSuccess();