This commit is contained in:
Daniil Gentili 2017-03-24 21:13:23 +01:00
commit 7971237fe1
18 changed files with 152 additions and 91 deletions

View File

@ -36,6 +36,7 @@ class API extends APIFactory
$this->API->v = $this->API->getV(); $this->API->v = $this->API->getV();
\danog\MadelineProto\Logger::log(['MadelineProto is ready!'], Logger::NOTICE); \danog\MadelineProto\Logger::log(['MadelineProto is ready!'], Logger::NOTICE);
} }
/* /*
public function __sleep() public function __sleep()
{ {

View File

@ -105,7 +105,8 @@ class Connection
case 'https': case 'https':
try { try {
fclose($this->sock); fclose($this->sock);
} catch (\danog\MadelineProto\Exception $e) { ; } } catch (\danog\MadelineProto\Exception $e) {
}
break; break;
case 'udp': case 'udp':
throw new Exception("Connection: This protocol wasn't implemented yet."); throw new Exception("Connection: This protocol wasn't implemented yet.");

View File

@ -24,9 +24,11 @@ class DataCenter
public $dclist = []; public $dclist = [];
public $settings = []; public $settings = [];
public function __sleep() { public function __sleep()
{
return ['sockets', 'curdc', 'dclist', 'settings']; return ['sockets', 'curdc', 'dclist', 'settings'];
} }
public function __construct(&$dclist, &$settings) public function __construct(&$dclist, &$settings)
{ {
$this->dclist = &$dclist; $this->dclist = &$dclist;
@ -71,13 +73,18 @@ class DataCenter
\danog\MadelineProto\Logger::log(['Connecting to DC '.$dc_number.' ('.$test.' server, '.$ipv6.', '.$this->settings[$dc_number]['protocol'].')...'], \danog\MadelineProto\Logger::VERBOSE); \danog\MadelineProto\Logger::log(['Connecting to DC '.$dc_number.' ('.$test.' server, '.$ipv6.', '.$this->settings[$dc_number]['protocol'].')...'], \danog\MadelineProto\Logger::VERBOSE);
$this->sockets[$dc_number] = new Connection($address, $port, $this->settings[$dc_number]['protocol'], $this->settings[$dc_number]['timeout']); $this->sockets[$dc_number] = new Connection($address, $port, $this->settings[$dc_number]['protocol'], $this->settings[$dc_number]['timeout']);
return true; return true;
} }
public function get_dcs() {
public function get_dcs()
{
$test = $this->settings[2]['test_mode'] ? 'test' : 'main'; $test = $this->settings[2]['test_mode'] ? 'test' : 'main';
$ipv6 = $this->settings[2]['ipv6'] ? 'ipv6' : 'ipv4'; $ipv6 = $this->settings[2]['ipv6'] ? 'ipv6' : 'ipv4';
return array_keys($this->dclist[$test][$ipv6]); return array_keys($this->dclist[$test][$ipv6]);
} }
public function &__get($name) public function &__get($name)
{ {
return $this->sockets[$this->curdc]->{$name}; return $this->sockets[$this->curdc]->{$name};

View File

@ -14,13 +14,14 @@ namespace danog\MadelineProto;
class Exception extends \Exception class Exception extends \Exception
{ {
public function __construct($message = null, $code = 0, Exception $previous = null) { public function __construct($message = null, $code = 0, Exception $previous = null)
{
parent::__construct($message, $code, $previous); parent::__construct($message, $code, $previous);
if (\danog\MadelineProto\Logger::$constructed && $this->file !== __FILE__) { if (\danog\MadelineProto\Logger::$constructed && $this->file !== __FILE__) {
\danog\MadelineProto\Logger::log([$message.' in '.basename($this->file).':'.$this->line], \danog\MadelineProto\Logger::FATAL_ERROR); \danog\MadelineProto\Logger::log([$message.' in '.basename($this->file).':'.$this->line], \danog\MadelineProto\Logger::FATAL_ERROR);
} }
} }
/** /**
* ExceptionErrorHandler. * ExceptionErrorHandler.
* *

View File

@ -29,8 +29,6 @@ class Logger
const ERROR = 1; const ERROR = 1;
const FATAL_ERROR = 0; const FATAL_ERROR = 0;
/* /*
* Constructor function * Constructor function
* Accepts various logger modes: * Accepts various logger modes:
@ -53,7 +51,6 @@ class Logger
public static function log($params, $level = self::NOTICE) public static function log($params, $level = self::NOTICE)
{ {
if (!self::$constructed) { if (!self::$constructed) {
throw new Exception("The constructor function wasn't called! Please call the constructor function before using this method."); throw new Exception("The constructor function wasn't called! Please call the constructor function before using this method.");
} }

View File

@ -100,20 +100,25 @@ class MTProto
$this->v = $this->getV(); $this->v = $this->getV();
$this->should_serialize = true; $this->should_serialize = true;
} }
public function setup_threads() {
if ($this->threads = $this->run_workers = class_exists('\Pool') && php_sapi_name() == "cli" && $this->settings['threading']['allow_threading']) { public function setup_threads()
{
if ($this->threads = $this->run_workers = class_exists('\Pool') && php_sapi_name() == 'cli' && $this->settings['threading']['allow_threading']) {
\danog\MadelineProto\Logger::log(['THREADING IS ENABLED'], \danog\MadelineProto\Logger::NOTICE); \danog\MadelineProto\Logger::log(['THREADING IS ENABLED'], \danog\MadelineProto\Logger::NOTICE);
$this->start_threads(); $this->start_threads();
} }
} }
public function start_threads() {
public function start_threads()
{
if ($this->threads) { if ($this->threads) {
$dcs = $this->datacenter->get_dcs(); $dcs = $this->datacenter->get_dcs();
if (!isset($this->reader_pool)) $this->reader_pool = new \Pool(count($dcs)); if (!isset($this->reader_pool)) {
$this->reader_pool = new \Pool(count($dcs));
}
foreach ($dcs as $dc) { foreach ($dcs as $dc) {
if (!isset($this->readers[$dc])) { if (!isset($this->readers[$dc])) {
$this->readers [$dc] = new \danog\MadelineProto\Threads\SocketReader($this, $dc); $this->readers[$dc] = new \danog\MadelineProto\Threads\SocketReader($this, $dc);
} }
if (!$this->readers[$dc]->isRunning()) { if (!$this->readers[$dc]->isRunning()) {
$this->readers[$dc]->garbage = false; $this->readers[$dc]->garbage = false;
@ -125,14 +130,22 @@ class MTProto
} }
} }
} }
public function __sleep() {
public function __sleep()
{
$t = get_object_vars($this); $t = get_object_vars($this);
if (isset($t['reader_pool'])) unset($t['reader_pool']); if (isset($t['reader_pool'])) {
unset($t['reader_pool']);
}
return array_keys($t); return array_keys($t);
} }
public function __wakeup() public function __wakeup()
{ {
if (debug_backtrace()[0]['file'] === __DIR__.'/Threads/SocketReader.php' || (debug_backtrace()[0]['file'] === __FILE__ && debug_backtrace()[0]['line'] === 117)) return; if (debug_backtrace()[0]['file'] === __DIR__.'/Threads/SocketReader.php' || (debug_backtrace()[0]['file'] === __FILE__ && debug_backtrace()[0]['line'] === 117)) {
return;
}
$this->bigint = PHP_INT_SIZE < 8; $this->bigint = PHP_INT_SIZE < 8;
$this->setup_logger(); $this->setup_logger();
if (!isset($this->v) || $this->v !== $this->getV()) { if (!isset($this->v) || $this->v !== $this->getV()) {
@ -147,13 +160,16 @@ class MTProto
$this->get_updates_difference(); $this->get_updates_difference();
} }
} }
public function __destruct() {
public function __destruct()
{
if (isset($this->reader_pool)) { if (isset($this->reader_pool)) {
$this->run_workers = false; $this->run_workers = false;
\danog\MadelineProto\Logger::log(['Shutting down reader pool...'], Logger::NOTICE); \danog\MadelineProto\Logger::log(['Shutting down reader pool...'], Logger::NOTICE);
$this->reader_pool->shutdown(); $this->reader_pool->shutdown();
} }
} }
public function parse_settings($settings) public function parse_settings($settings)
{ {
// Detect ipv6 // Detect ipv6
@ -304,7 +320,7 @@ Slv8kg9qv1m6XHVQY3PnEw+QQtqSIXklHwIDAQAB
], ],
'threading' => [ 'threading' => [
'allow_threading' => false, // Should I use threading, if it is enabled? 'allow_threading' => false, // Should I use threading, if it is enabled?
'handler_workers' => 10 // How many workers should every message handler pool of each socket reader have 'handler_workers' => 10, // How many workers should every message handler pool of each socket reader have
], ],
'pwr' => ['pwr' => false, 'db_token' => false, 'strict' => false], 'pwr' => ['pwr' => false, 'db_token' => false, 'strict' => false],
]; ];
@ -333,7 +349,6 @@ Slv8kg9qv1m6XHVQY3PnEw+QQtqSIXklHwIDAQAB
$this->should_serialize = true; $this->should_serialize = true;
} }
public function setup_logger() public function setup_logger()
{ {
\danog\MadelineProto\Logger::constructor($this->settings['logger']['logger'], $this->settings['logger']['logger_param'], isset($this->authorization['user']) ? (isset($this->authorization['user']['username']) ? $this->authorization['user']['username'] : $this->authorization['user']['id']) : '', isset($this->settings['logger']['logger_level']) ? $this->settings['logger']['logger_level'] : Logger::VERBOSE); \danog\MadelineProto\Logger::constructor($this->settings['logger']['logger'], $this->settings['logger']['logger_param'], isset($this->authorization['user']) ? (isset($this->authorization['user']['username']) ? $this->authorization['user']['username'] : $this->authorization['user']['id']) : '', isset($this->settings['logger']['logger_level']) ? $this->settings['logger']['logger_level'] : Logger::VERBOSE);
@ -360,11 +375,15 @@ Slv8kg9qv1m6XHVQY3PnEw+QQtqSIXklHwIDAQAB
public function connect_to_all_dcs() public function connect_to_all_dcs()
{ {
foreach ($old = $this->datacenter->get_dcs() as $new_dc) { foreach ($old = $this->datacenter->get_dcs() as $new_dc) {
if (!isset($this->datacenter->sockets[$new_dc])) $this->datacenter->dc_connect($new_dc); if (!isset($this->datacenter->sockets[$new_dc])) {
$this->datacenter->dc_connect($new_dc);
}
} }
$this->setup_threads(); $this->setup_threads();
$this->init_authorization(); $this->init_authorization();
if ($old !== $this->datacenter->get_dcs()) $this->connect_to_all_dcs(); if ($old !== $this->datacenter->get_dcs()) {
$this->connect_to_all_dcs();
}
} }
// Creates authorization keys // Creates authorization keys
@ -393,9 +412,13 @@ Slv8kg9qv1m6XHVQY3PnEw+QQtqSIXklHwIDAQAB
} }
} }
} }
public function sync_authorization($authorized_dc) {
public function sync_authorization($authorized_dc)
{
foreach ($this->datacenter->sockets as $new_dc => &$socket) { foreach ($this->datacenter->sockets as $new_dc => &$socket) {
if ($new_dc === $authorized_dc) continue; if ($new_dc === $authorized_dc) {
continue;
}
\danog\MadelineProto\Logger::log(['Copying authorization from dc '.$authorized_dc.' to dc '.$new_dc.'...'], Logger::VERBOSE); \danog\MadelineProto\Logger::log(['Copying authorization from dc '.$authorized_dc.' to dc '.$new_dc.'...'], Logger::VERBOSE);
$this->should_serialize = true; $this->should_serialize = true;
$exported_authorization = $this->method_call('auth.exportAuthorization', ['dc_id' => $new_dc], ['datacenter' => $authorized_dc]); $exported_authorization = $this->method_call('auth.exportAuthorization', ['dc_id' => $new_dc], ['datacenter' => $authorized_dc]);

View File

@ -22,8 +22,9 @@ trait AckHandler
// The server acknowledges that it received my message // The server acknowledges that it received my message
if (!isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id])) { if (!isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id])) {
\danog\MadelineProto\Logger::log(["WARNING: Couldn't find message id ".$message_id.' in the array of outgoing messages. Maybe try to increase its size?'], \danog\MadelineProto\Logger::WARNING); \danog\MadelineProto\Logger::log(["WARNING: Couldn't find message id ".$message_id.' in the array of outgoing messages. Maybe try to increase its size?'], \danog\MadelineProto\Logger::WARNING);
var_dump($message_id); var_dump($message_id);
var_dump(debug_backtrace()[0]['file'], debug_backtrace()[0]['line']); var_dump(debug_backtrace()[0]['file'], debug_backtrace()[0]['line']);
return false; return false;
} }

View File

@ -68,10 +68,9 @@ trait AuthKeyHandler
if (!isset($this->key->keydata['fp'])) { if (!isset($this->key->keydata['fp'])) {
$this->key = new \danog\MadelineProto\RSA($this->settings['authorization']['rsa_key']); $this->key = new \danog\MadelineProto\RSA($this->settings['authorization']['rsa_key']);
} }
if (in_array($this->key->keydata['fp'], $ResPQ['server_public_key_fingerprints'])) throw new \danog\MadelineProto\SecurityException("Couldn't find our key in the server_public_key_fingerprints vector."); if (in_array($this->key->keydata['fp'], $ResPQ['server_public_key_fingerprints'])) {
throw new \danog\MadelineProto\SecurityException("Couldn't find our key in the server_public_key_fingerprints vector.");
}
$pq_bytes = $ResPQ['pq']; $pq_bytes = $ResPQ['pq'];
$server_nonce = $ResPQ['server_nonce']; $server_nonce = $ResPQ['server_nonce'];
@ -512,7 +511,6 @@ trait AuthKeyHandler
private $temp_requested_calls = []; private $temp_requested_calls = [];
private $calls = []; private $calls = [];
public function accept_call($params) public function accept_call($params)
{ {
$dh_config = $this->get_dh_config(); $dh_config = $this->get_dh_config();
@ -530,6 +528,7 @@ trait AuthKeyHandler
$this->check_G($g_b, $dh_config['p']); $this->check_G($g_b, $dh_config['p']);
$this->handle_pending_updates(); $this->handle_pending_updates();
} }
public function request_call($user) public function request_call($user)
{ {
$user = $this->get_info($user)['InputUser']; $user = $this->get_info($user)['InputUser'];
@ -549,8 +548,6 @@ trait AuthKeyHandler
return $res['phone_call']['id']; return $res['phone_call']['id'];
} }
public function complete_call($params) public function complete_call($params)
{ {
if ($this->call_status($params['id']) !== 1) { if ($this->call_status($params['id']) !== 1) {
@ -573,7 +570,6 @@ trait AuthKeyHandler
$this->handle_pending_updates(); $this->handle_pending_updates();
} }
public function call_status($id) public function call_status($id)
{ {
if (isset($this->calls[$id])) { if (isset($this->calls[$id])) {

View File

@ -25,7 +25,9 @@ trait CallHandler
if (!is_array($aargs)) { if (!is_array($aargs)) {
throw new \danog\MadelineProto\Exception("Additonal arguments aren't an array."); throw new \danog\MadelineProto\Exception("Additonal arguments aren't an array.");
} }
if (!isset($aargs['datacenter'])) throw new \danog\MadelineProto\Exception("No datacenter provided"); if (!isset($aargs['datacenter'])) {
throw new \danog\MadelineProto\Exception('No datacenter provided');
}
$args = $this->botAPI_to_MTProto($args); $args = $this->botAPI_to_MTProto($args);
$serialized = $this->serialize_method($method, $args); $serialized = $this->serialize_method($method, $args);
$content_related = $this->content_related($method); $content_related = $this->content_related($method);
@ -44,7 +46,7 @@ trait CallHandler
$server_answer = null; $server_answer = null;
$update_count = 0; $update_count = 0;
$only_updates = false; $only_updates = false;
while ($server_answer === null && $res_count++ < $this->settings['max_tries']['response']+1) { // Loop until we get a response, loop for a max of $this->settings['max_tries']['response'] times while ($server_answer === null && $res_count++ < $this->settings['max_tries']['response'] + 1) { // Loop until we get a response, loop for a max of $this->settings['max_tries']['response'] times
try { try {
\danog\MadelineProto\Logger::log(['Getting response (try number '.$res_count.' for '.$method.')...'], \danog\MadelineProto\Logger::ULTRA_VERBOSE); \danog\MadelineProto\Logger::log(['Getting response (try number '.$res_count.' for '.$method.')...'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->start_threads(); $this->start_threads();
@ -66,7 +68,6 @@ trait CallHandler
$this->recv_message($aargs['datacenter']); // This method receives data from the socket, and parses stuff $this->recv_message($aargs['datacenter']); // This method receives data from the socket, and parses stuff
$only_updates = $this->handle_messages($aargs['datacenter']); // This method receives data from the socket, and parses stuff $only_updates = $this->handle_messages($aargs['datacenter']); // This method receives data from the socket, and parses stuff
//} //}
} catch (\danog\MadelineProto\Exception $e) { } catch (\danog\MadelineProto\Exception $e) {
if ($e->getMessage() === 'I had to recreate the temporary authorization key') { if ($e->getMessage() === 'I had to recreate the temporary authorization key') {
continue 2; continue 2;
@ -97,7 +98,7 @@ trait CallHandler
case 16: case 16:
case 17: case 17:
\danog\MadelineProto\Logger::log(['Received bad_msg_notification: '.$this->bad_msg_error_codes[$server_answer['error_code']]], \danog\MadelineProto\Logger::WARNING); \danog\MadelineProto\Logger::log(['Received bad_msg_notification: '.$this->bad_msg_error_codes[$server_answer['error_code']]], \danog\MadelineProto\Logger::WARNING);
$this->datacenter->sockets[$aargs['datacenter']]->timedelta = (int)((new \phpseclib\Math\BigInteger(strrev($this->datacenter->sockets[$aargs['datacenter']]->outgoing_messages[$int_message_id]['response']), 256))->bitwise_rightShift(32)->subtract(new \phpseclib\Math\BigInteger(time()))->toString()); $this->datacenter->sockets[$aargs['datacenter']]->timedelta = (int) ((new \phpseclib\Math\BigInteger(strrev($this->datacenter->sockets[$aargs['datacenter']]->outgoing_messages[$int_message_id]['response']), 256))->bitwise_rightShift(32)->subtract(new \phpseclib\Math\BigInteger(time()))->toString());
\danog\MadelineProto\Logger::log(['Set time delta to '.$this->datacenter->sockets[$aargs['datacenter']]->timedelta], \danog\MadelineProto\Logger::WARNING); \danog\MadelineProto\Logger::log(['Set time delta to '.$this->datacenter->sockets[$aargs['datacenter']]->timedelta], \danog\MadelineProto\Logger::WARNING);
$this->reset_session(); $this->reset_session();
$this->datacenter->sockets[$aargs['datacenter']]->temp_auth_key = null; $this->datacenter->sockets[$aargs['datacenter']]->temp_auth_key = null;
@ -149,13 +150,16 @@ var_dump($this->datacenter->sockets[$aargs['datacenter']]->outgoing_messages);
if (!is_array($args)) { if (!is_array($args)) {
throw new \danog\MadelineProto\Exception("Arguments aren't an array."); throw new \danog\MadelineProto\Exception("Arguments aren't an array.");
} }
if (!isset($aargs['datacenter'])) throw new \danog\MadelineProto\Exception("No datacenter provided"); if (!isset($aargs['datacenter'])) {
throw new \danog\MadelineProto\Exception('No datacenter provided');
}
for ($count = 1; $count <= $this->settings['max_tries']['query']; $count++) { for ($count = 1; $count <= $this->settings['max_tries']['query']; $count++) {
try { try {
\danog\MadelineProto\Logger::log([$object === 'msgs_ack' ? 'ack '.$args['msg_ids'][0] : 'Sending object (try number '.$count.' for '.$object.')...'], \danog\MadelineProto\Logger::ULTRA_VERBOSE); \danog\MadelineProto\Logger::log([$object === 'msgs_ack' ? 'ack '.$args['msg_ids'][0] : 'Sending object (try number '.$count.' for '.$object.')...'], \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$int_message_id = $this->send_message($this->serialize_object(['type' => $object], $args), $this->content_related($object), $aargs); $int_message_id = $this->send_message($this->serialize_object(['type' => $object], $args), $this->content_related($object), $aargs);
if ($object !== 'msgs_ack') $this->datacenter->sockets[$aargs['datacenter']]->outgoing_messages[$int_message_id]['content'] = ['method' => $object, 'args' => $args]; if ($object !== 'msgs_ack') {
$this->datacenter->sockets[$aargs['datacenter']]->outgoing_messages[$int_message_id]['content'] = ['method' => $object, 'args' => $args];
}
} catch (Exception $e) { } catch (Exception $e) {
\danog\MadelineProto\Logger::log(['An error occurred while calling object '.$object.': '.$e->getMessage().' in '.$e->getFile().':'.$e->getLine().'. Recreating connection and retrying to call object...'], \danog\MadelineProto\Logger::WARNING); \danog\MadelineProto\Logger::log(['An error occurred while calling object '.$object.': '.$e->getMessage().' in '.$e->getFile().':'.$e->getLine().'. Recreating connection and retrying to call object...'], \danog\MadelineProto\Logger::WARNING);
$this->datacenter->sockets[$aargs['datacenter']]->close_and_reopen(); $this->datacenter->sockets[$aargs['datacenter']]->close_and_reopen();

View File

@ -90,7 +90,7 @@ trait MessageHandler
} }
$message_id = substr($decrypted_data, 16, 8); $message_id = substr($decrypted_data, 16, 8);
$this->check_message_id($message_id, ['outgoing' => false, 'datacenter' => $datacenter, 'container' => false]); $this->check_message_id($message_id, ['outgoing' => false, 'datacenter' => $datacenter, 'container' => false]);
$seq_no = \danog\PHP\Struct::unpack('<I', substr($decrypted_data, 24, 4))[0]; $seq_no = \danog\PHP\Struct::unpack('<I', substr($decrypted_data, 24, 4))[0];
// Dunno how to handle any incorrect sequence numbers // Dunno how to handle any incorrect sequence numbers

View File

@ -86,10 +86,17 @@ trait MsgIdHandler
return strrev($message_id->toBytes()); return strrev($message_id->toBytes());
} }
public function get_max_id($datacenter, $incoming) {
public function get_max_id($datacenter, $incoming)
{
$keys = array_keys($this->datacenter->sockets[$datacenter]->{$incoming ? 'incoming_messages' : 'outgoing_messages'}); $keys = array_keys($this->datacenter->sockets[$datacenter]->{$incoming ? 'incoming_messages' : 'outgoing_messages'});
if (empty($keys)) return $this->zero; if (empty($keys)) {
array_walk($keys, function (&$value, $key) { $value = is_integer($value) ? new \phpseclib\Math\BigInteger($value) : new \phpseclib\Math\BigInteger(strrev($value), 256); }); return $this->zero;
}
array_walk($keys, function (&$value, $key) {
$value = is_int($value) ? new \phpseclib\Math\BigInteger($value) : new \phpseclib\Math\BigInteger(strrev($value), 256);
});
return \phpseclib\Math\BigInteger::max(...$keys); return \phpseclib\Math\BigInteger::max(...$keys);
} }
} }

View File

@ -49,7 +49,6 @@ trait ResponseHandler
foreach ($msg_ids as $msg_id) { foreach ($msg_ids as $msg_id) {
$cur_info = 0; $cur_info = 0;
if (!in_array($msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages)) { if (!in_array($msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages)) {
$msg_id = new \phpseclib\Math\BigInteger(strrev($msg_id), 256); $msg_id = new \phpseclib\Math\BigInteger(strrev($msg_id), 256);
if ((new \phpseclib\Math\BigInteger(time() + $this->datacenter->sockets[$datacenter]->time_delta + 30))->bitwise_leftShift(32)->compare($msg_id) < 0) { if ((new \phpseclib\Math\BigInteger(time() + $this->datacenter->sockets[$datacenter]->time_delta + 30))->bitwise_leftShift(32)->compare($msg_id) < 0) {
$cur_info |= 3; $cur_info |= 3;
@ -135,7 +134,7 @@ trait ResponseHandler
case 'msg_container': case 'msg_container':
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['messages'] as $message) { foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['messages'] as $message) {
$this->check_message_id($message['msg_id'], ['outgoing' => false, 'datacenter' => $datacenter, 'container' => true]); $this->check_message_id($message['msg_id'], ['outgoing' => false, 'datacenter' => $datacenter, 'container' => true]);
$this->datacenter->sockets[$datacenter]->incoming_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'content' => $message['body']]; $this->datacenter->sockets[$datacenter]->incoming_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'content' => $message['body']];
$this->datacenter->sockets[$datacenter]->new_incoming[$message['msg_id']] = $message['msg_id']; $this->datacenter->sockets[$datacenter]->new_incoming[$message['msg_id']] = $message['msg_id'];
@ -148,7 +147,7 @@ trait ResponseHandler
if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id']])) { if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id']])) {
$this->ack_incoming_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id'], $datacenter); // Acknowledge that I received the server's response $this->ack_incoming_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id'], $datacenter); // Acknowledge that I received the server's response
} else { } else {
$this->check_message_id($message['orig_message']['msg_id'], ['outgoing' => false, 'datacenter' => $datacenter, 'container' => true]); $this->check_message_id($message['orig_message']['msg_id'], ['outgoing' => false, 'datacenter' => $datacenter, 'container' => true]);
$this->datacenter->sockets[$datacenter]->incoming_messages[$message['orig_message']['msg_id']] = ['content' => $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']]; $this->datacenter->sockets[$datacenter]->incoming_messages[$message['orig_message']['msg_id']] = ['content' => $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']];
$this->datacenter->sockets[$datacenter]->new_incoming[$message['orig_message']['msg_id']] = $message['orig_message']['msg_id']; $this->datacenter->sockets[$datacenter]->new_incoming[$message['orig_message']['msg_id']] = $message['orig_message']['msg_id'];
@ -287,15 +286,16 @@ trait ResponseHandler
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]); unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]);
} }
} }
return $only_updates; return $only_updates;
} }
public function handle_rpc_error($server_answer, &$datacenter) {
switch ($server_answer['error_code']) { public function handle_rpc_error($server_answer, &$datacenter)
{
switch ($server_answer['error_code']) {
case 303: case 303:
$this->datacenter->curdc = $datacenter = preg_replace('/[^0-9]+/', '', $server_answer['error_message']); $this->datacenter->curdc = $datacenter = preg_replace('/[^0-9]+/', '', $server_answer['error_message']);
throw new \danog\MadelineProto\Exception('Received request to switch to DC '.$this->datacenter->curdc); throw new \danog\MadelineProto\Exception('Received request to switch to DC '.$this->datacenter->curdc);
case 401: case 401:
switch ($server_answer['error_message']) { switch ($server_answer['error_message']) {
case 'USER_DEACTIVATED': case 'USER_DEACTIVATED':
@ -326,6 +326,7 @@ trait ResponseHandler
break; break;
} }
} }
public function handle_pending_updates() public function handle_pending_updates()
{ {
\danog\MadelineProto\Logger::log(['Parsing pending updates...'], \danog\MadelineProto\Logger::VERBOSE); \danog\MadelineProto\Logger::log(['Parsing pending updates...'], \danog\MadelineProto\Logger::VERBOSE);
@ -342,7 +343,7 @@ trait ResponseHandler
\danog\MadelineProto\Logger::log(['Parsing updates received via the socket...'], \danog\MadelineProto\Logger::VERBOSE); \danog\MadelineProto\Logger::log(['Parsing updates received via the socket...'], \danog\MadelineProto\Logger::VERBOSE);
if ($this->getting_state) { if ($this->getting_state) {
\danog\MadelineProto\Logger::log(['Getting state, handle later'], \danog\MadelineProto\Logger::VERBOSE); \danog\MadelineProto\Logger::log(['Getting state, handle later'], \danog\MadelineProto\Logger::VERBOSE);
$this->pending_updates []= $updates; $this->pending_updates[] = $updates;
return false; return false;
} }

View File

@ -22,6 +22,7 @@ trait SeqNoHandler
$in = $content_related ? 1 : 0; $in = $content_related ? 1 : 0;
$value = $this->datacenter->sockets[$datacenter]->session_out_seq_no; $value = $this->datacenter->sockets[$datacenter]->session_out_seq_no;
$this->datacenter->sockets[$datacenter]->session_out_seq_no += $in; $this->datacenter->sockets[$datacenter]->session_out_seq_no += $in;
return ($value * 2) + $in; return ($value * 2) + $in;
} }
@ -30,6 +31,7 @@ trait SeqNoHandler
$in = $content_related ? 1 : 0; $in = $content_related ? 1 : 0;
$value = $this->datacenter->sockets[$datacenter]->session_in_seq_no; $value = $this->datacenter->sockets[$datacenter]->session_in_seq_no;
$this->datacenter->sockets[$datacenter]->session_in_seq_no += $in; $this->datacenter->sockets[$datacenter]->session_in_seq_no += $in;
return ($value * 2) + $in; return ($value * 2) + $in;
} }

View File

@ -39,6 +39,7 @@ trait AuthKeyHandler
$this->notify_layer($params['id']); $this->notify_layer($params['id']);
$this->handle_pending_updates(); $this->handle_pending_updates();
} }
public function request_secret_chat($user) public function request_secret_chat($user)
{ {
$user = $this->get_info($user)['InputUser']; $user = $this->get_info($user)['InputUser'];
@ -82,7 +83,6 @@ trait AuthKeyHandler
$this->handle_pending_updates(); $this->handle_pending_updates();
} }
public function notify_layer($chat) public function notify_layer($chat)
{ {
$this->method_call('messages.sendEncryptedService', ['peer' => $chat, 'message' => ['_' => 'decryptedMessageService', 'action' => ['_' => 'decryptedMessageActionNotifyLayer', 'layer' => $this->encrypted_layer]]], ['datacenter' => $this->datacenter->curdc]); $this->method_call('messages.sendEncryptedService', ['peer' => $chat, 'message' => ['_' => 'decryptedMessageService', 'action' => ['_' => 'decryptedMessageActionNotifyLayer', 'layer' => $this->encrypted_layer]]], ['datacenter' => $this->datacenter->curdc]);

View File

@ -25,7 +25,7 @@ trait MessageHandler
return false; return false;
} }
$message = $this->serialize_object(['type' => $message['_']], $message, $this->secret_chats[$chat_id]['layer']); $message = $this->serialize_object(['type' => $message['_']], $message, $this->secret_chats[$chat_id]['layer']);
$this->secret_chats[$chat_id]['outgoing'] []= $message; $this->secret_chats[$chat_id]['outgoing'][] = $message;
$this->secret_chats[$chat_id]['ttr']--; $this->secret_chats[$chat_id]['ttr']--;
if (($this->secret_chats[$chat_id]['ttr'] <= 0 || time() - $this->secret_chats[$chat_id]['updated'] > 7 * 24 * 60 * 60) && $this->secret_chats[$chat_id]['rekeying'] === 0) { if (($this->secret_chats[$chat_id]['ttr'] <= 0 || time() - $this->secret_chats[$chat_id]['updated'] > 7 * 24 * 60 * 60) && $this->secret_chats[$chat_id]['rekeying'] === 0) {
$this->rekey($chat_id); $this->rekey($chat_id);

View File

@ -244,6 +244,7 @@ trait TL
if (strlen($object) !== 8) { if (strlen($object) !== 8) {
throw new Exception('Given value is not 8 bytes long'); throw new Exception('Given value is not 8 bytes long');
} }
return $object; return $object;
} }
@ -256,16 +257,19 @@ trait TL
if (strlen($object) !== 16) { if (strlen($object) !== 16) {
throw new Exception('Given value is not 16 bytes long'); throw new Exception('Given value is not 16 bytes long');
} }
return (string) $object; return (string) $object;
case 'int256': case 'int256':
if (strlen($object) !== 32) { if (strlen($object) !== 32) {
throw new Exception('Given value is not 32 bytes long'); throw new Exception('Given value is not 32 bytes long');
} }
return (string) $object; return (string) $object;
case 'int512': case 'int512':
if (strlen($object) !== 64) { if (strlen($object) !== 64) {
throw new Exception('Given value is not 64 bytes long'); throw new Exception('Given value is not 64 bytes long');
} }
return (string) $object; return (string) $object;
case 'double': case 'double':
return \danog\PHP\Struct::pack('<d', $object); return \danog\PHP\Struct::pack('<d', $object);
@ -561,5 +565,4 @@ trait TL
return $x; return $x;
} }
} }

View File

@ -28,22 +28,26 @@ class SocketHandler extends \Threaded implements \Collectable
*/ */
public function run() public function run()
{ {
require_once(__DIR__.'/../SecurityException.php'); require_once __DIR__.'/../SecurityException.php';
require_once(__DIR__.'/../RPCErrorException.php'); require_once __DIR__.'/../RPCErrorException.php';
require_once(__DIR__.'/../ResponseException.php'); require_once __DIR__.'/../ResponseException.php';
require_once(__DIR__.'/../TL/Conversion/Exception.php'); require_once __DIR__.'/../TL/Conversion/Exception.php';
require_once(__DIR__.'/../TL/Exception.php'); require_once __DIR__.'/../TL/Exception.php';
require_once(__DIR__.'/../NothingInTheSocketException.php'); require_once __DIR__.'/../NothingInTheSocketException.php';
require_once(__DIR__.'/../Exception.php'); require_once __DIR__.'/../Exception.php';
$this->API->handle_messages($current); $this->API->handle_messages($current);
$this->setGarbage(); $this->setGarbage();
} }
private $garbage = false; private $garbage = false;
public function setGarbage():void {
public function setGarbage():void
{
$this->garbage = true; $this->garbage = true;
} }
public function isGarbage():bool {
public function isGarbage():bool
{
return $this->garbage; return $this->garbage;
} }
} }

