Fix NPE with MqttUnsubAckMessage - regression of MQTT5 support (#10557)

Recent changes for MQTT5 may cause NPE if UNSUBACK message is created with `MqttMessageFactory`
and client code isn't up-to-date.

Modifications:

Added default body in `MqttUnsubAckMessage` constructor if null body is passed,
added null checks in `encodeUnsubAckMessage`

Result:

`MqttUnsubAckMessage` created with `MqttMessageFactory` doesn't cause NPE
even if null body is supplied.
This commit is contained in:
Paul Lysak 2020-09-14 13:04:55 +03:00 committed by GitHub
parent d889397c1e
commit 3b47427cf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 359 additions and 133 deletions

View File

@ -388,7 +388,8 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
message.idAndPropertiesVariableHeader().properties());
try {
int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
int payloadBufferSize = message.payload().unsubscribeReasonCodes().size();
MqttUnsubAckPayload payload = message.payload();
int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
@ -397,8 +398,10 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
buf.writeShort(message.variableHeader().messageId());
buf.writeBytes(propertiesBuf);
for (Short reasonCode : message.payload().unsubscribeReasonCodes()) {
buf.writeByte(reasonCode);
if (payload != null) {
for (Short reasonCode : payload.unsubscribeReasonCodes()) {
buf.writeByte(reasonCode);
}
}
return buf;

View File

@ -38,7 +38,7 @@ public final class MqttMessageFactory {
case SUBSCRIBE:
return new MqttSubscribeMessage(
mqttFixedHeader,
(MqttMessageIdAndPropertiesVariableHeader) variableHeader,
(MqttMessageIdVariableHeader) variableHeader,
(MqttSubscribePayload) payload);
case SUBACK:
@ -56,7 +56,7 @@ public final class MqttMessageFactory {
case UNSUBSCRIBE:
return new MqttUnsubscribeMessage(
mqttFixedHeader,
(MqttMessageIdAndPropertiesVariableHeader) variableHeader,
(MqttMessageIdVariableHeader) variableHeader,
(MqttUnsubscribePayload) payload);
case PUBLISH:

View File

@ -44,4 +44,9 @@ public final class MqttMessageIdAndPropertiesVariableHeader extends MqttMessageI
", properties=" + properties +
']';
}
@Override
MqttMessageIdAndPropertiesVariableHeader withDefaultEmptyProperties() {
return this;
}
}

View File

@ -53,4 +53,8 @@ public class MqttMessageIdVariableHeader {
public MqttMessageIdAndPropertiesVariableHeader withEmptyProperties() {
return new MqttMessageIdAndPropertiesVariableHeader(messageId, MqttProperties.NO_PROPERTIES);
}
MqttMessageIdAndPropertiesVariableHeader withDefaultEmptyProperties() {
return withEmptyProperties();
}
}

View File

@ -32,7 +32,7 @@ public final class MqttSubAckMessage extends MqttMessage {
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttSubAckPayload payload) {
this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload);
this(mqttFixedHeader, variableHeader.withDefaultEmptyProperties(), payload);
}
@Override

View File

@ -33,7 +33,7 @@ public final class MqttSubscribeMessage extends MqttMessage {
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttSubscribePayload payload) {
this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload);
this(mqttFixedHeader, variableHeader.withDefaultEmptyProperties(), payload);
}
@Override

View File

@ -24,7 +24,7 @@ public final class MqttUnsubAckMessage extends MqttMessage {
public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader,
MqttMessageIdAndPropertiesVariableHeader variableHeader,
MqttUnsubAckPayload payload) {
super(mqttFixedHeader, variableHeader, payload);
super(mqttFixedHeader, variableHeader, MqttUnsubAckPayload.withEmptyDefaults(payload));
}
public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader,

View File

