crypt module refactor. need to be tested in py2
This commit is contained in:
parent
31313ceabf
commit
e8bab0987c
59
crypt.py
Normal file
59
crypt.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Author: Sammy Pfeiffer
|
||||||
|
# Author: Anton Grigoryev
|
||||||
|
# This file implements the AES 256 IGE cipher
|
||||||
|
# working in Python 2.7 and Python 3.4 (other versions untested)
|
||||||
|
# as it's needed for the implementation of Telegram API
|
||||||
|
# It's based on PyCryto
|
||||||
|
|
||||||
|
from __future__ import print_function
|
||||||
|
from Crypto.Util.strxor import strxor
|
||||||
|
from Crypto.Cipher import AES
|
||||||
|
|
||||||
|
def ige(message, key, iv, operation="decrypt"):
|
||||||
|
"""Given a key, given an iv, and message
|
||||||
|
do whatever operation asked in the operation field.
|
||||||
|
Operation will be checked for: "decrypt" and "encrypt" strings.
|
||||||
|
Returns the message encrypted/decrypted.
|
||||||
|
message must be a multiple by 16 bytes (for division in 16 byte blocks)
|
||||||
|
key must be 32 byte
|
||||||
|
iv must be 32 byte (it's not internally used in AES 256 ECB, but it's
|
||||||
|
needed for IGE)"""
|
||||||
|
|
||||||
|
if len(key) != 32:
|
||||||
|
raise ValueError("key must be 32 bytes long (was " +
|
||||||
|
str(len(key)) + " bytes)")
|
||||||
|
if len(iv) != 32:
|
||||||
|
raise ValueError("iv must be 32 bytes long (was " +
|
||||||
|
str(len(iv)) + " bytes)")
|
||||||
|
|
||||||
|
cipher = AES.new(key, AES.MODE_ECB, iv)
|
||||||
|
blocksize = cipher.block_size
|
||||||
|
if len(message) % blocksize != 0:
|
||||||
|
raise ValueError("message must be a multiple of 16 bytes (try adding " +
|
||||||
|
str(16 - len(message) % 16) + " bytes of padding)")
|
||||||
|
|
||||||
|
ivp = iv[0:blocksize]
|
||||||
|
ivp2 = iv[blocksize:]
|
||||||
|
|
||||||
|
ciphered = bytearray()
|
||||||
|
|
||||||
|
for i in range(0, len(message), blocksize):
|
||||||
|
indata = message[i:i+blocksize]
|
||||||
|
if operation == "decrypt":
|
||||||
|
xored = strxor(indata, ivp2)
|
||||||
|
decrypt_xored = cipher.decrypt(xored)
|
||||||
|
outdata = strxor(decrypt_xored, ivp)
|
||||||
|
ivp = indata
|
||||||
|
ivp2 = outdata
|
||||||
|
elif operation == "encrypt":
|
||||||
|
xored = strxor(indata, ivp)
|
||||||
|
encrypt_xored = cipher.encrypt(xored)
|
||||||
|
outdata = strxor(encrypt_xored, ivp2)
|
||||||
|
ivp = outdata
|
||||||
|
ivp2 = indata
|
||||||
|
else:
|
||||||
|
raise ValueError("operation must be either 'decrypt' or 'encrypt'")
|
||||||
|
|
||||||
|
ciphered.extend(outdata)
|
||||||
|
return ciphered
|
14
testing.py
14
testing.py
@ -1,10 +1,7 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import mtproto
|
|
||||||
import os
|
import os
|
||||||
import io
|
import io
|
||||||
import prime
|
|
||||||
import struct
|
import struct
|
||||||
from Crypto.Cipher import AES
|
|
||||||
# Deal with py2 and py3 differences
|
# Deal with py2 and py3 differences
|
||||||
try:
|
try:
|
||||||
import configparser
|
import configparser
|
||||||
@ -12,7 +9,12 @@ except ImportError:
|
|||||||
import ConfigParser as configparser
|
import ConfigParser as configparser
|
||||||
from Crypto.Hash import SHA
|
from Crypto.Hash import SHA
|
||||||
from Crypto.PublicKey import RSA
|
from Crypto.PublicKey import RSA
|
||||||
from Crypto.Cipher import AES
|
|
||||||
|
# local modules
|
||||||
|
import crypt
|
||||||
|
import mtproto
|
||||||
|
import prime
|
||||||
|
|
||||||
|
|
||||||
config = configparser.ConfigParser()
|
config = configparser.ConfigParser()
|
||||||
# Check if credentials is correctly loaded (when it doesn't read anything it returns [])
|
# Check if credentials is correctly loaded (when it doesn't read anything it returns [])
|
||||||
@ -89,9 +91,7 @@ print("\ntmp_aes_iv:")
|
|||||||
mtproto.vis(tmp_aes_iv)
|
mtproto.vis(tmp_aes_iv)
|
||||||
print(tmp_aes_iv.__repr__())
|
print(tmp_aes_iv.__repr__())
|
||||||
|
|
||||||
|
decrypted_answer = crypt.ige(encrypted_answer, tmp_aes_key, tmp_aes_iv)
|
||||||
from ige import ige
|
|
||||||
decrypted_answer = ige(encrypted_answer, tmp_aes_key, tmp_aes_iv)
|
|
||||||
print("decrypted_answer is:")
|
print("decrypted_answer is:")
|
||||||
print(decrypted_answer.__repr__())
|
print(decrypted_answer.__repr__())
|
||||||
mtproto.vis(decrypted_answer[20:]) # To start off BA0D89 ...
|
mtproto.vis(decrypted_answer[20:]) # To start off BA0D89 ...
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
# This file tests the AES 256 IGE cipher
|
# This file tests the AES 256 IGE cipher
|
||||||
# working in Python 2.7 and Python 3.4 (other versions untested)
|
# working in Python 2.7 and Python 3.4 (other versions untested)
|
||||||
# as it's needed for the implementation of Telegram API
|
# as it's needed for the implementation of Telegram API
|
||||||
from ige import ige
|
from crypt import ige
|
||||||
|
|
||||||
# AES 256 IGE is using AES ECB internally, it implies (extract from PyCrypto.cipher.AES):
|
# AES 256 IGE is using AES ECB internally, it implies (extract from PyCrypto.cipher.AES):
|
||||||
# key : byte string
|
# key : byte string
|
@ -1,13 +1,3 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
# Author: Sammy Pfeiffer
|
|
||||||
# This file implements the AES 256 IGE cipher
|
|
||||||
# working in Python 2.7 and Python 3.4 (other versions untested)
|
|
||||||
# as it's needed for the implementation of Telegram API
|
|
||||||
# It's based on PyCryto
|
|
||||||
from __future__ import print_function
|
|
||||||
from Crypto.Util import number
|
|
||||||
from Crypto.Cipher import AES
|
|
||||||
from sys import version_info
|
|
||||||
if version_info >= (3, 4, 0):
|
if version_info >= (3, 4, 0):
|
||||||
from binascii import hexlify
|
from binascii import hexlify
|
||||||
long = int
|
long = int
|
||||||
@ -41,85 +31,6 @@ def hex_string_to_long(val):
|
|||||||
Convert it to int, which is actually long"""
|
Convert it to int, which is actually long"""
|
||||||
return int(val, 16)
|
return int(val, 16)
|
||||||
|
|
||||||
def xor_stuff(a, b):
|
|
||||||
"""XOR applied to every element of a with every element of b.
|
|
||||||
Depending on python version and depeding on input some arrangements need to be done."""
|
|
||||||
if version_info < (3, 4, 0):
|
|
||||||
if len(a) > len(b):
|
|
||||||
return "".join([chr(ord(x) ^ ord(y)) for (x, y) in zip(a[:len(b)], b)])
|
|
||||||
else:
|
|
||||||
return "".join([chr(ord(x) ^ ord(y)) for (x, y) in zip(a, b[:len(a)])])
|
|
||||||
else:
|
|
||||||
if type(a) == str and type(b) == bytes:# cipher.encrypt returns string
|
|
||||||
return bytes(ord(x) ^ y for x, y in zip(a, b))
|
|
||||||
elif type(a) == bytes and type(b) == str:
|
|
||||||
return bytes(x ^ ord(y) for x, y in zip(a, b))
|
|
||||||
else:
|
|
||||||
return bytes(x ^ y for x, y in zip(a, b))
|
|
||||||
|
|
||||||
def ige(message, key, iv, operation="decrypt"):
|
|
||||||
"""Given a key, given an iv, and message
|
|
||||||
do whatever operation asked in the operation field.
|
|
||||||
Operation will be checked for: "decrypt" and "encrypt" strings.
|
|
||||||
Returns the message encrypted/decrypted.
|
|
||||||
message must be a multiple by 16 bytes (for division in 16 byte blocks)
|
|
||||||
key must be 32 byte
|
|
||||||
iv must be 32 byte (it's not internally used in AES 256 ECB, but it's
|
|
||||||
needed for IGE)"""
|
|
||||||
if type(message) == long:
|
|
||||||
message = number.long_to_bytes(message)
|
|
||||||
if type(key) == long:
|
|
||||||
key = number.long_to_bytes(key)
|
|
||||||
if type(iv) == long:
|
|
||||||
iv = number.long_to_bytes(iv)
|
|
||||||
|
|
||||||
if len(key) != 32:
|
|
||||||
raise ValueError("key must be 32 bytes long (was " +
|
|
||||||
str(len(key)) + " bytes)")
|
|
||||||
if len(iv) != 32:
|
|
||||||
raise ValueError("iv must be 32 bytes long (was " +
|
|
||||||
str(len(iv)) + " bytes)")
|
|
||||||
|
|
||||||
cipher = AES.new(key, AES.MODE_ECB, iv)
|
|
||||||
blocksize = cipher.block_size
|
|
||||||
if len(message) % blocksize != 0:
|
|
||||||
raise ValueError("message must be a multiple of 16 bytes (try adding " +
|
|
||||||
str(16 - len(message) % 16) + " bytes of padding)")
|
|
||||||
|
|
||||||
ivp = iv[0:blocksize]
|
|
||||||
ivp2 = iv[blocksize:]
|
|
||||||
|
|
||||||
ciphered = None
|
|
||||||
|
|
||||||
for i in range(0, len(message), blocksize):
|
|
||||||
indata = message[i:i+blocksize]
|
|
||||||
if operation == "decrypt":
|
|
||||||
xored = xor_stuff(indata, ivp2)
|
|
||||||
decrypt_xored = cipher.decrypt(xored)
|
|
||||||
outdata = xor_stuff(decrypt_xored, ivp)
|
|
||||||
ivp = indata
|
|
||||||
ivp2 = outdata
|
|
||||||
elif operation == "encrypt":
|
|
||||||
xored = xor_stuff(indata, ivp)
|
|
||||||
encrypt_xored = cipher.encrypt(xored)
|
|
||||||
outdata = xor_stuff(encrypt_xored, ivp2)
|
|
||||||
ivp = outdata
|
|
||||||
ivp2 = indata
|
|
||||||
else:
|
|
||||||
raise ValueError("operation must be either 'decrypt' or 'encrypt'")
|
|
||||||
|
|
||||||
if ciphered is None:
|
|
||||||
ciphered = outdata
|
|
||||||
else:
|
|
||||||
ciphered_ba = bytearray(ciphered)
|
|
||||||
ciphered_ba.extend(outdata)
|
|
||||||
if version_info >= (3, 4, 0):
|
|
||||||
ciphered = bytes(ciphered_ba)
|
|
||||||
else:
|
|
||||||
ciphered = str(ciphered_ba)
|
|
||||||
|
|
||||||
return ciphered
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
# Example data from https://core.telegram.org/mtproto/samples-auth_key#conversion-of-encrypted-answer-into-answer
|
# Example data from https://core.telegram.org/mtproto/samples-auth_key#conversion-of-encrypted-answer-into-answer
|
Loading…
x
Reference in New Issue
Block a user