Add headers to MqttMessage returned by MqttDecoder in case of DecoderException (#8219)

Motivation:
When the MqttDecoder decodes a message larger than the 'maxBytesInMessage' a DecoderException is thrown and a MqttMessage with just the failure cause is returned. Even if I can't handle the message, I might want to send an ACK so that I won't have to worry about it again.

Modification:
The DecoderException is thrown after the variableHeader is decoded. The fixed and variable headers are then added to the MqttMessage along with the failure cause.

Result:
The invalid MqttMessage will have headers if available.
This commit is contained in:
Chi-Joung So 2018-08-31 15:06:09 +02:00 committed by Norman Maurer
parent a644563625
commit a9863f8128
3 changed files with 173 additions and 5 deletions

View File

@ -82,11 +82,11 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
}
case READ_VARIABLE_HEADER: try {
final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
if (bytesRemainingInVariablePart > maxBytesInMessage) {
throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes");
}
final Result<?> decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
variableHeader = decodedVariableHeader.value;
bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
checkpoint(DecoderState.READ_PAYLOAD);
// fall through
@ -133,7 +133,7 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
private MqttMessage invalidMessage(Throwable cause) {
checkpoint(DecoderState.BAD_MESSAGE);
return MqttMessageFactory.newInvalidMessage(cause);
return MqttMessageFactory.newInvalidMessage(mqttFixedHeader, variableHeader, cause);
}
/**

View File

@ -85,5 +85,10 @@ public final class MqttMessageFactory {
return new MqttMessage(null, null, null, DecoderResult.failure(cause));
}
public static MqttMessage newInvalidMessage(MqttFixedHeader mqttFixedHeader, Object variableHeader,
Throwable cause) {
return new MqttMessage(mqttFixedHeader, variableHeader, null, DecoderResult.failure(cause));
}
private MqttMessageFactory() { }
}

View File

@ -58,6 +58,11 @@ public class MqttCodecTest {
private final MqttDecoder mqttDecoder = new MqttDecoder();
/**
* MqttDecoder with an unrealistic max payload size of 1 byte.
*/
private final MqttDecoder mqttDecoderLimitedMessageSize = new MqttDecoder(1);
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
@ -297,6 +302,154 @@ public class MqttCodecTest {
}
}
@Test
public void testConnectMessageForMqtt31TooLarge() throws Exception {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateConnectVariableHeader(message.variableHeader(),
(MqttConnectVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
}
@Test
public void testConnectMessageForMqtt311TooLarge() throws Exception {
final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateConnectVariableHeader(message.variableHeader(),
(MqttConnectVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
}
@Test
public void testConnAckMessageTooLarge() throws Exception {
final MqttConnAckMessage message = createConnAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
}
@Test
public void testPublishMessageTooLarge() throws Exception {
final MqttPublishMessage message = createPublishMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validatePublishVariableHeader(message.variableHeader(),
(MqttPublishVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
}
@Test
public void testSubscribeMessageTooLarge() throws Exception {
final MqttSubscribeMessage message = createSubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
}
@Test
public void testSubAckMessageTooLarge() throws Exception {
final MqttSubAckMessage message = createSubAckMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
}
@Test
public void testUnSubscribeMessageTooLarge() throws Exception {
final MqttUnsubscribeMessage message = createUnsubscribeMessage();
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
try {
final List<Object> out = new LinkedList<Object>();
mqttDecoderLimitedMessageSize.decode(ctx, byteBuf, out);
assertEquals("Expected one object but got " + out.size(), 1, out.size());
final MqttMessage decodedMessage = (MqttMessage) out.get(0);
validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader());
validateMessageIdVariableHeader(message.variableHeader(),
(MqttMessageIdVariableHeader) decodedMessage.variableHeader());
validateDecoderExceptionTooLargeMessage(decodedMessage);
} finally {
byteBuf.release();
}
}
private void testMessageWithOnlyFixedHeader(MqttMessageType messageType) throws Exception {
MqttMessage message = createMessageWithFixedHeader(messageType);
ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message);
@ -340,7 +493,7 @@ public class MqttCodecTest {
new MqttFixedHeader(
messageType,
false,
messageType == MqttMessageType.PUBREL ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE,
messageType == MqttMessageType.PUBREL ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE,
false,
0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345);
@ -378,7 +531,7 @@ public class MqttCodecTest {
MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, true, 0);
MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader("/abc", 1234);
ByteBuf payload = ALLOCATOR.buffer();
ByteBuf payload = ALLOCATOR.buffer();
payload.writeBytes("whatever".getBytes(CharsetUtil.UTF_8));
return new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, payload);
}
@ -530,4 +683,14 @@ public class MqttCodecTest {
expected.topics().toArray(),
actual.topics().toArray());
}
private static void validateDecoderExceptionTooLargeMessage(MqttMessage message) {
assertNull("MqttMessage payload expected null ", message.payload());
assertTrue(message.decoderResult().isFailure());
Throwable cause = message.decoderResult().cause();
assertTrue("MqttMessage DecoderResult cause expected instance of DecoderException ",
cause instanceof DecoderException);
assertTrue("MqttMessage DecoderResult cause reason expect to contain 'too large message' ",
cause.getMessage().contains("too large message:"));
}
}