diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java index 8d94ee48e9..e014f37ac2 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java @@ -392,7 +392,10 @@ public final class MqttDecoder extends ReplayingDecoder { final List grantedQos = new ArrayList(); int numberOfBytesConsumed = 0; while (numberOfBytesConsumed < bytesRemainingInVariablePart) { - int qos = buffer.readUnsignedByte() & 0x03; + int qos = buffer.readUnsignedByte(); + if (qos != MqttQoS.FAILURE.value()) { + qos &= 0x03; + } numberOfBytesConsumed++; grantedQos.add(qos); } diff --git a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java index 1a5e0ccfd4..f69387ef21 100644 --- a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java +++ b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java @@ -214,6 +214,30 @@ public class MqttCodecTest { validateSubAckPayload(message.payload(), decodedMessage.payload()); } + @Test + public void testSubAckMessageWithFailureInPayload() throws Exception { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345); + MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(MqttQoS.FAILURE.value()); + MqttSubAckMessage message = + new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload); + + ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + + List out = new LinkedList(); + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + + MqttSubAckMessage decodedMessage = (MqttSubAckMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateMessageIdVariableHeader(message.variableHeader(), decodedMessage.variableHeader()); + validateSubAckPayload(message.payload(), decodedMessage.payload()); + assertEquals(1, decodedMessage.payload().grantedQoSLevels().size()); + assertEquals(MqttQoS.FAILURE, MqttQoS.valueOf(decodedMessage.payload().grantedQoSLevels().get(0))); + } + @Test public void testUnSubscribeMessage() throws Exception { final MqttUnsubscribeMessage message = createUnsubscribeMessage();