MQTT5: support multiple Subscription ID properties (#10734)
Motivation: Subscription ID property of the PUBLISH message may be repeated multiple times, which wasn't taken into account when developing `MqttProperties` API. Modification: Store Subscription ID properties separately from others - in `MqttProperties.subscriptionIds`. Add `MqttProperties.getProperties` method to retrieve properties that may be repeated. Change internal representation of User Properties for uniformity with Subscription ID - now they're stored in `MqttProperties.userProperties` rather than the common hash map. Result: Multiple Subscription ID properties can be set or retrieved.
This commit is contained in:
parent
7579054a8f
commit
30e9b6846d
@ -139,6 +139,23 @@ public final class MqttProperties {
|
||||
public int propertyId() {
|
||||
return propertyId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return propertyId + 31 * value.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
MqttProperty that = (MqttProperty) obj;
|
||||
return this.propertyId == that.propertyId && this.value.equals(that.value);
|
||||
}
|
||||
}
|
||||
|
||||
public static final class IntegerProperty extends MqttProperty<Integer> {
|
||||
@ -146,6 +163,11 @@ public final class MqttProperties {
|
||||
public IntegerProperty(int propertyId, Integer value) {
|
||||
super(propertyId, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "IntegerProperty(" + propertyId + ", " + value + ")";
|
||||
}
|
||||
}
|
||||
|
||||
public static final class StringProperty extends MqttProperty<String> {
|
||||
@ -153,6 +175,11 @@ public final class MqttProperties {
|
||||
public StringProperty(int propertyId, String value) {
|
||||
super(propertyId, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "StringProperty(" + propertyId + ", " + value + ")";
|
||||
}
|
||||
}
|
||||
|
||||
public static final class StringPair {
|
||||
@ -200,6 +227,14 @@ public final class MqttProperties {
|
||||
this.value.addAll(values);
|
||||
}
|
||||
|
||||
private static UserProperties fromUserPropertyCollection(Collection<UserProperty> properties) {
|
||||
UserProperties userProperties = new UserProperties();
|
||||
for (UserProperty property: properties) {
|
||||
userProperties.add(new StringPair(property.value.key, property.value.value));
|
||||
}
|
||||
return userProperties;
|
||||
}
|
||||
|
||||
public void add(StringPair pair) {
|
||||
this.value.add(pair);
|
||||
}
|
||||
@ -207,12 +242,32 @@ public final class MqttProperties {
|
||||
public void add(String key, String value) {
|
||||
this.value.add(new StringPair(key, value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder("UserProperties(");
|
||||
boolean first = true;
|
||||
for (StringPair pair: value) {
|
||||
if (!first) {
|
||||
builder.append(", ");
|
||||
}
|
||||
builder.append(pair.key + "->" + pair.value);
|
||||
first = false;
|
||||
}
|
||||
builder.append(")");
|
||||
return builder.toString();
|
||||
}
|
||||
}
|
||||
|
||||
public static final class UserProperty extends MqttProperty<StringPair> {
|
||||
public UserProperty(String key, String value) {
|
||||
super(MqttPropertyType.USER_PROPERTY.value, new StringPair(key, value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "UserProperty(" + value.key + ", " + value.value + ")";
|
||||
}
|
||||
}
|
||||
|
||||
public static final class BinaryProperty extends MqttProperty<byte[]> {
|
||||
@ -220,6 +275,11 @@ public final class MqttProperties {
|
||||
public BinaryProperty(int propertyId, byte[] value) {
|
||||
super(propertyId, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "BinaryProperty(" + propertyId + ", " + value.length + " bytes)";
|
||||
}
|
||||
}
|
||||
|
||||
public MqttProperties() {
|
||||
@ -231,6 +291,8 @@ public final class MqttProperties {
|
||||
}
|
||||
|
||||
private IntObjectHashMap<MqttProperty> props;
|
||||
private List<UserProperty> userProperties;
|
||||
private List<IntegerProperty> subscriptionIds;
|
||||
private final boolean canModify;
|
||||
|
||||
public void add(MqttProperty property) {
|
||||
@ -239,21 +301,30 @@ public final class MqttProperties {
|
||||
}
|
||||
IntObjectHashMap<MqttProperty> props = this.props;
|
||||
if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) {
|
||||
UserProperties userProps = (UserProperties) (props != null? props.get(property.propertyId) : null);
|
||||
if (userProps == null) {
|
||||
userProps = new UserProperties();
|
||||
if (props == null) {
|
||||
props = new IntObjectHashMap<MqttProperty>();
|
||||
this.props = props;
|
||||
}
|
||||
props.put(property.propertyId, userProps);
|
||||
List<UserProperty> userProperties = this.userProperties;
|
||||
if (userProperties == null) {
|
||||
userProperties = new ArrayList<UserProperty>(1);
|
||||
this.userProperties = userProperties;
|
||||
}
|
||||
if (property instanceof UserProperty) {
|
||||
userProps.add(((UserProperty) property).value);
|
||||
} else {
|
||||
userProperties.add((UserProperty) property);
|
||||
} else if (property instanceof UserProperties) {
|
||||
for (StringPair pair: ((UserProperties) property).value) {
|
||||
userProps.add(pair);
|
||||
userProperties.add(new UserProperty(pair.key, pair.value));
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("User property must be of UserProperty or UserProperties type");
|
||||
}
|
||||
} else if (property.propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
|
||||
List<IntegerProperty> subscriptionIds = this.subscriptionIds;
|
||||
if (subscriptionIds == null) {
|
||||
subscriptionIds = new ArrayList<IntegerProperty>(1);
|
||||
this.subscriptionIds = subscriptionIds;
|
||||
}
|
||||
if (property instanceof IntegerProperty) {
|
||||
subscriptionIds.add((IntegerProperty) property);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Subscription ID must be an integer property");
|
||||
}
|
||||
} else {
|
||||
if (props == null) {
|
||||
@ -266,7 +337,26 @@ public final class MqttProperties {
|
||||
|
||||
public Collection<? extends MqttProperty> listAll() {
|
||||
IntObjectHashMap<MqttProperty> props = this.props;
|
||||
return props == null? Collections.<MqttProperty>emptyList() : props.values();
|
||||
if (props == null && subscriptionIds == null && userProperties == null) {
|
||||
return Collections.<MqttProperty>emptyList();
|
||||
}
|
||||
if (subscriptionIds == null && userProperties == null) {
|
||||
return props.values();
|
||||
}
|
||||
if (props == null && userProperties == null) {
|
||||
return subscriptionIds;
|
||||
}
|
||||
List<MqttProperty> propValues = new ArrayList<MqttProperty>(props != null ? props.size() : 1);
|
||||
if (props != null) {
|
||||
propValues.addAll(props.values());
|
||||
}
|
||||
if (subscriptionIds != null) {
|
||||
propValues.addAll(subscriptionIds);
|
||||
}
|
||||
if (userProperties != null) {
|
||||
propValues.add(UserProperties.fromUserPropertyCollection(userProperties));
|
||||
}
|
||||
return propValues;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
@ -274,8 +364,51 @@ public final class MqttProperties {
|
||||
return props == null || props.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get property by ID. If there are multiple properties of this type (can be with Subscription ID)
|
||||
* then return the first one.
|
||||
*
|
||||
* @param propertyId ID of the property
|
||||
* @return a property if it is set, null otherwise
|
||||
*/
|
||||
public MqttProperty getProperty(int propertyId) {
|
||||
if (propertyId == MqttPropertyType.USER_PROPERTY.value) {
|
||||
//special handling to keep compatibility with earlier versions
|
||||
List<UserProperty> userProperties = this.userProperties;
|
||||
if (userProperties == null) {
|
||||
return null;
|
||||
}
|
||||
return UserProperties.fromUserPropertyCollection(userProperties);
|
||||
}
|
||||
if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
|
||||
List<IntegerProperty> subscriptionIds = this.subscriptionIds;
|
||||
if (subscriptionIds == null || subscriptionIds.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return subscriptionIds.get(0);
|
||||
}
|
||||
IntObjectHashMap<MqttProperty> props = this.props;
|
||||
return props == null? null : props.get(propertyId);
|
||||
return props == null ? null : props.get(propertyId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get properties by ID.
|
||||
* Some properties (Subscription ID and User Properties) may occur multiple times,
|
||||
* this method returns all their values in order.
|
||||
*
|
||||
* @param propertyId ID of the property
|
||||
* @return all properties having specified ID
|
||||
*/
|
||||
public List<? extends MqttProperty> getProperties(int propertyId) {
|
||||
if (propertyId == MqttPropertyType.USER_PROPERTY.value) {
|
||||
return userProperties == null ? Collections.<MqttProperty>emptyList() : userProperties;
|
||||
}
|
||||
if (propertyId == MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value) {
|
||||
return subscriptionIds == null ? Collections.<MqttProperty>emptyList() : subscriptionIds;
|
||||
}
|
||||
IntObjectHashMap<MqttProperty> props = this.props;
|
||||
return (props == null || !props.containsKey(propertyId)) ?
|
||||
Collections.<MqttProperty>emptyList() :
|
||||
Collections.singletonList(props.get(propertyId));
|
||||
}
|
||||
}
|
||||
|
@ -502,11 +502,18 @@ public class MqttCodecTest {
|
||||
public void testPublishMessageForMqtt5() throws Exception {
|
||||
when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5);
|
||||
MqttProperties props = new MqttProperties();
|
||||
props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 10));
|
||||
props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 20));
|
||||
props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
|
||||
props.add(new MqttProperties.UserProperty("isSecret", "true"));
|
||||
props.add(new MqttProperties.UserProperty("isUrgent", "false"));
|
||||
assertEquals("User properties count mismatch",
|
||||
((MqttProperties.UserProperties) props.getProperty(USER_PROPERTY.value())).value.size(), 2);
|
||||
props.add(new MqttProperties.UserProperty("tag", "firstTag"));
|
||||
props.add(new MqttProperties.UserProperty("tag", "secondTag"));
|
||||
assertEquals("Subscription IDs count mismatch", 2,
|
||||
(props.getProperties(SUBSCRIPTION_IDENTIFIER.value())).size());
|
||||
assertEquals("User properties count mismatch", 3,
|
||||
(props.getProperties(USER_PROPERTY.value())).size());
|
||||
assertEquals("UserProperties count mismatch", 3,
|
||||
((MqttProperties.UserProperties) props.getProperty(USER_PROPERTY.value())).value.size());
|
||||
final MqttPublishMessage message = createPublishMessage(props);
|
||||
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
|
||||
|
||||
|
@ -0,0 +1,112 @@
|
||||
/*
|
||||
* Copyright 2020 The Netty Project
|
||||
*
|
||||
* The Netty Project licenses this file to you under the Apache License,
|
||||
* version 2.0 (the "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at:
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.netty.handler.codec.mqtt;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
|
||||
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.PAYLOAD_FORMAT_INDICATOR;
|
||||
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;
|
||||
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.USER_PROPERTY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class MqttPropertiesTest {
|
||||
|
||||
private MqttProperties createSampleProperties() {
|
||||
MqttProperties props = new MqttProperties();
|
||||
props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 10));
|
||||
props.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 20));
|
||||
props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
|
||||
props.add(new MqttProperties.StringProperty(CONTENT_TYPE.value(), "text/plain"));
|
||||
props.add(new MqttProperties.UserProperty("isSecret", "true"));
|
||||
props.add(new MqttProperties.UserProperty("tag", "firstTag"));
|
||||
props.add(new MqttProperties.UserProperty("tag", "secondTag"));
|
||||
return props;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetProperty() {
|
||||
MqttProperties props = createSampleProperties();
|
||||
|
||||
assertEquals("getProperty Content Type",
|
||||
"text/plain",
|
||||
((MqttProperties.StringProperty) props.getProperty(CONTENT_TYPE.value())).value);
|
||||
assertEquals("getProperty Subscription ID",
|
||||
10,
|
||||
((MqttProperties.IntegerProperty) props.getProperty(SUBSCRIPTION_IDENTIFIER.value())).value.intValue());
|
||||
|
||||
List<MqttProperties.StringPair> expectedUserProps = new ArrayList<MqttProperties.StringPair>();
|
||||
expectedUserProps.add(new MqttProperties.StringPair("isSecret", "true"));
|
||||
expectedUserProps.add(new MqttProperties.StringPair("tag", "firstTag"));
|
||||
expectedUserProps.add(new MqttProperties.StringPair("tag", "secondTag"));
|
||||
List<MqttProperties.StringPair> actualUserProps =
|
||||
((MqttProperties.UserProperties) props.getProperty(USER_PROPERTY.value())).value;
|
||||
assertEquals("getProperty User Properties", expectedUserProps, actualUserProps);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetProperties() {
|
||||
MqttProperties props = createSampleProperties();
|
||||
|
||||
assertEquals("getProperties Content Type",
|
||||
Collections.singletonList(new MqttProperties.StringProperty(CONTENT_TYPE.value(), "text/plain")),
|
||||
props.getProperties(CONTENT_TYPE.value()));
|
||||
|
||||
List<MqttProperties.IntegerProperty> expectedSubscriptionIds = new ArrayList<MqttProperties.IntegerProperty>();
|
||||
expectedSubscriptionIds.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 10));
|
||||
expectedSubscriptionIds.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 20));
|
||||
assertEquals("getProperties Subscription ID",
|
||||
expectedSubscriptionIds,
|
||||
props.getProperties(SUBSCRIPTION_IDENTIFIER.value()));
|
||||
|
||||
List<MqttProperties.UserProperty> expectedUserProps = new ArrayList<MqttProperties.UserProperty>();
|
||||
expectedUserProps.add(new MqttProperties.UserProperty("isSecret", "true"));
|
||||
expectedUserProps.add(new MqttProperties.UserProperty("tag", "firstTag"));
|
||||
expectedUserProps.add(new MqttProperties.UserProperty("tag", "secondTag"));
|
||||
List<MqttProperties.UserProperty> actualUserProps =
|
||||
(List<MqttProperties.UserProperty>) props.getProperties(USER_PROPERTY.value());
|
||||
assertEquals("getProperty User Properties", expectedUserProps, actualUserProps);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListAll() {
|
||||
MqttProperties props = createSampleProperties();
|
||||
|
||||
List<MqttProperties.MqttProperty> expectedProperties = new ArrayList<MqttProperties.MqttProperty>();
|
||||
expectedProperties.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
|
||||
expectedProperties.add(new MqttProperties.StringProperty(CONTENT_TYPE.value(), "text/plain"));
|
||||
|
||||
expectedProperties.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 10));
|
||||
expectedProperties.add(new MqttProperties.IntegerProperty(SUBSCRIPTION_IDENTIFIER.value(), 20));
|
||||
|
||||
MqttProperties.UserProperties expectedUserProperties = new MqttProperties.UserProperties();
|
||||
expectedUserProperties.add(new MqttProperties.StringPair("isSecret", "true"));
|
||||
expectedUserProperties.add(new MqttProperties.StringPair("tag", "firstTag"));
|
||||
expectedUserProperties.add(new MqttProperties.StringPair("tag", "secondTag"));
|
||||
|
||||
expectedProperties.add(expectedUserProperties);
|
||||
|
||||
assertEquals("listAll",
|
||||
expectedProperties,
|
||||
props.listAll());
|
||||
}
|
||||
|
||||
}
|
@ -19,6 +19,8 @@ package io.netty.handler.codec.mqtt;
|
||||
import io.netty.buffer.ByteBufUtil;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@ -32,6 +34,8 @@ public final class MqttTestUtils {
|
||||
public static void validateProperties(MqttProperties expected, MqttProperties actual) {
|
||||
for (MqttProperties.MqttProperty expectedProperty : expected.listAll()) {
|
||||
MqttProperties.MqttProperty actualProperty = actual.getProperty(expectedProperty.propertyId);
|
||||
List<? extends MqttProperties.MqttProperty> actualProperties =
|
||||
actual.getProperties(expectedProperty.propertyId);
|
||||
switch (MqttProperties.MqttPropertyType.valueOf(expectedProperty.propertyId)) {
|
||||
// one byte value integer property
|
||||
case PAYLOAD_FORMAT_INDICATOR:
|
||||
@ -70,8 +74,7 @@ public final class MqttTestUtils {
|
||||
// four byte value integer property
|
||||
case SUBSCRIPTION_IDENTIFIER: {
|
||||
final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value;
|
||||
final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value;
|
||||
assertEquals("variable byte integer property doesn't match", expectedValue, actualValue);
|
||||
assertContainsValue("Subscription ID doesn't match", expectedValue, actualProperties);
|
||||
break;
|
||||
}
|
||||
// UTF-8 string value integer property
|
||||
@ -119,6 +122,18 @@ public final class MqttTestUtils {
|
||||
}
|
||||
}
|
||||
|
||||
private static void assertContainsValue(String message,
|
||||
Integer expectedValue,
|
||||
List<? extends MqttProperties.MqttProperty> properties) {
|
||||
for (MqttProperties.MqttProperty property: properties) {
|
||||
if (property instanceof MqttProperties.IntegerProperty &&
|
||||
((MqttProperties.IntegerProperty) property).value == expectedValue) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
fail(message + " - properties didn't contain expected integer value " + expectedValue + ": " + properties);
|
||||
}
|
||||
|
||||
public static void validateSubscribePayload(MqttSubscribePayload expected, MqttSubscribePayload actual) {
|
||||
List<MqttTopicSubscription> expectedTopicSubscriptions = expected.topicSubscriptions();
|
||||
List<MqttTopicSubscription> actualTopicSubscriptions = actual.topicSubscriptions();
|
||||
|
Loading…
Reference in New Issue
Block a user