Fix handling of non-auto read for ByteToMessageDecoder and SslHandler

Motivation:

Our automatically handling of non-auto-read failed because it not detected the need of calling read again by itself if nothing was decoded. Beside this handling of non-auto-read never worked for SslHandler as it always triggered a read even if it decoded a message and auto-read was false.

This fixes [#3529] and [#3587].

Modifications:

- Implement handling of calling read when nothing was decoded (with non-auto-read) to ByteToMessageDecoder again
- Correctly respect non-auto-read by SslHandler

Result:

No more stales and correctly respecting of non-auto-read by SslHandler.
This commit is contained in:
Norman Maurer 2015-04-15 18:04:02 +02:00
parent 77e112a6d5
commit 62057f73d6
3 changed files with 70 additions and 13 deletions

View File

@ -46,6 +46,7 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
private SpdySettingsFrame spdySettingsFrame; private SpdySettingsFrame spdySettingsFrame;
private ChannelHandlerContext ctx; private ChannelHandlerContext ctx;
private boolean read;
/** /**
* Creates a new instance with the specified {@code version} and * Creates a new instance with the specified {@code version} and
@ -95,6 +96,17 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
spdyFrameDecoder.decode(in); spdyFrameDecoder.decode(in);
} }
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
if (!read) {
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
read = false;
super.channelReadComplete(ctx);
}
@Override @Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise); ctx.bind(localAddress, promise);
@ -252,6 +264,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
@Override @Override
public void readDataFrame(int streamId, boolean last, ByteBuf data) { public void readDataFrame(int streamId, boolean last, ByteBuf data) {
read = true;
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId, data); SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId, data);
spdyDataFrame.setLast(last); spdyDataFrame.setLast(last);
ctx.fireChannelRead(spdyDataFrame); ctx.fireChannelRead(spdyDataFrame);
@ -275,12 +289,16 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
@Override @Override
public void readRstStreamFrame(int streamId, int statusCode) { public void readRstStreamFrame(int streamId, int statusCode) {
read = true;
SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, statusCode); SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, statusCode);
ctx.fireChannelRead(spdyRstStreamFrame); ctx.fireChannelRead(spdyRstStreamFrame);
} }
@Override @Override
public void readSettingsFrame(boolean clearPersisted) { public void readSettingsFrame(boolean clearPersisted) {
read = true;
spdySettingsFrame = new DefaultSpdySettingsFrame(); spdySettingsFrame = new DefaultSpdySettingsFrame();
spdySettingsFrame.setClearPreviouslyPersistedSettings(clearPersisted); spdySettingsFrame.setClearPreviouslyPersistedSettings(clearPersisted);
} }
@ -292,6 +310,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
@Override @Override
public void readSettingsEnd() { public void readSettingsEnd() {
read = true;
Object frame = spdySettingsFrame; Object frame = spdySettingsFrame;
spdySettingsFrame = null; spdySettingsFrame = null;
ctx.fireChannelRead(frame); ctx.fireChannelRead(frame);
@ -299,12 +319,16 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
@Override @Override
public void readPingFrame(int id) { public void readPingFrame(int id) {
read = true;
SpdyPingFrame spdyPingFrame = new DefaultSpdyPingFrame(id); SpdyPingFrame spdyPingFrame = new DefaultSpdyPingFrame(id);
ctx.fireChannelRead(spdyPingFrame); ctx.fireChannelRead(spdyPingFrame);
} }
@Override @Override
public void readGoAwayFrame(int lastGoodStreamId, int statusCode) { public void readGoAwayFrame(int lastGoodStreamId, int statusCode) {
read = true;
SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, statusCode); SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, statusCode);
ctx.fireChannelRead(spdyGoAwayFrame); ctx.fireChannelRead(spdyGoAwayFrame);
} }
@ -317,6 +341,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
@Override @Override
public void readWindowUpdateFrame(int streamId, int deltaWindowSize) { public void readWindowUpdateFrame(int streamId, int deltaWindowSize) {
read = true;
SpdyWindowUpdateFrame spdyWindowUpdateFrame = new DefaultSpdyWindowUpdateFrame(streamId, deltaWindowSize); SpdyWindowUpdateFrame spdyWindowUpdateFrame = new DefaultSpdyWindowUpdateFrame(streamId, deltaWindowSize);
ctx.fireChannelRead(spdyWindowUpdateFrame); ctx.fireChannelRead(spdyWindowUpdateFrame);
} }
@ -343,6 +369,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
ctx.fireExceptionCaught(e); ctx.fireExceptionCaught(e);
} }
if (frame != null) { if (frame != null) {
read = true;
ctx.fireChannelRead(frame); ctx.fireChannelRead(frame);
} }
} }

View File

@ -252,6 +252,17 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
discardSomeReadBytes();
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
ctx.fireChannelReadComplete();
}
protected final void discardSomeReadBytes() {
if (cumulation != null && !first && cumulation.refCnt() == 1) { if (cumulation != null && !first && cumulation.refCnt() == 1) {
// discard some bytes if possible to make more room in the // discard some bytes if possible to make more room in the
// buffer but only if the refCnt == 1 as otherwise the user may have // buffer but only if the refCnt == 1 as otherwise the user may have
@ -262,13 +273,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
// - https://github.com/netty/netty/issues/1764 // - https://github.com/netty/netty/issues/1764
cumulation.discardSomeReadBytes(); cumulation.discardSomeReadBytes();
} }
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
ctx.fireChannelReadComplete();
} }
@Override @Override

View File

@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelException; import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -234,6 +235,12 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
private int packetLength; private int packetLength;
/**
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
*/
private boolean firedChannelRead;
private volatile long handshakeTimeoutMillis = 10000; private volatile long handshakeTimeoutMillis = 10000;
private volatile long closeNotifyTimeoutMillis = 3000; private volatile long closeNotifyTimeoutMillis = 3000;
@ -294,7 +301,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
* {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s * {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s
* and which does not need to do extra memory copies. * and which does not need to do extra memory copies.
*/ */
setCumulator(opensslEngine ? COMPOSITE_CUMULATOR : MERGE_CUMULATOR); setCumulator(opensslEngine? COMPOSITE_CUMULATOR : MERGE_CUMULATOR);
} }
public long getHandshakeTimeoutMillis() { public long getHandshakeTimeoutMillis() {
@ -908,6 +915,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
} }
if (totalLength > 0) { if (totalLength > 0) {
boolean decoded = false;
// The buffer contains one or more full SSL records. // The buffer contains one or more full SSL records.
// Slice out the whole packet so unwrap will only be called with complete packets. // Slice out the whole packet so unwrap will only be called with complete packets.
// Also directly reset the packetLength. This is needed as unwrap(..) may trigger // Also directly reset the packetLength. This is needed as unwrap(..) may trigger
@ -926,12 +935,18 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
ByteBuf copy = ctx.alloc().heapBuffer(totalLength); ByteBuf copy = ctx.alloc().heapBuffer(totalLength);
try { try {
copy.writeBytes(in, startOffset, totalLength); copy.writeBytes(in, startOffset, totalLength);
unwrap(ctx, copy, 0, totalLength); decoded = unwrap(ctx, copy, 0, totalLength);
} finally { } finally {
copy.release(); copy.release();
} }
} else { } else {
unwrap(ctx, in, startOffset, totalLength); decoded = unwrap(ctx, in, startOffset, totalLength);
}
if (!firedChannelRead) {
// Check first if firedChannelRead is not set yet as it may have been set in a
// previous decode(...) call.
firedChannelRead = decoded;
} }
} }
@ -947,17 +962,23 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// Discard bytes of the cumulation buffer if needed.
discardSomeReadBytes();
if (needsFlush) { if (needsFlush) {
needsFlush = false; needsFlush = false;
ctx.flush(); ctx.flush();
} }
// If handshake is not finished yet, we need more data. // If handshake is not finished yet, we need more data.
if (!handshakePromise.isDone() && !ctx.channel().config().isAutoRead()) { if (!ctx.channel().config().isAutoRead() && (!firedChannelRead || !handshakePromise.isDone())) {
// No auto-read used and no message passed through the ChannelPipeline or the handhshake was not complete
// yet, which means we need to trigger the read to ensure we not encounter any stalls.
ctx.read(); ctx.read();
} }
super.channelReadComplete(ctx); firedChannelRead = false;
ctx.fireChannelReadComplete();
} }
/** /**
@ -970,9 +991,10 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
/** /**
* Unwraps inbound SSL records. * Unwraps inbound SSL records.
*/ */
private void unwrap( private boolean unwrap(
ChannelHandlerContext ctx, ByteBuf packet, int offset, int length) throws SSLException { ChannelHandlerContext ctx, ByteBuf packet, int offset, int length) throws SSLException {
boolean decoded = false;
boolean wrapLater = false; boolean wrapLater = false;
boolean notifyClosure = false; boolean notifyClosure = false;
ByteBuf decodeOut = allocate(ctx, length); ByteBuf decodeOut = allocate(ctx, length);
@ -1041,11 +1063,14 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
throw e; throw e;
} finally { } finally {
if (decodeOut.isReadable()) { if (decodeOut.isReadable()) {
decoded = true;
ctx.fireChannelRead(decodeOut); ctx.fireChannelRead(decodeOut);
} else { } else {
decodeOut.release(); decodeOut.release();
} }
} }
return decoded;
} }
private SSLEngineResult unwrap( private SSLEngineResult unwrap(