MQTT5: support multiple Subscription ID properties (#10734)

Motivation:

Subscription ID property of the PUBLISH message may be repeated multiple times, which wasn't taken into account when developing `MqttProperties` API.

Modification:

Store Subscription ID properties separately from others - in `MqttProperties.subscriptionIds`. 
Add `MqttProperties.getProperties` method to retrieve properties that may be repeated.
Change internal representation of User Properties for uniformity with Subscription ID - now they're stored in `MqttProperties.userProperties` rather than the common hash map.

Result:

Multiple Subscription ID properties can be set or retrieved.
This commit is contained in:
Paul Lysak 2020-10-30 12:17:46 +02:00 committed by GitHub
parent dab9a068a9
commit 7b736a3ae4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 285 additions and 18 deletions

View File

@ -139,6 +139,23 @@ public final class MqttProperties {
public int propertyId() { public int propertyId() {
return propertyId; return propertyId;
} }
@Override
public int hashCode() {
return propertyId + 31 * value.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
MqttProperty that = (MqttProperty) obj;
return this.propertyId == that.propertyId && this.value.equals(that.value);
}
} }
public static final class IntegerProperty extends MqttProperty<Integer> { public static final class IntegerProperty extends MqttProperty<Integer> {
@ -146,6 +163,11 @@ public final class MqttProperties {
public IntegerProperty(int propertyId, Integer value) { public IntegerProperty(int propertyId, Integer value) {
super(propertyId, value); super(propertyId, value);
} }
@Override
public String toString() {
return "IntegerProperty(" + propertyId + ", " + value + ")";
}
} }
public static final class StringProperty extends MqttProperty<String> { public static final class StringProperty extends MqttProperty<String> {
@ -153,6 +175,11 @@ public final class MqttProperties {
public StringProperty(int propertyId, String value) { public StringProperty(int propertyId, String value) {
super(propertyId, value); super(propertyId, value);
} }
@Override
public String toString() {
return "StringProperty(" + propertyId + ", " + value + ")";
}
} }
public static final class StringPair { public static final class StringPair {
@ -200,6 +227,14 @@ public final class MqttProperties {
this.value.addAll(values); this.value.addAll(values);
} }
private static UserProperties fromUserPropertyCollection(Collection<UserProperty> properties) {
UserProperties userProperties = new UserProperties();
for (UserProperty property: properties) {
userProperties.add(new StringPair(property.value.key, property.value.value));
}
return userProperties;
}
public void add(StringPair pair) { public void add(StringPair pair) {
this.value.add(pair); this.value.add(pair);
} }
@ -207,12 +242,32 @@ public final class MqttProperties {
public void add(String key, String value) { public void add(String key, String value) {
this.value.add(new StringPair(key, value)); this.value.add(new StringPair(key, value));
} }
@Override
public String toString() {
StringBuilder builder = new StringBuilder("UserProperties(");
boolean first = true;
for (StringPair pair: value) {
if (!first) {
builder.append(", ");
}
builder.append(pair.key + "->" + pair.value);
first = false;
}
builder.append(")");
return builder.toString();
}
} }
public static final class UserProperty extends MqttProperty<StringPair> { public static final class UserProperty extends MqttProperty<StringPair> {
public UserProperty(String key, String value) { public UserProperty(String key, String value) {
super(MqttPropertyType.USER_PROPERTY.value, new StringPair(key, value)); super(MqttPropertyType.USER_PROPERTY.value, new StringPair(key, value));
} }
@Override
public String toString() {
return "UserProperty(" + value.key + ", " + value.value + ")";
}
} }
public static final class BinaryProperty extends MqttProperty<byte[]> { public static final class BinaryProperty extends MqttProperty<byte[]> {
@ -220,6 +275,11 @@ public final class MqttProperties {
public BinaryProperty(int propertyId, byte[] value) { public BinaryProperty(int propertyId, byte[] value) {
super(propertyId, value); super(propertyId, value);
} }
@Override
public String toString() {
return "BinaryProperty(" + propertyId + ", " + value.length + " bytes)";
}
} }
public MqttProperties() { public MqttProperties() {
@ -231,6 +291,8 @@ public final class MqttProperties {
} }
private IntObjectHashMap<MqttProperty> props; private IntObjectHashMap<MqttProperty> props;
private List<UserProperty> userProperties;
private List<IntegerProperty> subscriptionIds;
private final boolean canModify; private final boolean canModify;
public void add(MqttProperty property) { public void add(MqttProperty property) {
@ -239,21 +301,30 @@ public final class MqttProperties {
} }
IntObjectHashMap<MqttProperty> props = this.props; IntObjectHashMap<MqttProperty> props = this.props;
if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) { if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) {
UserProperties userProps = (UserProperties) (props != null? props.get(property.propertyId) : null); List<UserProperty> userProperties = this.userProperties;
if (userProps == null) { if (userProperties == null) {
userProps = new UserProperties(); userProperties = new ArrayList<UserProperty>(1);
if (props == null) { this.userProperties = userProperties;
props = new IntObjectHashMap<MqttProperty>();
this.props = props;
}
props.put(property.propertyId, userProps);
} }
if (property instanceof UserProperty) { if (property instanceof UserProperty) {
userProps.add(((UserProperty) property).value); userProperties.add((UserProperty) property);
} else { } else if (property instanceof UserProperties) {
for (StringPair pair: ((UserProperties) property).value) { for (StringPair pair: ((UserProperties) property).value) {
userProps.add(pair); userProperties.add(new UserProperty(pair.key, pair.value));
} }
} else {
throw new IllegalArgumentException("User property must be of UserProperty or UserProperties type");
}
} else if (property.propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
List<IntegerProperty> subscriptionIds = this.subscriptionIds;
if (subscriptionIds == null) {
subscriptionIds = new ArrayList<IntegerProperty>(1);
this.subscriptionIds = subscriptionIds;
}
if (property instanceof IntegerProperty) {
subscriptionIds.add((IntegerProperty) property);
} else {
throw new IllegalArgumentException("Subscription ID must be an integer property");
} }
} else { } else {
if (props == null) { if (props == null) {
@ -266,7 +337,26 @@ public final class MqttProperties {
public Collection<? extends MqttProperty> listAll() { public Collection<? extends MqttProperty> listAll() {
IntObjectHashMap<MqttProperty> props = this.props; IntObjectHashMap<MqttProperty> props = this.props;
return props == null? Collections.<MqttProperty>emptyList() : props.values(); if (props == null && subscriptionIds == null && userProperties == null) {
return Collections.<MqttProperty>emptyList();
}
if (subscriptionIds == null && userProperties == null) {
return props.values();
}
if (props == null && userProperties == null) {
return subscriptionIds;
}
List<MqttProperty> propValues = new ArrayList<MqttProperty>(props != null ? props.size() : 1);
if (props != null) {
propValues.addAll(props.values());
}
if (subscriptionIds != null) {
propValues.addAll(subscriptionIds);
}
if (userProperties != null) {
propValues.add(UserProperties.fromUserPropertyCollection(userProperties));
}
return propValues;
} }
public boolean isEmpty() { public boolean isEmpty() {
@ -274,8 +364,51 @@ public final class MqttProperties {
return props == null || props.isEmpty(); return props == null || props.isEmpty();
} }
/**
* Get property by ID. If there are multiple properties of this type (can be with Subscription ID)
* then return the first one.
*
* @param propertyId ID of the property
* @return a property if it is set, null otherwise
*/
public MqttProperty getProperty(int propertyId) { public MqttProperty getProperty(int propertyId) {
if (propertyId == MqttPropertyType.USER_PROPERTY.value) {
//special handling to keep compatibility with earlier versions
List<UserProperty> userProperties = this.userProperties;
if (userProperties == null) {
return null;
}
return UserProperties.fromUserPropertyCollection(userProperties);
}
if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
List<IntegerProperty> subscriptionIds = this.subscriptionIds;
if (subscriptionIds == null || subscriptionIds.isEmpty()) {
return null;
}
return subscriptionIds.get(0);
}
IntObjectHashMap<MqttProperty> props = this.props; IntObjectHashMap<MqttProperty> props = this.props;
return props == null ? null : props.get(propertyId); return props == null ? null : props.get(propertyId);
} }
/**
* Get properties by ID.
* Some properties (Subscription ID and User Properties) may occur multiple times,
* this method returns all their values in order.
*
* @param propertyId ID of the property
* @return all properties having specified ID
*/
public List<? extends MqttProperty> getProperties(int propertyId) {
if (propertyId == MqttPropertyType.USER_PROPERTY.value) {
return userProperties == null ? Collections.<MqttProperty>emptyList() : userProperties;
}
if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
return subscriptionIds == null ? Collections.<MqttProperty>emptyList() : subscriptionIds;
}
IntObjectHashMap<MqttProperty> props = this.props;
return (props == null || !props.containsKey(propertyId)) ?
Collections.<MqttProperty>emptyList() :
Collections.singletonList(props.get(propertyId));
}
} }

View File

@ -518,11 +518,18 @@ public class MqttCodecTest {
public void testPublishMessageForMqtt5() throws Exception { public void testPublishMessageForMqtt5() throws Exception {
when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5); when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5);
MqttProperties props = new MqttProperties(); MqttProperties props = new MqttProperties();
props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 10));
props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 20));
props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6)); props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
props.add(new MqttProperties.UserProperty("isSecret", "true")); props.add(new MqttProperties.UserProperty("isSecret", "true"));
props.add(new MqttProperties.UserProperty("isUrgent", "false")); props.add(new MqttProperties.UserProperty("tag", "firstTag"));
assertEquals("User properties count mismatch", props.add(new MqttProperties.UserProperty("tag", "secondTag"));
((MqttProperties.UserProperties) props.getProperty(USER_PROPERTY.value())).value.size(), 2); assertEquals("Subscription IDs count mismatch", 2,
(props.getProperties(SUBSCRIPTION_IDENTIFIER.value())).size());
assertEquals("User properties count mismatch", 3,
(props.getProperties(USER_PROPERTY.value())).size());
assertEquals("UserProperties count mismatch", 3,
((MqttProperties.UserProperties) props.getProperty(USER_PROPERTY.value())).value.size());
final MqttPublishMessage message = createPublishMessage(props); final MqttPublishMessage message = createPublishMessage(props);
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);

