Fix handling of non-auto read for ByteToMessageDecoder and SslHandler
Motivation: Our automatically handling of non-auto-read failed because it not detected the need of calling read again by itself if nothing was decoded. Beside this handling of non-auto-read never worked for SslHandler as it always triggered a read even if it decoded a message and auto-read was false. This fixes [#3529] and [#3587]. Modifications: - Implement handling of calling read when nothing was decoded (with non-auto-read) to ByteToMessageDecoder again - Correctly respect non-auto-read by SslHandler Result: No more stales and correctly respecting of non-auto-read by SslHandler.
This commit is contained in:
parent
3b1bf6348a
commit
5cd541c537
@ -46,6 +46,7 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
|
||||
private SpdySettingsFrame spdySettingsFrame;
|
||||
|
||||
private ChannelHandlerContext ctx;
|
||||
private boolean read;
|
||||
|
||||
/**
|
||||
* Creates a new instance with the specified {@code version} and
|
||||
@ -95,6 +96,17 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
|
||||
spdyFrameDecoder.decode(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
if (!read) {
|
||||
if (!ctx.channel().config().isAutoRead()) {
|
||||
ctx.read();
|
||||
}
|
||||
}
|
||||
read = false;
|
||||
super.channelReadComplete(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
|
||||
ctx.bind(localAddress, promise);
|
||||
@ -252,6 +264,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
|
||||
|
||||
@Override
|
||||
public void readDataFrame(int streamId, boolean last, ByteBuf data) {
|
||||
read = true;
|
||||
|
||||
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId, data);
|
||||
spdyDataFrame.setLast(last);
|
||||
ctx.fireChannelRead(spdyDataFrame);
|
||||
@ -275,12 +289,16 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
|
||||
|
||||
@Override
|
||||
public void readRstStreamFrame(int streamId, int statusCode) {
|
||||
read = true;
|
||||
|
||||
SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, statusCode);
|
||||
ctx.fireChannelRead(spdyRstStreamFrame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readSettingsFrame(boolean clearPersisted) {
|
||||
read = true;
|
||||
|
||||
spdySettingsFrame = new DefaultSpdySettingsFrame();
|
||||
spdySettingsFrame.setClearPreviouslyPersistedSettings(clearPersisted);
|
||||
}
|
||||
@ -292,6 +310,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
|
||||
|
||||
@Override
|
||||
public void readSettingsEnd() {
|
||||
read = true;
|
||||
|
||||
Object frame = spdySettingsFrame;
|
||||
spdySettingsFrame = null;
|
||||
ctx.fireChannelRead(frame);
|
||||
@ -299,12 +319,16 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
|
||||
|
||||
@Override
|
||||
public void readPingFrame(int id) {
|
||||
read = true;
|
||||
|
||||
SpdyPingFrame spdyPingFrame = new DefaultSpdyPingFrame(id);
|
||||
ctx.fireChannelRead(spdyPingFrame);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readGoAwayFrame(int lastGoodStreamId, int statusCode) {
|
||||
read = true;
|
||||
|
||||
SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, statusCode);
|
||||
ctx.fireChannelRead(spdyGoAwayFrame);
|
||||
}
|
||||
@ -317,6 +341,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
|
||||
|
||||
@Override
|
||||
public void readWindowUpdateFrame(int streamId, int deltaWindowSize) {
|
||||
read = true;
|
||||
|
||||
SpdyWindowUpdateFrame spdyWindowUpdateFrame = new DefaultSpdyWindowUpdateFrame(streamId, deltaWindowSize);
|
||||
ctx.fireChannelRead(spdyWindowUpdateFrame);
|
||||
}
|
||||
@ -343,6 +369,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder
|
||||
ctx.fireExceptionCaught(e);
|
||||
}
|
||||
if (frame != null) {
|
||||
read = true;
|
||||
|
||||
ctx.fireChannelRead(frame);
|
||||
}
|
||||
}
|
||||
|
@ -252,6 +252,17 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
discardSomeReadBytes();
|
||||
if (decodeWasNull) {
|
||||
decodeWasNull = false;
|
||||
if (!ctx.channel().config().isAutoRead()) {
|
||||
ctx.read();
|
||||
}
|
||||
}
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
|
||||
protected final void discardSomeReadBytes() {
|
||||
if (cumulation != null && !first && cumulation.refCnt() == 1) {
|
||||
// discard some bytes if possible to make more room in the
|
||||
// buffer but only if the refCnt == 1 as otherwise the user may have
|
||||
@ -262,13 +273,6 @@ public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter
|
||||
// - https://github.com/netty/netty/issues/1764
|
||||
cumulation.discardSomeReadBytes();
|
||||
}
|
||||
if (decodeWasNull) {
|
||||
decodeWasNull = false;
|
||||
if (!ctx.channel().config().isAutoRead()) {
|
||||
ctx.read();
|
||||
}
|
||||
}
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil;
|
||||
import io.netty.buffer.CompositeByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelConfig;
|
||||
import io.netty.channel.ChannelException;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
@ -234,6 +235,12 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
|
||||
private int packetLength;
|
||||
|
||||
/**
|
||||
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
|
||||
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
|
||||
*/
|
||||
private boolean firedChannelRead;
|
||||
|
||||
private volatile long handshakeTimeoutMillis = 10000;
|
||||
private volatile long closeNotifyTimeoutMillis = 3000;
|
||||
|
||||
@ -294,7 +301,7 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
* {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s
|
||||
* and which does not need to do extra memory copies.
|
||||
*/
|
||||
setCumulator(opensslEngine ? COMPOSITE_CUMULATOR : MERGE_CUMULATOR);
|
||||
setCumulator(opensslEngine? COMPOSITE_CUMULATOR : MERGE_CUMULATOR);
|
||||
}
|
||||
|
||||
public long getHandshakeTimeoutMillis() {
|
||||
@ -908,6 +915,8 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
}
|
||||
|
||||
if (totalLength > 0) {
|
||||
boolean decoded = false;
|
||||
|
||||
// The buffer contains one or more full SSL records.
|
||||
// Slice out the whole packet so unwrap will only be called with complete packets.
|
||||
// Also directly reset the packetLength. This is needed as unwrap(..) may trigger
|
||||
@ -926,12 +935,18 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
ByteBuf copy = ctx.alloc().heapBuffer(totalLength);
|
||||
try {
|
||||
copy.writeBytes(in, startOffset, totalLength);
|
||||
unwrap(ctx, copy, 0, totalLength);
|
||||
decoded = unwrap(ctx, copy, 0, totalLength);
|
||||
} finally {
|
||||
copy.release();
|
||||
}
|
||||
} else {
|
||||
unwrap(ctx, in, startOffset, totalLength);
|
||||
decoded = unwrap(ctx, in, startOffset, totalLength);
|
||||
}
|
||||
|
||||
if (!firedChannelRead) {
|
||||
// Check first if firedChannelRead is not set yet as it may have been set in a
|
||||
// previous decode(...) call.
|
||||
firedChannelRead = decoded;
|
||||
}
|
||||
}
|
||||
|
||||
@ -947,17 +962,23 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
// Discard bytes of the cumulation buffer if needed.
|
||||
discardSomeReadBytes();
|
||||
|
||||
if (needsFlush) {
|
||||
needsFlush = false;
|
||||
ctx.flush();
|
||||
}
|
||||
|
||||
// If handshake is not finished yet, we need more data.
|
||||
if (!handshakePromise.isDone() && !ctx.channel().config().isAutoRead()) {
|
||||
if (!ctx.channel().config().isAutoRead() && (!firedChannelRead || !handshakePromise.isDone())) {
|
||||
// No auto-read used and no message passed through the ChannelPipeline or the handhshake was not complete
|
||||
// yet, which means we need to trigger the read to ensure we not encounter any stalls.
|
||||
ctx.read();
|
||||
}
|
||||
|
||||
super.channelReadComplete(ctx);
|
||||
firedChannelRead = false;
|
||||
ctx.fireChannelReadComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -970,9 +991,10 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
/**
|
||||
* Unwraps inbound SSL records.
|
||||
*/
|
||||
private void unwrap(
|
||||
private boolean unwrap(
|
||||
ChannelHandlerContext ctx, ByteBuf packet, int offset, int length) throws SSLException {
|
||||
|
||||
boolean decoded = false;
|
||||
boolean wrapLater = false;
|
||||
boolean notifyClosure = false;
|
||||
ByteBuf decodeOut = allocate(ctx, length);
|
||||
@ -1041,11 +1063,14 @@ public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundH
|
||||
throw e;
|
||||
} finally {
|
||||
if (decodeOut.isReadable()) {
|
||||
decoded = true;
|
||||
|
||||
ctx.fireChannelRead(decodeOut);
|
||||
} else {
|
||||
decodeOut.release();
|
||||
}
|
||||
}
|
||||
return decoded;
|
||||
}
|
||||
|
||||
private SSLEngineResult unwrap(
|
||||
|
Loading…
Reference in New Issue
Block a user