View File

@ -21,46 +21,59 @@ class SocketReader extends \Threaded implements \Collectable
{ {
$this->API = $me; $this->API = $me;
$this->current = $current; $this->current = $current;
} }
public function __sleep() {
public function __sleep()
{
return ['current', 'API', 'garbage']; return ['current', 'API', 'garbage'];
} }
public function __destruct() {
public function __destruct()
{
\danog\MadelineProto\Logger::log(['Shutting down handler pool for DC '.$this->current], \danog\MadelineProto\Logger::NOTICE); \danog\MadelineProto\Logger::log(['Shutting down handler pool for DC '.$this->current], \danog\MadelineProto\Logger::NOTICE);
if (isset($this->handler_pool)) $this->handler_pool->shutdown(); if (isset($this->handler_pool)) {
$this->handler_pool->shutdown();
}
} }
/** /**
* Reading connection and receiving message from server. Check the CRC32. * Reading connection and receiving message from server. Check the CRC32.
*/ */
public function run() public function run()
{ {
require_once(__DIR__.'/../SecurityException.php'); require_once __DIR__.'/../SecurityException.php';
require_once(__DIR__.'/../RPCErrorException.php'); require_once __DIR__.'/../RPCErrorException.php';
require_once(__DIR__.'/../ResponseException.php'); require_once __DIR__.'/../ResponseException.php';
require_once(__DIR__.'/../TL/Conversion/Exception.php'); require_once __DIR__.'/../TL/Conversion/Exception.php';
require_once(__DIR__.'/../TL/Exception.php'); require_once __DIR__.'/../TL/Exception.php';
require_once(__DIR__.'/../NothingInTheSocketException.php'); require_once __DIR__.'/../NothingInTheSocketException.php';
require_once(__DIR__.'/../Exception.php'); require_once __DIR__.'/../Exception.php';
var_dump($this->API->settings['threading']); var_dump($this->API->settings['threading']);
if (!isset($this->handler_pool)) $this->handler_pool = new \Pool(2); if (!isset($this->handler_pool)) {
$this->handler_pool = new \Pool(2);
}
var_dump($this->API->settings['threading']); var_dump($this->API->settings['threading']);
while ($this->API->run_workers) { while ($this->API->run_workers) {
try { try {
$this->API->recv_message($this->current); $this->API->recv_message($this->current);
$this->handler_pool->submit(new SocketHandler($this->API, $this->current)); $this->handler_pool->submit(new SocketHandler($this->API, $this->current));
} catch (\danog\MadelineProto\NothingInTheSocketException $e) { ; } } catch (\danog\MadelineProto\NothingInTheSocketException $e) {
}
} }
$this->setGarbage(); $this->setGarbage();
} }
public $garbage = false; public $garbage = false;
public function setGarbage():void {
public function setGarbage():void
{
$this->garbage = true; $this->garbage = true;
} }
public function isGarbage():bool {
public function isGarbage():bool
{
return $this->garbage; return $this->garbage;
} }
} }