@ -29,6 +29,16 @@ public final class MqttUnsubAckPayload {
private final List<Short> unsubscribeReasonCodes;
private static final MqttUnsubAckPayload EMPTY = new MqttUnsubAckPayload();
public static MqttUnsubAckPayload withEmptyDefaults(MqttUnsubAckPayload payload) {
if (payload == null) {
return EMPTY;
} else {
return payload;
}
}
public MqttUnsubAckPayload(short... unsubscribeReasonCodes) {
ObjectUtil.checkNotNull(unsubscribeReasonCodes, "unsubscribeReasonCodes");

View File

@ -33,7 +33,7 @@ public final class MqttUnsubscribeMessage extends MqttMessage {
MqttFixedHeader mqttFixedHeader,
MqttMessageIdVariableHeader variableHeader,
MqttUnsubscribePayload payload) {
this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload);
this(mqttFixedHeader, variableHeader.withDefaultEmptyProperties(), payload);
}
@Override

View File

@ -18,7 +18,6 @@ package io.netty.handler.codec.mqtt;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
@ -30,13 +29,14 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.w3c.dom.Attr;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateProperties;
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateSubscribePayload;
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateUnsubscribePayload;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.*;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS;
@ -984,33 +984,6 @@ public class MqttCodecTest {
assertEquals("MqttMessageIdVariableHeader MessageId mismatch ", expected.messageId(), actual.messageId());
}
private static void validateSubscribePayload(MqttSubscribePayload expected, MqttSubscribePayload actual) {
List<MqttTopicSubscription> expectedTopicSubscriptions = expected.topicSubscriptions();
List<MqttTopicSubscription> actualTopicSubscriptions = actual.topicSubscriptions();
assertEquals(
"MqttSubscribePayload TopicSubscriptionList size mismatch ",
expectedTopicSubscriptions.size(),
actualTopicSubscriptions.size());
for (int i = 0; i < expectedTopicSubscriptions.size(); i++) {
validateTopicSubscription(expectedTopicSubscriptions.get(i), actualTopicSubscriptions.get(i));
}
}
private static void validateTopicSubscription(
MqttTopicSubscription expected,
MqttTopicSubscription actual) {
assertEquals("MqttTopicSubscription TopicName mismatch ", expected.topicName(), actual.topicName());
assertEquals(
"MqttTopicSubscription Qos mismatch ",
expected.qualityOfService(),
actual.qualityOfService());
assertEquals(
"MqttTopicSubscription options mismatch ",
expected.option(),
actual.option());
}
private static void validateSubAckPayload(MqttSubAckPayload expected, MqttSubAckPayload actual) {
assertArrayEquals(
"MqttSubAckPayload GrantedQosLevels mismatch ",
@ -1018,14 +991,7 @@ public class MqttCodecTest {
actual.grantedQoSLevels().toArray());
}
private static void validateUnsubscribePayload(MqttUnsubscribePayload expected, MqttUnsubscribePayload actual) {
assertArrayEquals(
"MqttUnsubscribePayload TopicList mismatch ",
expected.topics().toArray(),
actual.topics().toArray());
}
private static void validateDecoderExceptionTooLargeMessage(MqttMessage message) {
private static void validateDecoderExceptionTooLargeMessage(MqttMessage message) {
assertNull("MqttMessage payload expected null ", message.payload());
assertTrue(message.decoderResult().isFailure());
Throwable cause = message.decoderResult().cause();
@ -1065,90 +1031,4 @@ public class MqttCodecTest {
final MqttProperties actualProps = actual.properties();
validateProperties(expectedProps, actualProps);
}
private static void validateProperties(MqttProperties expected, MqttProperties actual) {
for (MqttProperties.MqttProperty expectedProperty : expected.listAll()) {
MqttProperties.MqttProperty actualProperty = actual.getProperty(expectedProperty.propertyId);
switch (MqttProperties.MqttPropertyType.valueOf(expectedProperty.propertyId)) {
// one byte value integer property
case PAYLOAD_FORMAT_INDICATOR:
case REQUEST_PROBLEM_INFORMATION:
case REQUEST_RESPONSE_INFORMATION:
case MAXIMUM_QOS:
case RETAIN_AVAILABLE:
case WILDCARD_SUBSCRIPTION_AVAILABLE:
case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
case SHARED_SUBSCRIPTION_AVAILABLE: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("one byte property doesn't match", expectedValue, actualValue);
break;
}
// two byte value integer property
case SERVER_KEEP_ALIVE:
case RECEIVE_MAXIMUM:
case TOPIC_ALIAS_MAXIMUM:
case TOPIC_ALIAS: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("two byte property doesn't match", expectedValue, actualValue);
break;
}
// four byte value integer property
case PUBLICATION_EXPIRY_INTERVAL:
case SESSION_EXPIRY_INTERVAL:
case WILL_DELAY_INTERVAL:
case MAXIMUM_PACKET_SIZE: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("four byte property doesn't match", expectedValue, actualValue);
break;
}
// four byte value integer property
case SUBSCRIPTION_IDENTIFIER: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("variable byte integer property doesn't match", expectedValue, actualValue);
break;
}
// UTF-8 string value integer property
case CONTENT_TYPE:
case RESPONSE_TOPIC:
case ASSIGNED_CLIENT_IDENTIFIER:
case AUTHENTICATION_METHOD:
case RESPONSE_INFORMATION:
case SERVER_REFERENCE:
case REASON_STRING: {
final String expectedValue = ((MqttProperties.StringProperty) expectedProperty).value;
final String actualValue = ((MqttProperties.StringProperty) actualProperty).value;
assertEquals("String property doesn't match", expectedValue, actualValue);
break;
}
// User property
case USER_PROPERTY: {
final List<MqttProperties.StringPair> expectedPairs =
((MqttProperties.UserProperties) expectedProperty).value;
final List<MqttProperties.StringPair> actualPairs =
((MqttProperties.UserProperties) actualProperty).value;
assertEquals("User properties count doesn't match", expectedPairs, actualPairs);
for (int i = 0; i < expectedPairs.size(); i++) {
assertEquals("User property mismatch", expectedPairs.get(i), actualPairs.get(i));
}
break;
}
// byte[] property
case CORRELATION_DATA:
case AUTHENTICATION_DATA: {
final byte[] expectedValue = ((MqttProperties.BinaryProperty) expectedProperty).value;
final byte[] actualValue = ((MqttProperties.BinaryProperty) actualProperty).value;
final String expectedHexDump = ByteBufUtil.hexDump(expectedValue);
final String actualHexDump = ByteBufUtil.hexDump(actualValue);
assertEquals("byte[] property doesn't match", expectedHexDump, actualHexDump);
break;
}
default:
fail("Property Id not recognized " + Integer.toHexString(expectedProperty.propertyId));
}
}
}
}

