From 3b47427cf7abcca72442989c29ae5b9561ed918c Mon Sep 17 00:00:00 2001 From: Paul Lysak Date: Mon, 14 Sep 2020 13:04:55 +0300 Subject: [PATCH] Fix NPE with MqttUnsubAckMessage - regression of MQTT5 support (#10557) Recent changes for MQTT5 may cause NPE if UNSUBACK message is created with `MqttMessageFactory` and client code isn't up-to-date. Modifications: Added default body in `MqttUnsubAckMessage` constructor if null body is passed, added null checks in `encodeUnsubAckMessage` Result: `MqttUnsubAckMessage` created with `MqttMessageFactory` doesn't cause NPE even if null body is supplied. --- .../netty/handler/codec/mqtt/MqttEncoder.java | 9 +- .../codec/mqtt/MqttMessageFactory.java | 4 +- ...tMessageIdAndPropertiesVariableHeader.java | 5 + .../mqtt/MqttMessageIdVariableHeader.java | 4 + .../handler/codec/mqtt/MqttSubAckMessage.java | 2 +- .../codec/mqtt/MqttSubscribeMessage.java | 2 +- .../codec/mqtt/MqttUnsubAckMessage.java | 2 +- .../codec/mqtt/MqttUnsubAckPayload.java | 10 ++ .../codec/mqtt/MqttUnsubscribeMessage.java | 2 +- .../handler/codec/mqtt/MqttCodecTest.java | 128 +------------ .../codec/mqtt/MqttMessageFactoryTest.java | 169 ++++++++++++++++++ .../handler/codec/mqtt/MqttTestUtils.java | 155 ++++++++++++++++ 12 files changed, 359 insertions(+), 133 deletions(-) create mode 100644 codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttMessageFactoryTest.java create mode 100644 codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttTestUtils.java diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java index 03eb55caf9..083e27092d 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java @@ -388,7 +388,8 @@ public final class MqttEncoder extends MessageToMessageEncoder { message.idAndPropertiesVariableHeader().properties()); try { int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes(); - int payloadBufferSize = message.payload().unsubscribeReasonCodes().size(); + MqttUnsubAckPayload payload = message.payload(); + int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size(); int variablePartSize = variableHeaderBufferSize + payloadBufferSize; int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize); @@ -397,8 +398,10 @@ public final class MqttEncoder extends MessageToMessageEncoder { buf.writeShort(message.variableHeader().messageId()); buf.writeBytes(propertiesBuf); - for (Short reasonCode : message.payload().unsubscribeReasonCodes()) { - buf.writeByte(reasonCode); + if (payload != null) { + for (Short reasonCode : payload.unsubscribeReasonCodes()) { + buf.writeByte(reasonCode); + } } return buf; diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java index c65df17e4e..61a2c69f86 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java @@ -38,7 +38,7 @@ public final class MqttMessageFactory { case SUBSCRIBE: return new MqttSubscribeMessage( mqttFixedHeader, - (MqttMessageIdAndPropertiesVariableHeader) variableHeader, + (MqttMessageIdVariableHeader) variableHeader, (MqttSubscribePayload) payload); case SUBACK: @@ -56,7 +56,7 @@ public final class MqttMessageFactory { case UNSUBSCRIBE: return new MqttUnsubscribeMessage( mqttFixedHeader, - (MqttMessageIdAndPropertiesVariableHeader) variableHeader, + (MqttMessageIdVariableHeader) variableHeader, (MqttUnsubscribePayload) payload); case PUBLISH: diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdAndPropertiesVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdAndPropertiesVariableHeader.java index b2a32aaf5b..0c42af5e4a 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdAndPropertiesVariableHeader.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdAndPropertiesVariableHeader.java @@ -44,4 +44,9 @@ public final class MqttMessageIdAndPropertiesVariableHeader extends MqttMessageI ", properties=" + properties + ']'; } + + @Override + MqttMessageIdAndPropertiesVariableHeader withDefaultEmptyProperties() { + return this; + } } diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java index e3b9c6e01a..fc400bc20b 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java @@ -53,4 +53,8 @@ public class MqttMessageIdVariableHeader { public MqttMessageIdAndPropertiesVariableHeader withEmptyProperties() { return new MqttMessageIdAndPropertiesVariableHeader(messageId, MqttProperties.NO_PROPERTIES); } + + MqttMessageIdAndPropertiesVariableHeader withDefaultEmptyProperties() { + return withEmptyProperties(); + } } diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java index f0c7719306..109b028456 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java @@ -32,7 +32,7 @@ public final class MqttSubAckMessage extends MqttMessage { MqttFixedHeader mqttFixedHeader, MqttMessageIdVariableHeader variableHeader, MqttSubAckPayload payload) { - this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload); + this(mqttFixedHeader, variableHeader.withDefaultEmptyProperties(), payload); } @Override diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java index d02f3da038..51ae8a19b0 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java @@ -33,7 +33,7 @@ public final class MqttSubscribeMessage extends MqttMessage { MqttFixedHeader mqttFixedHeader, MqttMessageIdVariableHeader variableHeader, MqttSubscribePayload payload) { - this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload); + this(mqttFixedHeader, variableHeader.withDefaultEmptyProperties(), payload); } @Override diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java index 54e61957f9..3469a87fb9 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java @@ -24,7 +24,7 @@ public final class MqttUnsubAckMessage extends MqttMessage { public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader, MqttMessageIdAndPropertiesVariableHeader variableHeader, MqttUnsubAckPayload payload) { - super(mqttFixedHeader, variableHeader, payload); + super(mqttFixedHeader, variableHeader, MqttUnsubAckPayload.withEmptyDefaults(payload)); } public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader, diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckPayload.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckPayload.java index 2ab1d42b11..e330f12799 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckPayload.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckPayload.java @@ -29,6 +29,16 @@ public final class MqttUnsubAckPayload { private final List unsubscribeReasonCodes; + private static final MqttUnsubAckPayload EMPTY = new MqttUnsubAckPayload(); + + public static MqttUnsubAckPayload withEmptyDefaults(MqttUnsubAckPayload payload) { + if (payload == null) { + return EMPTY; + } else { + return payload; + } + } + public MqttUnsubAckPayload(short... unsubscribeReasonCodes) { ObjectUtil.checkNotNull(unsubscribeReasonCodes, "unsubscribeReasonCodes"); diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java index 4523e389ae..c5b4786fa1 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java @@ -33,7 +33,7 @@ public final class MqttUnsubscribeMessage extends MqttMessage { MqttFixedHeader mqttFixedHeader, MqttMessageIdVariableHeader variableHeader, MqttUnsubscribePayload payload) { - this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload); + this(mqttFixedHeader, variableHeader.withDefaultEmptyProperties(), payload); } @Override diff --git a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java index 8ceb404aa9..3bbd252f9a 100644 --- a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java +++ b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java @@ -18,7 +18,6 @@ package io.netty.handler.codec.mqtt; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufUtil; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -30,13 +29,14 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.w3c.dom.Attr; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; +import static io.netty.handler.codec.mqtt.MqttTestUtils.validateProperties; +import static io.netty.handler.codec.mqtt.MqttTestUtils.validateSubscribePayload; +import static io.netty.handler.codec.mqtt.MqttTestUtils.validateUnsubscribePayload; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.*; import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS; @@ -984,33 +984,6 @@ public class MqttCodecTest { assertEquals("MqttMessageIdVariableHeader MessageId mismatch ", expected.messageId(), actual.messageId()); } - private static void validateSubscribePayload(MqttSubscribePayload expected, MqttSubscribePayload actual) { - List expectedTopicSubscriptions = expected.topicSubscriptions(); - List actualTopicSubscriptions = actual.topicSubscriptions(); - - assertEquals( - "MqttSubscribePayload TopicSubscriptionList size mismatch ", - expectedTopicSubscriptions.size(), - actualTopicSubscriptions.size()); - for (int i = 0; i < expectedTopicSubscriptions.size(); i++) { - validateTopicSubscription(expectedTopicSubscriptions.get(i), actualTopicSubscriptions.get(i)); - } - } - - private static void validateTopicSubscription( - MqttTopicSubscription expected, - MqttTopicSubscription actual) { - assertEquals("MqttTopicSubscription TopicName mismatch ", expected.topicName(), actual.topicName()); - assertEquals( - "MqttTopicSubscription Qos mismatch ", - expected.qualityOfService(), - actual.qualityOfService()); - assertEquals( - "MqttTopicSubscription options mismatch ", - expected.option(), - actual.option()); - } - private static void validateSubAckPayload(MqttSubAckPayload expected, MqttSubAckPayload actual) { assertArrayEquals( "MqttSubAckPayload GrantedQosLevels mismatch ", @@ -1018,14 +991,7 @@ public class MqttCodecTest { actual.grantedQoSLevels().toArray()); } - private static void validateUnsubscribePayload(MqttUnsubscribePayload expected, MqttUnsubscribePayload actual) { - assertArrayEquals( - "MqttUnsubscribePayload TopicList mismatch ", - expected.topics().toArray(), - actual.topics().toArray()); - } - - private static void validateDecoderExceptionTooLargeMessage(MqttMessage message) { + private static void validateDecoderExceptionTooLargeMessage(MqttMessage message) { assertNull("MqttMessage payload expected null ", message.payload()); assertTrue(message.decoderResult().isFailure()); Throwable cause = message.decoderResult().cause(); @@ -1065,90 +1031,4 @@ public class MqttCodecTest { final MqttProperties actualProps = actual.properties(); validateProperties(expectedProps, actualProps); } - - private static void validateProperties(MqttProperties expected, MqttProperties actual) { - for (MqttProperties.MqttProperty expectedProperty : expected.listAll()) { - MqttProperties.MqttProperty actualProperty = actual.getProperty(expectedProperty.propertyId); - switch (MqttProperties.MqttPropertyType.valueOf(expectedProperty.propertyId)) { - // one byte value integer property - case PAYLOAD_FORMAT_INDICATOR: - case REQUEST_PROBLEM_INFORMATION: - case REQUEST_RESPONSE_INFORMATION: - case MAXIMUM_QOS: - case RETAIN_AVAILABLE: - case WILDCARD_SUBSCRIPTION_AVAILABLE: - case SUBSCRIPTION_IDENTIFIER_AVAILABLE: - case SHARED_SUBSCRIPTION_AVAILABLE: { - final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; - final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; - assertEquals("one byte property doesn't match", expectedValue, actualValue); - break; - } - // two byte value integer property - case SERVER_KEEP_ALIVE: - case RECEIVE_MAXIMUM: - case TOPIC_ALIAS_MAXIMUM: - case TOPIC_ALIAS: { - final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; - final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; - assertEquals("two byte property doesn't match", expectedValue, actualValue); - break; - } - // four byte value integer property - case PUBLICATION_EXPIRY_INTERVAL: - case SESSION_EXPIRY_INTERVAL: - case WILL_DELAY_INTERVAL: - case MAXIMUM_PACKET_SIZE: { - final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; - final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; - assertEquals("four byte property doesn't match", expectedValue, actualValue); - break; - } - // four byte value integer property - case SUBSCRIPTION_IDENTIFIER: { - final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; - final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; - assertEquals("variable byte integer property doesn't match", expectedValue, actualValue); - break; - } - // UTF-8 string value integer property - case CONTENT_TYPE: - case RESPONSE_TOPIC: - case ASSIGNED_CLIENT_IDENTIFIER: - case AUTHENTICATION_METHOD: - case RESPONSE_INFORMATION: - case SERVER_REFERENCE: - case REASON_STRING: { - final String expectedValue = ((MqttProperties.StringProperty) expectedProperty).value; - final String actualValue = ((MqttProperties.StringProperty) actualProperty).value; - assertEquals("String property doesn't match", expectedValue, actualValue); - break; - } - // User property - case USER_PROPERTY: { - final List expectedPairs = - ((MqttProperties.UserProperties) expectedProperty).value; - final List actualPairs = - ((MqttProperties.UserProperties) actualProperty).value; - assertEquals("User properties count doesn't match", expectedPairs, actualPairs); - for (int i = 0; i < expectedPairs.size(); i++) { - assertEquals("User property mismatch", expectedPairs.get(i), actualPairs.get(i)); - } - break; - } - // byte[] property - case CORRELATION_DATA: - case AUTHENTICATION_DATA: { - final byte[] expectedValue = ((MqttProperties.BinaryProperty) expectedProperty).value; - final byte[] actualValue = ((MqttProperties.BinaryProperty) actualProperty).value; - final String expectedHexDump = ByteBufUtil.hexDump(expectedValue); - final String actualHexDump = ByteBufUtil.hexDump(actualValue); - assertEquals("byte[] property doesn't match", expectedHexDump, actualHexDump); - break; - } - default: - fail("Property Id not recognized " + Integer.toHexString(expectedProperty.propertyId)); - } - } - } } diff --git a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttMessageFactoryTest.java b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttMessageFactoryTest.java new file mode 100644 index 0000000000..e8db260d9c --- /dev/null +++ b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttMessageFactoryTest.java @@ -0,0 +1,169 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; +import static io.netty.handler.codec.mqtt.MqttTestUtils.validateProperties; +import static io.netty.handler.codec.mqtt.MqttTestUtils.validateSubscribePayload; +import static io.netty.handler.codec.mqtt.MqttTestUtils.validateUnsubscribePayload; + +public class MqttMessageFactoryTest { + private static final String SAMPLE_TOPIC = "a/b/c"; + private static final int SAMPLE_MESSAGE_ID = 123; + + @Test + public void createUnsubAckV3() { + MqttFixedHeader fixedHeader = + new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdVariableHeader variableHeader = + MqttMessageIdVariableHeader.from(SAMPLE_MESSAGE_ID); + + MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null); + + assertEquals("Message type mismatch", MqttMessageType.UNSUBACK, unsuback.fixedHeader().messageType()); + MqttMessageIdAndPropertiesVariableHeader actualVariableHeader = + (MqttMessageIdAndPropertiesVariableHeader) unsuback.variableHeader(); + assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId()); + validateProperties(MqttProperties.NO_PROPERTIES, actualVariableHeader.properties()); + MqttUnsubAckPayload actualPayload = (MqttUnsubAckPayload) unsuback.payload(); + assertNotNull("payload", actualPayload); + assertEquals("reason codes size", 0, actualPayload.unsubscribeReasonCodes().size()); + } + + @Test + public void createUnsubAckV5() { + MqttFixedHeader fixedHeader = + new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttProperties properties = new MqttProperties(); + String reasonString = "All right"; + properties.add(new MqttProperties.StringProperty( + MqttProperties.MqttPropertyType.REASON_STRING.value(), + reasonString)); + MqttMessageIdAndPropertiesVariableHeader variableHeader = + new MqttMessageIdAndPropertiesVariableHeader(SAMPLE_MESSAGE_ID, properties); + MqttUnsubAckPayload payload = new MqttUnsubAckPayload((short) 0x80 /*unspecified error*/); + + MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); + + assertEquals("Message type mismatch", MqttMessageType.UNSUBACK, unsuback.fixedHeader().messageType()); + MqttMessageIdAndPropertiesVariableHeader actualVariableHeader = + (MqttMessageIdAndPropertiesVariableHeader) unsuback.variableHeader(); + assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId()); + validateProperties(properties, actualVariableHeader.properties()); + MqttUnsubAckPayload actualPayload = (MqttUnsubAckPayload) unsuback.payload(); + assertEquals("Reason code list doesn't match", + payload.unsubscribeReasonCodes(), + actualPayload.unsubscribeReasonCodes()); + } + + @Test + public void createSubscribeV3() { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, AT_LEAST_ONCE, false, 0); + + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(SAMPLE_MESSAGE_ID); + List subscriptions = new ArrayList(); + subscriptions.add(new MqttTopicSubscription(SAMPLE_TOPIC, MqttQoS.AT_MOST_ONCE)); + + MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions); + + MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); + + assertEquals("Message type mismatch", MqttMessageType.SUBSCRIBE, subscribe.fixedHeader().messageType()); + MqttMessageIdAndPropertiesVariableHeader actualVariableHeader = + (MqttMessageIdAndPropertiesVariableHeader) subscribe.variableHeader(); + assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId()); + validateProperties(MqttProperties.NO_PROPERTIES, actualVariableHeader.properties()); + MqttSubscribePayload actualPayload = (MqttSubscribePayload) subscribe.payload(); + validateSubscribePayload(payload, actualPayload); + } + + @Test + public void createSubscribeV5() { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, AT_LEAST_ONCE, false, 0); + + MqttProperties properties = new MqttProperties(); + properties.add(new MqttProperties.UserProperty("correlationId", "111222")); + MqttMessageIdAndPropertiesVariableHeader variableHeader = + new MqttMessageIdAndPropertiesVariableHeader(SAMPLE_MESSAGE_ID, properties); + List subscriptions = new ArrayList(); + subscriptions.add(new MqttTopicSubscription(SAMPLE_TOPIC, MqttQoS.AT_MOST_ONCE)); + + MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions); + + MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); + + assertEquals("Message type mismatch", MqttMessageType.SUBSCRIBE, subscribe.fixedHeader().messageType()); + MqttMessageIdAndPropertiesVariableHeader actualVariableHeader = + (MqttMessageIdAndPropertiesVariableHeader) subscribe.variableHeader(); + assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId()); + validateProperties(properties, actualVariableHeader.properties()); + MqttSubscribePayload actualPayload = (MqttSubscribePayload) subscribe.payload(); + validateSubscribePayload(payload, actualPayload); + } + + @Test + public void createUnsubscribeV3() { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, AT_LEAST_ONCE, false, 0); + + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(SAMPLE_MESSAGE_ID); + + List topics = new ArrayList(); + topics.add(SAMPLE_TOPIC); + MqttUnsubscribePayload payload = new MqttUnsubscribePayload(topics); + + MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); + + assertEquals("Message type mismatch", MqttMessageType.UNSUBSCRIBE, unsubscribe.fixedHeader().messageType()); + MqttMessageIdAndPropertiesVariableHeader actualVariableHeader = + (MqttMessageIdAndPropertiesVariableHeader) unsubscribe.variableHeader(); + assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId()); + validateProperties(MqttProperties.NO_PROPERTIES, actualVariableHeader.properties()); + MqttUnsubscribePayload actualPayload = (MqttUnsubscribePayload) unsubscribe.payload(); + validateUnsubscribePayload(payload, actualPayload); + } + + @Test + public void createUnsubscribeV5() { + MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, AT_LEAST_ONCE, false, 0); + + MqttProperties properties = new MqttProperties(); + properties.add(new MqttProperties.UserProperty("correlationId", "111222")); + MqttMessageIdAndPropertiesVariableHeader variableHeader = + new MqttMessageIdAndPropertiesVariableHeader(SAMPLE_MESSAGE_ID, properties); + + List topics = new ArrayList(); + topics.add(SAMPLE_TOPIC); + MqttUnsubscribePayload payload = new MqttUnsubscribePayload(topics); + + MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload); + + assertEquals("Message type mismatch", MqttMessageType.UNSUBSCRIBE, unsubscribe.fixedHeader().messageType()); + MqttMessageIdAndPropertiesVariableHeader actualVariableHeader = + (MqttMessageIdAndPropertiesVariableHeader) unsubscribe.variableHeader(); + assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId()); + validateProperties(properties, actualVariableHeader.properties()); + MqttUnsubscribePayload actualPayload = (MqttUnsubscribePayload) unsubscribe.payload(); + validateUnsubscribePayload(payload, actualPayload); + } +} diff --git a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttTestUtils.java b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttTestUtils.java new file mode 100644 index 0000000000..1eec83b4bb --- /dev/null +++ b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttTestUtils.java @@ -0,0 +1,155 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.buffer.ByteBufUtil; + +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public final class MqttTestUtils { + private MqttTestUtils() { + } + + public static void validateProperties(MqttProperties expected, MqttProperties actual) { + for (MqttProperties.MqttProperty expectedProperty : expected.listAll()) { + MqttProperties.MqttProperty actualProperty = actual.getProperty(expectedProperty.propertyId); + switch (MqttProperties.MqttPropertyType.valueOf(expectedProperty.propertyId)) { + // one byte value integer property + case PAYLOAD_FORMAT_INDICATOR: + case REQUEST_PROBLEM_INFORMATION: + case REQUEST_RESPONSE_INFORMATION: + case MAXIMUM_QOS: + case RETAIN_AVAILABLE: + case WILDCARD_SUBSCRIPTION_AVAILABLE: + case SUBSCRIPTION_IDENTIFIER_AVAILABLE: + case SHARED_SUBSCRIPTION_AVAILABLE: { + final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; + final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; + assertEquals("one byte property doesn't match", expectedValue, actualValue); + break; + } + // two byte value integer property + case SERVER_KEEP_ALIVE: + case RECEIVE_MAXIMUM: + case TOPIC_ALIAS_MAXIMUM: + case TOPIC_ALIAS: { + final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; + final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; + assertEquals("two byte property doesn't match", expectedValue, actualValue); + break; + } + // four byte value integer property + case PUBLICATION_EXPIRY_INTERVAL: + case SESSION_EXPIRY_INTERVAL: + case WILL_DELAY_INTERVAL: + case MAXIMUM_PACKET_SIZE: { + final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; + final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; + assertEquals("four byte property doesn't match", expectedValue, actualValue); + break; + } + // four byte value integer property + case SUBSCRIPTION_IDENTIFIER: { + final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; + final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; + assertEquals("variable byte integer property doesn't match", expectedValue, actualValue); + break; + } + // UTF-8 string value integer property + case CONTENT_TYPE: + case RESPONSE_TOPIC: + case ASSIGNED_CLIENT_IDENTIFIER: + case AUTHENTICATION_METHOD: + case RESPONSE_INFORMATION: + case SERVER_REFERENCE: + case REASON_STRING: { + final String expectedValue = ((MqttProperties.StringProperty) expectedProperty).value; + final String actualValue = ((MqttProperties.StringProperty) actualProperty).value; + assertEquals("String property doesn't match", expectedValue, actualValue); + break; + } + // User property + case USER_PROPERTY: { + final List expectedPairs = + ((MqttProperties.UserProperties) expectedProperty).value; + final List actualPairs = + ((MqttProperties.UserProperties) actualProperty).value; + assertEquals("User properties count doesn't match", expectedPairs, actualPairs); + for (int i = 0; i < expectedPairs.size(); i++) { + assertEquals("User property mismatch", expectedPairs.get(i), actualPairs.get(i)); + } + break; + } + // byte[] property + case CORRELATION_DATA: + case AUTHENTICATION_DATA: { + final byte[] expectedValue = ((MqttProperties.BinaryProperty) expectedProperty).value; + final byte[] actualValue = ((MqttProperties.BinaryProperty) actualProperty).value; + final String expectedHexDump = ByteBufUtil.hexDump(expectedValue); + final String actualHexDump = ByteBufUtil.hexDump(actualValue); + assertEquals("byte[] property doesn't match", expectedHexDump, actualHexDump); + break; + } + default: + fail("Property Id not recognized " + Integer.toHexString(expectedProperty.propertyId)); + } + } + for (MqttProperties.MqttProperty actualProperty : actual.listAll()) { + MqttProperties.MqttProperty expectedProperty = expected.getProperty(actualProperty.propertyId); + assertNotNull("Property " + actualProperty.propertyId + " not expected", expectedProperty); + } + } + + public static void validateSubscribePayload(MqttSubscribePayload expected, MqttSubscribePayload actual) { + List expectedTopicSubscriptions = expected.topicSubscriptions(); + List actualTopicSubscriptions = actual.topicSubscriptions(); + + assertEquals( + "MqttSubscribePayload TopicSubscriptionList size mismatch ", + expectedTopicSubscriptions.size(), + actualTopicSubscriptions.size()); + for (int i = 0; i < expectedTopicSubscriptions.size(); i++) { + validateTopicSubscription(expectedTopicSubscriptions.get(i), actualTopicSubscriptions.get(i)); + } + } + + public static void validateTopicSubscription( + MqttTopicSubscription expected, + MqttTopicSubscription actual) { + assertEquals("MqttTopicSubscription TopicName mismatch ", expected.topicName(), actual.topicName()); + assertEquals( + "MqttTopicSubscription Qos mismatch ", + expected.qualityOfService(), + actual.qualityOfService()); + assertEquals( + "MqttTopicSubscription options mismatch ", + expected.option(), + actual.option()); + } + + public static void validateUnsubscribePayload(MqttUnsubscribePayload expected, MqttUnsubscribePayload actual) { + assertArrayEquals( + "MqttUnsubscribePayload TopicList mismatch ", + expected.topics().toArray(), + actual.topics().toArray()); + } +}