diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java index 59c7dfd271..81c17d25bc 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java @@ -287,18 +287,22 @@ public final class MqttEncoder extends MessageToMessageEncoder { // Payload for (MqttTopicSubscription topic : payload.topicSubscriptions()) { writeUnsafeUTF8String(buf, topic.topicName()); - final MqttSubscriptionOption option = topic.option(); + if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_3_1) { + buf.writeByte(topic.qualityOfService().value()); + } else { + final MqttSubscriptionOption option = topic.option(); - int optionEncoded = option.retainHandling().value() << 4; - if (option.isRetainAsPublished()) { - optionEncoded |= 0x08; - } - if (option.isNoLocal()) { - optionEncoded |= 0x04; - } - optionEncoded |= option.qos().value(); + int optionEncoded = option.retainHandling().value() << 4; + if (option.isRetainAsPublished()) { + optionEncoded |= 0x08; + } + if (option.isNoLocal()) { + optionEncoded |= 0x04; + } + optionEncoded |= option.qos().value(); - buf.writeByte(optionEncoded); + buf.writeByte(optionEncoded); + } } return buf; diff --git a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java index 22f6f8ee94..e5459d5446 100644 --- a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java +++ b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java @@ -620,6 +620,43 @@ public class MqttCodecTest { validateSubscribePayload(message.payload(), decodedMessage.payload()); } + @Test + public void testSubscribeMessageMqtt5EncodeAsMqtt3() throws Exception { + when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_3_1_1); + + //Set parameters only available in MQTT5 to see if they're dropped when encoding as MQTT3 + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6)); + final MqttSubscribeMessage message = MqttMessageBuilders.subscribe() + .messageId((short) 1) + .properties(props) + .addSubscription("/topic", new MqttSubscriptionOption(AT_LEAST_ONCE, + true, + true, + SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS)) + .build(); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(MqttSubscribeMessage.class); + mqttDecoder.decode(ctx, byteBuf); + verify(ctx).fireChannelRead(captor.capture()); + final MqttSubscribeMessage decodedMessage = captor.getValue(); + + final MqttSubscribeMessage expectedMessage = MqttMessageBuilders.subscribe() + .messageId((short) 1) + .addSubscription("/topic", MqttSubscriptionOption.onlyFromQos(AT_LEAST_ONCE)) + .build(); + validateFixedHeaders(expectedMessage.fixedHeader(), decodedMessage.fixedHeader()); + final MqttMessageIdAndPropertiesVariableHeader expectedHeader = + (MqttMessageIdAndPropertiesVariableHeader) expectedMessage.variableHeader(); + final MqttMessageIdAndPropertiesVariableHeader actualHeader = + (MqttMessageIdAndPropertiesVariableHeader) decodedMessage.variableHeader(); + validatePacketIdAndPropertiesVariableHeader(expectedHeader, actualHeader); + validateSubscribePayload(expectedMessage.payload(), decodedMessage.payload()); + } + @Test public void testUnsubAckMessageForMqtt5() throws Exception { when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5);