View File

@ -0,0 +1,169 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateProperties;
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateSubscribePayload;
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateUnsubscribePayload;
public class MqttMessageFactoryTest {
private static final String SAMPLE_TOPIC = "a/b/c";
private static final int SAMPLE_MESSAGE_ID = 123;
@Test
public void createUnsubAckV3() {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader =
MqttMessageIdVariableHeader.from(SAMPLE_MESSAGE_ID);
MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
assertEquals("Message type mismatch", MqttMessageType.UNSUBACK, unsuback.fixedHeader().messageType());
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
(MqttMessageIdAndPropertiesVariableHeader) unsuback.variableHeader();
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
validateProperties(MqttProperties.NO_PROPERTIES, actualVariableHeader.properties());
MqttUnsubAckPayload actualPayload = (MqttUnsubAckPayload) unsuback.payload();
assertNotNull("payload", actualPayload);
assertEquals("reason codes size", 0, actualPayload.unsubscribeReasonCodes().size());
}
@Test
public void createUnsubAckV5() {
MqttFixedHeader fixedHeader =
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttProperties properties = new MqttProperties();
String reasonString = "All right";
properties.add(new MqttProperties.StringProperty(
MqttProperties.MqttPropertyType.REASON_STRING.value(),
reasonString));
MqttMessageIdAndPropertiesVariableHeader variableHeader =
new MqttMessageIdAndPropertiesVariableHeader(SAMPLE_MESSAGE_ID, properties);
MqttUnsubAckPayload payload = new MqttUnsubAckPayload((short) 0x80 /*unspecified error*/);
MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
assertEquals("Message type mismatch", MqttMessageType.UNSUBACK, unsuback.fixedHeader().messageType());
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
(MqttMessageIdAndPropertiesVariableHeader) unsuback.variableHeader();
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
validateProperties(properties, actualVariableHeader.properties());
MqttUnsubAckPayload actualPayload = (MqttUnsubAckPayload) unsuback.payload();
assertEquals("Reason code list doesn't match",
payload.unsubscribeReasonCodes(),
actualPayload.unsubscribeReasonCodes());
}
@Test
public void createSubscribeV3() {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(SAMPLE_MESSAGE_ID);
List<MqttTopicSubscription> subscriptions = new ArrayList<MqttTopicSubscription>();
subscriptions.add(new MqttTopicSubscription(SAMPLE_TOPIC, MqttQoS.AT_MOST_ONCE));
MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions);
MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
assertEquals("Message type mismatch", MqttMessageType.SUBSCRIBE, subscribe.fixedHeader().messageType());
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
(MqttMessageIdAndPropertiesVariableHeader) subscribe.variableHeader();
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
validateProperties(MqttProperties.NO_PROPERTIES, actualVariableHeader.properties());
MqttSubscribePayload actualPayload = (MqttSubscribePayload) subscribe.payload();
validateSubscribePayload(payload, actualPayload);
}
@Test
public void createSubscribeV5() {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, AT_LEAST_ONCE, false, 0);
MqttProperties properties = new MqttProperties();
properties.add(new MqttProperties.UserProperty("correlationId", "111222"));
MqttMessageIdAndPropertiesVariableHeader variableHeader =
new MqttMessageIdAndPropertiesVariableHeader(SAMPLE_MESSAGE_ID, properties);
List<MqttTopicSubscription> subscriptions = new ArrayList<MqttTopicSubscription>();
subscriptions.add(new MqttTopicSubscription(SAMPLE_TOPIC, MqttQoS.AT_MOST_ONCE));
MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions);
MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
assertEquals("Message type mismatch", MqttMessageType.SUBSCRIBE, subscribe.fixedHeader().messageType());
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
(MqttMessageIdAndPropertiesVariableHeader) subscribe.variableHeader();
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
validateProperties(properties, actualVariableHeader.properties());
MqttSubscribePayload actualPayload = (MqttSubscribePayload) subscribe.payload();
validateSubscribePayload(payload, actualPayload);
}
@Test
public void createUnsubscribeV3() {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, AT_LEAST_ONCE, false, 0);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(SAMPLE_MESSAGE_ID);
List<String> topics = new ArrayList<String>();
topics.add(SAMPLE_TOPIC);
MqttUnsubscribePayload payload = new MqttUnsubscribePayload(topics);
MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
assertEquals("Message type mismatch", MqttMessageType.UNSUBSCRIBE, unsubscribe.fixedHeader().messageType());
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
(MqttMessageIdAndPropertiesVariableHeader) unsubscribe.variableHeader();
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
validateProperties(MqttProperties.NO_PROPERTIES, actualVariableHeader.properties());
MqttUnsubscribePayload actualPayload = (MqttUnsubscribePayload) unsubscribe.payload();
validateUnsubscribePayload(payload, actualPayload);
}
@Test
public void createUnsubscribeV5() {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, AT_LEAST_ONCE, false, 0);
MqttProperties properties = new MqttProperties();
properties.add(new MqttProperties.UserProperty("correlationId", "111222"));
MqttMessageIdAndPropertiesVariableHeader variableHeader =
new MqttMessageIdAndPropertiesVariableHeader(SAMPLE_MESSAGE_ID, properties);
List<String> topics = new ArrayList<String>();
topics.add(SAMPLE_TOPIC);
MqttUnsubscribePayload payload = new MqttUnsubscribePayload(topics);
MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
assertEquals("Message type mismatch", MqttMessageType.UNSUBSCRIBE, unsubscribe.fixedHeader().messageType());
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
(MqttMessageIdAndPropertiesVariableHeader) unsubscribe.variableHeader();
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
validateProperties(properties, actualVariableHeader.properties());
MqttUnsubscribePayload actualPayload = (MqttUnsubscribePayload) unsubscribe.payload();
validateUnsubscribePayload(payload, actualPayload);
}
}

