From 40c0976c5a8791cab7b0483d733dd8938976a024 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sun, 1 Sep 2019 01:52:28 +0200 Subject: [PATCH] Almost finished refactoring --- serializeLoop | 0 src/danog/MadelineProto/Connection.php | 198 ++++++++------ src/danog/MadelineProto/DataCenter.php | 21 ++ .../MadelineProto/DataCenterConnection.php | 141 ++++++++-- .../Loop/Connection/CheckLoop.php | 4 +- src/danog/MadelineProto/MTProto.php | 116 +++++---- .../MTProtoSession/AckHandler.php | 67 +++++ .../MTProtoSession/CallHandler.php | 241 ++++++++++++++++++ .../MTProtoSession/ResponseHandler.php | 5 +- .../MadelineProto/MTProtoSession/Session.php | 3 + .../MTProtoTools/CallHandler.php | 218 ---------------- 11 files changed, 644 insertions(+), 370 deletions(-) create mode 100644 serializeLoop create mode 100644 src/danog/MadelineProto/MTProtoSession/CallHandler.php delete mode 100644 src/danog/MadelineProto/MTProtoTools/CallHandler.php diff --git a/serializeLoop b/serializeLoop new file mode 100644 index 00000000..e69de29b diff --git a/src/danog/MadelineProto/Connection.php b/src/danog/MadelineProto/Connection.php index c53c88b7..dec06d17 100644 --- a/src/danog/MadelineProto/Connection.php +++ b/src/danog/MadelineProto/Connection.php @@ -20,6 +20,7 @@ namespace danog\MadelineProto; use Amp\ByteStream\ClosedException; use Amp\Deferred; +use Amp\Promise; use danog\MadelineProto\Loop\Connection\CheckLoop; use danog\MadelineProto\Loop\Connection\HttpWaitLoop; use danog\MadelineProto\Loop\Connection\ReadLoop; @@ -42,33 +43,57 @@ class Connection extends Session const PENDING_MAX = 2000000000; /** - * The actual socket + * Writer loop. + * + * @var \danog\MadelineProto\Loop\Connection\WriteLoop + */ + protected $writer; + /** + * Reader loop. + * + * @var \danog\MadelineProto\Loop\Connection\ReadLoop + */ + protected $reader; + /** + * Checker loop. + * + * @var \danog\MadelineProto\Loop\Connection\CheckLoop + */ + protected $checker; + /** + * Waiter loop. + * + * @var \danog\MadelineProto\Loop\Connection\HttpWaitLoop + */ + protected $waiter; + /** + * The actual socket. * * @var Stream */ private $stream; /** - * Connection context + * Connection context. * - * @var Connection context + * @var ConnectionContext */ private $ctx; /** - * HTTP request count + * HTTP request count. * * @var integer */ private $httpReqCount = 0; /** - * HTTP response count + * HTTP response count. * * @var integer */ private $httpResCount = 0; /** - * Date of last chunk received + * Date of last chunk received. * * @var integer */ @@ -86,9 +111,15 @@ class Connection extends Session * @var MTProto */ protected $API; + /** + * Shared connection instance. + * + * @var DataCenterConnection + */ + protected $shared; /** - * DC ID + * DC ID. * * @var string */ @@ -107,6 +138,19 @@ class Connection extends Session */ private $writing = false; + /** + * Writing callback. + * + * @var callable + */ + private $writingCallback; + /** + * Reading callback. + * + * @var callable + */ + private $readingCallback; + /** * Check if the socket is writing stuff. * @@ -126,30 +170,32 @@ class Connection extends Session return $this->reading; } /** - * Set writing boolean + * Set writing boolean. * * @param boolean $writing - * + * * @return void */ public function writing(bool $writing) { $this->writing = $writing; + ($this->writingCallback)($writing); } /** - * Set reading boolean + * Set reading boolean. * * @param boolean $reading - * + * * @return void */ public function reading(bool $reading) { $this->reading = $reading; + ($this->readingCallback)($writing); } /** - * Tell the class that we have read a chunk of data from the socket + * Tell the class that we have read a chunk of data from the socket. * * @return void */ @@ -168,7 +214,7 @@ class Connection extends Session } /** - * Indicate a received HTTP response + * Indicate a received HTTP response. * * @return void */ @@ -177,7 +223,7 @@ class Connection extends Session $this->httpResCount++; } /** - * Count received HTTP responses + * Count received HTTP responses. * * @return integer */ @@ -186,7 +232,7 @@ class Connection extends Session return $this->httpResCount; } /** - * Indicate a sent HTTP request + * Indicate a sent HTTP request. * * @return void */ @@ -195,7 +241,7 @@ class Connection extends Session $this->httpReqCount++; } /** - * Count sent HTTP requests + * Count sent HTTP requests. * * @return integer */ @@ -206,7 +252,7 @@ class Connection extends Session /** - * Get connection context + * Get connection context. * * @return ConnectionContext */ @@ -271,7 +317,41 @@ class Connection extends Session $this->waiter->start(); } - public function sendMessage($message, $flush = true) + /** + * Send an MTProto message. + * + * Structure of message array: + * [ + * // only in outgoing messages + * 'body' => deserialized body, (optional if container) + * 'serialized_body' => 'serialized body', (optional if container) + * 'content_related' => bool, + * '_' => 'predicate', + * 'promise' => deferred promise that gets resolved when a response to the message is received (optional), + * 'send_promise' => deferred promise that gets resolved when the message is sent (optional), + * 'file' => bool (optional), + * 'type' => 'type' (optional), + * 'queue' => queue ID (optional), + * 'container' => [message ids] (optional), + * + * // only in incoming messages + * 'content' => deserialized body, + * 'seq_no' => number (optional), + * 'from_container' => bool (optional), + * + * // can be present in both + * 'response' => message id (optional), + * 'msg_id' => message id (optional), + * 'sent' => timestamp, + * 'tries' => number + * ] + * + * @param array $message The message to send + * @param boolean $flush Whether to flush the message right away + * + * @return Promise + */ + public function sendMessage(array $message, bool $flush = true): Promise { $deferred = new Deferred(); @@ -314,14 +394,17 @@ class Connection extends Session * * @return void */ - public function setExtra(MTProto $extra) + public function setExtra(DataCenterConnection $extra, $readingCallback, $writingCallback) { - $this->API = $extra; - $this->logger = $extra->logger; + $this->shared = $extra; + $this->readingCallback = $readingCallback; + $this->writingCallback = $writingCallback; + $this->API = $extra->getExtra(); + $this->logger = $this->API->logger; } /** - * Get main instance + * Get main instance. * * @return MTProto */ @@ -329,6 +412,12 @@ class Connection extends Session { return $this->API; } + + /** + * Disconnect from DC + * + * @return void + */ public function disconnect() { $this->API->logger->logger("Disconnecting from DC {$this->datacenter}"); @@ -347,7 +436,12 @@ class Connection extends Session } $this->API->logger->logger("Disconnected from DC {$this->datacenter}"); } - + + /** + * Reconnect to DC + * + * @return \Generator + */ public function reconnect(): \Generator { $this->API->logger->logger("Reconnecting DC {$this->datacenter}"); @@ -364,59 +458,11 @@ class Connection extends Session } } - public function hasPendingCalls() - { - $API = $this->API; - $datacenter = $this->datacenter; - - $dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all'; - $timeout = $API->settings['connection_settings'][$dc_config_number]['timeout']; - $pfs = $API->settings['connection_settings'][$dc_config_number]['pfs']; - - foreach ($this->new_outgoing as $message_id) { - if (isset($this->outgoing_messages[$message_id]['sent']) - && $this->outgoing_messages[$message_id]['sent'] + $timeout < \time() - && ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted'] - && $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req' - ) { - if ($pfs && !isset($this->temp_auth_key['bound']) && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') { - continue; - } - - return true; - } - } - - return false; - } - - public function getPendingCalls() - { - $API = $this->API; - $datacenter = $this->datacenter; - - $dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all'; - $timeout = $API->settings['connection_settings'][$dc_config_number]['timeout']; - $pfs = $API->settings['connection_settings'][$dc_config_number]['pfs']; - - $result = []; - foreach ($this->new_outgoing as $message_id) { - if (isset($this->outgoing_messages[$message_id]['sent']) - && $this->outgoing_messages[$message_id]['sent'] + $timeout < \time() - && ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted'] - && $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req' - ) { - if ($pfs && !isset($this->temp_auth_key['bound']) && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') { - continue; - } - - $result[] = $message_id; - } - } - - return $result; - } - + /** + * Get name + * + * @return string + */ public function getName(): string { return __CLASS__; diff --git a/src/danog/MadelineProto/DataCenter.php b/src/danog/MadelineProto/DataCenter.php index b19e47fb..c38d3afa 100644 --- a/src/danog/MadelineProto/DataCenter.php +++ b/src/danog/MadelineProto/DataCenter.php @@ -740,6 +740,27 @@ class DataCenter return yield (yield $this->getHTTPClient()->request($url))->getBody(); } + /** + * Get Connection instance + * + * @param string $dc + * @return Connection + */ + public function getConnection(string $dc): Connection + { + return $this->sockets[$dc]->getConnection(); + } + /** + * Check if a DC is present + * + * @param string $dc DC ID + * + * @return boolean + */ + public function has(string $dc): bool + { + return isset($this->sockets[$dc]); + } public function get_dcs($all = true) { $test = $this->settings['all']['test_mode'] ? 'test' : 'main'; diff --git a/src/danog/MadelineProto/DataCenterConnection.php b/src/danog/MadelineProto/DataCenterConnection.php index 701647d0..bea52e68 100644 --- a/src/danog/MadelineProto/DataCenterConnection.php +++ b/src/danog/MadelineProto/DataCenterConnection.php @@ -18,6 +18,7 @@ namespace danog\MadelineProto; +use Amp\Promise; use danog\MadelineProto\Stream\ConnectionContext; class DataCenterConnection @@ -43,33 +44,53 @@ class DataCenterConnection private $authorized = false; /** - * Connections open to a certain DC + * Connections open to a certain DC. * * @var array */ private $connections = []; + /** + * Connection weights + * + * @var array + */ + private $availableConnections = []; /** - * Main API instance + * Main API instance. * * @var \danog\MadelineProto\MTProto */ private $API; /** - * Connection context + * Connection context. * * @var ConnectionContext */ private $ctx; /** - * DC ID + * DC ID. * * @var string */ private $datacenter; - + + /** + * Index for round robin. + * + * @var integer + */ + private $index = 0; + + /** + * Loop to keep weights at sane value + * + * @var \danog\MadelineProto\Loop\Generic\PeriodicLoop + */ + private $robinLoop; + /** * Get auth key. * @@ -127,7 +148,7 @@ class DataCenterConnection } /** - * Get connection context + * Get connection context. * * @return ConnectionContext */ @@ -145,50 +166,126 @@ class DataCenterConnection */ public function connect(ConnectionContext $ctx): \Generator { - $this->API->logger->logger("Trying connection via $ctx", \danog\MadelineProto\Logger::WARNING); + $this->API->logger->logger("Trying shared connection via $ctx", \danog\MadelineProto\Logger::WARNING); $this->ctx = $ctx->getCtx(); $this->datacenter = $ctx->getDc(); $media = $ctx->isMedia(); + $count = $media ? $this->API->settings['connection_settings']['media_socket_count'] : 1; + if ($count > 1) { + if (!$this->robinLoop) { + $this->robinLoop = new PeriodicLoop($this, [$this, 'even'], "Robin loop DC {$this->datacenter}", 10); + } + $this->robinLoop->start(); + } + + $incRead = $media ? 5 : 1; + $this->connections = []; + $this->availableConnections = []; for ($x = 0; $x < $count; $x++) { + $this->availableConnections[$x] = 0; $this->connections[$x] = new Connection(); + $this->connections[$x]->setExtra( + $this, + function (bool $reading) use ($x, $incRead) { + $this->availableConnections[$x] += $reading ? -$incRead : $incRead; + }, + function (bool $writing) use ($x) { + $this->availableConnections[$x] += $writing ? -10 : 10; + } + ); yield $this->connections[$x]->connect(yield $ctx->getStream()); $ctx = $this->ctx->getCtx(); } } - public function sendMessage($message, $flush = true) - { - } - - public function setExtra(API $API) - { - $this->API = $API; - } - + /** + * Close all connections to DC + * + * @return void + */ public function disconnect() { - $this->API->logger->logger("Disconnecting from DC {$this->datacenter}"); + $this->API->logger->logger("Disconnecting from shared DC {$this->datacenter}"); + if ($this->robinLoop) { + $this->robinLoop->signal(true); + $this->robinLoop = null; + } foreach ($this->connections as $connection) { $connection->disconnect(); } $this->connections = []; + $this->availableConnections = []; } + /** + * Reconnect to DC + * + * @return \Generator + */ public function reconnect(): \Generator { - $this->API->logger->logger("Reconnecting DC {$this->datacenter}"); - foreach ($this->connections as $connection) { - yield $connection->reconnect(); - } + $this->API->logger->logger("Reconnecting shared DC {$this->datacenter}"); $this->disconnect(); - yield $this->API->datacenter->dcConnectAsync($this->ctx->getDc()); + yield $this->connect($this->ctx); } + /** + * Get best socket in round robin. + * + * @return Connection + */ + public function getConnection(): Connection + { + if (count($this->availableConnections) === 1) { + return $this->connections[0]; + } + max($this->availableConnections); + $key = key($this->availableConnections); + // Decrease to implement round robin + $this->availableConnections[$key]--; + return $this->connections[$key]; + } + + /** + * Even out round robin values + * + * @return void + */ + public function even() + { + if (\min($this->availableConnections) < 1000) { + foreach ($this->availableConnections as &$value) { + $value += 1000; + } + } + } + + /** + * Set main instance + * + * @param MTProto $API Main instance + * + * @return void + */ + public function setExtra(MTProto $API) + { + $this->API = $API; + } + + /** + * Get main instance + * + * @return MTProto + */ + public function getExtra(): MTProto + { + return $this->API; + } /** * Sleep function. * diff --git a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php index f3a93bf7..2f70e575 100644 --- a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php @@ -106,7 +106,7 @@ class CheckLoop extends ResumableSignalLoop break; } $API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.($message_id).' not received by server, resending...', \danog\MadelineProto\Logger::ERROR); - $API->method_recall('watcherId', ['message_id' => $message_id, 'datacenter' => $datacenter, 'postpone' => true]); + $connection->method_recall('watcherId', ['message_id' => $message_id, 'postpone' => true]); break; case 4: if ($chr & 32) { @@ -144,7 +144,7 @@ class CheckLoop extends ResumableSignalLoop && $connection->outgoing_messages[$message_id]['unencrypted'] ) { $API->logger->logger('Still missing '.$connection->outgoing_messages[$message_id]['_'].' with message id '.($message_id)." on DC $datacenter, resending", \danog\MadelineProto\Logger::ERROR); - $API->method_recall('', ['message_id' => $message_id, 'datacenter' => $datacenter, 'postpone' => true]); + $connection->method_recall('', ['message_id' => $message_id, 'postpone' => true]); } } $connection->writer->resume(); diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index 7ecc3e31..c58253d0 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -21,10 +21,10 @@ namespace danog\MadelineProto; use Amp\Loop; use danog\MadelineProto\Async\AsyncConstruct; +use danog\MadelineProto\Loop\Generic\PeriodicLoop; use danog\MadelineProto\Loop\Update\FeedLoop; use danog\MadelineProto\Loop\Update\SeqLoop; use danog\MadelineProto\Loop\Update\UpdateLoop; -use danog\MadelineProto\Loop\Generic\PeriodicLoop; use danog\MadelineProto\MTProtoTools\CombinedUpdatesState; use danog\MadelineProto\MTProtoTools\ReferenceDatabase; use danog\MadelineProto\MTProtoTools\UpdatesState; @@ -150,6 +150,7 @@ class MTProto extends AsyncConstruct implements TLCallback public $referenceDatabase; public $phoneConfigWatcherId; private $callCheckerLoop; + private $serializeLoop; public $feeders = []; public $updaters = []; public $destructing = false; // Avoid problems with exceptions thrown by forked strands, see tools @@ -200,7 +201,7 @@ class MTProto extends AsyncConstruct implements TLCallback if ((!isset($this->authorization['user']['bot']) || !$this->authorization['user']['bot']) && $this->datacenter->sockets[$this->datacenter->curdc]->temp_auth_key !== null) { try { $nearest_dc = yield $this->method_call_async_read('help.getNearestDc', [], ['datacenter' => $this->datacenter->curdc]); - $this->logger->logger(sprintf(\danog\MadelineProto\Lang::$current_lang['nearest_dc'], $nearest_dc['country'], $nearest_dc['nearest_dc']), Logger::NOTICE); + $this->logger->logger(\sprintf(\danog\MadelineProto\Lang::$current_lang['nearest_dc'], $nearest_dc['country'], $nearest_dc['nearest_dc']), Logger::NOTICE); if ($nearest_dc['nearest_dc'] != $nearest_dc['this_dc']) { $this->settings['connection_settings']['default_dc'] = $this->datacenter->curdc = (int) $nearest_dc['nearest_dc']; } @@ -236,7 +237,7 @@ class MTProto extends AsyncConstruct implements TLCallback public function logger($param, $level = Logger::NOTICE, $file = null) { if ($file === null) { - $file = basename(debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1)[0]['file'], '.php'); + $file = \basename(\debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1)[0]['file'], '.php'); } return isset($this->logger) ? $this->logger->logger($param, $level, $file) : Logger::$default->logger($param, $level, $file); @@ -280,12 +281,24 @@ class MTProto extends AsyncConstruct implements TLCallback return true; } + public function trySerialize() + { + if ($this->wrapper instanceof API && isset($this->wrapper->session) && !\is_null($this->wrapper->session) && !$this->asyncInitPromise) { + $this->logger->logger("Didn't serialize in a while, doing that now..."); + $this->wrapper->serialize($this->wrapper->session); + } + } public function startLoops() { if (!$this->callCheckerLoop) { $this->callCheckerLoop = new PeriodicLoop($this, [$this, 'checkCalls'], 'call check', 10); } $this->callCheckerLoop->start(); + + if (!$this->serializeLoop) { + $this->serializeLoop = new PeriodicLoop($this, [$this, 'trySerialize'], 'serialize', $this->settings['serialization']['serialization_interval']); + } + $this->serializeLoop->start(); } public function stopLoops() { @@ -293,19 +306,23 @@ class MTProto extends AsyncConstruct implements TLCallback $this->callCheckerLoop->signal(true); $this->callCheckerLoop = null; } + if ($this->serializeLoop) { + $this->serializeLoop->signal(true); + $this->serializeLoop = null; + } } public function __wakeup() { - $backtrace = debug_backtrace(0, 3); + $backtrace = \debug_backtrace(0, 3); $this->asyncInitPromise = true; $this->setInitPromise($this->__wakeup_async($backtrace)); } public function __wakeup_async($backtrace) { - set_error_handler(['\\danog\\MadelineProto\\Exception', 'ExceptionErrorHandler']); + \set_error_handler(['\\danog\\MadelineProto\\Exception', 'ExceptionErrorHandler']); $this->setup_logger(); - if (\danog\MadelineProto\Magic::$has_thread && is_object(\Thread::getCurrentThread())) { + if (\danog\MadelineProto\Magic::$has_thread && \is_object(\Thread::getCurrentThread())) { return; } Lang::$current_lang = &Lang::$lang['en']; @@ -346,14 +363,14 @@ class MTProto extends AsyncConstruct implements TLCallback unset($this->updates_state); } - if ($this->event_handler && class_exists($this->event_handler) && is_subclass_of($this->event_handler, '\danog\MadelineProto\EventHandler')) { + if ($this->event_handler && \class_exists($this->event_handler) && \is_subclass_of($this->event_handler, '\danog\MadelineProto\EventHandler')) { $this->setEventHandler($this->event_handler); } $force = false; $this->reset_session(); if (isset($backtrace[2]['function'], $backtrace[2]['class'], $backtrace[2]['args']) && $backtrace[2]['class'] === 'danog\\MadelineProto\\API' && $backtrace[2]['function'] === '__construct_async') { - if (count($backtrace[2]['args']) >= 2) { - $this->parse_settings(array_replace_recursive($this->settings, $backtrace[2]['args'][1])); + if (\count($backtrace[2]['args']) >= 2) { + $this->parse_settings(\array_replace_recursive($this->settings, $backtrace[2]['args'][1])); } } @@ -364,7 +381,7 @@ class MTProto extends AsyncConstruct implements TLCallback if (!isset($this->v) || $this->v !== self::V) { $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['serialization_ofd'], Logger::WARNING); foreach ($this->datacenter->sockets as $dc_id => $socket) { - if ($this->authorized === self::LOGGED_IN && strpos($dc_id, '_') === false && $socket->auth_key !== null && $socket->temp_auth_key !== null) { + if ($this->authorized === self::LOGGED_IN && \strpos($dc_id, '_') === false && $socket->auth_key !== null && $socket->temp_auth_key !== null) { $socket->authorized = true; } } @@ -392,7 +409,7 @@ class MTProto extends AsyncConstruct implements TLCallback } } foreach ($this->secret_chats as $key => &$chat) { - if (!is_array($chat)) { + if (!\is_array($chat)) { unset($this->secret_chats[$key]); continue; } @@ -403,7 +420,7 @@ class MTProto extends AsyncConstruct implements TLCallback } } foreach ($settings['connection_settings'] as $key => &$connection) { - if (!is_array($connection)) { + if (!\is_array($connection)) { unset($settings['connection_settings'][$key]); continue; } @@ -414,7 +431,7 @@ class MTProto extends AsyncConstruct implements TLCallback $connection['proxy_extra'] = []; } if (!isset($connection['pfs'])) { - $connection['pfs'] = extension_loaded('gmp'); + $connection['pfs'] = \extension_loaded('gmp'); } if ($connection['protocol'] === 'obfuscated2') { $connection['protocol'] = 'tcp_intermediate_padded'; @@ -446,7 +463,7 @@ class MTProto extends AsyncConstruct implements TLCallback }*/ yield $this->connect_to_all_dcs_async(); foreach ($this->calls as $id => $controller) { - if (!is_object($controller)) { + if (!\is_object($controller)) { unset($this->calls[$id]); } elseif ($controller->getCallState() === \danog\MadelineProto\VoIP::CALL_STATE_ENDED) { $controller->setMadeline($this); @@ -488,7 +505,7 @@ class MTProto extends AsyncConstruct implements TLCallback foreach ($this->channels_state->get() as $state) { $channelIds[] = $state->getChannel(); } - sort($channelIds); + \sort($channelIds); foreach ($channelIds as $channelId) { if (isset($this->feeders[$channelId])) { $this->feeders[$channelId]->signal(true); @@ -505,7 +522,7 @@ class MTProto extends AsyncConstruct implements TLCallback public function serialize() { - if ($this->wrapper instanceof \danog\MadelineProto\API && isset($this->wrapper->session) && !is_null($this->wrapper->session)) { + if ($this->wrapper instanceof \danog\MadelineProto\API && isset($this->wrapper->session) && !\is_null($this->wrapper->session)) { $this->wrapper->serialize($this->wrapper->session); } } @@ -525,7 +542,7 @@ class MTProto extends AsyncConstruct implements TLCallback } // Detect device model try { - $device_model = php_uname('s'); + $device_model = \php_uname('s'); } catch (\danog\MadelineProto\Exception $e) { $device_model = 'Web server'; } @@ -548,9 +565,9 @@ class MTProto extends AsyncConstruct implements TLCallback } // Detect system version try { - $system_version = php_uname('r'); + $system_version = \php_uname('r'); } catch (\danog\MadelineProto\Exception $e) { - $system_version = phpversion(); + $system_version = PHP_VERSION; } if ($settings['app_info']['api_id'] === 6) { // TG DEV NOTICE: these app info spoofing measures were implemented for NON-MALICIOUS purposes. @@ -574,9 +591,9 @@ class MTProto extends AsyncConstruct implements TLCallback $lang_code = 'en'; Lang::$current_lang = &Lang::$lang[$lang_code]; if (isset($_SERVER['HTTP_ACCEPT_LANGUAGE'])) { - $lang_code = substr($_SERVER['HTTP_ACCEPT_LANGUAGE'], 0, 2); + $lang_code = \substr($_SERVER['HTTP_ACCEPT_LANGUAGE'], 0, 2); } elseif (isset($_SERVER['LANG'])) { - $lang_code = explode('_', $_SERVER['LANG'])[0]; + $lang_code = \explode('_', $_SERVER['LANG'])[0]; } if (isset(Lang::$lang[$lang_code])) { Lang::$current_lang = &Lang::$lang[$lang_code]; @@ -702,7 +719,7 @@ class MTProto extends AsyncConstruct implements TLCallback // Extra parameters to pass to the proxy class using setExtra 'obfuscated' => false, 'transport' => 'tcp', - 'pfs' => extension_loaded('gmp'), + 'pfs' => \extension_loaded('gmp'), ], 'media_socket_count' => 5, 'default_dc' => 2, @@ -746,7 +763,7 @@ class MTProto extends AsyncConstruct implements TLCallback */ // write to 'logger_param' => Magic::$script_cwd.'/MadelineProto.log', - 'logger' => php_sapi_name() === 'cli' ? 3 : 2, + 'logger' => PHP_SAPI === 'cli' ? 3 : 2, // overwrite previous setting and echo logs 'logger_level' => Logger::VERBOSE, 'max_size' => 100 * 1024 * 1024, @@ -803,7 +820,7 @@ class MTProto extends AsyncConstruct implements TLCallback // Need info ? 'requests' => true, ]]; - $settings = array_replace_recursive($default_settings, $settings); + $settings = \array_replace_recursive($default_settings, $settings); if (isset(Lang::$lang[$settings['app_info']['lang_code']])) { Lang::$current_lang = &Lang::$lang[$settings['app_info']['lang_code']]; } @@ -853,7 +870,7 @@ class MTProto extends AsyncConstruct implements TLCallback public function reset_session($de = true, $auth_key = false) { - if (!is_object($this->datacenter)) { + if (!\is_object($this->datacenter)) { throw new Exception(\danog\MadelineProto\Lang::$current_lang['session_corrupted']); } foreach ($this->datacenter->sockets as $id => $socket) { @@ -879,7 +896,7 @@ class MTProto extends AsyncConstruct implements TLCallback public function is_http($datacenter) { - return in_array($this->datacenter->sockets[$datacenter]->getCtx()->getStreamName(), [HttpStream::getName(), HttpsStream::getName()]); + return \in_array($this->datacenter->sockets[$datacenter]->getCtx()->getStreamName(), [HttpStream::getName(), HttpsStream::getName()]); } // Connects to all datacenters and if necessary creates authorization keys, binds them and writes client info @@ -928,7 +945,7 @@ class MTProto extends AsyncConstruct implements TLCallback foreach ($this->channels_state->get() as $state) { $channelIds[] = $state->getChannel(); } - sort($channelIds); + \sort($channelIds); foreach ($channelIds as $channelId) { if (isset($this->feeders[$channelId])) { $this->feeders[$channelId]->signal(true); @@ -957,7 +974,6 @@ class MTProto extends AsyncConstruct implements TLCallback $this->referenceDatabase = new ReferenceDatabase($this); $this->dialog_params = ['_' => 'MadelineProto.dialogParams', 'limit' => 0, 'offset_date' => 0, 'offset_id' => 0, 'offset_peer' => ['_' => 'inputPeerEmpty'], 'count' => 0]; $this->full_chats = []; - } public function resetUpdateState() { @@ -970,10 +986,10 @@ class MTProto extends AsyncConstruct implements TLCallback $channelIds[] = $state->getChannel(); $channelId = $state->getChannel(); $pts = $state->pts(); - $pts = $channelId ? max(1, $pts-1000000) : ($pts > 4000000 ? $pts-1000000 : max(1, $pts-1000000)); + $pts = $channelId ? \max(1, $pts-1000000) : ($pts > 4000000 ? $pts-1000000 : \max(1, $pts-1000000)); $newStates[$channelId] = new UpdatesState(['pts' => $pts], $channelId); } - sort($channelIds); + \sort($channelIds); foreach ($channelIds as $channelId) { if (isset($this->feeders[$channelId])) { $this->feeders[$channelId]->signal(true); @@ -1002,7 +1018,7 @@ class MTProto extends AsyncConstruct implements TLCallback foreach ($this->channels_state->get() as $state) { $channelIds[] = $state->getChannel(); } - sort($channelIds); + \sort($channelIds); foreach ($channelIds as $channelId) { if (!isset($this->feeders[$channelId])) { $this->feeders[$channelId] = new FeedLoop($this, $channelId); @@ -1017,7 +1033,9 @@ class MTProto extends AsyncConstruct implements TLCallback $this->updaters[$channelId]->resume(); } } - foreach ($this->datacenter->sockets as $datacenter) {$datacenter->writer->resume();} + foreach ($this->datacenter->sockets as $datacenter) { + $datacenter->writer->resume(); + } if ($this->seqUpdater->start()) { $this->seqUpdater->resume(); } @@ -1025,7 +1043,7 @@ class MTProto extends AsyncConstruct implements TLCallback public function get_phone_config_async($watcherId = null) { - if ($this->authorized === self::LOGGED_IN && class_exists('\\danog\\MadelineProto\\VoIPServerConfigInternal') && !$this->authorization['user']['bot'] && $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->temp_auth_key !== null) { + if ($this->authorized === self::LOGGED_IN && \class_exists('\\danog\\MadelineProto\\VoIPServerConfigInternal') && !$this->authorization['user']['bot'] && $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->temp_auth_key !== null) { $this->logger->logger('Fetching phone config...'); VoIPServerConfig::updateDefault(yield $this->method_call_async_read('phone.getCallConfig', [], ['datacenter' => $this->settings['connection_settings']['default_dc']])); } else { @@ -1035,7 +1053,7 @@ class MTProto extends AsyncConstruct implements TLCallback public function get_config_async($config = [], $options = []) { - if ($this->config['expires'] > time()) { + if ($this->config['expires'] > \time()) { return $this->config; } $this->config = empty($config) ? yield $this->method_call_async_read('help.getConfig', $config, empty($options) ? ['datacenter' => $this->settings['connection_settings']['default_dc']] : $options) : $config; @@ -1085,13 +1103,13 @@ class MTProto extends AsyncConstruct implements TLCallback $id .= $dc['media_only'] ? '_media' : ''; $ipv6 = $dc['ipv6'] ? 'ipv6' : 'ipv4'; //$id .= isset($this->settings['connection'][$test][$ipv6][$id]) && $this->settings['connection'][$test][$ipv6][$id]['ip_address'] != $dc['ip_address'] ? '_bk' : ''; - if (is_numeric($id)) { + if (\is_numeric($id)) { $id = (int) $id; } - unset($dc['cdn']); - unset($dc['media_only']); - unset($dc['id']); - unset($dc['ipv6']); + unset($dc['cdn'], $dc['media_only'], $dc['id'], $dc['ipv6']); + + + $this->settings['connection'][$test][$ipv6][$id] = $dc; } $curdc = $this->datacenter->curdc; @@ -1101,9 +1119,9 @@ class MTProto extends AsyncConstruct implements TLCallback } public function content_related($method) { - $method = is_array($method) && isset($method['_']) ? $method['_'] : $method; + $method = \is_array($method) && isset($method['_']) ? $method['_'] : $method; - return is_string($method) ? !in_array($method, MTProto::NOT_CONTENT_RELATED) : true; + return \is_string($method) ? !\in_array($method, MTProto::NOT_CONTENT_RELATED) : true; } public function get_self_async() @@ -1131,9 +1149,9 @@ class MTProto extends AsyncConstruct implements TLCallback public function getConstructorCallbacks(): array { - return array_merge( - array_fill_keys(['chat', 'chatEmpty', 'chatForbidden', 'channel', 'channelEmpty', 'channelForbidden'], [[$this, 'add_chat_async']]), - array_fill_keys(['user', 'userEmpty'], [[$this, 'add_user']]), + return \array_merge( + \array_fill_keys(['chat', 'chatEmpty', 'chatForbidden', 'channel', 'channelEmpty', 'channelForbidden'], [[$this, 'add_chat_async']]), + \array_fill_keys(['user', 'userEmpty'], [[$this, 'add_user']]), ['help.support' => [[$this, 'add_support']]] ); } @@ -1150,17 +1168,17 @@ class MTProto extends AsyncConstruct implements TLCallback public function getTypeMismatchCallbacks(): array { - return array_merge( - array_fill_keys(['User', 'InputUser', 'Chat', 'InputChannel', 'Peer', 'InputPeer', 'InputDialogPeer', 'InputNotifyPeer'], [$this, 'get_info_async']), - array_fill_keys(['InputMedia', 'InputDocument', 'InputPhoto'], [$this, 'get_file_info_async']), - array_fill_keys(['InputFileLocation'], [$this, 'get_download_info_async']) + return \array_merge( + \array_fill_keys(['User', 'InputUser', 'Chat', 'InputChannel', 'Peer', 'InputPeer', 'InputDialogPeer', 'InputNotifyPeer'], [$this, 'get_info_async']), + \array_fill_keys(['InputMedia', 'InputDocument', 'InputPhoto'], [$this, 'get_file_info_async']), + \array_fill_keys(['InputFileLocation'], [$this, 'get_download_info_async']) ); } public function __debugInfo() { - return ['MadelineProto instance '.spl_object_hash($this)]; + return ['MadelineProto instance '.\spl_object_hash($this)]; } const ALL_MIMES = ['webp' => [0 => 'image/webp'], 'png' => [0 => 'image/png', 1 => 'image/x-png'], 'bmp' => [0 => 'image/bmp', 1 => 'image/x-bmp', 2 => 'image/x-bitmap', 3 => 'image/x-xbitmap', 4 => 'image/x-win-bitmap', 5 => 'image/x-windows-bmp', 6 => 'image/ms-bmp', 7 => 'image/x-ms-bmp', 8 => 'application/bmp', 9 => 'application/x-bmp', 10 => 'application/x-win-bitmap'], 'gif' => [0 => 'image/gif'], 'jpeg' => [0 => 'image/jpeg', 1 => 'image/pjpeg'], 'xspf' => [0 => 'application/xspf+xml'], 'vlc' => [0 => 'application/videolan'], 'wmv' => [0 => 'video/x-ms-wmv', 1 => 'video/x-ms-asf'], 'au' => [0 => 'audio/x-au'], 'ac3' => [0 => 'audio/ac3'], 'flac' => [0 => 'audio/x-flac'], 'ogg' => [0 => 'audio/ogg', 1 => 'video/ogg', 2 => 'application/ogg'], 'kmz' => [0 => 'application/vnd.google-earth.kmz'], 'kml' => [0 => 'application/vnd.google-earth.kml+xml'], 'rtx' => [0 => 'text/richtext'], 'rtf' => [0 => 'text/rtf'], 'jar' => [0 => 'application/java-archive', 1 => 'application/x-java-application', 2 => 'application/x-jar'], 'zip' => [0 => 'application/x-zip', 1 => 'application/zip', 2 => 'application/x-zip-compressed', 3 => 'application/s-compressed', 4 => 'multipart/x-zip'], '7zip' => [0 => 'application/x-compressed'], 'xml' => [0 => 'application/xml', 1 => 'text/xml'], 'svg' => [0 => 'image/svg+xml'], '3g2' => [0 => 'video/3gpp2'], '3gp' => [0 => 'video/3gp', 1 => 'video/3gpp'], 'mp4' => [0 => 'video/mp4'], 'm4a' => [0 => 'audio/x-m4a'], 'f4v' => [0 => 'video/x-f4v'], 'flv' => [0 => 'video/x-flv'], 'webm' => [0 => 'video/webm'], 'aac' => [0 => 'audio/x-acc'], 'm4u' => [0 => 'application/vnd.mpegurl'], 'pdf' => [0 => 'application/pdf', 1 => 'application/octet-stream'], 'pptx' => [0 => 'application/vnd.openxmlformats-officedocument.presentationml.presentation'], 'ppt' => [0 => 'application/powerpoint', 1 => 'application/vnd.ms-powerpoint', 2 => 'application/vnd.ms-office', 3 => 'application/msword'], 'docx' => [0 => 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'], 'xlsx' => [0 => 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', 1 => 'application/vnd.ms-excel'], 'xl' => [0 => 'application/excel'], 'xls' => [0 => 'application/msexcel', 1 => 'application/x-msexcel', 2 => 'application/x-ms-excel', 3 => 'application/x-excel', 4 => 'application/x-dos_ms_excel', 5 => 'application/xls', 6 => 'application/x-xls'], 'xsl' => [0 => 'text/xsl'], 'mpeg' => [0 => 'video/mpeg'], 'mov' => [0 => 'video/quicktime'], 'avi' => [0 => 'video/x-msvideo', 1 => 'video/msvideo', 2 => 'video/avi', 3 => 'application/x-troff-msvideo'], 'movie' => [0 => 'video/x-sgi-movie'], 'log' => [0 => 'text/x-log'], 'txt' => [0 => 'text/plain'], 'css' => [0 => 'text/css'], 'html' => [0 => 'text/html'], 'wav' => [0 => 'audio/x-wav', 1 => 'audio/wave', 2 => 'audio/wav'], 'xhtml' => [0 => 'application/xhtml+xml'], 'tar' => [0 => 'application/x-tar'], 'tgz' => [0 => 'application/x-gzip-compressed'], 'psd' => [0 => 'application/x-photoshop', 1 => 'image/vnd.adobe.photoshop'], 'exe' => [0 => 'application/x-msdownload'], 'js' => [0 => 'application/x-javascript'], 'mp3' => [0 => 'audio/mpeg', 1 => 'audio/mpg', 2 => 'audio/mpeg3', 3 => 'audio/mp3'], 'rar' => [0 => 'application/x-rar', 1 => 'application/rar', 2 => 'application/x-rar-compressed'], 'gzip' => [0 => 'application/x-gzip'], 'hqx' => [0 => 'application/mac-binhex40', 1 => 'application/mac-binhex', 2 => 'application/x-binhex40', 3 => 'application/x-mac-binhex40'], 'cpt' => [0 => 'application/mac-compactpro'], 'bin' => [0 => 'application/macbinary', 1 => 'application/mac-binary', 2 => 'application/x-binary', 3 => 'application/x-macbinary'], 'oda' => [0 => 'application/oda'], 'ai' => [0 => 'application/postscript'], 'smil' => [0 => 'application/smil'], 'mif' => [0 => 'application/vnd.mif'], 'wbxml' => [0 => 'application/wbxml'], 'wmlc' => [0 => 'application/wmlc'], 'dcr' => [0 => 'application/x-director'], 'dvi' => [0 => 'application/x-dvi'], 'gtar' => [0 => 'application/x-gtar'], 'php' => [0 => 'application/x-httpd-php', 1 => 'application/php', 2 => 'application/x-php', 3 => 'text/php', 4 => 'text/x-php', 5 => 'application/x-httpd-php-source'], 'swf' => [0 => 'application/x-shockwave-flash'], 'sit' => [0 => 'application/x-stuffit'], 'z' => [0 => 'application/x-compress'], 'mid' => [0 => 'audio/midi'], 'aif' => [0 => 'audio/x-aiff', 1 => 'audio/aiff'], 'ram' => [0 => 'audio/x-pn-realaudio'], 'rpm' => [0 => 'audio/x-pn-realaudio-plugin'], 'ra' => [0 => 'audio/x-realaudio'], 'rv' => [0 => 'video/vnd.rn-realvideo'], 'jp2' => [0 => 'image/jp2', 1 => 'video/mj2', 2 => 'image/jpx', 3 => 'image/jpm'], 'tiff' => [0 => 'image/tiff'], 'eml' => [0 => 'message/rfc822'], 'pem' => [0 => 'application/x-x509-user-cert', 1 => 'application/x-pem-file'], 'p10' => [0 => 'application/x-pkcs10', 1 => 'application/pkcs10'], 'p12' => [0 => 'application/x-pkcs12'], 'p7a' => [0 => 'application/x-pkcs7-signature'], 'p7c' => [0 => 'application/pkcs7-mime', 1 => 'application/x-pkcs7-mime'], 'p7r' => [0 => 'application/x-pkcs7-certreqresp'], 'p7s' => [0 => 'application/pkcs7-signature'], 'crt' => [0 => 'application/x-x509-ca-cert', 1 => 'application/pkix-cert'], 'crl' => [0 => 'application/pkix-crl', 1 => 'application/pkcs-crl'], 'pgp' => [0 => 'application/pgp'], 'gpg' => [0 => 'application/gpg-keys'], 'rsa' => [0 => 'application/x-pkcs7'], 'ics' => [0 => 'text/calendar'], 'zsh' => [0 => 'text/x-scriptzsh'], 'cdr' => [0 => 'application/cdr', 1 => 'application/coreldraw', 2 => 'application/x-cdr', 3 => 'application/x-coreldraw', 4 => 'image/cdr', 5 => 'image/x-cdr', 6 => 'zz-application/zz-winassoc-cdr'], 'wma' => [0 => 'audio/x-ms-wma'], 'vcf' => [0 => 'text/x-vcard'], 'srt' => [0 => 'text/srt'], 'vtt' => [0 => 'text/vtt'], 'ico' => [0 => 'image/x-icon', 1 => 'image/x-ico', 2 => 'image/vnd.microsoft.icon'], 'csv' => [0 => 'text/x-comma-separated-values', 1 => 'text/comma-separated-values', 2 => 'application/vnd.msexcel'], 'json' => [0 => 'application/json', 1 => 'text/json']]; diff --git a/src/danog/MadelineProto/MTProtoSession/AckHandler.php b/src/danog/MadelineProto/MTProtoSession/AckHandler.php index 89739a3f..ee1a4faa 100644 --- a/src/danog/MadelineProto/MTProtoSession/AckHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/AckHandler.php @@ -78,4 +78,71 @@ trait AckHandler return true; } + + + + + /** + * Check if there are some pending calls + * + * @return boolean + */ + public function hasPendingCalls() + { + $API = $this->API; + $datacenter = $this->datacenter; + + $dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all'; + $timeout = $API->settings['connection_settings'][$dc_config_number]['timeout']; + $pfs = $API->settings['connection_settings'][$dc_config_number]['pfs']; + + foreach ($this->new_outgoing as $message_id) { + if (isset($this->outgoing_messages[$message_id]['sent']) + && $this->outgoing_messages[$message_id]['sent'] + $timeout < \time() + && ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted'] + && $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req' + ) { + if ($pfs && !isset($this->temp_auth_key['bound']) && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') { + continue; + } + + return true; + } + } + + return false; + } + + /** + * Get all pending calls + * + * @return void + */ + public function getPendingCalls() + { + $API = $this->API; + $datacenter = $this->datacenter; + + $dc_config_number = isset($API->settings['connection_settings'][$datacenter]) ? $datacenter : 'all'; + $timeout = $API->settings['connection_settings'][$dc_config_number]['timeout']; + $pfs = $API->settings['connection_settings'][$dc_config_number]['pfs']; + + $result = []; + foreach ($this->new_outgoing as $message_id) { + if (isset($this->outgoing_messages[$message_id]['sent']) + && $this->outgoing_messages[$message_id]['sent'] + $timeout < \time() + && ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted'] + && $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req' + ) { + if ($pfs && !isset($this->temp_auth_key['bound']) && $this->outgoing_messages[$message_id]['_'] !== 'auth.bindTempAuthKey') { + continue; + } + + $result[] = $message_id; + } + } + + return $result; + } + } diff --git a/src/danog/MadelineProto/MTProtoSession/CallHandler.php b/src/danog/MadelineProto/MTProtoSession/CallHandler.php new file mode 100644 index 00000000..86f7906f --- /dev/null +++ b/src/danog/MadelineProto/MTProtoSession/CallHandler.php @@ -0,0 +1,241 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2019 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\MTProtoSession; + +use Amp\Deferred; +use Amp\Promise; +use Amp\Success; +use danog\MadelineProto\Async\Parameters; +use function Amp\Promise\all; + +/** + * Manages method and object calls. + */ +trait CallHandler +{ + /** + * Recall method. + * + * @param string $watcherId Watcher ID for defer + * @param array $args Args + * + * @return void + */ + public function method_recall(string $watcherId, $args) + { + $message_id = $args['message_id']; + $postpone = $args['postpone'] ?? false; + $datacenter = $args['datacenter'] ?? false; + + $message_ids = $this->outgoing_messages[$message_id]['container'] ?? [$message_id]; + + foreach ($message_ids as $message_id) { + if (isset($this->outgoing_messages[$message_id]['body'])) { + if ($datacenter) { + $res = $this->API->datacenter->sockets[$datacenter]->sendMessage($this->outgoing_messages[$message_id], false); + } else { + $res = $this->sendMessage($this->outgoing_messages[$message_id], false); + } + $this->callFork($res); + $this->ack_outgoing_message_id($message_id); + $this->got_response_for_outgoing_message_id($message_id); + } else { + $this->logger->logger('Could not resend '.isset($this->outgoing_messages[$message_id]['_']) ? $this->outgoing_messages[$message_id]['_'] : $message_id); + } + } + if (!$postpone) { + if ($datacenter) { + $this->API->datacenter->sockets[$datacenter]->writer->resume(); + } else { + $this->writer->resume(); + } + } + } + + /** + * Synchronous wrapper for method_call. + * + * @param string $method Method name + * @param array $args Arguments + * @param array $aargs Additional arguments + * + * @return array + */ + public function method_call(string $method, $args = [], array $aargs = ['msg_id' => null]) + { + return $this->wait($this->method_call_async_read($method, $args, $aargs)); + } + + /** + * Call method and wait asynchronously for response. + * + * If the $aargs['noResponse'] is true, will not wait for a response. + * + * @param string $method Method name + * @param array $args Arguments + * @param array $aargs Additional arguments + * + * @return Promise + */ + public function method_call_async_read(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise + { + $deferred = new Deferred(); + $this->method_call_async_write($method, $args, $aargs)->onResolve(function ($e, $read_deferred) use ($deferred) { + if ($e) { + $deferred->fail($e); + } else { + if (\is_array($read_deferred)) { + $read_deferred = \array_map(function ($value) { + return $value->promise(); + }, $read_deferred); + $deferred->resolve(all($read_deferred)); + } else { + $deferred->resolve($read_deferred->promise()); + } + } + }); + + return ($aargs['noResponse'] ?? false) ? new Success() : $deferred->promise(); + } + + /** + * Call method and make sure it is asynchronously sent. + * + * @param string $method Method name + * @param array $args Arguments + * @param array $aargs Additional arguments + * + * @return Promise + */ + public function method_call_async_write(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise + { + return $this->call($this->method_call_async_write_generator($method, $args, $aargs)); + } + + /** + * Call method and make sure it is asynchronously sent (generator). + * + * @param string $method Method name + * @param array $args Arguments + * @param array $aargs Additional arguments + * + * @return Generator + */ + public function method_call_async_write_generator(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator + { + if (\is_array($args) + && isset($args['id']['_']) + && isset($args['id']['dc_id']) + && $args['id']['_'] === 'inputBotInlineMessageID' + && $this->datacenter !== $args['id']['dc_id'] + ) { + return yield $this->API->datacenter->getConnection($args['id']['dc_id'])->method_call_async_write_generator($method, $args, $aargs); + } + if ($aargs['file'] ?? false && !$this->getCtx()->isMedia() && $this->API->datacenter->has($this->datacenter.'_media')) { + $this->logger->logger('Using media DC'); + return yield $this->API->datacenter->getConnection($this->datacenter.'_media')->method_call_async_write_generator($method, $args, $aargs); + } + if (\in_array($method, ['messages.setEncryptedTyping', 'messages.readEncryptedHistory', 'messages.sendEncrypted', 'messages.sendEncryptedFile', 'messages.sendEncryptedService', 'messages.receivedQueue'])) { + $aargs['queue'] = 'secret'; + } + + if (\is_array($args)) { + if (isset($args['multiple'])) { + $aargs['multiple'] = true; + } + if (isset($args['message']) && \is_string($args['message']) && \mb_strlen($args['message'], 'UTF-8') > $this->API->config['message_length_max'] && \mb_strlen((yield $this->parse_mode_async($args))['message'], 'UTF-8') > $this->API->config['message_length_max']) { + $args = yield $this->split_to_chunks_async($args); + $promises = []; + $aargs['queue'] = $method; + $aargs['multiple'] = true; + } + if (isset($aargs['multiple'])) { + $new_aargs = $aargs; + $new_aargs['postpone'] = true; + unset($new_aargs['multiple']); + + if (isset($args['multiple'])) { + unset($args['multiple']); + } + foreach ($args as $single_args) { + $promises[] = $this->method_call_async_write($method, $single_args, $new_aargs); + } + + if (!isset($aargs['postpone'])) { + $this->writer->resume(); + } + + return yield all($promises); + } + $args = yield $this->botAPI_to_MTProto_async($args); + if (isset($args['ping_id']) && \is_int($args['ping_id'])) { + $args['ping_id'] = $this->pack_signed_long($args['ping_id']); + } + } + + $deferred = new Deferred(); + $message = \array_merge( + $aargs, + [ + '_' => $method, + 'type' => $this->methods->find_by_method($method)['type'], + 'content_related' => $this->content_related($method), + 'promise' => $deferred, + 'method' => true, + 'unencrypted' => $this->shared->hasAuthKey() && \strpos($method, '.') === false + ] + ); + + if (\is_object($args) && $args instanceof Parameters) { + $message['body'] = yield $args->fetchParameters(); + } else { + $message['body'] = $args; + } + + if (($method === 'users.getUsers' && $args === ['id' => [['_' => 'inputUserSelf']]]) || $method === 'auth.exportAuthorization' || $method === 'updates.getDifference') { + $message['user_related'] = true; + } + $aargs['postpone'] = $aargs['postpone'] ?? false; + $deferred = yield $this->sendMessage($message, !$aargs['postpone']); + + $this->checker->resume(); + + return $deferred; + } + + /** + * Send object and make sure it is asynchronously sent (generator). + * + * @param string $object Object name + * @param array $args Arguments + * @param array $aargs Additional arguments + * + * @return Promise + */ + public function object_call_async(string $object, $args = [], array $aargs = ['msg_id' => null]): Promise + { + $message = ['_' => $object, 'body' => $args, 'content_related' => $this->content_related($object), 'unencrypted' => $this->shared->hasAuthKey(), 'method' => false]; + if (isset($aargs['promise'])) { + $message['promise'] = $aargs['promise']; + } + + return $this->sendMessage($message, isset($aargs['postpone']) ? !$aargs['postpone'] : true); + } +} diff --git a/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php b/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php index e876f742..5de1fffa 100644 --- a/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php @@ -385,7 +385,6 @@ trait ResponseHandler return; case 303: - $old_datacenter = $datacenter; $this->API->datacenter->curdc = $datacenter = (int) \preg_replace('/[^0-9]+/', '', $response['error_message']); if (isset($request['file']) && $request['file'] && isset($this->API->datacenter->sockets[$datacenter.'_media'])) { @@ -396,8 +395,8 @@ trait ResponseHandler if (isset($request['user_related']) && $request['user_related']) { $this->settings['connection_settings']['default_dc'] = $this->API->authorized_dc = $this->API->datacenter->curdc; } - Loop::defer([$this->API, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter]); - //$this->API->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]); + Loop::defer([$this->API, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]); + //$this->API->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); return; case 401: diff --git a/src/danog/MadelineProto/MTProtoSession/Session.php b/src/danog/MadelineProto/MTProtoSession/Session.php index e2982cbf..f5365ce6 100644 --- a/src/danog/MadelineProto/MTProtoSession/Session.php +++ b/src/danog/MadelineProto/MTProtoSession/Session.php @@ -28,6 +28,7 @@ abstract class Session use ResponseHandler; use SaltHandler; use SeqNoHandler; + use CallHandler; public $incoming_messages = []; public $outgoing_messages = []; @@ -41,4 +42,6 @@ abstract class Session public $call_queue = []; public $ack_queue = []; + + } diff --git a/src/danog/MadelineProto/MTProtoTools/CallHandler.php b/src/danog/MadelineProto/MTProtoTools/CallHandler.php deleted file mode 100644 index cccab965..00000000 --- a/src/danog/MadelineProto/MTProtoTools/CallHandler.php +++ /dev/null @@ -1,218 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2016-2019 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - * - * @link https://docs.madelineproto.xyz MadelineProto documentation - */ - -namespace danog\MadelineProto\MTProtoTools; - -use Amp\Deferred; -use Amp\Promise; -use danog\MadelineProto\Async\Parameters; -use function Amp\Promise\all; - -/** - * Manages method and object calls. - */ -trait CallHandler -{ - public function method_recall($watcherId, $args) - { - $message_id = $args['message_id']; - $new_datacenter = $args['datacenter']; - $old_datacenter = $new_datacenter; - if (isset($args['old_datacenter'])) { - $old_datacenter = $args['old_datacenter']; - } - $postpone = false; - if (isset($args['postpone'])) { - $postpone = $args['postpone']; - } - - if (isset($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['container'])) { - $message_ids = $this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['container']; - } else { - $message_ids = [$message_id]; - } - - foreach ($message_ids as $message_id) { - if (isset($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['body'])) { - $this->callFork($this->datacenter->sockets[$new_datacenter]->sendMessage($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id], false)); - $this->ack_outgoing_message_id($message_id, $old_datacenter); - $this->got_response_for_outgoing_message_id($message_id, $old_datacenter); - } else { - $this->logger->logger('Could not resend '.isset($this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['_']) ? $this->datacenter->sockets[$old_datacenter]->outgoing_messages[$message_id]['_'] : $message_id); - } - } - if (!$postpone) { - $this->datacenter->sockets[$new_datacenter]->writer->resume(); - } - } - - public function method_call($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false]) - { - return $this->wait($this->method_call_async_read($method, $args, $aargs)); - } - - public function method_call_async_read($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false]) - { - $deferred = new Deferred(); - $this->method_call_async_write($method, $args, $aargs)->onResolve(function ($e, $read_deferred) use ($deferred) { - if ($e) { - $deferred->fail($e); - } else { - if (is_array($read_deferred)) { - $read_deferred = array_map(function ($value) { - return $value->promise(); - }, $read_deferred); - $deferred->resolve(all($read_deferred)); - } else { - $deferred->resolve($read_deferred->promise()); - } - } - }); - - return isset($aargs['noResponse']) && $aargs['noResponse'] ? 0 : $deferred->promise(); - } - - public function method_call_async_write($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false]): Promise - { - return $this->call($this->method_call_async_write_generator($method, $args, $aargs)); - } - - public function method_call_async_write_generator($method, $args = [], $aargs = ['msg_id' => null, 'heavy' => false]): \Generator - { - if (is_array($args) && isset($args['id']['_']) && isset($args['id']['dc_id']) && $args['id']['_'] === 'inputBotInlineMessageID') { - $aargs['datacenter'] = $args['id']['dc_id']; - } - if ($this->wrapper instanceof \danog\MadelineProto\API && isset($this->wrapper->session) && !is_null($this->wrapper->session) && time() - $this->wrapper->serialized > $this->settings['serialization']['serialization_interval'] && !$this->asyncInitPromise) { - $this->logger->logger("Didn't serialize in a while, doing that now..."); - $this->wrapper->serialize($this->wrapper->session); - } - if (isset($aargs['file']) && $aargs['file'] && isset($this->datacenter->sockets[$aargs['datacenter'].'_media'])) { - $this->logger->logger('Using media DC'); - $aargs['datacenter'] .= '_media'; - } - if (in_array($method, ['messages.setEncryptedTyping', 'messages.readEncryptedHistory', 'messages.sendEncrypted', 'messages.sendEncryptedFile', 'messages.sendEncryptedService', 'messages.receivedQueue'])) { - $aargs['queue'] = 'secret'; - } - - if (is_array($args)) { - if (isset($args['multiple'])) { - $aargs['multiple'] = true; - } - if (isset($args['message']) && is_string($args['message']) && mb_strlen($args['message'], 'UTF-8') > $this->config['message_length_max'] && mb_strlen((yield $this->parse_mode_async($args))['message'], 'UTF-8') > $this->config['message_length_max']) { - $args = yield $this->split_to_chunks_async($args); - $promises = []; - $aargs['queue'] = $method; - $aargs['multiple'] = true; - } - if (isset($aargs['multiple'])) { - $new_aargs = $aargs; - $new_aargs['postpone'] = true; - unset($new_aargs['multiple']); - - if (isset($args['multiple'])) { - unset($args['multiple']); - } - foreach ($args as $single_args) { - $promises[] = $this->method_call_async_write($method, $single_args, $new_aargs); - } - - if (!isset($aargs['postpone'])) { - $this->datacenter->sockets[$aargs['datacenter']]->writer->resume(); - } - - return yield all($promises); - } - $args = yield $this->botAPI_to_MTProto_async($args); - if (isset($args['ping_id']) && is_int($args['ping_id'])) { - $args['ping_id'] = $this->pack_signed_long($args['ping_id']); - } - } - - $deferred = new Deferred(); - $message = ['_' => $method, 'type' => $this->methods->find_by_method($method)['type'], 'content_related' => $this->content_related($method), 'promise' => $deferred, 'method' => true, 'unencrypted' => $this->datacenter->sockets[$aargs['datacenter']]->temp_auth_key === null && strpos($method, '.') === false]; - - if (is_object($args) && $args instanceof Parameters) { - $message['body'] = yield $args->fetchParameters(); - } else { - $message['body'] = $args; - } - - if (isset($aargs['msg_id'])) { - $message['msg_id'] = $aargs['msg_id']; - } - if (isset($aargs['queue'])) { - $message['queue'] = $aargs['queue']; - } - if (isset($aargs['file'])) { - $message['file'] = $aargs['file']; - } - if (isset($aargs['botAPI'])) { - $message['botAPI'] = $aargs['botAPI']; - } - if (isset($aargs['FloodWaitLimit'])) { - $message['FloodWaitLimit'] = $aargs['FloodWaitLimit']; - } - if (($method === 'users.getUsers' && $args === ['id' => [['_' => 'inputUserSelf']]]) || $method === 'auth.exportAuthorization' || $method === 'updates.getDifference') { - $message['user_related'] = true; - } - - $deferred = yield $this->datacenter->sockets[$aargs['datacenter']]->sendMessage($message, isset($aargs['postpone']) ? !$aargs['postpone'] : true); - - $this->datacenter->sockets[$aargs['datacenter']]->checker->resume(); - - return $deferred; - } - - public function object_call_async($object, $args = [], $aargs = ['msg_id' => null, 'heavy' => false]) - { - $message = ['_' => $object, 'body' => $args, 'content_related' => $this->content_related($object), 'unencrypted' => $this->datacenter->sockets[$aargs['datacenter']]->temp_auth_key === null, 'method' => false]; - if (isset($aargs['promise'])) { - $message['promise'] = $aargs['promise']; - } - - return $this->datacenter->sockets[$aargs['datacenter']]->sendMessage($message, isset($aargs['postpone']) ? !$aargs['postpone'] : true); - } - - /* -$message = [ -// only in outgoing messages -'body' => deserialized body, (optional if container) -'serialized_body' => 'serialized body', (optional if container) -'content_related' => bool, -'_' => 'predicate', -'promise' => deferred promise that gets resolved when a response to the message is received (optional), -'send_promise' => deferred promise that gets resolved when the message is sent (optional), -'file' => bool (optional), -'type' => 'type' (optional), -'queue' => queue ID (optional), -'container' => [message ids] (optional), - -// only in incoming messages -'content' => deserialized body, -'seq_no' => number (optional), -'from_container' => bool (optional), - -// can be present in both -'response' => message id (optional), -'msg_id' => message id (optional), -'sent' => timestamp, -'tries' => number -]; - */ -}