Before throwing TooLongFrameException,should skip the bytes to be read in MqttDecoder (#11204)

Motivation:

Before throwing TooLongFrameException, should call the skipBytes method to skip the bytes to be read

Modification:
- skip bytes before throw

Result:
Actually skip the bytes when we detect too much data

Signed-off-by: xingrufei <xingrufei@sogou-inc.com>

Co-authored-by: xingrufei <xingrufei@sogou-inc.com>
This commit is contained in:
skyguard1 2021-04-29 14:13:11 +08:00 committed by Norman Maurer
parent 438632a3ac
commit bf721c84f4
2 changed files with 12 additions and 0 deletions

View File

@ -98,6 +98,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
final Result<?> decodedVariableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader); final Result<?> decodedVariableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value; variableHeader = decodedVariableHeader.value;
if (bytesRemainingInVariablePart > maxBytesInMessage) { if (bytesRemainingInVariablePart > maxBytesInMessage) {
buffer.skipBytes(buffer.readableBytes());
throw new TooLongFrameException("too large message: " + bytesRemainingInVariablePart + " bytes"); throw new TooLongFrameException("too large message: " + bytesRemainingInVariablePart + " bytes");
} }
bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;

View File

@ -327,6 +327,7 @@ public class MqttCodecTest {
verify(ctx).fireChannelRead(captor.capture()); verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue(); final MqttMessage decodedMessage = captor.getValue();
assertEquals(0, byteBuf.readableBytes());
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateConnectVariableHeader(message.variableHeader(), validateConnectVariableHeader(message.variableHeader(),
@ -348,6 +349,7 @@ public class MqttCodecTest {
verify(ctx).fireChannelRead(captor.capture()); verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue(); final MqttMessage decodedMessage = captor.getValue();
assertEquals(0, byteBuf.readableBytes());
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateConnectVariableHeader(message.variableHeader(), validateConnectVariableHeader(message.variableHeader(),
@ -369,6 +371,8 @@ public class MqttCodecTest {
verify(ctx).fireChannelRead(captor.capture()); verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue(); final MqttMessage decodedMessage = captor.getValue();
assertEquals(0, byteBuf.readableBytes());
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage); validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally { } finally {
@ -387,6 +391,7 @@ public class MqttCodecTest {
verify(ctx).fireChannelRead(captor.capture()); verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue(); final MqttMessage decodedMessage = captor.getValue();
assertEquals(0, byteBuf.readableBytes());
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validatePublishVariableHeader(message.variableHeader(), validatePublishVariableHeader(message.variableHeader(),
@ -408,6 +413,8 @@ public class MqttCodecTest {
verify(ctx).fireChannelRead(captor.capture()); verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue(); final MqttMessage decodedMessage = captor.getValue();
assertEquals(0, byteBuf.readableBytes());
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(), validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader()); (MqttMessageIdVariableHeader) decodedMessage.variableHeader());
@ -428,6 +435,8 @@ public class MqttCodecTest {
verify(ctx).fireChannelRead(captor.capture()); verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue(); final MqttMessage decodedMessage = captor.getValue();
assertEquals(0, byteBuf.readableBytes());
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(), validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader()); (MqttMessageIdVariableHeader) decodedMessage.variableHeader());
@ -448,6 +457,8 @@ public class MqttCodecTest {
verify(ctx).fireChannelRead(captor.capture()); verify(ctx).fireChannelRead(captor.capture());
final MqttMessage decodedMessage = captor.getValue(); final MqttMessage decodedMessage = captor.getValue();
assertEquals(0, byteBuf.readableBytes());
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(), validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader()); (MqttMessageIdVariableHeader) decodedMessage.variableHeader());