View File

@ -0,0 +1,155 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import io.netty.buffer.ByteBufUtil;
import java.util.List;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
public final class MqttTestUtils {
private MqttTestUtils() {
}
public static void validateProperties(MqttProperties expected, MqttProperties actual) {
for (MqttProperties.MqttProperty expectedProperty : expected.listAll()) {
MqttProperties.MqttProperty actualProperty = actual.getProperty(expectedProperty.propertyId);
switch (MqttProperties.MqttPropertyType.valueOf(expectedProperty.propertyId)) {
// one byte value integer property
case PAYLOAD_FORMAT_INDICATOR:
case REQUEST_PROBLEM_INFORMATION:
case REQUEST_RESPONSE_INFORMATION:
case MAXIMUM_QOS:
case RETAIN_AVAILABLE:
case WILDCARD_SUBSCRIPTION_AVAILABLE:
case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
case SHARED_SUBSCRIPTION_AVAILABLE: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("one byte property doesn't match", expectedValue, actualValue);
break;
}
// two byte value integer property
case SERVER_KEEP_ALIVE:
case RECEIVE_MAXIMUM:
case TOPIC_ALIAS_MAXIMUM:
case TOPIC_ALIAS: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("two byte property doesn't match", expectedValue, actualValue);
break;
}
// four byte value integer property
case PUBLICATION_EXPIRY_INTERVAL:
case SESSION_EXPIRY_INTERVAL:
case WILL_DELAY_INTERVAL:
case MAXIMUM_PACKET_SIZE: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("four byte property doesn't match", expectedValue, actualValue);
break;
}
// four byte value integer property
case SUBSCRIPTION_IDENTIFIER: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
assertEquals("variable byte integer property doesn't match", expectedValue, actualValue);
break;
}
// UTF-8 string value integer property
case CONTENT_TYPE:
case RESPONSE_TOPIC:
case ASSIGNED_CLIENT_IDENTIFIER:
case AUTHENTICATION_METHOD:
case RESPONSE_INFORMATION:
case SERVER_REFERENCE:
case REASON_STRING: {
final String expectedValue = ((MqttProperties.StringProperty) expectedProperty).value;
final String actualValue = ((MqttProperties.StringProperty) actualProperty).value;
assertEquals("String property doesn't match", expectedValue, actualValue);
break;
}
// User property
case USER_PROPERTY: {
final List<MqttProperties.StringPair> expectedPairs =
((MqttProperties.UserProperties) expectedProperty).value;
final List<MqttProperties.StringPair> actualPairs =
((MqttProperties.UserProperties) actualProperty).value;
assertEquals("User properties count doesn't match", expectedPairs, actualPairs);
for (int i = 0; i < expectedPairs.size(); i++) {
assertEquals("User property mismatch", expectedPairs.get(i), actualPairs.get(i));
}
break;
}
// byte[] property
case CORRELATION_DATA:
case AUTHENTICATION_DATA: {
final byte[] expectedValue = ((MqttProperties.BinaryProperty) expectedProperty).value;
final byte[] actualValue = ((MqttProperties.BinaryProperty) actualProperty).value;
final String expectedHexDump = ByteBufUtil.hexDump(expectedValue);
final String actualHexDump = ByteBufUtil.hexDump(actualValue);
assertEquals("byte[] property doesn't match", expectedHexDump, actualHexDump);
break;
}
default:
fail("Property Id not recognized " + Integer.toHexString(expectedProperty.propertyId));
}
}
for (MqttProperties.MqttProperty actualProperty : actual.listAll()) {
MqttProperties.MqttProperty expectedProperty = expected.getProperty(actualProperty.propertyId);
assertNotNull("Property " + actualProperty.propertyId + " not expected", expectedProperty);
}
}
public static void validateSubscribePayload(MqttSubscribePayload expected, MqttSubscribePayload actual) {
List<MqttTopicSubscription> expectedTopicSubscriptions = expected.topicSubscriptions();
List<MqttTopicSubscription> actualTopicSubscriptions = actual.topicSubscriptions();
assertEquals(
"MqttSubscribePayload TopicSubscriptionList size mismatch ",
expectedTopicSubscriptions.size(),
actualTopicSubscriptions.size());
for (int i = 0; i < expectedTopicSubscriptions.size(); i++) {
validateTopicSubscription(expectedTopicSubscriptions.get(i), actualTopicSubscriptions.get(i));
}
}
public static void validateTopicSubscription(
MqttTopicSubscription expected,
MqttTopicSubscription actual) {
assertEquals("MqttTopicSubscription TopicName mismatch ", expected.topicName(), actual.topicName());
assertEquals(
"MqttTopicSubscription Qos mismatch ",
expected.qualityOfService(),
actual.qualityOfService());
assertEquals(
"MqttTopicSubscription options mismatch ",
expected.option(),
actual.option());
}
public static void validateUnsubscribePayload(MqttUnsubscribePayload expected, MqttUnsubscribePayload actual) {
assertArrayEquals(
"MqttUnsubscribePayload TopicList mismatch ",
expected.topics().toArray(),
actual.topics().toArray());
}
}