From 62057f73d6cda6294c44f562ba0ae8bde1923c96 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Wed, 15 Apr 2015 18:04:02 +0200 Subject: [PATCH] Fix handling of non-auto read for ByteToMessageDecoder and SslHandler Motivation: Our automatically handling of non-auto-read failed because it not detected the need of calling read again by itself if nothing was decoded. Beside this handling of non-auto-read never worked for SslHandler as it always triggered a read even if it decoded a message and auto-read was false. This fixes [#3529] and [#3587]. Modifications: - Implement handling of calling read when nothing was decoded (with non-auto-read) to ByteToMessageDecoder again - Correctly respect non-auto-read by SslHandler Result: No more stales and correctly respecting of non-auto-read by SslHandler. --- .../handler/codec/spdy/SpdyFrameCodec.java | 28 ++++++++++++++ .../handler/codec/ByteToMessageDecoder.java | 18 +++++---- .../java/io/netty/handler/ssl/SslHandler.java | 37 ++++++++++++++++--- 3 files changed, 70 insertions(+), 13 deletions(-) diff --git a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java index 2c771f842e..2595d31b7f 100644 --- a/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java +++ b/codec-http/src/main/java/io/netty/handler/codec/spdy/SpdyFrameCodec.java @@ -46,6 +46,7 @@ public class SpdyFrameCodec extends ByteToMessageDecoder private SpdySettingsFrame spdySettingsFrame; private ChannelHandlerContext ctx; + private boolean read; /** * Creates a new instance with the specified {@code version} and @@ -95,6 +96,17 @@ public class SpdyFrameCodec extends ByteToMessageDecoder spdyFrameDecoder.decode(in); } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + if (!read) { + if (!ctx.channel().config().isAutoRead()) { + ctx.read(); + } + } + read = false; + super.channelReadComplete(ctx); + } + @Override public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.bind(localAddress, promise); @@ -252,6 +264,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder @Override public void readDataFrame(int streamId, boolean last, ByteBuf data) { + read = true; + SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId, data); spdyDataFrame.setLast(last); ctx.fireChannelRead(spdyDataFrame); @@ -275,12 +289,16 @@ public class SpdyFrameCodec extends ByteToMessageDecoder @Override public void readRstStreamFrame(int streamId, int statusCode) { + read = true; + SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, statusCode); ctx.fireChannelRead(spdyRstStreamFrame); } @Override public void readSettingsFrame(boolean clearPersisted) { + read = true; + spdySettingsFrame = new DefaultSpdySettingsFrame(); spdySettingsFrame.setClearPreviouslyPersistedSettings(clearPersisted); } @@ -292,6 +310,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder @Override public void readSettingsEnd() { + read = true; + Object frame = spdySettingsFrame; spdySettingsFrame = null; ctx.fireChannelRead(frame); @@ -299,12 +319,16 @@ public class SpdyFrameCodec extends ByteToMessageDecoder @Override public void readPingFrame(int id) { + read = true; + SpdyPingFrame spdyPingFrame = new DefaultSpdyPingFrame(id); ctx.fireChannelRead(spdyPingFrame); } @Override public void readGoAwayFrame(int lastGoodStreamId, int statusCode) { + read = true; + SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, statusCode); ctx.fireChannelRead(spdyGoAwayFrame); } @@ -317,6 +341,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder @Override public void readWindowUpdateFrame(int streamId, int deltaWindowSize) { + read = true; + SpdyWindowUpdateFrame spdyWindowUpdateFrame = new DefaultSpdyWindowUpdateFrame(streamId, deltaWindowSize); ctx.fireChannelRead(spdyWindowUpdateFrame); } @@ -343,6 +369,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder ctx.fireExceptionCaught(e); } if (frame != null) { + read = true; + ctx.fireChannelRead(frame); } } diff --git a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java index 87c16e7e67..b0ebfda9a2 100644 --- a/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java @@ -252,6 +252,17 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + discardSomeReadBytes(); + if (decodeWasNull) { + decodeWasNull = false; + if (!ctx.channel().config().isAutoRead()) { + ctx.read(); + } + } + ctx.fireChannelReadComplete(); + } + + protected final void discardSomeReadBytes() { if (cumulation != null && !first && cumulation.refCnt() == 1) { // discard some bytes if possible to make more room in the // buffer but only if the refCnt == 1 as otherwise the user may have @@ -262,13 +273,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter // - https://github.com/netty/netty/issues/1764 cumulation.discardSomeReadBytes(); } - if (decodeWasNull) { - decodeWasNull = false; - if (!ctx.channel().config().isAutoRead()) { - ctx.read(); - } - } - ctx.fireChannelReadComplete(); } @Override diff --git a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java index 9970eff926..be89e7d866 100644 --- a/handler/src/main/java/io/netty/handler/ssl/SslHandler.java +++ b/handler/src/main/java/io/netty/handler/ssl/SslHandler.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -234,6 +235,12 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH private int packetLength; + /** + * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data + * when {@link ChannelConfig#isAutoRead()} is {@code false}. + */ + private boolean firedChannelRead; + private volatile long handshakeTimeoutMillis = 10000; private volatile long closeNotifyTimeoutMillis = 3000; @@ -294,7 +301,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH * {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s * and which does not need to do extra memory copies. */ - setCumulator(opensslEngine ? COMPOSITE_CUMULATOR : MERGE_CUMULATOR); + setCumulator(opensslEngine? COMPOSITE_CUMULATOR : MERGE_CUMULATOR); } public long getHandshakeTimeoutMillis() { @@ -908,6 +915,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH } if (totalLength > 0) { + boolean decoded = false; + // The buffer contains one or more full SSL records. // Slice out the whole packet so unwrap will only be called with complete packets. // Also directly reset the packetLength. This is needed as unwrap(..) may trigger @@ -926,12 +935,18 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH ByteBuf copy = ctx.alloc().heapBuffer(totalLength); try { copy.writeBytes(in, startOffset, totalLength); - unwrap(ctx, copy, 0, totalLength); + decoded = unwrap(ctx, copy, 0, totalLength); } finally { copy.release(); } } else { - unwrap(ctx, in, startOffset, totalLength); + decoded = unwrap(ctx, in, startOffset, totalLength); + } + + if (!firedChannelRead) { + // Check first if firedChannelRead is not set yet as it may have been set in a + // previous decode(...) call. + firedChannelRead = decoded; } } @@ -947,17 +962,23 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + // Discard bytes of the cumulation buffer if needed. + discardSomeReadBytes(); + if (needsFlush) { needsFlush = false; ctx.flush(); } // If handshake is not finished yet, we need more data. - if (!handshakePromise.isDone() && !ctx.channel().config().isAutoRead()) { + if (!ctx.channel().config().isAutoRead() && (!firedChannelRead || !handshakePromise.isDone())) { + // No auto-read used and no message passed through the ChannelPipeline or the handhshake was not complete + // yet, which means we need to trigger the read to ensure we not encounter any stalls. ctx.read(); } - super.channelReadComplete(ctx); + firedChannelRead = false; + ctx.fireChannelReadComplete(); } /** @@ -970,9 +991,10 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH /** * Unwraps inbound SSL records. */ - private void unwrap( + private boolean unwrap( ChannelHandlerContext ctx, ByteBuf packet, int offset, int length) throws SSLException { + boolean decoded = false; boolean wrapLater = false; boolean notifyClosure = false; ByteBuf decodeOut = allocate(ctx, length); @@ -1041,11 +1063,14 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH throw e; } finally { if (decodeOut.isReadable()) { + decoded = true; + ctx.fireChannelRead(decodeOut); } else { decodeOut.release(); } } + return decoded; } private SSLEngineResult unwrap(