import sys import typing from collections import OrderedDict from code_writer import CodeWriter natives = ["int", "long", "boolean", "String", "double", "byte[]", "byte"] def deserialize_tdapi(output: CodeWriter, arg_name: str, arg_type: str, cont, classes, null_check: bool = True): if null_check: output.indent() output.open_if("input.readBoolean()") output.newline() output.indent() if arg_type in cont: output.open_switch("input.readInt()") output.newline() for class_name, class_meta in classes.items(): if class_meta[1] == arg_type and class_name != arg_type: output.indent() output.open_switch_case(f"{class_name}.CONSTRUCTOR") output.newline() output.indent() output.class_assign(arg_name, f"new {class_name}(input)") output.newline() output.indent() output.switch_break() output.newline() output.indent() output.open_switch_default() output.newline() output.indent() output.exception("UnsupportedOperationException") output.newline() output.indent_depth -= 1 output.close_block(space=True) else: output.open_if(f"{arg_type}.CONSTRUCTOR != input.readInt()") output.newline() output.indent() output.exception("UnsupportedOperationException") output.newline() output.close_block(space=True) output.indent() output.class_assign(arg_name, f"new {arg_type}(input)") output.newline() if null_check: output.close_block(space=True) def deserialize_native(output: CodeWriter, arg_name, arg_type, null_check: bool = True): if arg_type == "int": output.indent() output.class_assign(arg_name, "input.readInt()") output.newline() if arg_type == "byte[]" or arg_type == "byte": if null_check: output.indent() output.open_if("input.readBoolean()") output.newline() output.indent() output.class_assign(arg_name, f"new byte[input.readInt()]") output.newline() output.indent() output.call("input.readFully", f"this.{arg_name}") output.newline() if null_check: output.close_block(space=True) if arg_type == "long": output.indent() output.class_assign(arg_name, "input.readLong()") output.newline() if arg_type == "double": output.indent() output.class_assign(arg_name, "input.readDouble()") output.newline() if arg_type == "boolean": output.indent() output.class_assign(arg_name, "input.readBoolean()") output.newline() if arg_type == "String": if null_check: output.indent() output.open_if("input.readBoolean()") output.newline() tmp_name = arg_name.split("[")[0] + "Tmp" output.indent() output.local_assign(tmp_name, f"new byte[input.readInt()]") output.newline() output.indent() output.call("input.readFully", tmp_name) output.newline() output.indent() output.class_assign(arg_name, f"new String({tmp_name}, StandardCharsets.UTF_8)") output.newline() if null_check: output.close_block(space=True) def serialize_tdapi(output: CodeWriter, arg_name, null_check: bool = True): if null_check: output.indent() output.open_if(f"this.{arg_name} == null") output.newline() output.indent() output.call("output.writeBoolean", "false") output.newline() output.open_if_else(space=True) output.newline() output.indent() output.call("output.writeBoolean", "true") output.newline() output.indent() output.call(f"this.{arg_name}.serialize", "output") output.newline() if null_check: output.close_block(space=True) def serialize_native(output: CodeWriter, arg_type: str, arg_name: str, null_check: bool = True): if arg_type == "int": output.indent() output.call("output.writeInt", f"this.{arg_name}") output.newline() if arg_type == "byte[]" or arg_type == "byte": if null_check: output.indent() output.open_if(f"this.{arg_name} == null") output.newline() output.indent() output.call("output.writeBoolean", "false") output.newline() output.open_if_else(space=True) output.newline() output.indent() output.call("output.writeBoolean", "true") output.newline() output.indent() output.call("output.writeInt", f"this.{arg_name}.length") output.newline() output.indent() output.call("output.write", f"this.{arg_name}") output.newline() if null_check: output.close_block(space=True) if arg_type == "long": output.indent() output.call("output.writeLong", f"this.{arg_name}") output.newline() if arg_type == "double": output.indent() output.call("output.writeDouble", f"this.{arg_name}") output.newline() if arg_type == "boolean": output.indent() output.call("output.writeBoolean", f"this.{arg_name}") output.newline() if arg_type == "String": if null_check: output.indent() output.open_if(f"this.{arg_name} == null") output.newline() output.indent() output.call("output.writeBoolean", "false") output.newline() output.open_if_else(space=True) output.newline() output.indent() output.call("output.writeBoolean", "true") output.newline() output.indent() tmp_name = arg_name.split("[")[0] + "Tmp" output.local_assign(tmp_name, f"this.{arg_name}.getBytes(StandardCharsets.UTF_8)") output.newline() output.indent() output.call("output.writeInt", f"{tmp_name}.length") output.newline() output.indent() output.call("output.write", tmp_name) output.newline() if null_check: output.close_block(space=True) def main(input_path: str, output_path: str, headers_path: str): data_input = open(input_path) package: typing.Optional[str] = None current_constructor: typing.Optional[int] = None current_class_name: typing.Optional[str] = None inside_abstract_class: bool = False inside_object_class: bool = False inside_function_class: bool = False inside_object_container_class: bool = False container_class_name: typing.Optional[str] = None function_depth: int = 0 function_classes = OrderedDict() # key: {class_name, value: (constructor_id, container_name, [arg_type, arg_name])} object_classes = OrderedDict() # key: {class_name, value: (constructor_id, container_name, [arg_type, arg_name])} container_classes: typing.List[str] = [] # [class_name, ...] current_arguments: typing.Optional[typing.List[typing.Tuple[str, str]]] = None # [(arg_name, arg_type), ...] for line in data_input.readlines(): line = line.strip() keywords = line.split() if not keywords: continue if (inside_object_class or inside_function_class) and keywords[-1] == "{": function_depth += 1 continue if (inside_object_class or inside_function_class) and keywords[-1] == "}" and function_depth > 0: function_depth -= 1 continue if function_depth > 0: continue if inside_object_container_class and keywords[-1] == "}": inside_object_container_class = False container_classes.append(current_class_name) current_class_name = None continue if inside_abstract_class and keywords[-1] == "}": inside_abstract_class = False continue if inside_object_class and keywords[-1] == "}": inside_object_class = False object_classes[current_class_name] = (current_constructor, container_class_name, current_arguments) container_class_name = None current_arguments = None current_class_name = None current_constructor = None continue if inside_function_class and keywords[0] == "}": inside_function_class = False function_classes[current_class_name] = (current_constructor, container_class_name, current_arguments) current_arguments = None container_class_name = None current_class_name = None current_constructor = None continue if inside_function_class or inside_object_class: if len(keywords) == 3 and keywords[-1].endswith(";"): current_arguments.append((keywords[1], keywords[2][:-1])) continue if len(keywords) == 7 and keywords[4] == "CONSTRUCTOR": current_constructor = int(keywords[6][:-1]) continue if keywords[0] == "package": package = line continue if len(keywords) == 8 and keywords[1] == "abstract": inside_object_container_class = True current_class_name = keywords[4] continue if len(keywords) == 4 and keywords[2] == "TdApi": continue if len(keywords) == 6 and keywords[1] == "abstract": inside_abstract_class = True continue if len(keywords) == 7 and keywords[2] == "class" and keywords[5] == "Object": current_class_name = keywords[-4] current_arguments = [] inside_object_class = True container_class_name = keywords[5] continue if len(keywords) == 7 and keywords[2] == "class" and keywords[5] == "Function": current_class_name = keywords[-4] current_arguments = [] inside_function_class = True container_class_name = keywords[5] continue if len(keywords) == 7 and keywords[2] == "class" and keywords[5] in container_classes: current_class_name = keywords[-4] current_arguments = [] inside_object_class = True container_class_name = keywords[5] continue data_input.close() data_output = open(output_path, "w") data_output.write(package + "\n\n") data_output.write(open(headers_path).read()) container_classes.remove("Function") output = CodeWriter(data_output, 1) output.indent() output.open_custom_block(f"public static class Deserializer") output.newline() output.indent() output.open_function("deserialize", [("DataInput", "input")], "static Object", "IOException") output.newline() output.indent() output.open_switch("input.readInt()") output.newline() for classes in (object_classes, function_classes): for class_name in classes.keys(): output.indent() output.open_switch_case(f"{class_name}.CONSTRUCTOR") output.newline() output.indent() output.ret(f"new {class_name}(input)") output.newline() output.indent_depth -= 1 output.indent() output.open_switch_default() output.newline() output.indent() output.exception("UnsupportedOperationException") output.newline() output.indent_depth -= 1 output.close_block(space=True) output.close_block(space=True) output.close_block(space=True) output.newline() for container_class in container_classes: output.indent() output.open_custom_block(f"public abstract static class", container_class, "extends Object") output.close_block() output.newline() for classes in (object_classes, function_classes): for class_name, class_meta in classes.items(): output.indent() output.open_custom_block("public static class", class_name, "extends", class_meta[1]) output.newline() for arg_type, arg_name in class_meta[2]: output.indent() output.declare(arg_name, arg_type, "public") output.newline() if class_meta[2]: output.newline() output.indent() output.declare("CONSTRUCTOR", "int", "public static final", value=str(class_meta[0])) output.newline() output.newline() output.indent() output.open_constructor_function(class_name, []) output.close_block() output.newline() if class_meta[2]: output.indent() output.open_constructor_function(class_name, class_meta[2]) output.newline() for arg_type, arg_name in class_meta[2]: output.indent() output.class_assign(arg_name, arg_name) output.newline() output.close_block(space=True) output.newline() output.indent() output.open_constructor_function(class_name, [("DataInput", "input")], "IOException") output.newline() for arg_type, arg_name in class_meta[2]: if arg_type in natives: deserialize_native(output, arg_name, arg_type) elif not arg_type.endswith("[]"): deserialize_tdapi(output, arg_name, arg_type, container_classes, object_classes) elif arg_type == "byte[][]" or not arg_type.endswith("[][]"): output.indent() output.open_if("input.readBoolean()") output.newline() output.indent() if arg_type == "byte[][]": output.class_assign(arg_name, f"new byte[input.readInt()][]") else: output.class_assign(arg_name, f"new {arg_type[:-2]}[input.readInt()]") output.newline() output.indent() output.open_for("int i = 0", f"i < this.{arg_name}.length", "i++") output.newline() if arg_type[:-2] in natives: deserialize_native(output, f"{arg_name}[i]", arg_type[:-2], null_check=False) else: deserialize_tdapi(output, f"{arg_name}[i]", arg_type[:-2], container_classes, object_classes, null_check=False) output.close_block(space=True) output.close_block(space=True) elif arg_type.endswith("[][]"): output.indent() output.open_if("input.readBoolean()") output.newline() output.indent() output.class_assign(arg_name, f"new {arg_type[:-4]}[input.readInt()][]") output.newline() output.indent() output.open_for("int i = 0", f"i < this.{arg_name}.length", "i++") output.newline() output.indent() output.class_assign(f"{arg_name}[i]", f"new {arg_type[:-4]}[input.readInt()]") output.newline() output.indent() output.open_for("int j = 0", f"j < this.{arg_name}[i].length", "j++") output.newline() if arg_type[:-4] in natives: deserialize_native(output, f"{arg_name}[i][j]", arg_type[:-4], null_check=False) else: deserialize_tdapi(output, f"{arg_name}[i][j]", arg_type[:-4], container_classes, object_classes, null_check=False) output.close_block(space=True) output.close_block(space=True) output.close_block(space=True) output.close_block(space=True) output.newline() output.indent() output.open_function("getConstructor", [], "int") output.newline() output.indent() output.ret("CONSTRUCTOR") output.newline() output.close_block(space=True) output.newline() output.indent() output.open_function("serialize", [("DataOutputStream", "output")], "void", "IOException") output.newline() output.indent() output.call("output.writeInt", f"CONSTRUCTOR") output.newline() for arg_type, arg_name in class_meta[2]: if arg_type in natives: serialize_native(output, arg_type, arg_name) elif not arg_type.endswith("[]"): serialize_tdapi(output, arg_name) elif arg_type == "byte[][]" or not arg_type.endswith("[][]"): output.indent() output.open_if(f"this.{arg_name} == null") output.newline() output.indent() output.call("output.writeBoolean", "false") output.newline() output.open_if_else(space=True) output.newline() output.indent() output.call("output.writeBoolean", "true") output.newline() output.indent() output.call("output.writeInt", f"this.{arg_name}.length") output.newline() output.indent() output.open_for("int i = 0", f"i < this.{arg_name}.length", "i++") output.newline() if arg_type[:-2] in natives: serialize_native(output, arg_type[:-2], f"{arg_name}[i]", null_check=False) else: serialize_tdapi(output, f"{arg_name}[i]", null_check=False) output.close_block(space=True) output.close_block(space=True) elif arg_type.endswith("[][]"): output.indent() output.open_if(f"this.{arg_name} == null") output.newline() output.indent() output.call("output.writeBoolean", "false") output.newline() output.open_if_else(space=True) output.newline() output.indent() output.call("output.writeBoolean", "true") output.newline() output.indent() output.call("output.writeInt", f"this.{arg_name}.length") output.newline() output.indent() output.open_for("int i = 0", f"i < this.{arg_name}.length", "i++") output.newline() output.indent() output.call("output.writeInt", f"this.{arg_name}[i].length") output.newline() output.indent() output.open_for("int j = 0", f"j < this.{arg_name}[i].length", "j++") output.newline() if arg_type[:-4] in natives: serialize_native(output, arg_type[:-4], f"{arg_name}[i][j]", null_check=False) else: serialize_tdapi(output, f"{arg_name}[i][j]", null_check=False) output.close_block(space=True) output.close_block(space=True) output.close_block(space=True) output.close_block(space=True) output.close_block(space=True) output.newline() data_output.seek(data_output.tell() - 1) data_output.write("}") if __name__ == '__main__': main(sys.argv[-3], sys.argv[-2], sys.argv[-1]