From be2cd68443948f9d7a8c77d3ae286899146bfee8 Mon Sep 17 00:00:00 2001 From: Paul Lysak Date: Mon, 31 Aug 2020 10:16:40 +0300 Subject: [PATCH] MQTT5 support for netty-codec-mqtt (#10483) Motivation: MQTT Specification version 5 was released over a year ago, netty-codec-mqtt should be changed to support it. Modifications: Added more message and header types in `io.netty.handler.codec.mqtt` package in `netty-coded-mqtt` subproject, changed `MqttEncoder` and `MqttDecoder` to handle them properly, added attribute `NETTY_CODEC_MQTT_VERSION` to track protocol version Result: `netty-codec-mqtt` supports both MQTT5 and MQTT3 now. --- .../handler/codec/mqtt/MqttCodecUtil.java | 31 +- .../codec/mqtt/MqttConnAckVariableHeader.java | 12 + .../codec/mqtt/MqttConnectPayload.java | 25 +- .../codec/mqtt/MqttConnectReturnCode.java | 25 +- .../codec/mqtt/MqttConnectVariableHeader.java | 29 + .../netty/handler/codec/mqtt/MqttDecoder.java | 330 +++++++++- .../netty/handler/codec/mqtt/MqttEncoder.java | 595 +++++++++++++----- .../codec/mqtt/MqttMessageBuilders.java | 265 +++++++- .../codec/mqtt/MqttMessageFactory.java | 16 +- ...tMessageIdAndPropertiesVariableHeader.java | 47 ++ .../mqtt/MqttMessageIdVariableHeader.java | 8 +- .../handler/codec/mqtt/MqttMessageType.java | 3 +- .../handler/codec/mqtt/MqttProperties.java | 232 +++++++ .../MqttPubReplyMessageVariableHeader.java | 56 ++ .../codec/mqtt/MqttPublishVariableHeader.java | 10 + ...ReasonCodeAndPropertiesVariableHeader.java | 54 ++ .../handler/codec/mqtt/MqttSubAckMessage.java | 13 +- .../codec/mqtt/MqttSubscribeMessage.java | 13 +- .../codec/mqtt/MqttSubscriptionOption.java | 124 ++++ .../codec/mqtt/MqttTopicSubscription.java | 17 +- .../codec/mqtt/MqttUnsubAckMessage.java | 33 +- .../codec/mqtt/MqttUnsubAckPayload.java | 65 ++ .../codec/mqtt/MqttUnsubscribeMessage.java | 13 +- .../netty/handler/codec/mqtt/MqttVersion.java | 7 +- .../handler/codec/mqtt/MqttCodecTest.java | 519 ++++++++++++++- .../codec/mqtt/MqttConnectPayloadTest.java | 16 +- .../heartBeat/MqttHeartBeatClientHandler.java | 10 +- 27 files changed, 2330 insertions(+), 238 deletions(-) create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdAndPropertiesVariableHeader.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttProperties.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPubReplyMessageVariableHeader.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttReasonCodeAndPropertiesVariableHeader.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscriptionOption.java create mode 100644 codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckPayload.java diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttCodecUtil.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttCodecUtil.java index b0bcb3f001..f3552a2e27 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttCodecUtil.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttCodecUtil.java @@ -16,7 +16,10 @@ package io.netty.handler.codec.mqtt; +import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; final class MqttCodecUtil { @@ -24,6 +27,22 @@ final class MqttCodecUtil { private static final int MIN_CLIENT_ID_LENGTH = 1; private static final int MAX_CLIENT_ID_LENGTH = 23; + static final AttributeKey MQTT_VERSION_KEY = AttributeKey.valueOf("NETTY_CODEC_MQTT_VERSION"); + + static MqttVersion getMqttVersion(ChannelHandlerContext ctx) { + Attribute attr = ctx.channel().attr(MQTT_VERSION_KEY); + MqttVersion version = attr.get(); + if (version == null) { + return MqttVersion.MQTT_3_1_1; + } + return version; + } + + static void setMqttVersion(ChannelHandlerContext ctx, MqttVersion version) { + Attribute attr = ctx.channel().attr(MQTT_VERSION_KEY); + attr.set(version); + } + static boolean isValidPublishTopicName(String topicName) { // publish topic name must not contain any wildcard for (char c : TOPIC_WILDCARDS) { @@ -43,15 +62,15 @@ final class MqttCodecUtil { return clientId != null && clientId.length() >= MIN_CLIENT_ID_LENGTH && clientId.length() <= MAX_CLIENT_ID_LENGTH; } - if (mqttVersion == MqttVersion.MQTT_3_1_1) { - // In 3.1.3.1 Client Identifier of MQTT 3.1.1 specification, The Server MAY allow ClientId’s + if (mqttVersion == MqttVersion.MQTT_3_1_1 || mqttVersion == MqttVersion.MQTT_5) { + // In 3.1.3.1 Client Identifier of MQTT 3.1.1 and 5.0 specifications, The Server MAY allow ClientId’s // that contain more than 23 encoded bytes. And, The Server MAY allow zero-length ClientId. return clientId != null; } throw new IllegalArgumentException(mqttVersion + " is unknown mqtt version"); } - static MqttFixedHeader validateFixedHeader(MqttFixedHeader mqttFixedHeader) { + static MqttFixedHeader validateFixedHeader(ChannelHandlerContext ctx, MqttFixedHeader mqttFixedHeader) { switch (mqttFixedHeader.messageType()) { case PUBREL: case SUBSCRIBE: @@ -59,6 +78,12 @@ final class MqttCodecUtil { if (mqttFixedHeader.qosLevel() != MqttQoS.AT_LEAST_ONCE) { throw new DecoderException(mqttFixedHeader.messageType().name() + " message must have QoS 1"); } + return mqttFixedHeader; + case AUTH: + if (MqttCodecUtil.getMqttVersion(ctx) != MqttVersion.MQTT_5) { + throw new DecoderException("AUTH message requires at least MQTT 5"); + } + return mqttFixedHeader; default: return mqttFixedHeader; } diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckVariableHeader.java index b84c21d522..cf450a6e1f 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckVariableHeader.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnAckVariableHeader.java @@ -27,9 +27,17 @@ public final class MqttConnAckVariableHeader { private final boolean sessionPresent; + private final MqttProperties properties; + public MqttConnAckVariableHeader(MqttConnectReturnCode connectReturnCode, boolean sessionPresent) { + this(connectReturnCode, sessionPresent, MqttProperties.NO_PROPERTIES); + } + + public MqttConnAckVariableHeader(MqttConnectReturnCode connectReturnCode, boolean sessionPresent, + MqttProperties properties) { this.connectReturnCode = connectReturnCode; this.sessionPresent = sessionPresent; + this.properties = MqttProperties.withEmptyDefaults(properties); } public MqttConnectReturnCode connectReturnCode() { @@ -40,6 +48,10 @@ public final class MqttConnAckVariableHeader { return sessionPresent; } + public MqttProperties properties() { + return properties; + } + @Override public String toString() { return new StringBuilder(StringUtil.simpleClassName(this)) diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectPayload.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectPayload.java index 93f63e507b..6d4619e2c2 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectPayload.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectPayload.java @@ -27,13 +27,15 @@ import io.netty.util.internal.StringUtil; public final class MqttConnectPayload { private final String clientIdentifier; + private final MqttProperties willProperties; private final String willTopic; private final byte[] willMessage; private final String userName; private final byte[] password; /** - * @deprecated use {@link MqttConnectPayload#MqttConnectPayload(String, String, byte[], String, byte[])} instead + * @deprecated use {@link MqttConnectPayload#MqttConnectPayload(String, + * MqttProperties, String, byte[], String, byte[])} instead */ @Deprecated public MqttConnectPayload( @@ -44,6 +46,7 @@ public final class MqttConnectPayload { String password) { this( clientIdentifier, + MqttProperties.NO_PROPERTIES, willTopic, willMessage.getBytes(CharsetUtil.UTF_8), userName, @@ -56,7 +59,23 @@ public final class MqttConnectPayload { byte[] willMessage, String userName, byte[] password) { + this(clientIdentifier, + MqttProperties.NO_PROPERTIES, + willTopic, + willMessage, + userName, + password); + } + + public MqttConnectPayload( + String clientIdentifier, + MqttProperties willProperties, + String willTopic, + byte[] willMessage, + String userName, + byte[] password) { this.clientIdentifier = clientIdentifier; + this.willProperties = MqttProperties.withEmptyDefaults(willProperties); this.willTopic = willTopic; this.willMessage = willMessage; this.userName = userName; @@ -67,6 +86,10 @@ public final class MqttConnectPayload { return clientIdentifier; } + public MqttProperties willProperties() { + return willProperties; + } + public String willTopic() { return willTopic; } diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectReturnCode.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectReturnCode.java index 802dc196f9..be5a091a77 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectReturnCode.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectReturnCode.java @@ -25,11 +25,34 @@ import java.util.Map; */ public enum MqttConnectReturnCode { CONNECTION_ACCEPTED((byte) 0x00), + //MQTT 3 codes CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION((byte) 0X01), CONNECTION_REFUSED_IDENTIFIER_REJECTED((byte) 0x02), CONNECTION_REFUSED_SERVER_UNAVAILABLE((byte) 0x03), CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD((byte) 0x04), - CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05); + CONNECTION_REFUSED_NOT_AUTHORIZED((byte) 0x05), + //MQTT 5 codes + CONNECTION_REFUSED_UNSPECIFIED_ERROR((byte) 0x80), + CONNECTION_REFUSED_MALFORMED_PACKET((byte) 0x81), + CONNECTION_REFUSED_PROTOCOL_ERROR((byte) 0x82), + CONNECTION_REFUSED_IMPLEMENTATION_SPECIFIC((byte) 0x83), + CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION((byte) 0x84), + CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID((byte) 0x85), + CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD((byte) 0x86), + CONNECTION_REFUSED_NOT_AUTHORIZED_5((byte) 0x87), + CONNECTION_REFUSED_SERVER_UNAVAILABLE_5((byte) 0x88), + CONNECTION_REFUSED_SERVER_BUSY((byte) 0x89), + CONNECTION_REFUSED_BANNED((byte) 0x8A), + CONNECTION_REFUSED_BAD_AUTHENTICATION_METHOD((byte) 0x8C), + CONNECTION_REFUSED_TOPIC_NAME_INVALID((byte) 0x90), + CONNECTION_REFUSED_PACKET_TOO_LARGE((byte) 0x95), + CONNECTION_REFUSED_QUOTA_EXCEEDED((byte) 0x97), + CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID((byte) 0x99), + CONNECTION_REFUSED_RETAIN_NOT_SUPPORTED((byte) 0x9A), + CONNECTION_REFUSED_QOS_NOT_SUPPORTED((byte) 0x9B), + CONNECTION_REFUSED_USE_ANOTHER_SERVER((byte) 0x9C), + CONNECTION_REFUSED_SERVER_MOVED((byte) 0x9D), + CONNECTION_REFUSED_CONNECTION_RATE_EXCEEDED((byte) 0x9F); private static final Map VALUE_TO_CODE_MAP; diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectVariableHeader.java index 2a1ca2f850..16e9078c48 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectVariableHeader.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttConnectVariableHeader.java @@ -32,6 +32,7 @@ public final class MqttConnectVariableHeader { private final boolean isWillFlag; private final boolean isCleanSession; private final int keepAliveTimeSeconds; + private final MqttProperties properties; public MqttConnectVariableHeader( String name, @@ -43,6 +44,29 @@ public final class MqttConnectVariableHeader { boolean isWillFlag, boolean isCleanSession, int keepAliveTimeSeconds) { + this(name, + version, + hasUserName, + hasPassword, + isWillRetain, + willQos, + isWillFlag, + isCleanSession, + keepAliveTimeSeconds, + MqttProperties.NO_PROPERTIES); + } + + public MqttConnectVariableHeader( + String name, + int version, + boolean hasUserName, + boolean hasPassword, + boolean isWillRetain, + int willQos, + boolean isWillFlag, + boolean isCleanSession, + int keepAliveTimeSeconds, + MqttProperties properties) { this.name = name; this.version = version; this.hasUserName = hasUserName; @@ -52,6 +76,7 @@ public final class MqttConnectVariableHeader { this.isWillFlag = isWillFlag; this.isCleanSession = isCleanSession; this.keepAliveTimeSeconds = keepAliveTimeSeconds; + this.properties = MqttProperties.withEmptyDefaults(properties); } public String name() { @@ -90,6 +115,10 @@ public final class MqttConnectVariableHeader { return keepAliveTimeSeconds; } + public MqttProperties properties() { + return properties; + } + @Override public String toString() { return new StringBuilder(StringUtil.simpleClassName(this)) diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java index e93d34bde1..fce56d4342 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttDecoder.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.ReplayingDecoder; +import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.codec.mqtt.MqttDecoder.DecoderState; import io.netty.util.CharsetUtil; @@ -31,11 +32,15 @@ import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidMessageId; import static io.netty.handler.codec.mqtt.MqttCodecUtil.isValidPublishTopicName; import static io.netty.handler.codec.mqtt.MqttCodecUtil.resetUnusedFields; import static io.netty.handler.codec.mqtt.MqttCodecUtil.validateFixedHeader; +import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy; /** * Decodes Mqtt messages from bytes, following - * - * the MQTT protocol specification v3.1 + * the MQTT protocol specification + * v3.1 + * or + * v5.0, depending on the + * version specified in the CONNECT message that first goes through the channel. */ public final class MqttDecoder extends ReplayingDecoder { @@ -72,7 +77,7 @@ public final class MqttDecoder extends ReplayingDecoder { protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { switch (state()) { case READ_FIXED_HEADER: try { - mqttFixedHeader = decodeFixedHeader(buffer); + mqttFixedHeader = decodeFixedHeader(ctx, buffer); bytesRemainingInVariablePart = mqttFixedHeader.remainingLength(); checkpoint(DecoderState.READ_VARIABLE_HEADER); // fall through @@ -82,10 +87,10 @@ public final class MqttDecoder extends ReplayingDecoder { } case READ_VARIABLE_HEADER: try { - final Result decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader); + final Result decodedVariableHeader = decodeVariableHeader(ctx, buffer, mqttFixedHeader); variableHeader = decodedVariableHeader.value; if (bytesRemainingInVariablePart > maxBytesInMessage) { - throw new DecoderException("too large message: " + bytesRemainingInVariablePart + " bytes"); + throw new TooLongFrameException("too large message: " + bytesRemainingInVariablePart + " bytes"); } bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed; checkpoint(DecoderState.READ_PAYLOAD); @@ -98,6 +103,7 @@ public final class MqttDecoder extends ReplayingDecoder { case READ_PAYLOAD: try { final Result decodedPayload = decodePayload( + ctx, buffer, mqttFixedHeader.messageType(), bytesRemainingInVariablePart, @@ -142,7 +148,7 @@ public final class MqttDecoder extends ReplayingDecoder { * @param buffer the buffer to decode from * @return the fixed header */ - private static MqttFixedHeader decodeFixedHeader(ByteBuf buffer) { + private static MqttFixedHeader decodeFixedHeader(ChannelHandlerContext ctx, ByteBuf buffer) { short b1 = buffer.readUnsignedByte(); MqttMessageType messageType = MqttMessageType.valueOf(b1 >> 4); @@ -167,7 +173,7 @@ public final class MqttDecoder extends ReplayingDecoder { } MqttFixedHeader decodedFixedHeader = new MqttFixedHeader(messageType, dupFlag, MqttQoS.valueOf(qosLevel), retain, remainingLength); - return validateFixedHeader(resetUnusedFields(decodedFixedHeader)); + return validateFixedHeader(ctx, resetUnusedFields(decodedFixedHeader)); } /** @@ -176,44 +182,54 @@ public final class MqttDecoder extends ReplayingDecoder { * @param mqttFixedHeader MqttFixedHeader of the same message * @return the variable header */ - private static Result decodeVariableHeader(ByteBuf buffer, MqttFixedHeader mqttFixedHeader) { + private Result decodeVariableHeader(ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) { switch (mqttFixedHeader.messageType()) { case CONNECT: - return decodeConnectionVariableHeader(buffer); + return decodeConnectionVariableHeader(ctx, buffer); case CONNACK: - return decodeConnAckVariableHeader(buffer); + return decodeConnAckVariableHeader(ctx, buffer); - case SUBSCRIBE: case UNSUBSCRIBE: + case SUBSCRIBE: case SUBACK: case UNSUBACK: + return decodeMessageIdAndPropertiesVariableHeader(ctx, buffer); + case PUBACK: case PUBREC: case PUBCOMP: case PUBREL: - return decodeMessageIdVariableHeader(buffer); + return decodePubReplyMessage(buffer); case PUBLISH: - return decodePublishVariableHeader(buffer, mqttFixedHeader); + return decodePublishVariableHeader(ctx, buffer, mqttFixedHeader); + + case DISCONNECT: + case AUTH: + return decodeReasonCodeAndPropertiesVariableHeader(buffer); case PINGREQ: case PINGRESP: - case DISCONNECT: // Empty variable header return new Result(null, 0); + default: + //shouldn't reach here + throw new DecoderException("Unknown message type: " + mqttFixedHeader.messageType()); } - return new Result(null, 0); //should never reach here } - private static Result decodeConnectionVariableHeader(ByteBuf buffer) { + private static Result decodeConnectionVariableHeader( + ChannelHandlerContext ctx, + ByteBuf buffer) { final Result protoString = decodeString(buffer); int numberOfBytesConsumed = protoString.numberOfBytesConsumed; final byte protocolLevel = buffer.readByte(); numberOfBytesConsumed += 1; - final MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel); + MqttVersion version = MqttVersion.fromProtocolNameAndLevel(protoString.value, protocolLevel); + MqttCodecUtil.setMqttVersion(ctx, version); final int b1 = buffer.readUnsignedByte(); numberOfBytesConsumed += 1; @@ -227,7 +243,7 @@ public final class MqttDecoder extends ReplayingDecoder { final int willQos = (b1 & 0x18) >> 3; final boolean willFlag = (b1 & 0x04) == 0x04; final boolean cleanSession = (b1 & 0x02) == 0x02; - if (mqttVersion == MqttVersion.MQTT_3_1_1) { + if (version == MqttVersion.MQTT_3_1_1 || version == MqttVersion.MQTT_5) { final boolean zeroReservedFlag = (b1 & 0x01) == 0x0; if (!zeroReservedFlag) { // MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is @@ -237,25 +253,48 @@ public final class MqttDecoder extends ReplayingDecoder { } } + final MqttProperties properties; + if (version == MqttVersion.MQTT_5) { + final Result propertiesResult = decodeProperties(buffer); + properties = propertiesResult.value; + numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed; + } else { + properties = MqttProperties.NO_PROPERTIES; + } + final MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader( - mqttVersion.protocolName(), - mqttVersion.protocolLevel(), + version.protocolName(), + version.protocolLevel(), hasUserName, hasPassword, willRetain, willQos, willFlag, cleanSession, - keepAlive.value); + keepAlive.value, + properties); return new Result(mqttConnectVariableHeader, numberOfBytesConsumed); } - private static Result decodeConnAckVariableHeader(ByteBuf buffer) { + private static Result decodeConnAckVariableHeader( + ChannelHandlerContext ctx, + ByteBuf buffer) { + final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); final boolean sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01; byte returnCode = buffer.readByte(); - final int numberOfBytesConsumed = 2; + int numberOfBytesConsumed = 2; + + final MqttProperties properties; + if (mqttVersion == MqttVersion.MQTT_5) { + final Result propertiesResult = decodeProperties(buffer); + properties = propertiesResult.value; + numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed; + } else { + properties = MqttProperties.NO_PROPERTIES; + } + final MqttConnAckVariableHeader mqttConnAckVariableHeader = - new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent); + new MqttConnAckVariableHeader(MqttConnectReturnCode.valueOf(returnCode), sessionPresent, properties); return new Result(mqttConnAckVariableHeader, numberOfBytesConsumed); } @@ -266,9 +305,89 @@ public final class MqttDecoder extends ReplayingDecoder { messageId.numberOfBytesConsumed); } - private static Result decodePublishVariableHeader( + private static Result decodeMessageIdAndPropertiesVariableHeader( + ChannelHandlerContext ctx, + ByteBuf buffer) { + final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); + final Result packetId = decodeMessageId(buffer); + + final MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader; + final int mqtt5Consumed; + + if (mqttVersion == MqttVersion.MQTT_5) { + final Result properties = decodeProperties(buffer); + mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId.value, properties.value); + mqtt5Consumed = properties.numberOfBytesConsumed; + } else { + mqttVariableHeader = new MqttMessageIdAndPropertiesVariableHeader(packetId.value, + MqttProperties.NO_PROPERTIES); + mqtt5Consumed = 0; + } + + return new Result(mqttVariableHeader, + packetId.numberOfBytesConsumed + mqtt5Consumed); + } + + private Result decodePubReplyMessage(ByteBuf buffer) { + final Result packetId = decodeMessageId(buffer); + + final MqttPubReplyMessageVariableHeader mqttPubAckVariableHeader; + final int consumed; + if (bytesRemainingInVariablePart > 3) { + final byte reasonCode = buffer.readByte(); + final Result properties = decodeProperties(buffer); + mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId.value, + reasonCode, + properties.value); + consumed = packetId.numberOfBytesConsumed + 1 + properties.numberOfBytesConsumed; + } else if (bytesRemainingInVariablePart > 2) { + final byte reasonCode = buffer.readByte(); + mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId.value, + reasonCode, + MqttProperties.NO_PROPERTIES); + consumed = packetId.numberOfBytesConsumed + 1; + } else { + mqttPubAckVariableHeader = new MqttPubReplyMessageVariableHeader(packetId.value, + (byte) 0, + MqttProperties.NO_PROPERTIES); + consumed = packetId.numberOfBytesConsumed; + } + + return new Result(mqttPubAckVariableHeader, consumed); + } + + private Result decodeReasonCodeAndPropertiesVariableHeader( + ByteBuf buffer) { + final byte reasonCode; + final MqttProperties properties; + final int consumed; + if (bytesRemainingInVariablePart > 1) { + reasonCode = buffer.readByte(); + final Result propertiesResult = decodeProperties(buffer); + properties = propertiesResult.value; + consumed = 1 + propertiesResult.numberOfBytesConsumed; + } else if (bytesRemainingInVariablePart > 0) { + reasonCode = buffer.readByte(); + properties = MqttProperties.NO_PROPERTIES; + consumed = 1; + } else { + reasonCode = 0; + properties = MqttProperties.NO_PROPERTIES; + consumed = 0; + } + final MqttReasonCodeAndPropertiesVariableHeader mqttReasonAndPropsVariableHeader = + new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties); + + return new Result( + mqttReasonAndPropsVariableHeader, + consumed); + } + + private Result decodePublishVariableHeader( + ChannelHandlerContext ctx, ByteBuf buffer, MqttFixedHeader mqttFixedHeader) { + final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); final Result decodedTopic = decodeString(buffer); if (!isValidPublishTopicName(decodedTopic.value)) { throw new DecoderException("invalid publish topic name: " + decodedTopic.value + " (contains wildcards)"); @@ -281,8 +400,18 @@ public final class MqttDecoder extends ReplayingDecoder { messageId = decodedMessageId.value; numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed; } + + final MqttProperties properties; + if (mqttVersion == MqttVersion.MQTT_5) { + final Result propertiesResult = decodeProperties(buffer); + properties = propertiesResult.value; + numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed; + } else { + properties = MqttProperties.NO_PROPERTIES; + } + final MqttPublishVariableHeader mqttPublishVariableHeader = - new MqttPublishVariableHeader(decodedTopic.value, messageId); + new MqttPublishVariableHeader(decodedTopic.value, messageId, properties); return new Result(mqttPublishVariableHeader, numberOfBytesConsumed); } @@ -304,6 +433,7 @@ public final class MqttDecoder extends ReplayingDecoder { * @return the payload */ private static Result decodePayload( + ChannelHandlerContext ctx, ByteBuf buffer, MqttMessageType messageType, int bytesRemainingInVariablePart, @@ -321,6 +451,9 @@ public final class MqttDecoder extends ReplayingDecoder { case UNSUBSCRIBE: return decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart); + case UNSUBACK: + return decodeUnsubAckPayload(ctx, buffer, bytesRemainingInVariablePart); + case PUBLISH: return decodePublishPayload(buffer, bytesRemainingInVariablePart); @@ -344,11 +477,22 @@ public final class MqttDecoder extends ReplayingDecoder { Result decodedWillTopic = null; Result decodedWillMessage = null; + + final MqttProperties willProperties; if (mqttConnectVariableHeader.isWillFlag()) { + if (mqttVersion == MqttVersion.MQTT_5) { + final Result propertiesResult = decodeProperties(buffer); + willProperties = propertiesResult.value; + numberOfBytesConsumed += propertiesResult.numberOfBytesConsumed; + } else { + willProperties = MqttProperties.NO_PROPERTIES; + } decodedWillTopic = decodeString(buffer, 0, 32767); numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed; decodedWillMessage = decodeByteArray(buffer); numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed; + } else { + willProperties = MqttProperties.NO_PROPERTIES; } Result decodedUserName = null; Result decodedPassword = null; @@ -364,6 +508,7 @@ public final class MqttDecoder extends ReplayingDecoder { final MqttConnectPayload mqttConnectPayload = new MqttConnectPayload( decodedClientId.value, + willProperties, decodedWillTopic != null ? decodedWillTopic.value : null, decodedWillMessage != null ? decodedWillMessage.value : null, decodedUserName != null ? decodedUserName.value : null, @@ -379,9 +524,21 @@ public final class MqttDecoder extends ReplayingDecoder { while (numberOfBytesConsumed < bytesRemainingInVariablePart) { final Result decodedTopicName = decodeString(buffer); numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed; - int qos = buffer.readUnsignedByte() & 0x03; + //See 3.8.3.1 Subscription Options of MQTT 5.0 specification for optionByte details + final short optionByte = buffer.readUnsignedByte(); + + MqttQoS qos = MqttQoS.valueOf(optionByte & 0x03); + boolean noLocal = ((optionByte & 0x04) >> 2) == 1; + boolean retainAsPublished = ((optionByte & 0x08) >> 3) == 1; + RetainedHandlingPolicy retainHandling = RetainedHandlingPolicy.valueOf((optionByte & 0x30) >> 4); + + final MqttSubscriptionOption subscriptionOption = new MqttSubscriptionOption(qos, + noLocal, + retainAsPublished, + retainHandling); + numberOfBytesConsumed++; - subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, MqttQoS.valueOf(qos))); + subscribeTopics.add(new MqttTopicSubscription(decodedTopicName.value, subscriptionOption)); } return new Result(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed); } @@ -402,6 +559,20 @@ public final class MqttDecoder extends ReplayingDecoder { return new Result(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed); } + private static Result decodeUnsubAckPayload( + ChannelHandlerContext ctx, + ByteBuf buffer, + int bytesRemainingInVariablePart) { + final List reasonCodes = new ArrayList(bytesRemainingInVariablePart); + int numberOfBytesConsumed = 0; + while (numberOfBytesConsumed < bytesRemainingInVariablePart) { + short reasonCode = buffer.readUnsignedByte(); + numberOfBytesConsumed++; + reasonCodes.add(reasonCode); + } + return new Result(new MqttUnsubAckPayload(reasonCodes), numberOfBytesConsumed); + } + private static Result decodeUnsubscribePayload( ByteBuf buffer, int bytesRemainingInVariablePart) { @@ -464,6 +635,31 @@ public final class MqttDecoder extends ReplayingDecoder { return new Result(result, numberOfBytesConsumed); } + /** + * See 1.5.5 Variable Byte Integer section of MQTT 5.0 specification for encoding/decoding rules + * + * @param buffer the buffer to decode from + * @return decoded integer + */ + private static Result decodeVariableByteInteger(ByteBuf buffer) { + int remainingLength = 0; + int multiplier = 1; + short digit; + int loops = 0; + do { + digit = buffer.readUnsignedByte(); + remainingLength += (digit & 127) * multiplier; + multiplier *= 128; + loops++; + } while ((digit & 128) != 0 && loops < 4); + + // MQTT protocol limits Remaining Length to 4 bytes + if (loops == 4 && (digit & 128) != 0) { + return null; + } + return new Result(remainingLength, loops); + } + private static final class Result { private final T value; @@ -474,4 +670,82 @@ public final class MqttDecoder extends ReplayingDecoder { this.numberOfBytesConsumed = numberOfBytesConsumed; } } + + private static Result decodeProperties(ByteBuf buffer) { + final Result propertiesLength = decodeVariableByteInteger(buffer); + int totalPropertiesLength = propertiesLength.value; + int numberOfBytesConsumed = propertiesLength.numberOfBytesConsumed; + + MqttProperties decodedProperties = new MqttProperties(); + while (numberOfBytesConsumed < totalPropertiesLength) { + Result propertyId = decodeVariableByteInteger(buffer); + numberOfBytesConsumed += propertyId.numberOfBytesConsumed; + + MqttProperties.MqttPropertyType propertyType = MqttProperties.MqttPropertyType.valueOf(propertyId.value); + switch (propertyType) { + case PAYLOAD_FORMAT_INDICATOR: + case REQUEST_PROBLEM_INFORMATION: + case REQUEST_RESPONSE_INFORMATION: + case MAXIMUM_QOS: + case RETAIN_AVAILABLE: + case WILDCARD_SUBSCRIPTION_AVAILABLE: + case SUBSCRIPTION_IDENTIFIER_AVAILABLE: + case SHARED_SUBSCRIPTION_AVAILABLE: + final int b1 = buffer.readUnsignedByte(); + numberOfBytesConsumed++; + decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, b1)); + break; + case SERVER_KEEP_ALIVE: + case RECEIVE_MAXIMUM: + case TOPIC_ALIAS_MAXIMUM: + case TOPIC_ALIAS: + final Result int2BytesResult = decodeMsbLsb(buffer); + numberOfBytesConsumed += int2BytesResult.numberOfBytesConsumed; + decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, int2BytesResult.value)); + break; + case PUBLICATION_EXPIRY_INTERVAL: + case SESSION_EXPIRY_INTERVAL: + case WILL_DELAY_INTERVAL: + case MAXIMUM_PACKET_SIZE: + final int maxPacketSize = buffer.readInt(); + numberOfBytesConsumed += 4; + decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, maxPacketSize)); + break; + case SUBSCRIPTION_IDENTIFIER: + Result vbIntegerResult = decodeVariableByteInteger(buffer); + numberOfBytesConsumed += vbIntegerResult.numberOfBytesConsumed; + decodedProperties.add(new MqttProperties.IntegerProperty(propertyId.value, vbIntegerResult.value)); + break; + case CONTENT_TYPE: + case RESPONSE_TOPIC: + case ASSIGNED_CLIENT_IDENTIFIER: + case AUTHENTICATION_METHOD: + case RESPONSE_INFORMATION: + case SERVER_REFERENCE: + case REASON_STRING: + final Result stringResult = decodeString(buffer); + numberOfBytesConsumed += stringResult.numberOfBytesConsumed; + decodedProperties.add(new MqttProperties.StringProperty(propertyId.value, stringResult.value)); + break; + case USER_PROPERTY: + final Result keyResult = decodeString(buffer); + final Result valueResult = decodeString(buffer); + numberOfBytesConsumed += keyResult.numberOfBytesConsumed; + numberOfBytesConsumed += valueResult.numberOfBytesConsumed; + decodedProperties.add(new MqttProperties.UserProperty(keyResult.value, valueResult.value)); + break; + case CORRELATION_DATA: + case AUTHENTICATION_DATA: + final Result binaryDataResult = decodeByteArray(buffer); + numberOfBytesConsumed += binaryDataResult.numberOfBytesConsumed; + decodedProperties.add(new MqttProperties.BinaryProperty(propertyId.value, binaryDataResult.value)); + break; + default: + //shouldn't reach here + throw new DecoderException("Unknown property type: " + propertyType); + } + } + + return new Result(decodedProperties, numberOfBytesConsumed); + } } diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java index 29eeb6cc29..417655717b 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttEncoder.java @@ -18,9 +18,10 @@ package io.netty.handler.codec.mqtt; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.EncoderException; import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.util.CharsetUtil; import io.netty.util.internal.EmptyArrays; @@ -32,6 +33,8 @@ import static io.netty.handler.codec.mqtt.MqttCodecUtil.*; /** * Encodes Mqtt messages into bytes following the protocol specification v3.1 * as described here MQTTV3.1 + * or v5.0 as described here MQTTv5.0 - + * depending on the version specified in the first CONNECT message that goes through the channel. */ @ChannelHandler.Sharable public final class MqttEncoder extends MessageToMessageEncoder { @@ -42,49 +45,57 @@ public final class MqttEncoder extends MessageToMessageEncoder { @Override protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List out) throws Exception { - out.add(doEncode(ctx.alloc(), msg)); + out.add(doEncode(ctx, msg)); } /** * This is the main encoding method. * It's only visible for testing. * - * @param byteBufAllocator Allocates ByteBuf * @param message MQTT message to encode * @return ByteBuf with encoded bytes */ - static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) { + static ByteBuf doEncode(ChannelHandlerContext ctx, + MqttMessage message) { switch (message.fixedHeader().messageType()) { case CONNECT: - return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message); + return encodeConnectMessage(ctx, (MqttConnectMessage) message); case CONNACK: - return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message); + return encodeConnAckMessage(ctx, (MqttConnAckMessage) message); case PUBLISH: - return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message); + return encodePublishMessage(ctx, (MqttPublishMessage) message); case SUBSCRIBE: - return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message); + return encodeSubscribeMessage(ctx, (MqttSubscribeMessage) message); case UNSUBSCRIBE: - return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message); + return encodeUnsubscribeMessage(ctx, (MqttUnsubscribeMessage) message); case SUBACK: - return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message); + return encodeSubAckMessage(ctx, (MqttSubAckMessage) message); case UNSUBACK: + if (message instanceof MqttUnsubAckMessage) { + return encodeUnsubAckMessage(ctx, (MqttUnsubAckMessage) message); + } + return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message); + case PUBACK: case PUBREC: case PUBREL: case PUBCOMP: - return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message); + return encodePubReplyMessage(ctx, message); + + case DISCONNECT: + case AUTH: + return encodeReasonCodePlusPropertiesMessage(ctx, message); case PINGREQ: case PINGRESP: - case DISCONNECT: - return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message); + return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message); default: throw new IllegalArgumentException( @@ -93,7 +104,7 @@ public final class MqttEncoder extends MessageToMessageEncoder { } private static ByteBuf encodeConnectMessage( - ByteBufAllocator byteBufAllocator, + ChannelHandlerContext ctx, MqttConnectMessage message) { int payloadBufferSize = 0; @@ -102,10 +113,11 @@ public final class MqttEncoder extends MessageToMessageEncoder { MqttConnectPayload payload = message.payload(); MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(), (byte) variableHeader.version()); + MqttCodecUtil.setMqttVersion(ctx, mqttVersion); // as MQTT 3.1 & 3.1.1 spec, If the User Name Flag is set to 0, the Password Flag MUST be set to 0 if (!variableHeader.hasUserName() && variableHeader.hasPassword()) { - throw new DecoderException("Without a username, the password MUST be not set"); + throw new EncoderException("Without a username, the password MUST be not set"); } // Client id @@ -138,40 +150,62 @@ public final class MqttEncoder extends MessageToMessageEncoder { payloadBufferSize += 2 + passwordBytes.length; } - // Fixed header + // Fixed and variable header byte[] protocolNameBytes = mqttVersion.protocolNameBytes(); - int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4; - int variablePartSize = variableHeaderBufferSize + payloadBufferSize; - int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); - ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); - buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); - writeVariableLengthInt(buf, variablePartSize); + ByteBuf propertiesBuf = encodePropertiesIfNeeded( + mqttVersion, + ctx.alloc(), + message.variableHeader().properties()); + try { + final ByteBuf willPropertiesBuf; + if (variableHeader.isWillFlag()) { + willPropertiesBuf = encodePropertiesIfNeeded(mqttVersion, ctx.alloc(), payload.willProperties()); + payloadBufferSize += willPropertiesBuf.readableBytes(); + } else { + willPropertiesBuf = Unpooled.EMPTY_BUFFER; + } + try { + int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4 + propertiesBuf.readableBytes(); - buf.writeShort(protocolNameBytes.length); - buf.writeBytes(protocolNameBytes); + int variablePartSize = variableHeaderBufferSize + payloadBufferSize; + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + writeVariableLengthInt(buf, variablePartSize); - buf.writeByte(variableHeader.version()); - buf.writeByte(getConnVariableHeaderFlag(variableHeader)); - buf.writeShort(variableHeader.keepAliveTimeSeconds()); + buf.writeShort(protocolNameBytes.length); + buf.writeBytes(protocolNameBytes); - // Payload - buf.writeShort(clientIdentifierBytes.length); - buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length); - if (variableHeader.isWillFlag()) { - buf.writeShort(willTopicBytes.length); - buf.writeBytes(willTopicBytes, 0, willTopicBytes.length); - buf.writeShort(willMessageBytes.length); - buf.writeBytes(willMessageBytes, 0, willMessageBytes.length); + buf.writeByte(variableHeader.version()); + buf.writeByte(getConnVariableHeaderFlag(variableHeader)); + buf.writeShort(variableHeader.keepAliveTimeSeconds()); + buf.writeBytes(propertiesBuf); + + // Payload + buf.writeShort(clientIdentifierBytes.length); + buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length); + if (variableHeader.isWillFlag()) { + buf.writeBytes(willPropertiesBuf); + buf.writeShort(willTopicBytes.length); + buf.writeBytes(willTopicBytes, 0, willTopicBytes.length); + buf.writeShort(willMessageBytes.length); + buf.writeBytes(willMessageBytes, 0, willMessageBytes.length); + } + if (variableHeader.hasUserName()) { + buf.writeShort(userNameBytes.length); + buf.writeBytes(userNameBytes, 0, userNameBytes.length); + } + if (variableHeader.hasPassword()) { + buf.writeShort(passwordBytes.length); + buf.writeBytes(passwordBytes, 0, passwordBytes.length); + } + return buf; + } finally { + willPropertiesBuf.release(); + } + } finally { + propertiesBuf.release(); } - if (variableHeader.hasUserName()) { - buf.writeShort(userNameBytes.length); - buf.writeBytes(userNameBytes, 0, userNameBytes.length); - } - if (variableHeader.hasPassword()) { - buf.writeShort(passwordBytes.length); - buf.writeBytes(passwordBytes, 0, passwordBytes.length); - } - return buf; } private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) { @@ -196,114 +230,193 @@ public final class MqttEncoder extends MessageToMessageEncoder { } private static ByteBuf encodeConnAckMessage( - ByteBufAllocator byteBufAllocator, + ChannelHandlerContext ctx, MqttConnAckMessage message) { - ByteBuf buf = byteBufAllocator.buffer(4); - buf.writeByte(getFixedHeaderByte1(message.fixedHeader())); - buf.writeByte(2); - buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00); - buf.writeByte(message.variableHeader().connectReturnCode().byteValue()); + final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); + ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, + ctx.alloc(), + message.variableHeader().properties()); - return buf; + try { + ByteBuf buf = ctx.alloc().buffer(4 + propertiesBuf.readableBytes()); + buf.writeByte(getFixedHeaderByte1(message.fixedHeader())); + writeVariableLengthInt(buf, 2 + propertiesBuf.readableBytes()); + buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00); + buf.writeByte(message.variableHeader().connectReturnCode().byteValue()); + buf.writeBytes(propertiesBuf); + return buf; + } finally { + propertiesBuf.release(); + } } private static ByteBuf encodeSubscribeMessage( - ByteBufAllocator byteBufAllocator, + ChannelHandlerContext ctx, MqttSubscribeMessage message) { - int variableHeaderBufferSize = 2; - int payloadBufferSize = 0; + MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); + ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, + ctx.alloc(), + message.idAndPropertiesVariableHeader().properties()); - MqttFixedHeader mqttFixedHeader = message.fixedHeader(); - MqttMessageIdVariableHeader variableHeader = message.variableHeader(); - MqttSubscribePayload payload = message.payload(); + try { + final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes(); + int payloadBufferSize = 0; - for (MqttTopicSubscription topic : payload.topicSubscriptions()) { - String topicName = topic.topicName(); - byte[] topicNameBytes = encodeStringUtf8(topicName); - payloadBufferSize += 2 + topicNameBytes.length; - payloadBufferSize += 1; + MqttFixedHeader mqttFixedHeader = message.fixedHeader(); + MqttMessageIdVariableHeader variableHeader = message.variableHeader(); + MqttSubscribePayload payload = message.payload(); + + for (MqttTopicSubscription topic : payload.topicSubscriptions()) { + String topicName = topic.topicName(); + byte[] topicNameBytes = encodeStringUtf8(topicName); + payloadBufferSize += 2 + topicNameBytes.length; + payloadBufferSize += 1; + } + + int variablePartSize = variableHeaderBufferSize + payloadBufferSize; + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + + ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + writeVariableLengthInt(buf, variablePartSize); + + // Variable Header + int messageId = variableHeader.messageId(); + buf.writeShort(messageId); + buf.writeBytes(propertiesBuf); + + // Payload + for (MqttTopicSubscription topic : payload.topicSubscriptions()) { + writeUTF8String(buf, topic.topicName()); + final MqttSubscriptionOption option = topic.option(); + + int optionEncoded = option.retainHandling().value() << 4; + if (option.isRetainAsPublished()) { + optionEncoded |= 0x08; + } + if (option.isNoLocal()) { + optionEncoded |= 0x04; + } + optionEncoded |= option.qos().value(); + + buf.writeByte(optionEncoded); + } + + return buf; + } finally { + propertiesBuf.release(); } - - int variablePartSize = variableHeaderBufferSize + payloadBufferSize; - int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); - - ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); - buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); - writeVariableLengthInt(buf, variablePartSize); - - // Variable Header - int messageId = variableHeader.messageId(); - buf.writeShort(messageId); - - // Payload - for (MqttTopicSubscription topic : payload.topicSubscriptions()) { - String topicName = topic.topicName(); - byte[] topicNameBytes = encodeStringUtf8(topicName); - buf.writeShort(topicNameBytes.length); - buf.writeBytes(topicNameBytes, 0, topicNameBytes.length); - buf.writeByte(topic.qualityOfService().value()); - } - - return buf; } private static ByteBuf encodeUnsubscribeMessage( - ByteBufAllocator byteBufAllocator, + ChannelHandlerContext ctx, MqttUnsubscribeMessage message) { - int variableHeaderBufferSize = 2; - int payloadBufferSize = 0; + MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); + ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, + ctx.alloc(), + message.idAndPropertiesVariableHeader().properties()); - MqttFixedHeader mqttFixedHeader = message.fixedHeader(); - MqttMessageIdVariableHeader variableHeader = message.variableHeader(); - MqttUnsubscribePayload payload = message.payload(); + try { + final int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes(); + int payloadBufferSize = 0; - for (String topicName : payload.topics()) { - byte[] topicNameBytes = encodeStringUtf8(topicName); - payloadBufferSize += 2 + topicNameBytes.length; + MqttFixedHeader mqttFixedHeader = message.fixedHeader(); + MqttMessageIdVariableHeader variableHeader = message.variableHeader(); + MqttUnsubscribePayload payload = message.payload(); + + for (String topicName : payload.topics()) { + byte[] topicNameBytes = encodeStringUtf8(topicName); + payloadBufferSize += 2 + topicNameBytes.length; + } + + int variablePartSize = variableHeaderBufferSize + payloadBufferSize; + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + + ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + writeVariableLengthInt(buf, variablePartSize); + + // Variable Header + int messageId = variableHeader.messageId(); + buf.writeShort(messageId); + buf.writeBytes(propertiesBuf); + + // Payload + for (String topicName : payload.topics()) { + byte[] topicNameBytes = encodeStringUtf8(topicName); + buf.writeShort(topicNameBytes.length); + buf.writeBytes(topicNameBytes, 0, topicNameBytes.length); + } + + return buf; + } finally { + propertiesBuf.release(); } - - int variablePartSize = variableHeaderBufferSize + payloadBufferSize; - int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); - - ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); - buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); - writeVariableLengthInt(buf, variablePartSize); - - // Variable Header - int messageId = variableHeader.messageId(); - buf.writeShort(messageId); - - // Payload - for (String topicName : payload.topics()) { - byte[] topicNameBytes = encodeStringUtf8(topicName); - buf.writeShort(topicNameBytes.length); - buf.writeBytes(topicNameBytes, 0, topicNameBytes.length); - } - - return buf; } private static ByteBuf encodeSubAckMessage( - ByteBufAllocator byteBufAllocator, + ChannelHandlerContext ctx, MqttSubAckMessage message) { - int variableHeaderBufferSize = 2; - int payloadBufferSize = message.payload().grantedQoSLevels().size(); - int variablePartSize = variableHeaderBufferSize + payloadBufferSize; - int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); - ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); - buf.writeByte(getFixedHeaderByte1(message.fixedHeader())); - writeVariableLengthInt(buf, variablePartSize); - buf.writeShort(message.variableHeader().messageId()); - for (int qos : message.payload().grantedQoSLevels()) { - buf.writeByte(qos); - } + MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); + ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, + ctx.alloc(), + message.idAndPropertiesVariableHeader().properties()); + try { + int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes(); + int payloadBufferSize = message.payload().grantedQoSLevels().size(); + int variablePartSize = variableHeaderBufferSize + payloadBufferSize; + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize); + buf.writeByte(getFixedHeaderByte1(message.fixedHeader())); + writeVariableLengthInt(buf, variablePartSize); + buf.writeShort(message.variableHeader().messageId()); + buf.writeBytes(propertiesBuf); + for (int qos : message.payload().grantedQoSLevels()) { + buf.writeByte(qos); + } - return buf; + return buf; + } finally { + propertiesBuf.release(); + } + } + + private static ByteBuf encodeUnsubAckMessage( + ChannelHandlerContext ctx, + MqttUnsubAckMessage message) { + if (message.variableHeader() instanceof MqttMessageIdAndPropertiesVariableHeader) { + MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); + ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, + ctx.alloc(), + message.idAndPropertiesVariableHeader().properties()); + try { + int variableHeaderBufferSize = 2 + propertiesBuf.readableBytes(); + int payloadBufferSize = message.payload().unsubscribeReasonCodes().size(); + int variablePartSize = variableHeaderBufferSize + payloadBufferSize; + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize); + buf.writeByte(getFixedHeaderByte1(message.fixedHeader())); + writeVariableLengthInt(buf, variablePartSize); + buf.writeShort(message.variableHeader().messageId()); + buf.writeBytes(propertiesBuf); + + for (Short reasonCode : message.payload().unsubscribeReasonCodes()) { + buf.writeByte(reasonCode); + } + + return buf; + } finally { + propertiesBuf.release(); + } + } else { + return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message); + } } private static ByteBuf encodePublishMessage( - ByteBufAllocator byteBufAllocator, + ChannelHandlerContext ctx, MqttPublishMessage message) { + MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttPublishVariableHeader variableHeader = message.variableHeader(); ByteBuf payload = message.payload().duplicate(); @@ -311,23 +424,76 @@ public final class MqttEncoder extends MessageToMessageEncoder { String topicName = variableHeader.topicName(); byte[] topicNameBytes = encodeStringUtf8(topicName); - int variableHeaderBufferSize = 2 + topicNameBytes.length + - (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0); - int payloadBufferSize = payload.readableBytes(); - int variablePartSize = variableHeaderBufferSize + payloadBufferSize; - int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + ByteBuf propertiesBuf = encodePropertiesIfNeeded(mqttVersion, + ctx.alloc(), + message.variableHeader().properties()); - ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); - buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); - writeVariableLengthInt(buf, variablePartSize); - buf.writeShort(topicNameBytes.length); - buf.writeBytes(topicNameBytes); - if (mqttFixedHeader.qosLevel().value() > 0) { - buf.writeShort(variableHeader.packetId()); + try { + int variableHeaderBufferSize = 2 + topicNameBytes.length + + (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0) + propertiesBuf.readableBytes(); + int payloadBufferSize = payload.readableBytes(); + int variablePartSize = variableHeaderBufferSize + payloadBufferSize; + int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); + + ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variablePartSize); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + writeVariableLengthInt(buf, variablePartSize); + buf.writeShort(topicNameBytes.length); + buf.writeBytes(topicNameBytes); + if (mqttFixedHeader.qosLevel().value() > 0) { + buf.writeShort(variableHeader.packetId()); + } + buf.writeBytes(propertiesBuf); + buf.writeBytes(payload); + + return buf; + } finally { + propertiesBuf.release(); } - buf.writeBytes(payload); + } - return buf; + private static ByteBuf encodePubReplyMessage(ChannelHandlerContext ctx, + MqttMessage message) { + if (message.variableHeader() instanceof MqttPubReplyMessageVariableHeader) { + MqttFixedHeader mqttFixedHeader = message.fixedHeader(); + MqttPubReplyMessageVariableHeader variableHeader = + (MqttPubReplyMessageVariableHeader) message.variableHeader(); + int msgId = variableHeader.messageId(); + + final ByteBuf propertiesBuf; + final boolean includeReasonCode; + final int variableHeaderBufferSize; + final MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); + if (mqttVersion == MqttVersion.MQTT_5 && + (variableHeader.reasonCode() != MqttPubReplyMessageVariableHeader.REASON_CODE_OK || + !variableHeader.properties().isEmpty())) { + propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties()); + includeReasonCode = true; + variableHeaderBufferSize = 3 + propertiesBuf.readableBytes(); + } else { + propertiesBuf = Unpooled.EMPTY_BUFFER; + includeReasonCode = false; + variableHeaderBufferSize = 2; + } + + try { + final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize); + ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + writeVariableLengthInt(buf, variableHeaderBufferSize); + buf.writeShort(msgId); + if (includeReasonCode) { + buf.writeByte(variableHeader.reasonCode()); + } + buf.writeBytes(propertiesBuf); + + return buf; + } finally { + propertiesBuf.release(); + } + } else { + return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(ctx.alloc(), message); + } } private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId( @@ -347,6 +513,49 @@ public final class MqttEncoder extends MessageToMessageEncoder { return buf; } + private static ByteBuf encodeReasonCodePlusPropertiesMessage( + ChannelHandlerContext ctx, + MqttMessage message) { + if (message.variableHeader() instanceof MqttReasonCodeAndPropertiesVariableHeader) { + MqttVersion mqttVersion = MqttCodecUtil.getMqttVersion(ctx); + MqttFixedHeader mqttFixedHeader = message.fixedHeader(); + MqttReasonCodeAndPropertiesVariableHeader variableHeader = + (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader(); + + final ByteBuf propertiesBuf; + final boolean includeReasonCode; + final int variableHeaderBufferSize; + if (mqttVersion == MqttVersion.MQTT_5 && + (variableHeader.reasonCode() != MqttReasonCodeAndPropertiesVariableHeader.REASON_CODE_OK || + !variableHeader.properties().isEmpty())) { + propertiesBuf = encodeProperties(ctx.alloc(), variableHeader.properties()); + includeReasonCode = true; + variableHeaderBufferSize = 1 + propertiesBuf.readableBytes(); + } else { + propertiesBuf = Unpooled.EMPTY_BUFFER; + includeReasonCode = false; + variableHeaderBufferSize = 0; + } + + try { + final int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize); + ByteBuf buf = ctx.alloc().buffer(fixedHeaderBufferSize + variableHeaderBufferSize); + buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); + writeVariableLengthInt(buf, variableHeaderBufferSize); + if (includeReasonCode) { + buf.writeByte(variableHeader.reasonCode()); + } + buf.writeBytes(propertiesBuf); + + return buf; + } finally { + propertiesBuf.release(); + } + } else { + return encodeMessageWithOnlySingleByteFixedHeader(ctx.alloc(), message); + } + } + private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader( ByteBufAllocator byteBufAllocator, MqttMessage message) { @@ -358,6 +567,104 @@ public final class MqttEncoder extends MessageToMessageEncoder { return buf; } + private static ByteBuf encodePropertiesIfNeeded(MqttVersion mqttVersion, + ByteBufAllocator byteBufAllocator, + MqttProperties mqttProperties) { + if (mqttVersion == MqttVersion.MQTT_5) { + return encodeProperties(byteBufAllocator, mqttProperties); + } + return Unpooled.EMPTY_BUFFER; + } + + private static ByteBuf encodeProperties(ByteBufAllocator byteBufAllocator, + MqttProperties mqttProperties) { + ByteBuf propertiesHeaderBuf = byteBufAllocator.buffer(); + // encode also the Properties part + try { + ByteBuf propertiesBuf = byteBufAllocator.buffer(); + try { + for (MqttProperties.MqttProperty property : mqttProperties.listAll()) { + MqttProperties.MqttPropertyType propertyType = + MqttProperties.MqttPropertyType.valueOf(property.propertyId); + switch (propertyType) { + case PAYLOAD_FORMAT_INDICATOR: + case REQUEST_PROBLEM_INFORMATION: + case REQUEST_RESPONSE_INFORMATION: + case MAXIMUM_QOS: + case RETAIN_AVAILABLE: + case WILDCARD_SUBSCRIPTION_AVAILABLE: + case SUBSCRIPTION_IDENTIFIER_AVAILABLE: + case SHARED_SUBSCRIPTION_AVAILABLE: + writeVariableLengthInt(propertiesBuf, property.propertyId); + final byte bytePropValue = ((MqttProperties.IntegerProperty) property).value.byteValue(); + propertiesBuf.writeByte(bytePropValue); + break; + case SERVER_KEEP_ALIVE: + case RECEIVE_MAXIMUM: + case TOPIC_ALIAS_MAXIMUM: + case TOPIC_ALIAS: + writeVariableLengthInt(propertiesBuf, property.propertyId); + final short twoBytesInPropValue = + ((MqttProperties.IntegerProperty) property).value.shortValue(); + propertiesBuf.writeShort(twoBytesInPropValue); + break; + case PUBLICATION_EXPIRY_INTERVAL: + case SESSION_EXPIRY_INTERVAL: + case WILL_DELAY_INTERVAL: + case MAXIMUM_PACKET_SIZE: + writeVariableLengthInt(propertiesBuf, property.propertyId); + final int fourBytesIntPropValue = ((MqttProperties.IntegerProperty) property).value; + propertiesBuf.writeInt(fourBytesIntPropValue); + break; + case SUBSCRIPTION_IDENTIFIER: + writeVariableLengthInt(propertiesBuf, property.propertyId); + final int vbi = ((MqttProperties.IntegerProperty) property).value; + writeVariableLengthInt(propertiesBuf, vbi); + break; + case CONTENT_TYPE: + case RESPONSE_TOPIC: + case ASSIGNED_CLIENT_IDENTIFIER: + case AUTHENTICATION_METHOD: + case RESPONSE_INFORMATION: + case SERVER_REFERENCE: + case REASON_STRING: + writeVariableLengthInt(propertiesBuf, property.propertyId); + writeUTF8String(propertiesBuf, ((MqttProperties.StringProperty) property).value); + break; + case USER_PROPERTY: + final List pairs = + ((MqttProperties.UserProperties) property).value; + for (MqttProperties.StringPair pair : pairs) { + writeVariableLengthInt(propertiesBuf, property.propertyId); + writeUTF8String(propertiesBuf, pair.key); + writeUTF8String(propertiesBuf, pair.value); + } + break; + case CORRELATION_DATA: + case AUTHENTICATION_DATA: + writeVariableLengthInt(propertiesBuf, property.propertyId); + final byte[] binaryPropValue = ((MqttProperties.BinaryProperty) property).value; + propertiesBuf.writeShort(binaryPropValue.length); + propertiesBuf.writeBytes(binaryPropValue, 0, binaryPropValue.length); + break; + default: + //shouldn't reach here + throw new EncoderException("Unknown property type: " + propertyType); + } + } + writeVariableLengthInt(propertiesHeaderBuf, propertiesBuf.readableBytes()); + propertiesHeaderBuf.writeBytes(propertiesBuf); + + return propertiesHeaderBuf; + } finally { + propertiesBuf.release(); + } + } catch (RuntimeException e) { + propertiesHeaderBuf.release(); + throw e; + } + } + private static int getFixedHeaderByte1(MqttFixedHeader header) { int ret = 0; ret |= header.messageType().value() << 4; @@ -382,6 +689,12 @@ public final class MqttEncoder extends MessageToMessageEncoder { } while (num > 0); } + static void writeUTF8String(ByteBuf buf, String s) { + byte[] sBytes = encodeStringUtf8(s); + buf.writeShort(sBytes.length); + buf.writeBytes(sBytes, 0, sBytes.length); + } + private static int getVariableLengthInt(int num) { int count = 0; do { diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageBuilders.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageBuilders.java index 2bb0eaa3eb..46984c3f96 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageBuilders.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageBuilders.java @@ -20,6 +20,7 @@ import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public final class MqttMessageBuilders { @@ -30,6 +31,7 @@ public final class MqttMessageBuilders { private MqttQoS qos; private ByteBuf payload; private int messageId; + private MqttProperties mqttProperties; PublishBuilder() { } @@ -59,9 +61,15 @@ public final class MqttMessageBuilders { return this; } + public PublishBuilder properties(MqttProperties properties) { + this.mqttProperties = properties; + return this; + } + public MqttPublishMessage build() { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, retained, 0); - MqttPublishVariableHeader mqttVariableHeader = new MqttPublishVariableHeader(topic, messageId); + MqttPublishVariableHeader mqttVariableHeader = + new MqttPublishVariableHeader(topic, messageId, mqttProperties); return new MqttPublishMessage(mqttFixedHeader, mqttVariableHeader, Unpooled.buffer().writeBytes(payload)); } } @@ -74,6 +82,7 @@ public final class MqttMessageBuilders { private boolean hasUser; private boolean hasPassword; private int keepAliveSecs; + private MqttProperties willProperties = MqttProperties.NO_PROPERTIES; private boolean willFlag; private boolean willRetain; private MqttQoS willQos = MqttQoS.AT_MOST_ONCE; @@ -81,6 +90,7 @@ public final class MqttMessageBuilders { private byte[] willMessage; private String username; private byte[] password; + private MqttProperties properties = MqttProperties.NO_PROPERTIES; ConnectBuilder() { } @@ -139,6 +149,11 @@ public final class MqttMessageBuilders { return this; } + public ConnectBuilder willProperties(MqttProperties willProperties) { + this.willProperties = willProperties; + return this; + } + public ConnectBuilder hasUser(boolean value) { this.hasUser = value; return this; @@ -170,6 +185,11 @@ public final class MqttMessageBuilders { return this; } + public ConnectBuilder properties(MqttProperties properties) { + this.properties = properties; + return this; + } + public MqttConnectMessage build() { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0); @@ -183,9 +203,10 @@ public final class MqttMessageBuilders { willQos.value(), willFlag, cleanSession, - keepAliveSecs); + keepAliveSecs, + properties); MqttConnectPayload mqttConnectPayload = - new MqttConnectPayload(clientId, willTopic, willMessage, username, password); + new MqttConnectPayload(clientId, willProperties, willTopic, willMessage, username, password); return new MqttConnectMessage(mqttFixedHeader, mqttConnectVariableHeader, mqttConnectPayload); } } @@ -194,36 +215,54 @@ public final class MqttMessageBuilders { private List subscriptions; private int messageId; + private MqttProperties properties; SubscribeBuilder() { } public SubscribeBuilder addSubscription(MqttQoS qos, String topic) { - if (subscriptions == null) { - subscriptions = new ArrayList(5); - } + ensureSubscriptionsExist(); subscriptions.add(new MqttTopicSubscription(topic, qos)); return this; } + public SubscribeBuilder addSubscription(String topic, MqttSubscriptionOption option) { + ensureSubscriptionsExist(); + subscriptions.add(new MqttTopicSubscription(topic, option)); + return this; + } + public SubscribeBuilder messageId(int messageId) { this.messageId = messageId; return this; } + public SubscribeBuilder properties(MqttProperties properties) { + this.properties = properties; + return this; + } + public MqttSubscribeMessage build() { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0); - MqttMessageIdVariableHeader mqttVariableHeader = MqttMessageIdVariableHeader.from(messageId); + MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader = + new MqttMessageIdAndPropertiesVariableHeader(messageId, properties); MqttSubscribePayload mqttSubscribePayload = new MqttSubscribePayload(subscriptions); return new MqttSubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload); } + + private void ensureSubscriptionsExist() { + if (subscriptions == null) { + subscriptions = new ArrayList(5); + } + } } public static final class UnsubscribeBuilder { private List topicFilters; private int messageId; + private MqttProperties properties; UnsubscribeBuilder() { } @@ -241,10 +280,16 @@ public final class MqttMessageBuilders { return this; } + public UnsubscribeBuilder properties(MqttProperties properties) { + this.properties = properties; + return this; + } + public MqttUnsubscribeMessage build() { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0); - MqttMessageIdVariableHeader mqttVariableHeader = MqttMessageIdVariableHeader.from(messageId); + MqttMessageIdAndPropertiesVariableHeader mqttVariableHeader = + new MqttMessageIdAndPropertiesVariableHeader(messageId, properties); MqttUnsubscribePayload mqttSubscribePayload = new MqttUnsubscribePayload(topicFilters); return new MqttUnsubscribeMessage(mqttFixedHeader, mqttVariableHeader, mqttSubscribePayload); } @@ -254,6 +299,7 @@ public final class MqttMessageBuilders { private MqttConnectReturnCode returnCode; private boolean sessionPresent; + private MqttProperties properties = MqttProperties.NO_PROPERTIES; ConnAckBuilder() { } @@ -268,15 +314,196 @@ public final class MqttMessageBuilders { return this; } + public ConnAckBuilder properties(MqttProperties properties) { + this.properties = properties; + return this; + } + public MqttConnAckMessage build() { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttConnAckVariableHeader mqttConnAckVariableHeader = - new MqttConnAckVariableHeader(returnCode, sessionPresent); + new MqttConnAckVariableHeader(returnCode, sessionPresent, properties); return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader); } } + public static final class PubAckBuilder { + + private short packetId; + private byte reasonCode; + private MqttProperties properties; + + PubAckBuilder() { + } + + public PubAckBuilder reasonCode(byte reasonCode) { + this.reasonCode = reasonCode; + return this; + } + + public PubAckBuilder packetId(short packetId) { + this.packetId = packetId; + return this; + } + + public PubAckBuilder properties(MqttProperties properties) { + this.properties = properties; + return this; + } + + public MqttMessage build() { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttPubReplyMessageVariableHeader mqttPubAckVariableHeader = + new MqttPubReplyMessageVariableHeader(packetId, reasonCode, properties); + return new MqttMessage(mqttFixedHeader, mqttPubAckVariableHeader); + } + } + + public static final class SubAckBuilder { + + private short packetId; + private MqttProperties properties; + private final List grantedQoses = new ArrayList(); + + SubAckBuilder() { + } + + public SubAckBuilder packetId(short packetId) { + this.packetId = packetId; + return this; + } + + public SubAckBuilder properties(MqttProperties properties) { + this.properties = properties; + return this; + } + + public SubAckBuilder addGrantedQos(MqttQoS qos) { + this.grantedQoses.add(qos); + return this; + } + + public SubAckBuilder addGrantedQoses(MqttQoS... qoses) { + this.grantedQoses.addAll(Arrays.asList(qoses)); + return this; + } + + public MqttSubAckMessage build() { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdAndPropertiesVariableHeader mqttSubAckVariableHeader = + new MqttMessageIdAndPropertiesVariableHeader(packetId, properties); + + //transform to primitive types + int[] grantedQoses = new int[this.grantedQoses.size()]; + int i = 0; + for (MqttQoS grantedQos : this.grantedQoses) { + grantedQoses[i++] = grantedQos.value(); + } + + MqttSubAckPayload subAckPayload = new MqttSubAckPayload(grantedQoses); + return new MqttSubAckMessage(mqttFixedHeader, mqttSubAckVariableHeader, subAckPayload); + } + } + + public static final class UnsubAckBuilder { + + private short packetId; + private MqttProperties properties; + private final List reasonCodes = new ArrayList(); + + UnsubAckBuilder() { + } + + public UnsubAckBuilder packetId(short packetId) { + this.packetId = packetId; + return this; + } + + public UnsubAckBuilder properties(MqttProperties properties) { + this.properties = properties; + return this; + } + + public UnsubAckBuilder addReasonCode(short reasonCode) { + this.reasonCodes.add(reasonCode); + return this; + } + + public UnsubAckBuilder addReasonCodes(Short... reasonCodes) { + this.reasonCodes.addAll(Arrays.asList(reasonCodes)); + return this; + } + + public MqttUnsubAckMessage build() { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttMessageIdAndPropertiesVariableHeader mqttSubAckVariableHeader = + new MqttMessageIdAndPropertiesVariableHeader(packetId, properties); + + MqttUnsubAckPayload subAckPayload = new MqttUnsubAckPayload(reasonCodes); + return new MqttUnsubAckMessage(mqttFixedHeader, mqttSubAckVariableHeader, subAckPayload); + } + } + + public static final class DisconnectBuilder { + + private MqttProperties properties; + private byte reasonCode; + + DisconnectBuilder() { + } + + public DisconnectBuilder properties(MqttProperties properties) { + this.properties = properties; + return this; + } + + public DisconnectBuilder reasonCode(byte reasonCode) { + this.reasonCode = reasonCode; + return this; + } + + public MqttMessage build() { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttReasonCodeAndPropertiesVariableHeader mqttDisconnectVariableHeader = + new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties); + + return new MqttMessage(mqttFixedHeader, mqttDisconnectVariableHeader); + } + } + + public static final class AuthBuilder { + + private MqttProperties properties; + private byte reasonCode; + + AuthBuilder() { + } + + public AuthBuilder properties(MqttProperties properties) { + this.properties = properties; + return this; + } + + public AuthBuilder reasonCode(byte reasonCode) { + this.reasonCode = reasonCode; + return this; + } + + public MqttMessage build() { + MqttFixedHeader mqttFixedHeader = + new MqttFixedHeader(MqttMessageType.AUTH, false, MqttQoS.AT_MOST_ONCE, false, 0); + MqttReasonCodeAndPropertiesVariableHeader mqttAuthVariableHeader = + new MqttReasonCodeAndPropertiesVariableHeader(reasonCode, properties); + + return new MqttMessage(mqttFixedHeader, mqttAuthVariableHeader); + } + } + public static ConnectBuilder connect() { return new ConnectBuilder(); } @@ -297,6 +524,26 @@ public final class MqttMessageBuilders { return new UnsubscribeBuilder(); } + public static PubAckBuilder pubAck() { + return new PubAckBuilder(); + } + + public static SubAckBuilder subAck() { + return new SubAckBuilder(); + } + + public static UnsubAckBuilder unsubAck() { + return new UnsubAckBuilder(); + } + + public static DisconnectBuilder disconnect() { + return new DisconnectBuilder(); + } + + public static AuthBuilder auth() { + return new AuthBuilder(); + } + private MqttMessageBuilders() { } } diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java index 09f42698d7..c65df17e4e 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageFactory.java @@ -38,7 +38,7 @@ public final class MqttMessageFactory { case SUBSCRIBE: return new MqttSubscribeMessage( mqttFixedHeader, - (MqttMessageIdVariableHeader) variableHeader, + (MqttMessageIdAndPropertiesVariableHeader) variableHeader, (MqttSubscribePayload) payload); case SUBACK: @@ -50,12 +50,13 @@ public final class MqttMessageFactory { case UNSUBACK: return new MqttUnsubAckMessage( mqttFixedHeader, - (MqttMessageIdVariableHeader) variableHeader); + (MqttMessageIdVariableHeader) variableHeader, + (MqttUnsubAckPayload) payload); case UNSUBSCRIBE: return new MqttUnsubscribeMessage( mqttFixedHeader, - (MqttMessageIdVariableHeader) variableHeader, + (MqttMessageIdAndPropertiesVariableHeader) variableHeader, (MqttUnsubscribePayload) payload); case PUBLISH: @@ -65,17 +66,24 @@ public final class MqttMessageFactory { (ByteBuf) payload); case PUBACK: + //Having MqttPubReplyMessageVariableHeader or MqttMessageIdVariableHeader return new MqttPubAckMessage(mqttFixedHeader, (MqttMessageIdVariableHeader) variableHeader); case PUBREC: case PUBREL: case PUBCOMP: + //Having MqttPubReplyMessageVariableHeader or MqttMessageIdVariableHeader return new MqttMessage(mqttFixedHeader, variableHeader); case PINGREQ: case PINGRESP: - case DISCONNECT: return new MqttMessage(mqttFixedHeader); + case DISCONNECT: + case AUTH: + //Having MqttReasonCodeAndPropertiesVariableHeader + return new MqttMessage(mqttFixedHeader, + (MqttReasonCodeAndPropertiesVariableHeader) variableHeader); + default: throw new IllegalArgumentException("unknown message type: " + mqttFixedHeader.messageType()); } diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdAndPropertiesVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdAndPropertiesVariableHeader.java new file mode 100644 index 0000000000..b2a32aaf5b --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdAndPropertiesVariableHeader.java @@ -0,0 +1,47 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +/** + * Variable Header containing, Packet Id and Properties as in MQTT v5 spec. + */ +public final class MqttMessageIdAndPropertiesVariableHeader extends MqttMessageIdVariableHeader { + + private final MqttProperties properties; + + public MqttMessageIdAndPropertiesVariableHeader(int messageId, MqttProperties properties) { + super(messageId); + if (messageId < 1 || messageId > 0xffff) { + throw new IllegalArgumentException("messageId: " + messageId + " (expected: 1 ~ 65535)"); + } + this.properties = MqttProperties.withEmptyDefaults(properties); + } + + public MqttProperties properties() { + return properties; + } + + @Override + public String toString() { + return StringUtil.simpleClassName(this) + "[" + + "messageId=" + messageId() + + ", properties=" + properties + + ']'; + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java index c2e15935ca..e3b9c6e01a 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageIdVariableHeader.java @@ -22,7 +22,7 @@ import io.netty.util.internal.StringUtil; * Variable Header containing only Message Id * See MQTTV3.1/msg-id */ -public final class MqttMessageIdVariableHeader { +public class MqttMessageIdVariableHeader { private final int messageId; @@ -33,7 +33,7 @@ public final class MqttMessageIdVariableHeader { return new MqttMessageIdVariableHeader(messageId); } - private MqttMessageIdVariableHeader(int messageId) { + protected MqttMessageIdVariableHeader(int messageId) { this.messageId = messageId; } @@ -49,4 +49,8 @@ public final class MqttMessageIdVariableHeader { .append(']') .toString(); } + + public MqttMessageIdAndPropertiesVariableHeader withEmptyProperties() { + return new MqttMessageIdAndPropertiesVariableHeader(messageId, MqttProperties.NO_PROPERTIES); + } } diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageType.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageType.java index 2db21d40b0..7f9c57b871 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageType.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttMessageType.java @@ -33,7 +33,8 @@ public enum MqttMessageType { UNSUBACK(11), PINGREQ(12), PINGRESP(13), - DISCONNECT(14); + DISCONNECT(14), + AUTH(15); private final int value; diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttProperties.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttProperties.java new file mode 100644 index 0000000000..d5fbea2468 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttProperties.java @@ -0,0 +1,232 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.mqtt; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +/** + * MQTT Properties container + * */ +public final class MqttProperties { + + public enum MqttPropertyType { + // single byte properties + PAYLOAD_FORMAT_INDICATOR(0x01), + REQUEST_PROBLEM_INFORMATION(0x17), + REQUEST_RESPONSE_INFORMATION(0x19), + MAXIMUM_QOS(0x24), + RETAIN_AVAILABLE(0x25), + WILDCARD_SUBSCRIPTION_AVAILABLE(0x28), + SUBSCRIPTION_IDENTIFIER_AVAILABLE(0x29), + SHARED_SUBSCRIPTION_AVAILABLE(0x2A), + + // two bytes properties + SERVER_KEEP_ALIVE(0x13), + RECEIVE_MAXIMUM(0x21), + TOPIC_ALIAS_MAXIMUM(0x22), + TOPIC_ALIAS(0x23), + + // four bytes properties + PUBLICATION_EXPIRY_INTERVAL(0x02), + SESSION_EXPIRY_INTERVAL(0x11), + WILL_DELAY_INTERVAL(0x18), + MAXIMUM_PACKET_SIZE(0x27), + + // Variable Byte Integer + SUBSCRIPTION_IDENTIFIER(0x0B), + + // UTF-8 Encoded String properties + CONTENT_TYPE(0x03), + RESPONSE_TOPIC(0x08), + ASSIGNED_CLIENT_IDENTIFIER(0x12), + AUTHENTICATION_METHOD(0x15), + RESPONSE_INFORMATION(0x1A), + SERVER_REFERENCE(0x1C), + REASON_STRING(0x1F), + USER_PROPERTY(0x26), + + // Binary Data + CORRELATION_DATA(0x09), + AUTHENTICATION_DATA(0x16); + + private final int value; + + MqttPropertyType(int value) { + this.value = value; + } + + public int value() { + return value; + } + + public static MqttPropertyType valueOf(int type) { + for (MqttPropertyType t : values()) { + if (t.value == type) { + return t; + } + } + throw new IllegalArgumentException("unknown property type: " + type); + } + } + + public static final MqttProperties NO_PROPERTIES = new MqttProperties( + Collections.unmodifiableMap(new HashMap()) + ); + + static MqttProperties withEmptyDefaults(MqttProperties properties) { + if (properties == null) { + return MqttProperties.NO_PROPERTIES; + } + return properties; + } + + public abstract static class MqttProperty { + final T value; + final int propertyId; + + protected MqttProperty(int propertyId, T value) { + this.propertyId = propertyId; + this.value = value; + } + } + + public static final class IntegerProperty extends MqttProperty { + + public IntegerProperty(int propertyId, Integer value) { + super(propertyId, value); + } + } + + public static final class StringProperty extends MqttProperty { + + public StringProperty(int propertyId, String value) { + super(propertyId, value); + } + } + + public static final class StringPair { + public final String key; + public final String value; + + public StringPair(String key, String value) { + this.key = key; + this.value = value; + } + + @Override + public int hashCode() { + return key.hashCode() + 31 * value.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + StringPair that = (StringPair) obj; + + return that.key.equals(this.key) && that.value.equals(this.value); + } + } + + //User properties are the only properties that may be included multiple times and + //are the only properties where ordering is required. Therefore, they need a special handling + public static final class UserProperties extends MqttProperty> { + public UserProperties() { + super(MqttPropertyType.USER_PROPERTY.value, new ArrayList()); + } + + /** + * Create user properties from the collection of the String pair values + * + * @param values string pairs. Collection entries are copied, collection itself isn't shared + */ + public UserProperties(Collection values) { + this(); + this.value.addAll(values); + } + + public void add(StringPair pair) { + this.value.add(pair); + } + + public void add(String key, String value) { + this.value.add(new StringPair(key, value)); + } + } + + public static final class UserProperty extends MqttProperty { + public UserProperty(String key, String value) { + super(MqttPropertyType.USER_PROPERTY.value, new StringPair(key, value)); + } + } + + public static final class BinaryProperty extends MqttProperty { + + public BinaryProperty(int propertyId, byte[] value) { + super(propertyId, value); + } + } + + public MqttProperties() { + this(new HashMap()); + } + + private MqttProperties(Map props) { + this.props = props; + } + + private final Map props; + + public void add(MqttProperty property) { + if (property.propertyId == MqttPropertyType.USER_PROPERTY.value) { + UserProperties userProps = (UserProperties) props.get(property.propertyId); + if (userProps == null) { + userProps = new UserProperties(); + props.put(property.propertyId, userProps); + } + if (property instanceof UserProperty) { + userProps.add(((UserProperty) property).value); + } else { + for (StringPair pair: ((UserProperties) property).value) { + userProps.add(pair); + } + } + } else { + props.put(property.propertyId, property); + } + } + + public Collection listAll() { + return props.values(); + } + + public boolean isEmpty() { + return props.isEmpty(); + } + + public MqttProperty getProperty(int propertyId) { + return props.get(propertyId); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPubReplyMessageVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPubReplyMessageVariableHeader.java new file mode 100644 index 0000000000..12db46f1f1 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPubReplyMessageVariableHeader.java @@ -0,0 +1,56 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +/** + * Variable Header containing Packet Id, reason code and Properties as in MQTT v5 spec. + */ +public final class MqttPubReplyMessageVariableHeader extends MqttMessageIdVariableHeader { + + private final byte reasonCode; + private final MqttProperties properties; + + public static final byte REASON_CODE_OK = 0; + + public MqttPubReplyMessageVariableHeader(int messageId, byte reasonCode, MqttProperties properties) { + super(messageId); + if (messageId < 1 || messageId > 0xffff) { + throw new IllegalArgumentException("messageId: " + messageId + " (expected: 1 ~ 65535)"); + } + this.reasonCode = reasonCode; + this.properties = MqttProperties.withEmptyDefaults(properties); + } + + public byte reasonCode() { + return reasonCode; + } + + public MqttProperties properties() { + return properties; + } + + @Override + public String toString() { + return StringUtil.simpleClassName(this) + "[" + + "messageId=" + messageId() + + ", reasonCode=" + reasonCode + + ", properties=" + properties + + ']'; + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishVariableHeader.java index a0d0481b14..cc0e5875b5 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishVariableHeader.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttPublishVariableHeader.java @@ -25,10 +25,16 @@ public final class MqttPublishVariableHeader { private final String topicName; private final int packetId; + private final MqttProperties properties; public MqttPublishVariableHeader(String topicName, int packetId) { + this(topicName, packetId, MqttProperties.NO_PROPERTIES); + } + + public MqttPublishVariableHeader(String topicName, int packetId, MqttProperties properties) { this.topicName = topicName; this.packetId = packetId; + this.properties = MqttProperties.withEmptyDefaults(properties); } public String topicName() { @@ -47,6 +53,10 @@ public final class MqttPublishVariableHeader { return packetId; } + public MqttProperties properties() { + return properties; + } + @Override public String toString() { return new StringBuilder(StringUtil.simpleClassName(this)) diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttReasonCodeAndPropertiesVariableHeader.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttReasonCodeAndPropertiesVariableHeader.java new file mode 100644 index 0000000000..da6cec643a --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttReasonCodeAndPropertiesVariableHeader.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.StringUtil; + +/** + * Variable Header for AUTH and DISCONNECT messages represented by {@link MqttMessage} + */ +public final class MqttReasonCodeAndPropertiesVariableHeader { + + private final byte reasonCode; + private final MqttProperties properties; + + public static final byte REASON_CODE_OK = 0; + + public MqttReasonCodeAndPropertiesVariableHeader(byte reasonCode, + MqttProperties properties) { + this.reasonCode = reasonCode; + this.properties = MqttProperties.withEmptyDefaults(properties); + } + + public byte reasonCode() { + return reasonCode; + } + + public MqttProperties properties() { + return properties; + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("reasonCode=").append(reasonCode) + .append(", properties=").append(properties) + .append(']') + .toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java index 0a1376f819..f0c7719306 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubAckMessage.java @@ -23,16 +23,27 @@ public final class MqttSubAckMessage extends MqttMessage { public MqttSubAckMessage( MqttFixedHeader mqttFixedHeader, - MqttMessageIdVariableHeader variableHeader, + MqttMessageIdAndPropertiesVariableHeader variableHeader, MqttSubAckPayload payload) { super(mqttFixedHeader, variableHeader, payload); } + public MqttSubAckMessage( + MqttFixedHeader mqttFixedHeader, + MqttMessageIdVariableHeader variableHeader, + MqttSubAckPayload payload) { + this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload); + } + @Override public MqttMessageIdVariableHeader variableHeader() { return (MqttMessageIdVariableHeader) super.variableHeader(); } + public MqttMessageIdAndPropertiesVariableHeader idAndPropertiesVariableHeader() { + return (MqttMessageIdAndPropertiesVariableHeader) super.variableHeader(); + } + @Override public MqttSubAckPayload payload() { return (MqttSubAckPayload) super.payload(); diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java index d56aaacdac..d02f3da038 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscribeMessage.java @@ -24,16 +24,27 @@ public final class MqttSubscribeMessage extends MqttMessage { public MqttSubscribeMessage( MqttFixedHeader mqttFixedHeader, - MqttMessageIdVariableHeader variableHeader, + MqttMessageIdAndPropertiesVariableHeader variableHeader, MqttSubscribePayload payload) { super(mqttFixedHeader, variableHeader, payload); } + public MqttSubscribeMessage( + MqttFixedHeader mqttFixedHeader, + MqttMessageIdVariableHeader variableHeader, + MqttSubscribePayload payload) { + this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload); + } + @Override public MqttMessageIdVariableHeader variableHeader() { return (MqttMessageIdVariableHeader) super.variableHeader(); } + public MqttMessageIdAndPropertiesVariableHeader idAndPropertiesVariableHeader() { + return (MqttMessageIdAndPropertiesVariableHeader) super.variableHeader(); + } + @Override public MqttSubscribePayload payload() { return (MqttSubscribePayload) super.payload(); diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscriptionOption.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscriptionOption.java new file mode 100644 index 0000000000..3da9501661 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttSubscriptionOption.java @@ -0,0 +1,124 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.mqtt; + +/** + * Model the SubscriptionOption used in Subscribe MQTT v5 packet + */ +public final class MqttSubscriptionOption { + + enum RetainedHandlingPolicy { + SEND_AT_SUBSCRIBE(0), + SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS(1), + DONT_SEND_AT_SUBSCRIBE(2); + + private final int value; + + RetainedHandlingPolicy(int value) { + this.value = value; + } + + public int value() { + return value; + } + + public static RetainedHandlingPolicy valueOf(int value) { + for (RetainedHandlingPolicy q: values()) { + if (q.value == value) { + return q; + } + } + throw new IllegalArgumentException("invalid RetainedHandlingPolicy: " + value); + } + } + + private final MqttQoS qos; + private final boolean noLocal; + private final boolean retainAsPublished; + private final RetainedHandlingPolicy retainHandling; + + public static MqttSubscriptionOption onlyFromQos(MqttQoS qos) { + return new MqttSubscriptionOption(qos, false, false, RetainedHandlingPolicy.SEND_AT_SUBSCRIBE); + } + + public MqttSubscriptionOption(MqttQoS qos, + boolean noLocal, + boolean retainAsPublished, + RetainedHandlingPolicy retainHandling) { + this.qos = qos; + this.noLocal = noLocal; + this.retainAsPublished = retainAsPublished; + this.retainHandling = retainHandling; + } + + public MqttQoS qos() { + return qos; + } + + public boolean isNoLocal() { + return noLocal; + } + + public boolean isRetainAsPublished() { + return retainAsPublished; + } + + public RetainedHandlingPolicy retainHandling() { + return retainHandling; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MqttSubscriptionOption that = (MqttSubscriptionOption) o; + + if (noLocal != that.noLocal) { + return false; + } + if (retainAsPublished != that.retainAsPublished) { + return false; + } + if (qos != that.qos) { + return false; + } + return retainHandling == that.retainHandling; + } + + @Override + public int hashCode() { + int result = qos.hashCode(); + result = 31 * result + (noLocal ? 1 : 0); + result = 31 * result + (retainAsPublished ? 1 : 0); + result = 31 * result + retainHandling.hashCode(); + return result; + } + + @Override + public String toString() { + return "SubscriptionOption[" + + "qos=" + qos + + ", noLocal=" + noLocal + + ", retainAsPublished=" + retainAsPublished + + ", retainHandling=" + retainHandling + + ']'; + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttTopicSubscription.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttTopicSubscription.java index acee7fdd27..da0bd8ac14 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttTopicSubscription.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttTopicSubscription.java @@ -25,11 +25,16 @@ import io.netty.util.internal.StringUtil; public final class MqttTopicSubscription { private final String topicFilter; - private final MqttQoS qualityOfService; + private final MqttSubscriptionOption option; public MqttTopicSubscription(String topicFilter, MqttQoS qualityOfService) { this.topicFilter = topicFilter; - this.qualityOfService = qualityOfService; + this.option = MqttSubscriptionOption.onlyFromQos(qualityOfService); + } + + public MqttTopicSubscription(String topicFilter, MqttSubscriptionOption option) { + this.topicFilter = topicFilter; + this.option = option; } public String topicName() { @@ -37,7 +42,11 @@ public final class MqttTopicSubscription { } public MqttQoS qualityOfService() { - return qualityOfService; + return option.qos(); + } + + public MqttSubscriptionOption option() { + return option; } @Override @@ -45,7 +54,7 @@ public final class MqttTopicSubscription { return new StringBuilder(StringUtil.simpleClassName(this)) .append('[') .append("topicFilter=").append(topicFilter) - .append(", qualityOfService=").append(qualityOfService) + .append(", option=").append(this.option) .append(']') .toString(); } diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java index 8093ed336d..54e61957f9 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckMessage.java @@ -21,12 +21,41 @@ package io.netty.handler.codec.mqtt; */ public final class MqttUnsubAckMessage extends MqttMessage { - public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader, MqttMessageIdVariableHeader variableHeader) { - super(mqttFixedHeader, variableHeader, null); + public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader, + MqttMessageIdAndPropertiesVariableHeader variableHeader, + MqttUnsubAckPayload payload) { + super(mqttFixedHeader, variableHeader, payload); + } + + public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader, + MqttMessageIdVariableHeader variableHeader, + MqttUnsubAckPayload payload) { + this(mqttFixedHeader, fallbackVariableHeader(variableHeader), payload); + } + public MqttUnsubAckMessage(MqttFixedHeader mqttFixedHeader, + MqttMessageIdVariableHeader variableHeader) { + this(mqttFixedHeader, variableHeader, null); + } + + private static MqttMessageIdAndPropertiesVariableHeader fallbackVariableHeader( + MqttMessageIdVariableHeader variableHeader) { + if (variableHeader instanceof MqttMessageIdAndPropertiesVariableHeader) { + return (MqttMessageIdAndPropertiesVariableHeader) variableHeader; + } + return new MqttMessageIdAndPropertiesVariableHeader(variableHeader.messageId(), MqttProperties.NO_PROPERTIES); } @Override public MqttMessageIdVariableHeader variableHeader() { return (MqttMessageIdVariableHeader) super.variableHeader(); } + + public MqttMessageIdAndPropertiesVariableHeader idAndPropertiesVariableHeader() { + return (MqttMessageIdAndPropertiesVariableHeader) super.variableHeader(); + } + + @Override + public MqttUnsubAckPayload payload() { + return (MqttUnsubAckPayload) super.payload(); + } } diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckPayload.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckPayload.java new file mode 100644 index 0000000000..2ab1d42b11 --- /dev/null +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubAckPayload.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.handler.codec.mqtt; + +import io.netty.util.internal.ObjectUtil; +import io.netty.util.internal.StringUtil; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Payload for MQTT unsuback message as in V5. + */ +public final class MqttUnsubAckPayload { + + private final List unsubscribeReasonCodes; + + public MqttUnsubAckPayload(short... unsubscribeReasonCodes) { + ObjectUtil.checkNotNull(unsubscribeReasonCodes, "unsubscribeReasonCodes"); + + List list = new ArrayList(unsubscribeReasonCodes.length); + for (Short v: unsubscribeReasonCodes) { + list.add(v); + } + this.unsubscribeReasonCodes = Collections.unmodifiableList(list); + } + + public MqttUnsubAckPayload(Iterable unsubscribeReasonCodes) { + ObjectUtil.checkNotNull(unsubscribeReasonCodes, "unsubscribeReasonCodes"); + + List list = new ArrayList(); + for (Short v: unsubscribeReasonCodes) { + ObjectUtil.checkNotNull(v, "unsubscribeReasonCode"); + list.add(v); + } + this.unsubscribeReasonCodes = Collections.unmodifiableList(list); + } + + public List unsubscribeReasonCodes() { + return unsubscribeReasonCodes; + } + + @Override + public String toString() { + return new StringBuilder(StringUtil.simpleClassName(this)) + .append('[') + .append("unsubscribeReasonCodes=").append(unsubscribeReasonCodes) + .append(']') + .toString(); + } +} diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java index 515a6cdae8..4523e389ae 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttUnsubscribeMessage.java @@ -24,16 +24,27 @@ public final class MqttUnsubscribeMessage extends MqttMessage { public MqttUnsubscribeMessage( MqttFixedHeader mqttFixedHeader, - MqttMessageIdVariableHeader variableHeader, + MqttMessageIdAndPropertiesVariableHeader variableHeader, MqttUnsubscribePayload payload) { super(mqttFixedHeader, variableHeader, payload); } + public MqttUnsubscribeMessage( + MqttFixedHeader mqttFixedHeader, + MqttMessageIdVariableHeader variableHeader, + MqttUnsubscribePayload payload) { + this(mqttFixedHeader, variableHeader.withEmptyProperties(), payload); + } + @Override public MqttMessageIdVariableHeader variableHeader() { return (MqttMessageIdVariableHeader) super.variableHeader(); } + public MqttMessageIdAndPropertiesVariableHeader idAndPropertiesVariableHeader() { + return (MqttMessageIdAndPropertiesVariableHeader) super.variableHeader(); + } + @Override public MqttUnsubscribePayload payload() { return (MqttUnsubscribePayload) super.payload(); diff --git a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttVersion.java b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttVersion.java index 985bd9a094..bfa0d98829 100644 --- a/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttVersion.java +++ b/codec-mqtt/src/main/java/io/netty/handler/codec/mqtt/MqttVersion.java @@ -24,7 +24,8 @@ import io.netty.util.internal.ObjectUtil; */ public enum MqttVersion { MQTT_3_1("MQIsdp", (byte) 3), - MQTT_3_1_1("MQTT", (byte) 4); + MQTT_3_1_1("MQTT", (byte) 4), + MQTT_5("MQTT", (byte) 5); private final String name; private final byte level; @@ -48,8 +49,8 @@ public enum MqttVersion { public static MqttVersion fromProtocolNameAndLevel(String protocolName, byte protocolLevel) { for (MqttVersion mv : values()) { - if (mv.name.equals(protocolName)) { - if (mv.level == protocolLevel) { + if (mv.level == protocolLevel) { + if (mv.name.equals(protocolName)) { return mv; } else { throw new MqttUnacceptableProtocolVersionException(protocolName + " and " + diff --git a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java index 2264dc4079..8ceb404aa9 100644 --- a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java +++ b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttCodecTest.java @@ -18,20 +18,28 @@ package io.netty.handler.codec.mqtt; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.DecoderException; +import io.netty.handler.codec.EncoderException; +import io.netty.util.Attribute; import io.netty.util.CharsetUtil; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.w3c.dom.Attr; import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; +import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.*; +import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; +import static io.netty.handler.codec.mqtt.MqttSubscriptionOption.RetainedHandlingPolicy.SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -50,12 +58,17 @@ public class MqttCodecTest { private static final ByteBufAllocator ALLOCATOR = new UnpooledByteBufAllocator(false); + private static final int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092; + @Mock private final ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); @Mock private final Channel channel = mock(Channel.class); + @Mock + private final Attribute versionAttrMock = mock(Attribute.class); + private final MqttDecoder mqttDecoder = new MqttDecoder(); /** @@ -67,12 +80,14 @@ public class MqttCodecTest { public void setup() { MockitoAnnotations.initMocks(this); when(ctx.channel()).thenReturn(channel); + when(ctx.alloc()).thenReturn(ALLOCATOR); + when(channel.attr(MqttCodecUtil.MQTT_VERSION_KEY)).thenReturn(versionAttrMock); } @Test public void testConnectMessageForMqtt31() throws Exception { final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); final List out = new LinkedList(); mqttDecoder.decode(ctx, byteBuf, out); @@ -89,7 +104,7 @@ public class MqttCodecTest { @Test public void testConnectMessageForMqtt311() throws Exception { final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); final List out = new LinkedList(); mqttDecoder.decode(ctx, byteBuf, out); @@ -106,7 +121,7 @@ public class MqttCodecTest { @Test public void testConnectMessageWithNonZeroReservedFlagForMqtt311() throws Exception { final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); try { // Set the reserved flag in the CONNECT Packet to 1 byteBuf.setByte(9, byteBuf.getByte(9) | 0x1); @@ -127,19 +142,24 @@ public class MqttCodecTest { @Test public void testConnectMessageNoPassword() throws Exception { - final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1, null, PASSWORD); + final MqttConnectMessage message = createConnectMessage( + MqttVersion.MQTT_3_1_1, + null, + PASSWORD, + MqttProperties.NO_PROPERTIES, + MqttProperties.NO_PROPERTIES); try { - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); } catch (Exception cause) { - assertTrue(cause instanceof DecoderException); + assertTrue(cause instanceof EncoderException); } } @Test public void testConnAckMessage() throws Exception { final MqttConnAckMessage message = createConnAckMessage(); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); final List out = new LinkedList(); mqttDecoder.decode(ctx, byteBuf, out); @@ -154,7 +174,7 @@ public class MqttCodecTest { @Test public void testPublishMessage() throws Exception { final MqttPublishMessage message = createPublishMessage(); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); final List out = new LinkedList(); mqttDecoder.decode(ctx, byteBuf, out); @@ -190,7 +210,7 @@ public class MqttCodecTest { @Test public void testSubscribeMessage() throws Exception { final MqttSubscribeMessage message = createSubscribeMessage(); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); final List out = new LinkedList(); mqttDecoder.decode(ctx, byteBuf, out); @@ -206,7 +226,7 @@ public class MqttCodecTest { @Test public void testSubAckMessage() throws Exception { final MqttSubAckMessage message = createSubAckMessage(); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); final List out = new LinkedList(); mqttDecoder.decode(ctx, byteBuf, out); @@ -228,7 +248,7 @@ public class MqttCodecTest { MqttSubAckMessage message = new MqttSubAckMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttSubAckPayload); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); List out = new LinkedList(); mqttDecoder.decode(ctx, byteBuf, out); @@ -246,7 +266,7 @@ public class MqttCodecTest { @Test public void testUnSubscribeMessage() throws Exception { final MqttUnsubscribeMessage message = createUnsubscribeMessage(); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); final List out = new LinkedList(); mqttDecoder.decode(ctx, byteBuf, out); @@ -279,11 +299,12 @@ public class MqttCodecTest { testMessageWithOnlyFixedHeader(MqttMessage.DISCONNECT); } + //All 0..F message type codes are valid in MQTT 5 @Test public void testUnknownMessageType() throws Exception { final MqttMessage message = createMessageWithFixedHeader(MqttMessageType.PINGREQ); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); try { // setting an invalid message type (15, reserved and forbidden by MQTT 3.1.1 spec) byteBuf.setByte(0, 0xF0); @@ -295,8 +316,8 @@ public class MqttCodecTest { final MqttMessage decodedMessage = (MqttMessage) out.get(0); assertTrue(decodedMessage.decoderResult().isFailure()); Throwable cause = decodedMessage.decoderResult().cause(); - assertTrue(cause instanceof IllegalArgumentException); - assertEquals("unknown message type: 15", cause.getMessage()); + assertTrue(cause instanceof DecoderException); + assertEquals("AUTH message requires at least MQTT 5", cause.getMessage()); } finally { byteBuf.release(); } @@ -305,7 +326,7 @@ public class MqttCodecTest { @Test public void testConnectMessageForMqtt31TooLarge() throws Exception { final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); try { final List out = new LinkedList(); @@ -327,7 +348,7 @@ public class MqttCodecTest { @Test public void testConnectMessageForMqtt311TooLarge() throws Exception { final MqttConnectMessage message = createConnectMessage(MqttVersion.MQTT_3_1_1); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); try { final List out = new LinkedList(); @@ -349,7 +370,7 @@ public class MqttCodecTest { @Test public void testConnAckMessageTooLarge() throws Exception { final MqttConnAckMessage message = createConnAckMessage(); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); try { final List out = new LinkedList(); @@ -368,7 +389,7 @@ public class MqttCodecTest { @Test public void testPublishMessageTooLarge() throws Exception { final MqttPublishMessage message = createPublishMessage(); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); try { final List out = new LinkedList(); @@ -390,7 +411,7 @@ public class MqttCodecTest { @Test public void testSubscribeMessageTooLarge() throws Exception { final MqttSubscribeMessage message = createSubscribeMessage(); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); try { final List out = new LinkedList(); @@ -411,7 +432,7 @@ public class MqttCodecTest { @Test public void testSubAckMessageTooLarge() throws Exception { final MqttSubAckMessage message = createSubAckMessage(); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); try { final List out = new LinkedList(); @@ -432,7 +453,7 @@ public class MqttCodecTest { @Test public void testUnSubscribeMessageTooLarge() throws Exception { final MqttUnsubscribeMessage message = createUnsubscribeMessage(); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); try { final List out = new LinkedList(); @@ -450,8 +471,288 @@ public class MqttCodecTest { } } + @Test + public void testConnectMessageForMqtt5() throws Exception { + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.IntegerProperty(SESSION_EXPIRY_INTERVAL.value(), 10)); + props.add(new MqttProperties.StringProperty(AUTHENTICATION_METHOD.value(), "Plain")); + MqttProperties willProps = new MqttProperties(); + willProps.add(new MqttProperties.IntegerProperty(WILL_DELAY_INTERVAL.value(), 100)); + final MqttConnectMessage message = + createConnectMessage(MqttVersion.MQTT_5, USER_NAME, PASSWORD, props, willProps); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + + final MqttConnectMessage decodedMessage = (MqttConnectMessage) out.get(0); + + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateConnectVariableHeader(message.variableHeader(), decodedMessage.variableHeader()); + validateConnectPayload(message.payload(), decodedMessage.payload()); + } + + @Test + public void testConnAckMessageForMqtt5() throws Exception { + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.IntegerProperty(SESSION_EXPIRY_INTERVAL.value(), 10)); + props.add(new MqttProperties.IntegerProperty(MAXIMUM_QOS.value(), 1)); + props.add(new MqttProperties.IntegerProperty(MAXIMUM_PACKET_SIZE.value(), 1000)); + final MqttConnAckMessage message = createConnAckMessage(props); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + + final MqttConnAckMessage decodedMessage = (MqttConnAckMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateConnAckVariableHeader(message.variableHeader(), decodedMessage.variableHeader()); + } + + @Test + public void testPublishMessageForMqtt5() throws Exception { + when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5); + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6)); + props.add(new MqttProperties.UserProperty("isSecret", "true")); + props.add(new MqttProperties.UserProperty("isUrgent", "false")); + assertEquals("User properties count mismatch", + ((MqttProperties.UserProperties) props.getProperty(USER_PROPERTY.value())).value.size(), 2); + final MqttPublishMessage message = createPublishMessage(props); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + + final MqttPublishMessage decodedMessage = (MqttPublishMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validatePublishVariableHeader(message.variableHeader(), decodedMessage.variableHeader()); + validatePublishPayload(message.payload(), decodedMessage.payload()); + } + + @Test + public void testPubAckMessageForMqtt5() throws Exception { + when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5); + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6)); + //0x87 - Not Authorized + final MqttMessage message = createPubAckMessage((byte) 0x87, props); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + + final MqttMessage decodedMessage = (MqttMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validatePubReplyVariableHeader((MqttPubReplyMessageVariableHeader) message.variableHeader(), + (MqttPubReplyMessageVariableHeader) decodedMessage.variableHeader()); + } + + @Test + public void testPubAckMessageSkipCodeForMqtt5() throws Exception { + //Code 0 (Success) and no properties - skip encoding code and properties + final MqttMessage message = createPubAckMessage((byte) 0, MqttProperties.NO_PROPERTIES); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + + final MqttMessage decodedMessage = (MqttMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validatePubReplyVariableHeader((MqttPubReplyMessageVariableHeader) message.variableHeader(), + (MqttPubReplyMessageVariableHeader) decodedMessage.variableHeader()); + } + + @Test + public void testSubAckMessageForMqtt5() throws Exception { + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6)); + final MqttSubAckMessage message = createSubAckMessage(props); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + + final MqttSubAckMessage decodedMessage = (MqttSubAckMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validatePacketIdAndPropertiesVariableHeader( + (MqttMessageIdAndPropertiesVariableHeader) message.variableHeader(), + (MqttMessageIdAndPropertiesVariableHeader) decodedMessage.variableHeader()); + } + + @Test + public void testSubscribeMessageForMqtt5() throws Exception { + when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5); + + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6)); + final MqttSubscribeMessage message = MqttMessageBuilders.subscribe() + .messageId((short) 1) + .properties(props) + .addSubscription("/topic", new MqttSubscriptionOption(AT_LEAST_ONCE, + true, + true, + SEND_AT_SUBSCRIBE_IF_NOT_YET_EXISTS)) + .build(); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + final MqttSubscribeMessage decodedMessage = (MqttSubscribeMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + final MqttMessageIdAndPropertiesVariableHeader expectedHeader = + (MqttMessageIdAndPropertiesVariableHeader) message.variableHeader(); + final MqttMessageIdAndPropertiesVariableHeader actualHeader = + (MqttMessageIdAndPropertiesVariableHeader) decodedMessage.variableHeader(); + validatePacketIdAndPropertiesVariableHeader(expectedHeader, actualHeader); + validateSubscribePayload(message.payload(), decodedMessage.payload()); + } + + @Test + public void testUnsubAckMessageForMqtt5() throws Exception { + when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5); + + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.IntegerProperty(PAYLOAD_FORMAT_INDICATOR.value(), 6)); + final MqttUnsubAckMessage message = MqttMessageBuilders.unsubAck() + .packetId((short) 1) + .properties(props) + .addReasonCode((short) 0x83) + .build(); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + + final MqttUnsubAckMessage decodedMessage = (MqttUnsubAckMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validatePacketIdAndPropertiesVariableHeader( + (MqttMessageIdAndPropertiesVariableHeader) message.variableHeader(), + (MqttMessageIdAndPropertiesVariableHeader) decodedMessage.variableHeader()); + assertEquals("Reason code list doesn't match", message.payload().unsubscribeReasonCodes(), + decodedMessage.payload().unsubscribeReasonCodes()); + } + + @Test + public void testDisconnectMessageForMqtt5() throws Exception { + when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5); + + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.IntegerProperty(SESSION_EXPIRY_INTERVAL.value(), 6)); + final MqttMessage message = MqttMessageBuilders.disconnect() + .reasonCode((byte) 0x96) // Message rate too high + .properties(props) + .build(); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + final MqttMessage decodedMessage = (MqttMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateReasonCodeAndPropertiesVariableHeader( + (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader(), + (MqttReasonCodeAndPropertiesVariableHeader) decodedMessage.variableHeader()); + } + + @Test + public void testDisconnectMessageSkipCodeForMqtt5() throws Exception { + //code 0 and no properties - skip encoding code and properties + final MqttMessage message = MqttMessageBuilders.disconnect() + .reasonCode((byte) 0) // ok + .properties(MqttProperties.NO_PROPERTIES) + .build(); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + final MqttMessage decodedMessage = (MqttMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateReasonCodeAndPropertiesVariableHeader( + (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader(), + (MqttReasonCodeAndPropertiesVariableHeader) decodedMessage.variableHeader()); + } + + @Test + public void testAuthMessageForMqtt5() throws Exception { + when(versionAttrMock.get()).thenReturn(MqttVersion.MQTT_5); + + MqttProperties props = new MqttProperties(); + props.add(new MqttProperties.BinaryProperty(AUTHENTICATION_DATA.value(), "secret".getBytes(CharsetUtil.UTF_8))); + final MqttMessage message = MqttMessageBuilders.auth() + .reasonCode((byte) 0x18) // Continue authentication + .properties(props) + .build(); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); + + final List out = new LinkedList(); + + mqttDecoder.decode(ctx, byteBuf, out); + + assertEquals("Expected one object but got " + out.size(), 1, out.size()); + final MqttMessage decodedMessage = (MqttMessage) out.get(0); + validateFixedHeaders(message.fixedHeader(), decodedMessage.fixedHeader()); + validateReasonCodeAndPropertiesVariableHeader( + (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader(), + (MqttReasonCodeAndPropertiesVariableHeader) decodedMessage.variableHeader()); + } + + @Test + public void testMqttVersionDetection() throws Exception { + clearInvocations(versionAttrMock); + //Encode CONNECT message so that encoder would initialize its version + final MqttConnectMessage connectMessage = createConnectMessage(MqttVersion.MQTT_5); + ByteBuf connectByteBuf = MqttEncoder.doEncode(ctx, connectMessage); + + verify(versionAttrMock, times(1)).set(MqttVersion.MQTT_5); + clearInvocations(versionAttrMock); + + final List connectOut = new LinkedList(); + mqttDecoder.decode(ctx, connectByteBuf, connectOut); + + verify(versionAttrMock, times(1)).set(MqttVersion.MQTT_5); + + assertEquals("Expected one CONNECT object but got " + connectOut.size(), 1, connectOut.size()); + + final MqttConnectMessage decodedConnectMessage = (MqttConnectMessage) connectOut.get(0); + + validateFixedHeaders(connectMessage.fixedHeader(), decodedConnectMessage.fixedHeader()); + validateConnectVariableHeader(connectMessage.variableHeader(), decodedConnectMessage.variableHeader()); + validateConnectPayload(connectMessage.payload(), decodedConnectMessage.payload()); + + verifyNoMoreInteractions(versionAttrMock); + } + private void testMessageWithOnlyFixedHeader(MqttMessage message) throws Exception { - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); final List out = new LinkedList(); mqttDecoder.decode(ctx, byteBuf, out); @@ -466,7 +767,7 @@ public class MqttCodecTest { throws Exception { MqttMessage message = createMessageWithFixedHeaderAndMessageIdVariableHeader(messageType); - ByteBuf byteBuf = MqttEncoder.doEncode(ALLOCATOR, message); + ByteBuf byteBuf = MqttEncoder.doEncode(ctx, message); final List out = new LinkedList(); mqttDecoder.decode(ctx, byteBuf, out); @@ -500,36 +801,57 @@ public class MqttCodecTest { } private static MqttConnectMessage createConnectMessage(MqttVersion mqttVersion) { - return createConnectMessage(mqttVersion, USER_NAME, PASSWORD); + return createConnectMessage(mqttVersion, + USER_NAME, + PASSWORD, + MqttProperties.NO_PROPERTIES, + MqttProperties.NO_PROPERTIES); } - private static MqttConnectMessage createConnectMessage(MqttVersion mqttVersion, String username, String password) { + private static MqttConnectMessage createConnectMessage(MqttVersion mqttVersion, + String username, + String password, + MqttProperties properties, + MqttProperties willProperties) { return MqttMessageBuilders.connect() .clientId(CLIENT_ID) .protocolVersion(mqttVersion) .username(username) - .password(password) + .password(password.getBytes(CharsetUtil.UTF_8)) + .properties(properties) .willRetain(true) .willQoS(MqttQoS.AT_LEAST_ONCE) .willFlag(true) .willTopic(WILL_TOPIC) - .willMessage(WILL_MESSAGE) + .willMessage(WILL_MESSAGE.getBytes(CharsetUtil.UTF_8)) + .willProperties(willProperties) .cleanSession(true) .keepAlive(KEEP_ALIVE_SECONDS) .build(); } private static MqttConnAckMessage createConnAckMessage() { + return createConnAckMessage(MqttProperties.NO_PROPERTIES); + } + + private static MqttConnAckMessage createConnAckMessage(MqttProperties properties) { return MqttMessageBuilders.connAck() .returnCode(MqttConnectReturnCode.CONNECTION_ACCEPTED) + .properties(properties) .sessionPresent(true) .build(); } private static MqttPublishMessage createPublishMessage() { + return createPublishMessage(MqttProperties.NO_PROPERTIES); + } + + private static MqttPublishMessage createPublishMessage(MqttProperties properties) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, MqttQoS.AT_LEAST_ONCE, true, 0); - MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader("/abc", 1234); + MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader("/abc", + 1234, + properties); ByteBuf payload = ALLOCATOR.buffer(); payload.writeBytes("whatever".getBytes(CharsetUtil.UTF_8)); return new MqttPublishMessage(mqttFixedHeader, mqttPublishVariableHeader, payload); @@ -550,6 +872,10 @@ public class MqttCodecTest { } private static MqttSubAckMessage createSubAckMessage() { + return createSubAckMessage(MqttProperties.NO_PROPERTIES); + } + + private static MqttSubAckMessage createSubAckMessage(MqttProperties properties) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessageIdVariableHeader mqttMessageIdVariableHeader = MqttMessageIdVariableHeader.from(12345); @@ -571,6 +897,14 @@ public class MqttCodecTest { return new MqttUnsubscribeMessage(mqttFixedHeader, mqttMessageIdVariableHeader, mqttUnsubscribePayload); } + private MqttMessage createPubAckMessage(byte reasonCode, MqttProperties properties) { + return MqttMessageBuilders.pubAck() + .packetId((short) 1) + .reasonCode(reasonCode) + .properties(properties) + .build(); + } + // Helper methods to compare expected and actual // MQTT messages @@ -588,6 +922,8 @@ public class MqttCodecTest { expected.keepAliveTimeSeconds(), actual.keepAliveTimeSeconds()); assertEquals("MqttConnectVariableHeader Version mismatch ", expected.version(), actual.version()); + assertEquals("MqttConnectVariableHeader Version mismatch ", expected.version(), actual.version()); + validateProperties(expected.properties(), actual.properties()); assertEquals("MqttConnectVariableHeader WillQos mismatch ", expected.willQos(), actual.willQos()); assertEquals("MqttConnectVariableHeader HasUserName mismatch ", expected.hasUserName(), actual.hasUserName()); @@ -618,6 +954,7 @@ public class MqttCodecTest { "MqttConnectPayload WillMessage bytes mismatch ", Arrays.equals(expected.willMessageInBytes(), actual.willMessageInBytes())); assertEquals("MqttConnectPayload WillTopic mismatch ", expected.willTopic(), actual.willTopic()); + validateProperties(expected.willProperties(), actual.willProperties()); } private static void validateConnAckVariableHeader( @@ -633,7 +970,8 @@ public class MqttCodecTest { MqttPublishVariableHeader expected, MqttPublishVariableHeader actual) { assertEquals("MqttPublishVariableHeader TopicName mismatch ", expected.topicName(), actual.topicName()); - assertEquals("MqttPublishVariableHeader MessageId mismatch ", expected.messageId(), actual.messageId()); + assertEquals("MqttPublishVariableHeader MessageId mismatch ", expected.packetId(), actual.packetId()); + validateProperties(expected.properties(), actual.properties()); } private static void validatePublishPayload(ByteBuf expected, ByteBuf actual) { @@ -667,6 +1005,10 @@ public class MqttCodecTest { "MqttTopicSubscription Qos mismatch ", expected.qualityOfService(), actual.qualityOfService()); + assertEquals( + "MqttTopicSubscription options mismatch ", + expected.option(), + actual.option()); } private static void validateSubAckPayload(MqttSubAckPayload expected, MqttSubAckPayload actual) { @@ -692,4 +1034,121 @@ public class MqttCodecTest { assertTrue("MqttMessage DecoderResult cause reason expect to contain 'too large message' ", cause.getMessage().contains("too large message:")); } + + private static void validatePubReplyVariableHeader( + MqttPubReplyMessageVariableHeader expected, + MqttPubReplyMessageVariableHeader actual) { + assertEquals("MqttPubReplyMessageVariableHeader MessageId mismatch ", + expected.messageId(), actual.messageId()); + assertEquals("MqttPubReplyMessageVariableHeader reasonCode mismatch ", + expected.reasonCode(), actual.reasonCode()); + + final MqttProperties expectedProps = expected.properties(); + final MqttProperties actualProps = actual.properties(); + validateProperties(expectedProps, actualProps); + } + + private void validatePacketIdAndPropertiesVariableHeader(MqttMessageIdAndPropertiesVariableHeader expected, + MqttMessageIdAndPropertiesVariableHeader actual) { + assertEquals("MqttMessageIdAndPropertiesVariableHeader MessageId mismatch ", + expected.messageId(), actual.messageId()); + final MqttProperties expectedProps = expected.properties(); + final MqttProperties actualProps = actual.properties(); + validateProperties(expectedProps, actualProps); + } + + private void validateReasonCodeAndPropertiesVariableHeader(MqttReasonCodeAndPropertiesVariableHeader expected, + MqttReasonCodeAndPropertiesVariableHeader actual) { + assertEquals("MqttReasonCodeAndPropertiesVariableHeader reason mismatch ", + expected.reasonCode(), actual.reasonCode()); + final MqttProperties expectedProps = expected.properties(); + final MqttProperties actualProps = actual.properties(); + validateProperties(expectedProps, actualProps); + } + + private static void validateProperties(MqttProperties expected, MqttProperties actual) { + for (MqttProperties.MqttProperty expectedProperty : expected.listAll()) { + MqttProperties.MqttProperty actualProperty = actual.getProperty(expectedProperty.propertyId); + switch (MqttProperties.MqttPropertyType.valueOf(expectedProperty.propertyId)) { + // one byte value integer property + case PAYLOAD_FORMAT_INDICATOR: + case REQUEST_PROBLEM_INFORMATION: + case REQUEST_RESPONSE_INFORMATION: + case MAXIMUM_QOS: + case RETAIN_AVAILABLE: + case WILDCARD_SUBSCRIPTION_AVAILABLE: + case SUBSCRIPTION_IDENTIFIER_AVAILABLE: + case SHARED_SUBSCRIPTION_AVAILABLE: { + final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; + final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; + assertEquals("one byte property doesn't match", expectedValue, actualValue); + break; + } + // two byte value integer property + case SERVER_KEEP_ALIVE: + case RECEIVE_MAXIMUM: + case TOPIC_ALIAS_MAXIMUM: + case TOPIC_ALIAS: { + final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; + final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; + assertEquals("two byte property doesn't match", expectedValue, actualValue); + break; + } + // four byte value integer property + case PUBLICATION_EXPIRY_INTERVAL: + case SESSION_EXPIRY_INTERVAL: + case WILL_DELAY_INTERVAL: + case MAXIMUM_PACKET_SIZE: { + final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; + final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; + assertEquals("four byte property doesn't match", expectedValue, actualValue); + break; + } + // four byte value integer property + case SUBSCRIPTION_IDENTIFIER: { + final Integer expectedValue = ((MqttProperties.IntegerProperty) expectedProperty).value; + final Integer actualValue = ((MqttProperties.IntegerProperty) actualProperty).value; + assertEquals("variable byte integer property doesn't match", expectedValue, actualValue); + break; + } + // UTF-8 string value integer property + case CONTENT_TYPE: + case RESPONSE_TOPIC: + case ASSIGNED_CLIENT_IDENTIFIER: + case AUTHENTICATION_METHOD: + case RESPONSE_INFORMATION: + case SERVER_REFERENCE: + case REASON_STRING: { + final String expectedValue = ((MqttProperties.StringProperty) expectedProperty).value; + final String actualValue = ((MqttProperties.StringProperty) actualProperty).value; + assertEquals("String property doesn't match", expectedValue, actualValue); + break; + } + // User property + case USER_PROPERTY: { + final List expectedPairs = + ((MqttProperties.UserProperties) expectedProperty).value; + final List actualPairs = + ((MqttProperties.UserProperties) actualProperty).value; + assertEquals("User properties count doesn't match", expectedPairs, actualPairs); + for (int i = 0; i < expectedPairs.size(); i++) { + assertEquals("User property mismatch", expectedPairs.get(i), actualPairs.get(i)); + } + break; + } + // byte[] property + case CORRELATION_DATA: + case AUTHENTICATION_DATA: { + final byte[] expectedValue = ((MqttProperties.BinaryProperty) expectedProperty).value; + final byte[] actualValue = ((MqttProperties.BinaryProperty) actualProperty).value; + final String expectedHexDump = ByteBufUtil.hexDump(expectedValue); + final String actualHexDump = ByteBufUtil.hexDump(actualValue); + assertEquals("byte[] property doesn't match", expectedHexDump, actualHexDump); + break; + } + default: + fail("Property Id not recognized " + Integer.toHexString(expectedProperty.propertyId)); + } + } + } } diff --git a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttConnectPayloadTest.java b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttConnectPayloadTest.java index f1d9dc0db1..b8cb4ae7b8 100644 --- a/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttConnectPayloadTest.java +++ b/codec-mqtt/src/test/java/io/netty/handler/codec/mqtt/MqttConnectPayloadTest.java @@ -32,8 +32,12 @@ public class MqttConnectPayloadTest { byte[] willMessage = null; String userName = "userName"; byte[] password = "password".getBytes(CharsetUtil.UTF_8); - MqttConnectPayload mqttConnectPayload = - new MqttConnectPayload(clientIdentifier, willTopic, willMessage, userName, password); + MqttConnectPayload mqttConnectPayload = new MqttConnectPayload(clientIdentifier, + MqttProperties.NO_PROPERTIES, + willTopic, + willMessage, + userName, + password); assertNull(mqttConnectPayload.willMessageInBytes()); assertNull(mqttConnectPayload.willMessage()); @@ -46,8 +50,12 @@ public class MqttConnectPayloadTest { byte[] willMessage = "willMessage".getBytes(CharsetUtil.UTF_8); String userName = "userName"; byte[] password = null; - MqttConnectPayload mqttConnectPayload = - new MqttConnectPayload(clientIdentifier, willTopic, willMessage, userName, password); + MqttConnectPayload mqttConnectPayload = new MqttConnectPayload(clientIdentifier, + MqttProperties.NO_PROPERTIES, + willTopic, + willMessage, + userName, + password); assertNull(mqttConnectPayload.passwordInBytes()); assertNull(mqttConnectPayload.password()); diff --git a/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClientHandler.java b/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClientHandler.java index d04ecf1084..36268c7e8f 100644 --- a/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClientHandler.java +++ b/example/src/main/java/io/netty/example/mqtt/heartBeat/MqttHeartBeatClientHandler.java @@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttQoS; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; @@ -54,8 +55,13 @@ public class MqttHeartBeatClientHandler extends ChannelInboundHandlerAdapter { new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttConnectVariableHeader connectVariableHeader = new MqttConnectVariableHeader(PROTOCOL_NAME_MQTT_3_1_1, PROTOCOL_VERSION_MQTT_3_1_1, true, true, false, - 0, false, false, 20); - MqttConnectPayload connectPayload = new MqttConnectPayload(clientId, null, null, userName, password); + 0, false, false, 20, MqttProperties.NO_PROPERTIES); + MqttConnectPayload connectPayload = new MqttConnectPayload(clientId, + MqttProperties.NO_PROPERTIES, + null, + null, + userName, + password); MqttConnectMessage connectMessage = new MqttConnectMessage(connectFixedHeader, connectVariableHeader, connectPayload); ctx.writeAndFlush(connectMessage);