Fix handling of non-auto read for ByteToMessageDecoder and SslHandler
Motivation: Our automatically handling of non-auto-read failed because it not detected the need of calling read again by itself if nothing was decoded. Beside this handling of non-auto-read never worked for SslHandler as it always triggered a read even if it decoded a message and auto-read was false. This fixes [#3529] and [#3587]. Modifications: - Implement handling of calling read when nothing was decoded (with non-auto-read) to ByteToMessageDecoder again - Correctly respect non-auto-read by SslHandler Result: No more stales and correctly respecting of non-auto-read by SslHandler.
This commit is contained in:
parent
71838342ce
commit
52878880b4
@ -43,6 +43,7 @@ public class SpdyFrameCodec extends ByteToMessageDecoder implements SpdyFrameDec
|
|||||||
private SpdySettingsFrame spdySettingsFrame;
|
private SpdySettingsFrame spdySettingsFrame;
|
||||||
|
|
||||||
private ChannelHandlerContext ctx;
|
private ChannelHandlerContext ctx;
|
||||||
|
private boolean read;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new instance with the specified {@code version} and
|
* Creates a new instance with the specified {@code version} and
|
||||||
@ -92,6 +93,17 @@ public class SpdyFrameCodec extends ByteToMessageDecoder implements SpdyFrameDec
|
|||||||
spdyFrameDecoder.decode(in);
|
spdyFrameDecoder.decode(in);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
if (!read) {
|
||||||
|
if (!ctx.channel().config().isAutoRead()) {
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
read = false;
|
||||||
|
super.channelReadComplete(ctx);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||||
ByteBuf frame;
|
ByteBuf frame;
|
||||||
@ -213,6 +225,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder implements SpdyFrameDec
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readDataFrame(int streamId, boolean last, ByteBuf data) {
|
public void readDataFrame(int streamId, boolean last, ByteBuf data) {
|
||||||
|
read = true;
|
||||||
|
|
||||||
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId, data);
|
SpdyDataFrame spdyDataFrame = new DefaultSpdyDataFrame(streamId, data);
|
||||||
spdyDataFrame.setLast(last);
|
spdyDataFrame.setLast(last);
|
||||||
ctx.fireChannelRead(spdyDataFrame);
|
ctx.fireChannelRead(spdyDataFrame);
|
||||||
@ -236,12 +250,16 @@ public class SpdyFrameCodec extends ByteToMessageDecoder implements SpdyFrameDec
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readRstStreamFrame(int streamId, int statusCode) {
|
public void readRstStreamFrame(int streamId, int statusCode) {
|
||||||
|
read = true;
|
||||||
|
|
||||||
SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, statusCode);
|
SpdyRstStreamFrame spdyRstStreamFrame = new DefaultSpdyRstStreamFrame(streamId, statusCode);
|
||||||
ctx.fireChannelRead(spdyRstStreamFrame);
|
ctx.fireChannelRead(spdyRstStreamFrame);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readSettingsFrame(boolean clearPersisted) {
|
public void readSettingsFrame(boolean clearPersisted) {
|
||||||
|
read = true;
|
||||||
|
|
||||||
spdySettingsFrame = new DefaultSpdySettingsFrame();
|
spdySettingsFrame = new DefaultSpdySettingsFrame();
|
||||||
spdySettingsFrame.setClearPreviouslyPersistedSettings(clearPersisted);
|
spdySettingsFrame.setClearPreviouslyPersistedSettings(clearPersisted);
|
||||||
}
|
}
|
||||||
@ -253,6 +271,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder implements SpdyFrameDec
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readSettingsEnd() {
|
public void readSettingsEnd() {
|
||||||
|
read = true;
|
||||||
|
|
||||||
Object frame = spdySettingsFrame;
|
Object frame = spdySettingsFrame;
|
||||||
spdySettingsFrame = null;
|
spdySettingsFrame = null;
|
||||||
ctx.fireChannelRead(frame);
|
ctx.fireChannelRead(frame);
|
||||||
@ -260,12 +280,16 @@ public class SpdyFrameCodec extends ByteToMessageDecoder implements SpdyFrameDec
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readPingFrame(int id) {
|
public void readPingFrame(int id) {
|
||||||
|
read = true;
|
||||||
|
|
||||||
SpdyPingFrame spdyPingFrame = new DefaultSpdyPingFrame(id);
|
SpdyPingFrame spdyPingFrame = new DefaultSpdyPingFrame(id);
|
||||||
ctx.fireChannelRead(spdyPingFrame);
|
ctx.fireChannelRead(spdyPingFrame);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readGoAwayFrame(int lastGoodStreamId, int statusCode) {
|
public void readGoAwayFrame(int lastGoodStreamId, int statusCode) {
|
||||||
|
read = true;
|
||||||
|
|
||||||
SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, statusCode);
|
SpdyGoAwayFrame spdyGoAwayFrame = new DefaultSpdyGoAwayFrame(lastGoodStreamId, statusCode);
|
||||||
ctx.fireChannelRead(spdyGoAwayFrame);
|
ctx.fireChannelRead(spdyGoAwayFrame);
|
||||||
}
|
}
|
||||||
@ -278,6 +302,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder implements SpdyFrameDec
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void readWindowUpdateFrame(int streamId, int deltaWindowSize) {
|
public void readWindowUpdateFrame(int streamId, int deltaWindowSize) {
|
||||||
|
read = true;
|
||||||
|
|
||||||
SpdyWindowUpdateFrame spdyWindowUpdateFrame = new DefaultSpdyWindowUpdateFrame(streamId, deltaWindowSize);
|
SpdyWindowUpdateFrame spdyWindowUpdateFrame = new DefaultSpdyWindowUpdateFrame(streamId, deltaWindowSize);
|
||||||
ctx.fireChannelRead(spdyWindowUpdateFrame);
|
ctx.fireChannelRead(spdyWindowUpdateFrame);
|
||||||
}
|
}
|
||||||
@ -304,6 +330,8 @@ public class SpdyFrameCodec extends ByteToMessageDecoder implements SpdyFrameDec
|
|||||||
ctx.fireExceptionCaught(e);
|
ctx.fireExceptionCaught(e);
|
||||||
}
|
}
|
||||||
if (frame != null) {
|
if (frame != null) {
|
||||||
|
read = true;
|
||||||
|
|
||||||
ctx.fireChannelRead(frame);
|
ctx.fireChannelRead(frame);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -253,6 +253,17 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
discardSomeReadBytes();
|
||||||
|
if (decodeWasNull) {
|
||||||
|
decodeWasNull = false;
|
||||||
|
if (!ctx.channel().config().isAutoRead()) {
|
||||||
|
ctx.read();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ctx.fireChannelReadComplete();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final void discardSomeReadBytes() {
|
||||||
if (cumulation != null && !first && cumulation.refCnt() == 1) {
|
if (cumulation != null && !first && cumulation.refCnt() == 1) {
|
||||||
// discard some bytes if possible to make more room in the
|
// discard some bytes if possible to make more room in the
|
||||||
// buffer but only if the refCnt == 1 as otherwise the user may have
|
// buffer but only if the refCnt == 1 as otherwise the user may have
|
||||||
@ -263,13 +274,6 @@ public abstract class ByteToMessageDecoder extends ChannelHandlerAdapter {
|
|||||||
// - https://github.com/netty/netty/issues/1764
|
// - https://github.com/netty/netty/issues/1764
|
||||||
cumulation.discardSomeReadBytes();
|
cumulation.discardSomeReadBytes();
|
||||||
}
|
}
|
||||||
if (decodeWasNull) {
|
|
||||||
decodeWasNull = false;
|
|
||||||
if (!ctx.channel().config().isAutoRead()) {
|
|
||||||
ctx.read();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ctx.fireChannelReadComplete();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil;
|
|||||||
import io.netty.buffer.CompositeByteBuf;
|
import io.netty.buffer.CompositeByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.channel.ChannelConfig;
|
||||||
import io.netty.channel.ChannelException;
|
import io.netty.channel.ChannelException;
|
||||||
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelFuture;
|
||||||
import io.netty.channel.ChannelFutureListener;
|
import io.netty.channel.ChannelFutureListener;
|
||||||
@ -228,6 +229,12 @@ public class SslHandler extends ByteToMessageDecoder {
|
|||||||
|
|
||||||
private int packetLength;
|
private int packetLength;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
|
||||||
|
* when {@link ChannelConfig#isAutoRead()} is {@code false}.
|
||||||
|
*/
|
||||||
|
private boolean firedChannelRead;
|
||||||
|
|
||||||
private volatile long handshakeTimeoutMillis = 10000;
|
private volatile long handshakeTimeoutMillis = 10000;
|
||||||
private volatile long closeNotifyTimeoutMillis = 3000;
|
private volatile long closeNotifyTimeoutMillis = 3000;
|
||||||
|
|
||||||
@ -267,7 +274,7 @@ public class SslHandler extends ByteToMessageDecoder {
|
|||||||
* {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s
|
* {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s
|
||||||
* and which does not need to do extra memory copies.
|
* and which does not need to do extra memory copies.
|
||||||
*/
|
*/
|
||||||
setCumulator(opensslEngine ? COMPOSITE_CUMULATOR : MERGE_CUMULATOR);
|
setCumulator(opensslEngine? COMPOSITE_CUMULATOR : MERGE_CUMULATOR);
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getHandshakeTimeoutMillis() {
|
public long getHandshakeTimeoutMillis() {
|
||||||
@ -865,6 +872,8 @@ public class SslHandler extends ByteToMessageDecoder {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (totalLength > 0) {
|
if (totalLength > 0) {
|
||||||
|
boolean decoded = false;
|
||||||
|
|
||||||
// The buffer contains one or more full SSL records.
|
// The buffer contains one or more full SSL records.
|
||||||
// Slice out the whole packet so unwrap will only be called with complete packets.
|
// Slice out the whole packet so unwrap will only be called with complete packets.
|
||||||
// Also directly reset the packetLength. This is needed as unwrap(..) may trigger
|
// Also directly reset the packetLength. This is needed as unwrap(..) may trigger
|
||||||
@ -883,12 +892,18 @@ public class SslHandler extends ByteToMessageDecoder {
|
|||||||
ByteBuf copy = ctx.alloc().heapBuffer(totalLength);
|
ByteBuf copy = ctx.alloc().heapBuffer(totalLength);
|
||||||
try {
|
try {
|
||||||
copy.writeBytes(in, startOffset, totalLength);
|
copy.writeBytes(in, startOffset, totalLength);
|
||||||
unwrap(ctx, copy, 0, totalLength);
|
decoded = unwrap(ctx, copy, 0, totalLength);
|
||||||
} finally {
|
} finally {
|
||||||
copy.release();
|
copy.release();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
unwrap(ctx, in, startOffset, totalLength);
|
decoded = unwrap(ctx, in, startOffset, totalLength);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!firedChannelRead) {
|
||||||
|
// Check first if firedChannelRead is not set yet as it may have been set in a
|
||||||
|
// previous decode(...) call.
|
||||||
|
firedChannelRead = decoded;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -904,17 +919,23 @@ public class SslHandler extends ByteToMessageDecoder {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// Discard bytes of the cumulation buffer if needed.
|
||||||
|
discardSomeReadBytes();
|
||||||
|
|
||||||
if (needsFlush) {
|
if (needsFlush) {
|
||||||
needsFlush = false;
|
needsFlush = false;
|
||||||
ctx.flush();
|
ctx.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
// If handshake is not finished yet, we need more data.
|
// If handshake is not finished yet, we need more data.
|
||||||
if (!handshakePromise.isDone() && !ctx.channel().config().isAutoRead()) {
|
if (!ctx.channel().config().isAutoRead() && (!firedChannelRead || !handshakePromise.isDone())) {
|
||||||
|
// No auto-read used and no message passed through the ChannelPipeline or the handhshake was not complete
|
||||||
|
// yet, which means we need to trigger the read to ensure we not encounter any stalls.
|
||||||
ctx.read();
|
ctx.read();
|
||||||
}
|
}
|
||||||
|
|
||||||
super.channelReadComplete(ctx);
|
firedChannelRead = false;
|
||||||
|
ctx.fireChannelReadComplete();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -927,9 +948,10 @@ public class SslHandler extends ByteToMessageDecoder {
|
|||||||
/**
|
/**
|
||||||
* Unwraps inbound SSL records.
|
* Unwraps inbound SSL records.
|
||||||
*/
|
*/
|
||||||
private void unwrap(
|
private boolean unwrap(
|
||||||
ChannelHandlerContext ctx, ByteBuf packet, int offset, int length) throws SSLException {
|
ChannelHandlerContext ctx, ByteBuf packet, int offset, int length) throws SSLException {
|
||||||
|
|
||||||
|
boolean decoded = false;
|
||||||
boolean wrapLater = false;
|
boolean wrapLater = false;
|
||||||
boolean notifyClosure = false;
|
boolean notifyClosure = false;
|
||||||
ByteBuf decodeOut = allocate(ctx, length);
|
ByteBuf decodeOut = allocate(ctx, length);
|
||||||
@ -998,11 +1020,14 @@ public class SslHandler extends ByteToMessageDecoder {
|
|||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
if (decodeOut.isReadable()) {
|
if (decodeOut.isReadable()) {
|
||||||
|
decoded = true;
|
||||||
|
|
||||||
ctx.fireChannelRead(decodeOut);
|
ctx.fireChannelRead(decodeOut);
|
||||||
} else {
|
} else {
|
||||||
decodeOut.release();
|
decodeOut.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return decoded;
|
||||||
}
|
}
|
||||||
|
|
||||||
private SSLEngineResult unwrap(
|
private SSLEngineResult unwrap(
|
||||||
|
Loading…
Reference in New Issue
Block a user