Fix NPE with MqttUnsubAckMessage - regression of MQTT5 support (#10557)
Recent changes for MQTT5 may cause NPE if UNSUBACK message is created with `MqttMessageFactory` and client code isn't up-to-date. Modifications: Added default body in `MqttUnsubAckMessage` constructor if null body is passed, added null checks in `encodeUnsubAckMessage` Result: `MqttUnsubAckMessage` created with `MqttMessageFactory` doesn't cause NPE even if null body is supplied.
This commit is contained in:
parent
379be5085f
commit
575c2d16de
@ -388,7 +388,8 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
|
||||
message.idAndPropertiesVariableHeader().properties());
|
||||
try {
|
||||
int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes();
|
||||
int payloadBufferSize = message.payload().unsubscribeReasonCodes().size();
|
||||
MqttUnsubAckPayload payload = message.payload();
|
||||
int payloadBufferSize = payload == null ? 0 : payload.unsubscribeReasonCodes().size();
|
||||
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
|
||||
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
|
||||
ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize);
|
||||
@ -397,9 +398,11 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
|
||||
buf.writeShort(message.variableHeader().messageId());
|
||||
buf.writeBytes(propertiesBuf);
|
||||
|
||||
for (Short reasonCode : message.payload().unsubscribeReasonCodes()) {
|
||||
if (payload != null) {
|
||||
for (Short reasonCode : payload.unsubscribeReasonCodes()) {
|
||||
buf.writeByte(reasonCode);
|
||||
}
|
||||
}
|
||||
|
||||
return buf;
|
||||
} finally {
|
||||
|
@ -38,7 +38,7 @@ public final class MqttMessageFactory {
|
||||
case SUBSCRIBE:
|
||||
return new MqttSubscribeMessage(
|
||||
mqttFixedHeader,
|
||||
(MqttMessageIdAndPropertiesVariableHeader) variableHeader,
|
||||
(MqttMessageIdVariableHeader) variableHeader,
|
||||
(MqttSubscribePayload) payload);
|
||||
|
||||
case SUBACK:
|
||||
@ -56,7 +56,7 @@ public final class MqttMessageFactory {
|
||||
case UNSUBSCRIBE:
|
||||
return new MqttUnsubscribeMessage(
|
||||
mqttFixedHeader,
|
||||
(MqttMessageIdAndPropertiesVariableHeader) variableHeader,
|
||||
(MqttMessageIdVariableHeader) variableHeader,
|
||||
(MqttUnsubscribePayload) payload);
|
||||
|
||||
case PUBLISH:
|
||||
|
@ -44,4 +44,9 @@ public final class MqttMessageIdAndPropertiesVariableHeader extends MqttMessageI
|
||||
", properties=" + properties +
|
||||
']';
|
||||
}
|
||||
|
||||
@Override
|
||||
MqttMessageIdAndPropertiesVariableHeader withDefaultEmptyProperties() {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
@ -53,4 +53,8 @@ public class MqttMessageIdVariableHeader {
|
||||
public MqttMessageIdAndPropertiesVariableHeader withEmptyProperties() {
|
||||
return new MqttMessageIdAndPropertiesVariableHeader(messageId, MqttProperties.NO_PROPERTIES);
|
||||
}
|
||||
|
||||
MqttMessageIdAndPropertiesVariableHeader withDefaultEmptyProperties() {
|
||||
return withEmptyProperties();
|
||||
}
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ public final class MqttSubAckMessage extends MqttMessage {
|
||||
MqttFixedHeader mqttFixedHeader,
|
||||
MqttMessageIdVariableHeader variableHeader,
|
||||
MqttSubAckPayload payload) {
|
||||
this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload);
|
||||
this(mqttFixedHeader, variableHeader.withDefaultEmptyProperties(), payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -33,7 +33,7 @@ public final class MqttSubscribeMessage extends MqttMessage {
|
||||
MqttFixedHeader mqttFixedHeader,
|
||||
MqttMessageIdVariableHeader variableHeader,
|
||||
MqttSubscribePayload payload) {
|
||||
this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload);
|
||||
this(mqttFixedHeader, variableHeader.withDefaultEmptyProperties(), payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -24,7 +24,7 @@ public final class MqttUnsubAckMessage extends MqttMessage {
|
||||
public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader,
|
||||
MqttMessageIdAndPropertiesVariableHeader variableHeader,
|
||||
MqttUnsubAckPayload payload) {
|
||||
super(mqttFixedHeader, variableHeader, payload);
|
||||
super(mqttFixedHeader, variableHeader, MqttUnsubAckPayload.withEmptyDefaults(payload));
|
||||
}
|
||||
|
||||
public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader,
|
||||
|
@ -29,6 +29,16 @@ public final class MqttUnsubAckPayload {
|
||||
|
||||
private final List<Short> unsubscribeReasonCodes;
|
||||
|
||||
private static final MqttUnsubAckPayload EMPTY = new MqttUnsubAckPayload();
|
||||
|
||||
public static MqttUnsubAckPayload withEmptyDefaults(MqttUnsubAckPayload payload) {
|
||||
if (payload == null) {
|
||||
return EMPTY;
|
||||
} else {
|
||||
return payload;
|
||||
}
|
||||
}
|
||||
|
||||
public MqttUnsubAckPayload(short... unsubscribeReasonCodes) {
|
||||
ObjectUtil.checkNotNull(unsubscribeReasonCodes, "unsubscribeReasonCodes");
|
||||
|
||||
|
@ -33,7 +33,7 @@ public final class MqttUnsubscribeMessage extends MqttMessage {
|
||||
MqttFixedHeader mqttFixedHeader,
|
||||
MqttMessageIdVariableHeader variableHeader,
|
||||
MqttUnsubscribePayload payload) {
|
||||
this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload);
|
||||
this(mqttFixedHeader, variableHeader.withDefaultEmptyProperties(), payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -18,7 +18,6 @@ package io.netty.handler.codec.mqtt;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.ByteBufAllocator;
|
||||
import io.netty.buffer.ByteBufUtil;
|
||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
@ -36,6 +35,9 @@ import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateProperties;
|
||||
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateSubscribePayload;
|
||||
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateUnsubscribePayload;
|
||||
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.*;
|
||||
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
|
||||
import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS;
|
||||
@ -957,33 +959,6 @@ public class MqttCodecTest {
|
||||
assertEquals("MqttMessageIdVariableHeader MessageId mismatch ", expected.messageId(), actual.messageId());
|
||||
}
|
||||
|
||||
private static void validateSubscribePayload(MqttSubscribePayload expected, MqttSubscribePayload actual) {
|
||||
List<MqttTopicSubscription> expectedTopicSubscriptions = expected.topicSubscriptions();
|
||||
List<MqttTopicSubscription> actualTopicSubscriptions = actual.topicSubscriptions();
|
||||
|
||||
assertEquals(
|
||||
"MqttSubscribePayload TopicSubscriptionList size mismatch ",
|
||||
expectedTopicSubscriptions.size(),
|
||||
actualTopicSubscriptions.size());
|
||||
for (int i = 0; i < expectedTopicSubscriptions.size(); i++) {
|
||||
validateTopicSubscription(expectedTopicSubscriptions.get(i), actualTopicSubscriptions.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
private static void validateTopicSubscription(
|
||||
MqttTopicSubscription expected,
|
||||
MqttTopicSubscription actual) {
|
||||
assertEquals("MqttTopicSubscription TopicName mismatch ", expected.topicName(), actual.topicName());
|
||||
assertEquals(
|
||||
"MqttTopicSubscription Qos mismatch ",
|
||||
expected.qualityOfService(),
|
||||
actual.qualityOfService());
|
||||
assertEquals(
|
||||
"MqttTopicSubscription options mismatch ",
|
||||
expected.option(),
|
||||
actual.option());
|
||||
}
|
||||
|
||||
private static void validateSubAckPayload(MqttSubAckPayload expected, MqttSubAckPayload actual) {
|
||||
assertArrayEquals(
|
||||
"MqttSubAckPayload GrantedQosLevels mismatch ",
|
||||
@ -991,13 +966,6 @@ public class MqttCodecTest {
|
||||
actual.grantedQoSLevels().toArray());
|
||||
}
|
||||
|
||||
private static void validateUnsubscribePayload(MqttUnsubscribePayload expected, MqttUnsubscribePayload actual) {
|
||||
assertArrayEquals(
|
||||
"MqttUnsubscribePayload TopicList mismatch ",
|
||||
expected.topics().toArray(),
|
||||
actual.topics().toArray());
|
||||
}
|
||||
|
||||
private static void validateDecoderExceptionTooLargeMessage(MqttMessage message) {
|
||||
assertNull("MqttMessage payload expected null ", message.payload());
|
||||
assertTrue(message.decoderResult().isFailure());
|
||||
@ -1038,90 +1006,4 @@ public class MqttCodecTest {
|
||||
final MqttProperties actualProps = actual.properties();
|
||||
validateProperties(expectedProps, actualProps);
|
||||
}
|
||||
|
||||
private static void validateProperties(MqttProperties expected, MqttProperties actual) {
|
||||
for (MqttProperties.MqttProperty expectedProperty : expected.listAll()) {
|
||||
MqttProperties.MqttProperty actualProperty = actual.getProperty(expectedProperty.propertyId);
|
||||
switch (MqttProperties.MqttPropertyType.valueOf(expectedProperty.propertyId)) {
|
||||
// one byte value integer property
|
||||
case PAYLOAD_FORMAT_INDICATOR:
|
||||
case REQUEST_PROBLEM_INFORMATION:
|
||||
case REQUEST_RESPONSE_INFORMATION:
|
||||
case MAXIMUM_QOS:
|
||||
case RETAIN_AVAILABLE:
|
||||
case WILDCARD_SUBSCRIPTION_AVAILABLE:
|
||||
case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
|
||||
case SHARED_SUBSCRIPTION_AVAILABLE: {
|
||||
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
|
||||
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
|
||||
assertEquals("one byte property doesn't match", expectedValue, actualValue);
|
||||
break;
|
||||
}
|
||||
// two byte value integer property
|
||||
case SERVER_KEEP_ALIVE:
|
||||
case RECEIVE_MAXIMUM:
|
||||
case TOPIC_ALIAS_MAXIMUM:
|
||||
case TOPIC_ALIAS: {
|
||||
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
|
||||
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
|
||||
assertEquals("two byte property doesn't match", expectedValue, actualValue);
|
||||
break;
|
||||
}
|
||||
// four byte value integer property
|
||||
case PUBLICATION_EXPIRY_INTERVAL:
|
||||
case SESSION_EXPIRY_INTERVAL:
|
||||
case WILL_DELAY_INTERVAL:
|
||||
case MAXIMUM_PACKET_SIZE: {
|
||||
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
|
||||
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
|
||||
assertEquals("four byte property doesn't match", expectedValue, actualValue);
|
||||
break;
|
||||
}
|
||||
// four byte value integer property
|
||||
case SUBSCRIPTION_IDENTIFIER: {
|
||||
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
|
||||
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
|
||||
assertEquals("variable byte integer property doesn't match", expectedValue, actualValue);
|
||||
break;
|
||||
}
|
||||
// UTF-8 string value integer property
|
||||
case CONTENT_TYPE:
|
||||
case RESPONSE_TOPIC:
|
||||
case ASSIGNED_CLIENT_IDENTIFIER:
|
||||
case AUTHENTICATION_METHOD:
|
||||
case RESPONSE_INFORMATION:
|
||||
case SERVER_REFERENCE:
|
||||
case REASON_STRING: {
|
||||
final String expectedValue = ((MqttProperties.StringProperty) expectedProperty).value;
|
||||
final String actualValue = ((MqttProperties.StringProperty) actualProperty).value;
|
||||
assertEquals("String property doesn't match", expectedValue, actualValue);
|
||||
break;
|
||||
}
|
||||
// User property
|
||||
case USER_PROPERTY: {
|
||||
final List<MqttProperties.StringPair> expectedPairs =
|
||||
((MqttProperties.UserProperties) expectedProperty).value;
|
||||
final List<MqttProperties.StringPair> actualPairs =
|
||||
((MqttProperties.UserProperties) actualProperty).value;
|
||||
assertEquals("User properties count doesn't match", expectedPairs, actualPairs);
|
||||
for (int i = 0; i < expectedPairs.size(); i++) {
|
||||
assertEquals("User property mismatch", expectedPairs.get(i), actualPairs.get(i));
|
||||
}
|
||||
break;
|
||||
}
|
||||
// byte[] property
|
||||
case CORRELATION_DATA:
|
||||
case AUTHENTICATION_DATA: {
|
||||
final byte[] expectedValue = ((MqttProperties.BinaryProperty) expectedProperty).value;
|
||||
final byte[] actualValue = ((MqttProperties.BinaryProperty) actualProperty).value;
|
||||
final String expectedHexDump = ByteBufUtil.hexDump(expectedValue);
|
||||
final String actualHexDump = ByteBufUtil.hexDump(actualValue);
|
||||
assertEquals("byte[] property doesn't match", expectedHexDump, actualHexDump);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
fail("Property Id not recognized " + Integer.toHexString(expectedProperty.propertyId));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,169 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.handler.codec.mqtt;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateProperties;
|
||||
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateSubscribePayload;
|
||||
import static io.netty.handler.codec.mqtt.MqttTestUtils.validateUnsubscribePayload;
|
||||
|
||||
public class MqttMessageFactoryTest {
|
||||
private static final String SAMPLE_TOPIC = "a/b/c";
|
||||
private static final int SAMPLE_MESSAGE_ID = 123;
|
||||
|
||||
@Test
|
||||
public void createUnsubAckV3() {
|
||||
MqttFixedHeader fixedHeader =
|
||||
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
MqttMessageIdVariableHeader variableHeader =
|
||||
MqttMessageIdVariableHeader.from(SAMPLE_MESSAGE_ID);
|
||||
|
||||
MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
|
||||
|
||||
assertEquals("Message type mismatch", MqttMessageType.UNSUBACK, unsuback.fixedHeader().messageType());
|
||||
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
|
||||
(MqttMessageIdAndPropertiesVariableHeader) unsuback.variableHeader();
|
||||
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
|
||||
validateProperties(MqttProperties.NO_PROPERTIES, actualVariableHeader.properties());
|
||||
MqttUnsubAckPayload actualPayload = (MqttUnsubAckPayload) unsuback.payload();
|
||||
assertNotNull("payload", actualPayload);
|
||||
assertEquals("reason codes size", 0, actualPayload.unsubscribeReasonCodes().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createUnsubAckV5() {
|
||||
MqttFixedHeader fixedHeader =
|
||||
new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
MqttProperties properties = new MqttProperties();
|
||||
String reasonString = "All right";
|
||||
properties.add(new MqttProperties.StringProperty(
|
||||
MqttProperties.MqttPropertyType.REASON_STRING.value(),
|
||||
reasonString));
|
||||
MqttMessageIdAndPropertiesVariableHeader variableHeader =
|
||||
new MqttMessageIdAndPropertiesVariableHeader(SAMPLE_MESSAGE_ID, properties);
|
||||
MqttUnsubAckPayload payload = new MqttUnsubAckPayload((short) 0x80 /*unspecified error*/);
|
||||
|
||||
MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
|
||||
|
||||
assertEquals("Message type mismatch", MqttMessageType.UNSUBACK, unsuback.fixedHeader().messageType());
|
||||
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
|
||||
(MqttMessageIdAndPropertiesVariableHeader) unsuback.variableHeader();
|
||||
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
|
||||
validateProperties(properties, actualVariableHeader.properties());
|
||||
MqttUnsubAckPayload actualPayload = (MqttUnsubAckPayload) unsuback.payload();
|
||||
assertEquals("Reason code list doesn't match",
|
||||
payload.unsubscribeReasonCodes(),
|
||||
actualPayload.unsubscribeReasonCodes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createSubscribeV3() {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, AT_LEAST_ONCE, false, 0);
|
||||
|
||||
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(SAMPLE_MESSAGE_ID);
|
||||
List<MqttTopicSubscription> subscriptions = new ArrayList<MqttTopicSubscription>();
|
||||
subscriptions.add(new MqttTopicSubscription(SAMPLE_TOPIC, MqttQoS.AT_MOST_ONCE));
|
||||
|
||||
MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions);
|
||||
|
||||
MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
|
||||
|
||||
assertEquals("Message type mismatch", MqttMessageType.SUBSCRIBE, subscribe.fixedHeader().messageType());
|
||||
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
|
||||
(MqttMessageIdAndPropertiesVariableHeader) subscribe.variableHeader();
|
||||
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
|
||||
validateProperties(MqttProperties.NO_PROPERTIES, actualVariableHeader.properties());
|
||||
MqttSubscribePayload actualPayload = (MqttSubscribePayload) subscribe.payload();
|
||||
validateSubscribePayload(payload, actualPayload);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createSubscribeV5() {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, AT_LEAST_ONCE, false, 0);
|
||||
|
||||
MqttProperties properties = new MqttProperties();
|
||||
properties.add(new MqttProperties.UserProperty("correlationId", "111222"));
|
||||
MqttMessageIdAndPropertiesVariableHeader variableHeader =
|
||||
new MqttMessageIdAndPropertiesVariableHeader(SAMPLE_MESSAGE_ID, properties);
|
||||
List<MqttTopicSubscription> subscriptions = new ArrayList<MqttTopicSubscription>();
|
||||
subscriptions.add(new MqttTopicSubscription(SAMPLE_TOPIC, MqttQoS.AT_MOST_ONCE));
|
||||
|
||||
MqttSubscribePayload payload = new MqttSubscribePayload(subscriptions);
|
||||
|
||||
MqttMessage subscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
|
||||
|
||||
assertEquals("Message type mismatch", MqttMessageType.SUBSCRIBE, subscribe.fixedHeader().messageType());
|
||||
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
|
||||
(MqttMessageIdAndPropertiesVariableHeader) subscribe.variableHeader();
|
||||
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
|
||||
validateProperties(properties, actualVariableHeader.properties());
|
||||
MqttSubscribePayload actualPayload = (MqttSubscribePayload) subscribe.payload();
|
||||
validateSubscribePayload(payload, actualPayload);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createUnsubscribeV3() {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, AT_LEAST_ONCE, false, 0);
|
||||
|
||||
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(SAMPLE_MESSAGE_ID);
|
||||
|
||||
List<String> topics = new ArrayList<String>();
|
||||
topics.add(SAMPLE_TOPIC);
|
||||
MqttUnsubscribePayload payload = new MqttUnsubscribePayload(topics);
|
||||
|
||||
MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
|
||||
|
||||
assertEquals("Message type mismatch", MqttMessageType.UNSUBSCRIBE, unsubscribe.fixedHeader().messageType());
|
||||
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
|
||||
(MqttMessageIdAndPropertiesVariableHeader) unsubscribe.variableHeader();
|
||||
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
|
||||
validateProperties(MqttProperties.NO_PROPERTIES, actualVariableHeader.properties());
|
||||
MqttUnsubscribePayload actualPayload = (MqttUnsubscribePayload) unsubscribe.payload();
|
||||
validateUnsubscribePayload(payload, actualPayload);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createUnsubscribeV5() {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, AT_LEAST_ONCE, false, 0);
|
||||
|
||||
MqttProperties properties = new MqttProperties();
|
||||
properties.add(new MqttProperties.UserProperty("correlationId", "111222"));
|
||||
MqttMessageIdAndPropertiesVariableHeader variableHeader =
|
||||
new MqttMessageIdAndPropertiesVariableHeader(SAMPLE_MESSAGE_ID, properties);
|
||||
|
||||
List<String> topics = new ArrayList<String>();
|
||||
topics.add(SAMPLE_TOPIC);
|
||||
MqttUnsubscribePayload payload = new MqttUnsubscribePayload(topics);
|
||||
|
||||
MqttMessage unsubscribe = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
|
||||
|
||||
assertEquals("Message type mismatch", MqttMessageType.UNSUBSCRIBE, unsubscribe.fixedHeader().messageType());
|
||||
MqttMessageIdAndPropertiesVariableHeader actualVariableHeader =
|
||||
(MqttMessageIdAndPropertiesVariableHeader) unsubscribe.variableHeader();
|
||||
assertEquals("MessageId mismatch", SAMPLE_MESSAGE_ID, actualVariableHeader.messageId());
|
||||
validateProperties(properties, actualVariableHeader.properties());
|
||||
MqttUnsubscribePayload actualPayload = (MqttUnsubscribePayload) unsubscribe.payload();
|
||||
validateUnsubscribePayload(payload, actualPayload);
|
||||
}
|
||||
}
|
@ -0,0 +1,155 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.handler.codec.mqtt;
|
||||
|
||||
import io.netty.buffer.ByteBufUtil;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public final class MqttTestUtils {
|
||||
private MqttTestUtils() {
|
||||
}
|
||||
|
||||
public static void validateProperties(MqttProperties expected, MqttProperties actual) {
|
||||
for (MqttProperties.MqttProperty expectedProperty : expected.listAll()) {
|
||||
MqttProperties.MqttProperty actualProperty = actual.getProperty(expectedProperty.propertyId);
|
||||
switch (MqttProperties.MqttPropertyType.valueOf(expectedProperty.propertyId)) {
|
||||
// one byte value integer property
|
||||
case PAYLOAD_FORMAT_INDICATOR:
|
||||
case REQUEST_PROBLEM_INFORMATION:
|
||||
case REQUEST_RESPONSE_INFORMATION:
|
||||
case MAXIMUM_QOS:
|
||||
case RETAIN_AVAILABLE:
|
||||
case WILDCARD_SUBSCRIPTION_AVAILABLE:
|
||||
case SUBSCRIPTION_IDENTIFIER_AVAILABLE:
|
||||
case SHARED_SUBSCRIPTION_AVAILABLE: {
|
||||
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
|
||||
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
|
||||
assertEquals("one byte property doesn't match", expectedValue, actualValue);
|
||||
break;
|
||||
}
|
||||
// two byte value integer property
|
||||
case SERVER_KEEP_ALIVE:
|
||||
case RECEIVE_MAXIMUM:
|
||||
case TOPIC_ALIAS_MAXIMUM:
|
||||
case TOPIC_ALIAS: {
|
||||
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
|
||||
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
|
||||
assertEquals("two byte property doesn't match", expectedValue, actualValue);
|
||||
break;
|
||||
}
|
||||
// four byte value integer property
|
||||
case PUBLICATION_EXPIRY_INTERVAL:
|
||||
case SESSION_EXPIRY_INTERVAL:
|
||||
case WILL_DELAY_INTERVAL:
|
||||
case MAXIMUM_PACKET_SIZE: {
|
||||
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
|
||||
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
|
||||
assertEquals("four byte property doesn't match", expectedValue, actualValue);
|
||||
break;
|
||||
}
|
||||
// four byte value integer property
|
||||
case SUBSCRIPTION_IDENTIFIER: {
|
||||
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
|
||||
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
|
||||
assertEquals("variable byte integer property doesn't match", expectedValue, actualValue);
|
||||
break;
|
||||
}
|
||||
// UTF-8 string value integer property
|
||||
case CONTENT_TYPE:
|
||||
case RESPONSE_TOPIC:
|
||||
case ASSIGNED_CLIENT_IDENTIFIER:
|
||||
case AUTHENTICATION_METHOD:
|
||||
case RESPONSE_INFORMATION:
|
||||
case SERVER_REFERENCE:
|
||||
case REASON_STRING: {
|
||||
final String expectedValue = ((MqttProperties.StringProperty) expectedProperty).value;
|
||||
final String actualValue = ((MqttProperties.StringProperty) actualProperty).value;
|
||||
assertEquals("String property doesn't match", expectedValue, actualValue);
|
||||
break;
|
||||
}
|
||||
// User property
|
||||
case USER_PROPERTY: {
|
||||
final List<MqttProperties.StringPair> expectedPairs =
|
||||
((MqttProperties.UserProperties) expectedProperty).value;
|
||||
final List<MqttProperties.StringPair> actualPairs =
|
||||
((MqttProperties.UserProperties) actualProperty).value;
|
||||
assertEquals("User properties count doesn't match", expectedPairs, actualPairs);
|
||||
for (int i = 0; i < expectedPairs.size(); i++) {
|
||||
assertEquals("User property mismatch", expectedPairs.get(i), actualPairs.get(i));
|
||||
}
|
||||
break;
|
||||
}
|
||||
// byte[] property
|
||||
case CORRELATION_DATA:
|
||||
case AUTHENTICATION_DATA: {
|
||||
final byte[] expectedValue = ((MqttProperties.BinaryProperty) expectedProperty).value;
|
||||
final byte[] actualValue = ((MqttProperties.BinaryProperty) actualProperty).value;
|
||||
final String expectedHexDump = ByteBufUtil.hexDump(expectedValue);
|
||||
final String actualHexDump = ByteBufUtil.hexDump(actualValue);
|
||||
assertEquals("byte[] property doesn't match", expectedHexDump, actualHexDump);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
fail("Property Id not recognized " + Integer.toHexString(expectedProperty.propertyId));
|
||||
}
|
||||
}
|
||||
for (MqttProperties.MqttProperty actualProperty : actual.listAll()) {
|
||||
MqttProperties.MqttProperty expectedProperty = expected.getProperty(actualProperty.propertyId);
|
||||
assertNotNull("Property " + actualProperty.propertyId + " not expected", expectedProperty);
|
||||
}
|
||||
}
|
||||
|
||||
public static void validateSubscribePayload(MqttSubscribePayload expected, MqttSubscribePayload actual) {
|
||||
List<MqttTopicSubscription> expectedTopicSubscriptions = expected.topicSubscriptions();
|
||||
List<MqttTopicSubscription> actualTopicSubscriptions = actual.topicSubscriptions();
|
||||
|
||||
assertEquals(
|
||||
"MqttSubscribePayload TopicSubscriptionList size mismatch ",
|
||||
expectedTopicSubscriptions.size(),
|
||||
actualTopicSubscriptions.size());
|
||||
for (int i = 0; i < expectedTopicSubscriptions.size(); i++) {
|
||||
validateTopicSubscription(expectedTopicSubscriptions.get(i), actualTopicSubscriptions.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
public static void validateTopicSubscription(
|
||||
MqttTopicSubscription expected,
|
||||
MqttTopicSubscription actual) {
|
||||
assertEquals("MqttTopicSubscription TopicName mismatch ", expected.topicName(), actual.topicName());
|
||||
assertEquals(
|
||||
"MqttTopicSubscription Qos mismatch ",
|
||||
expected.qualityOfService(),
|
||||
actual.qualityOfService());
|
||||
assertEquals(
|
||||
"MqttTopicSubscription options mismatch ",
|
||||
expected.option(),
|
||||
actual.option());
|
||||
}
|
||||
|
||||
public static void validateUnsubscribePayload(MqttUnsubscribePayload expected, MqttUnsubscribePayload actual) {
|
||||
assertArrayEquals(
|
||||
"MqttUnsubscribePayload TopicList mismatch ",
|
||||
expected.topics().toArray(),
|
||||
actual.topics().toArray());
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user