Not truncate MQTT SUBACK reason codes (#10584)

Motivation:

Reason code for MQTT SUBACK messages is truncated, thus not allowing to get new codes supported by MQTT v.5

Modification:

Changed `MqttCodecTest.testUnsubAckMessageForMqtt5` to catch it then, removed truncation in `MqttDecoder.decodeSubackPayload` to make it pass.

Result:

Codec users can get all reason codes supported by MQTT5 now.
This commit is contained in:
Paul Lysak 2020-09-22 09:58:49 +03:00 committed by GitHub
parent 71d034593f
commit b112483799
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 25 deletions

View File

@ -543,15 +543,12 @@ public final class MqttDecoder extends ReplayingDecoder<DecoderState> {
private static Result<MqttSubAckPayload> decodeSubackPayload( private static Result<MqttSubAckPayload> decodeSubackPayload(
ByteBuf buffer, ByteBuf buffer,
int bytesRemainingInVariablePart) { int bytesRemainingInVariablePart) {
final List<Integer> grantedQos = new ArrayList<Integer>(); final List<Integer> grantedQos = new ArrayList<Integer>(bytesRemainingInVariablePart);
int numberOfBytesConsumed = 0; int numberOfBytesConsumed = 0;
while (numberOfBytesConsumed < bytesRemainingInVariablePart) { while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
int qos = buffer.readUnsignedByte(); int reasonCode = buffer.readUnsignedByte();
if (qos != MqttQoS.FAILURE.value()) {
qos &= 0x03;
}
numberOfBytesConsumed++; numberOfBytesConsumed++;
grantedQos.add(qos); grantedQos.add(reasonCode);
} }
return new Result<MqttSubAckPayload>(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed); return new Result<MqttSubAckPayload>(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed);
} }

View File

@ -368,8 +368,8 @@ public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
writeVariableLengthInt(buf, variablePartSize); writeVariableLengthInt(buf, variablePartSize);
buf.writeShort(message.variableHeader().messageId()); buf.writeShort(message.variableHeader().messageId());
buf.writeBytes(propertiesBuf); buf.writeBytes(propertiesBuf);
for (int qos : message.payload().grantedQoSLevels()) { for (int code: message.payload().reasonCodes()) {
buf.writeByte(qos); buf.writeByte(code);
} }
return buf; return buf;

View File

@ -28,39 +28,51 @@ import java.util.List;
*/ */
public class MqttSubAckPayload { public class MqttSubAckPayload {
private final List<Integer> grantedQoSLevels; private final List<Integer> reasonCodes;
public MqttSubAckPayload(int... grantedQoSLevels) { public MqttSubAckPayload(int... reasonCodes) {
ObjectUtil.checkNotNull(grantedQoSLevels, "grantedQoSLevels"); ObjectUtil.checkNotNull(reasonCodes, "reasonCodes");
List<Integer> list = new ArrayList<Integer>(grantedQoSLevels.length); List<Integer> list = new ArrayList<Integer>(reasonCodes.length);
for (int v: grantedQoSLevels) { for (int v: reasonCodes) {
list.add(v); list.add(v);
} }
this.grantedQoSLevels = Collections.unmodifiableList(list); this.reasonCodes = Collections.unmodifiableList(list);
} }
public MqttSubAckPayload(Iterable<Integer> grantedQoSLevels) { public MqttSubAckPayload(Iterable<Integer> reasonCodes) {
ObjectUtil.checkNotNull(grantedQoSLevels, "grantedQoSLevels"); ObjectUtil.checkNotNull(reasonCodes, "reasonCodes");
List<Integer> list = new ArrayList<Integer>(); List<Integer> list = new ArrayList<Integer>();
for (Integer v: grantedQoSLevels) { for (Integer v: reasonCodes) {
if (v == null) { if (v == null) {
break; break;
} }
list.add(v); list.add(v);
} }
this.grantedQoSLevels = Collections.unmodifiableList(list); this.reasonCodes = Collections.unmodifiableList(list);
} }
public List<Integer> grantedQoSLevels() { public List<Integer> grantedQoSLevels() {
return grantedQoSLevels; List<Integer> qosLevels = new ArrayList<Integer>(reasonCodes.size());
for (int code: reasonCodes) {
if (code > MqttQoS.EXACTLY_ONCE.value()) {
qosLevels.add(MqttQoS.FAILURE.value());
} else {
qosLevels.add(code);
}
}
return qosLevels;
}
public List<Integer> reasonCodes() {
return reasonCodes;
} }
@Override @Override
public String toString() { public String toString() {
return new StringBuilder(StringUtil.simpleClassName(this)) return new StringBuilder(StringUtil.simpleClassName(this))
.append('[') .append('[')
.append("grantedQoSLevels=").append(grantedQoSLevels) .append("reasonCodes=").append(reasonCodes)
.append(']') .append(']')
.toString(); .toString();
} }

View File

@ -581,7 +581,7 @@ public class MqttCodecTest {
public void testSubAckMessageForMqtt5() throws Exception { public void testSubAckMessageForMqtt5() throws Exception {
MqttProperties props = new MqttProperties(); MqttProperties props = new MqttProperties();
props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6)); props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6));
final MqttSubAckMessage message = createSubAckMessage(props); final MqttSubAckMessage message = createSubAckMessage(props, new int[] {1, 2, 0, 0x87 /* not authorized */});
ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message);
final List<Object> out = new LinkedList<Object>(); final List<Object> out = new LinkedList<Object>();
@ -595,6 +595,11 @@ public class MqttCodecTest {
validatePacketIdAndPropertiesVariableHeader( validatePacketIdAndPropertiesVariableHeader(
(MqttMessageIdAndPropertiesVariableHeader) message.variableHeader(), (MqttMessageIdAndPropertiesVariableHeader) message.variableHeader(),
(MqttMessageIdAndPropertiesVariableHeader) decodedMessage.variableHeader()); (MqttMessageIdAndPropertiesVariableHeader) decodedMessage.variableHeader());
validateSubAckPayload(message.payload(), decodedMessage.payload());
assertArrayEquals(
"MqttSubAckPayload QoS mismatch ",
new Integer[] {1, 2, 0, 0x80},
decodedMessage.payload().grantedQoSLevels().toArray());
} }
@Test @Test
@ -872,14 +877,14 @@ public class MqttCodecTest {
} }
private static MqttSubAckMessage createSubAckMessage() { private static MqttSubAckMessage createSubAckMessage() {
return createSubAckMessage(MqttProperties.NO_PROPERTIES); return createSubAckMessage(MqttProperties.NO_PROPERTIES, new int[] {1, 2, 0});
} }
private static MqttSubAckMessage createSubAckMessage(MqttProperties properties) { private static MqttSubAckMessage createSubAckMessage(MqttProperties properties, int[] reasonCodes) {
MqttFixedHeader mqttFixedHeader = MqttFixedHeader mqttFixedHeader =
new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345); MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345);
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(1, 2, 0); MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(reasonCodes);
return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload); return new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload);
} }
@ -986,7 +991,11 @@ public class MqttCodecTest {
private static void validateSubAckPayload(MqttSubAckPayload expected, MqttSubAckPayload actual) { private static void validateSubAckPayload(MqttSubAckPayload expected, MqttSubAckPayload actual) {
assertArrayEquals( assertArrayEquals(
"MqttSubAckPayload GrantedQosLevels mismatch ", "MqttSubAckPayload reason codes mismatch ",
expected.reasonCodes().toArray(),
actual.reasonCodes().toArray());
assertArrayEquals(
"MqttSubAckPayload QoS level mismatch ",
expected.grantedQoSLevels().toArray(), expected.grantedQoSLevels().toArray(),
actual.grantedQoSLevels().toArray()); actual.grantedQoSLevels().toArray());
} }