View File

@ -0,0 +1,112 @@
/*
* Copyright 2020 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.codec.mqtt;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.USER_PROPERTY;
import static org.junit.Assert.assertEquals;
public class MqttPropertiesTest {
private MqttProperties createSampleProperties() {
MqttProperties props = new MqttProperties();
props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 10));
props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 20));
props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
props.add(new MqttProperties.StringProperty(CONTENT_TYPE.value(), "text/plain"));
props.add(new MqttProperties.UserProperty("isSecret", "true"));
props.add(new MqttProperties.UserProperty("tag", "firstTag"));
props.add(new MqttProperties.UserProperty("tag", "secondTag"));
return props;
}
@Test
public void testGetProperty() {
MqttProperties props = createSampleProperties();
assertEquals("getProperty Content Type",
"text/plain",
((MqttProperties.StringProperty) props.getProperty(CONTENT_TYPE.value())).value);
assertEquals("getProperty Subscription ID",
10,
((MqttProperties.IntegerProperty) props.getProperty(SUBSCRIPTION_IDENTIFIER.value())).value.intValue());
List<MqttProperties.StringPair> expectedUserProps = new ArrayList<MqttProperties.StringPair>();
expectedUserProps.add(new MqttProperties.StringPair("isSecret", "true"));
expectedUserProps.add(new MqttProperties.StringPair("tag", "firstTag"));
expectedUserProps.add(new MqttProperties.StringPair("tag", "secondTag"));
List<MqttProperties.StringPair> actualUserProps =
((MqttProperties.UserProperties) props.getProperty(USER_PROPERTY.value())).value;
assertEquals("getProperty User Properties", expectedUserProps, actualUserProps);
}
@Test
public void testGetProperties() {
MqttProperties props = createSampleProperties();
assertEquals("getProperties Content Type",
Collections.singletonList(new MqttProperties.StringProperty(CONTENT_TYPE.value(), "text/plain")),
props.getProperties(CONTENT_TYPE.value()));
List<MqttProperties.IntegerProperty> expectedSubscriptionIds = new ArrayList<MqttProperties.IntegerProperty>();
expectedSubscriptionIds.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 10));
expectedSubscriptionIds.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 20));
assertEquals("getProperties Subscription ID",
expectedSubscriptionIds,
props.getProperties(SUBSCRIPTION_IDENTIFIER.value()));
List<MqttProperties.UserProperty> expectedUserProps = new ArrayList<MqttProperties.UserProperty>();
expectedUserProps.add(new MqttProperties.UserProperty("isSecret", "true"));
expectedUserProps.add(new MqttProperties.UserProperty("tag", "firstTag"));
expectedUserProps.add(new MqttProperties.UserProperty("tag", "secondTag"));
List<MqttProperties.UserProperty> actualUserProps =
(List<MqttProperties.UserProperty>) props.getProperties(USER_PROPERTY.value());
assertEquals("getProperty User Properties", expectedUserProps, actualUserProps);
}
@Test
public void testListAll() {
MqttProperties props = createSampleProperties();
List<MqttProperties.MqttProperty> expectedProperties = new ArrayList<MqttProperties.MqttProperty>();
expectedProperties.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
expectedProperties.add(new MqttProperties.StringProperty(CONTENT_TYPE.value(), "text/plain"));
expectedProperties.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 10));
expectedProperties.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 20));
MqttProperties.UserProperties expectedUserProperties = new MqttProperties.UserProperties();
expectedUserProperties.add(new MqttProperties.StringPair("isSecret", "true"));
expectedUserProperties.add(new MqttProperties.StringPair("tag", "firstTag"));
expectedUserProperties.add(new MqttProperties.StringPair("tag", "secondTag"));
expectedProperties.add(expectedUserProperties);
assertEquals("listAll",
expectedProperties,
props.listAll());
}
}

View File

@ -19,6 +19,8 @@ package io.netty.handler.codec.mqtt;
import io.netty.buffer.ByteBufUtil; import io.netty.buffer.ByteBufUtil;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -32,6 +34,8 @@ public final class MqttTestUtils {
public static void validateProperties(MqttProperties expected, MqttProperties actual) { public static void validateProperties(MqttProperties expected, MqttProperties actual) {
for (MqttProperties.MqttProperty expectedProperty : expected.listAll()) { for (MqttProperties.MqttProperty expectedProperty : expected.listAll()) {
MqttProperties.MqttProperty actualProperty = actual.getProperty(expectedProperty.propertyId); MqttProperties.MqttProperty actualProperty = actual.getProperty(expectedProperty.propertyId);
List<? extends MqttProperties.MqttProperty> actualProperties =
actual.getProperties(expectedProperty.propertyId);
switch (MqttProperties.MqttPropertyType.valueOf(expectedProperty.propertyId)) { switch (MqttProperties.MqttPropertyType.valueOf(expectedProperty.propertyId)) {
// one byte value integer property // one byte value integer property
case PAYLOAD_FORMAT_INDICATOR: case PAYLOAD_FORMAT_INDICATOR:
@ -70,8 +74,7 @@ public final class MqttTestUtils {
// four byte value integer property // four byte value integer property
case SUBSCRIPTION_IDENTIFIER: { case SUBSCRIPTION_IDENTIFIER: {
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; assertContainsValue("Subscription ID doesn't match", expectedValue, actualProperties);
assertEquals("variable byte integer property doesn't match", expectedValue, actualValue);
break; break;
} }
// UTF-8 string value integer property // UTF-8 string value integer property
@ -119,6 +122,18 @@ public final class MqttTestUtils {
} }
} }
private static void assertContainsValue(String message,
Integer expectedValue,
List<? extends MqttProperties.MqttProperty> properties) {
for (MqttProperties.MqttProperty property: properties) {
if (property instanceof MqttProperties.IntegerProperty &&
((MqttProperties.IntegerProperty) property).value == expectedValue) {
return;
}
}
fail(message + " - properties didn't contain expected integer value " + expectedValue + ": " + properties);
}
public static void validateSubscribePayload(MqttSubscribePayload expected, MqttSubscribePayload actual) { public static void validateSubscribePayload(MqttSubscribePayload expected, MqttSubscribePayload actual) {
List<MqttTopicSubscription> expectedTopicSubscriptions = expected.topicSubscriptions(); List<MqttTopicSubscription> expectedTopicSubscriptions = expected.topicSubscriptions();
List<MqttTopicSubscription> actualTopicSubscriptions = actual.topicSubscriptions(); List<MqttTopicSubscription> actualTopicSubscriptions = actual.topicSubscriptions();