From 4923eec9cd58e22f3dd16b9e579ccf57883ecbad Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Thu, 9 Jul 2020 18:23:16 +0200 Subject: [PATCH] 2GB file limit, async RPC reporting, file reference and MTProto fixes --- composer.json | 3 +- examples/bot.php | 1 - src/danog/MadelineProto/APIWrapper.php | 6 +- src/danog/MadelineProto/Connection.php | 3 +- .../MadelineProto/DataCenterConnection.php | 2 +- .../Loop/Connection/CheckLoop.php | 9 +- .../Loop/Connection/ReadLoop.php | 2 +- .../Loop/Connection/WriteLoop.php | 143 +++++++++++------- src/danog/MadelineProto/MTProto.php | 50 +++++- .../MTProtoSession/AckHandler.php | 44 ++++-- .../MTProtoSession/CallHandler.php | 4 +- .../MadelineProto/MTProtoSession/Reliable.php | 65 ++++++++ .../MTProtoSession/ResponseHandler.php | 27 ---- .../MadelineProto/MTProtoSession/Session.php | 1 + .../MadelineProto/MTProtoTools/Files.php | 10 +- .../MTProtoTools/GarbageCollector.php | 4 +- src/danog/MadelineProto/RPCErrorException.php | 11 +- 17 files changed, 262 insertions(+), 123 deletions(-) create mode 100644 src/danog/MadelineProto/MTProtoSession/Reliable.php diff --git a/composer.json b/composer.json index 1dab7b8d..4a62900a 100644 --- a/composer.json +++ b/composer.json @@ -53,7 +53,8 @@ "danog/7to5": "^1", "vimeo/psalm": "dev-master", "phpstan/phpstan": "^0.12.14", - "friendsofphp/php-cs-fixer": "^2.16" + "friendsofphp/php-cs-fixer": "^2.16", + "squizlabs/php_codesniffer": "^3.5" }, "suggest": { "ext-libtgvoip": "Install the php-libtgvoip extension to make phone calls (https://github.com/danog/php-libtgvoip)" diff --git a/examples/bot.php b/examples/bot.php index 0aba7268..d63751dd 100755 --- a/examples/bot.php +++ b/examples/bot.php @@ -84,7 +84,6 @@ class MyEventHandler extends EventHandler if (isset($update['message']['media']) && $update['message']['media']['_'] !== 'messageMediaGame') { yield $this->messages->sendMedia(['peer' => $update, 'message' => $update['message']['message'], 'media' => $update]); } - yield $this->ping(['multiple' => true] + \array_map(fn ($v) => ['ping_id' => $v], \range(0, 1020))); } } $settings = [ diff --git a/src/danog/MadelineProto/APIWrapper.php b/src/danog/MadelineProto/APIWrapper.php index c8093d27..c254311a 100644 --- a/src/danog/MadelineProto/APIWrapper.php +++ b/src/danog/MadelineProto/APIWrapper.php @@ -184,9 +184,9 @@ final class APIWrapper } $this->serialized = \time(); $realpaths = Serialization::realpaths($this->session); - //Logger::log('Waiting for exclusive lock of serialization lockfile...'); + Logger::log('Waiting for exclusive lock of serialization lockfile...'); $unlock = yield Tools::flock($realpaths['lockfile'], LOCK_EX); - //Logger::log('Lock acquired, serializing'); + Logger::log('Lock acquired, serializing'); try { if (!$this->gettingApiId) { $update_closure = $this->API->settings['updates']['callback']; @@ -207,7 +207,7 @@ final class APIWrapper } $unlock(); } - //Logger::log('Done serializing'); + Logger::log('Done serializing'); return $wrote; })()); } diff --git a/src/danog/MadelineProto/Connection.php b/src/danog/MadelineProto/Connection.php index dacafeea..22a6bd77 100644 --- a/src/danog/MadelineProto/Connection.php +++ b/src/danog/MadelineProto/Connection.php @@ -409,8 +409,7 @@ class Connection extends Session $deferred = new Deferred(); if (!isset($message['serialized_body'])) { $body = \is_object($message['body']) ? yield from $message['body'] : $message['body']; - $refreshNext = isset($message['refreshNext']) && $message['refreshNext']; - //$refreshNext = true; + $refreshNext = $message['refreshReferences'] ?? false; if ($refreshNext) { $this->API->referenceDatabase->refreshNext(true); } diff --git a/src/danog/MadelineProto/DataCenterConnection.php b/src/danog/MadelineProto/DataCenterConnection.php index e25b5e7f..96a737f4 100644 --- a/src/danog/MadelineProto/DataCenterConnection.php +++ b/src/danog/MadelineProto/DataCenterConnection.php @@ -374,7 +374,7 @@ class DataCenterConnection implements JsonSerializable $this->decWrite = self::WRITE_WEIGHT; if ($id === -1 || !isset($this->connections[$id])) { if ($this->connections) { - $this->API->logger("Already connected!", Logger::WARNING); + $this->API->logger->logger("Already connected!", Logger::WARNING); return; } yield from $this->connectMore($count); diff --git a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php index cd7ce296..00434738 100644 --- a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php @@ -131,14 +131,17 @@ class CheckLoop extends ResumableSignalLoop $API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.$message_id.' received by server and was already sent, requesting reply...', \danog\MadelineProto\Logger::ERROR); $reply[] = $message_id; } else { - $API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.$message_id.' received by server, requesting reply...', \danog\MadelineProto\Logger::ERROR); + $API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.$message_id.' received by server, waiting...', \danog\MadelineProto\Logger::ERROR); $reply[] = $message_id; } } } + /* if ($reply) { - \danog\MadelineProto\Tools::callFork($connection->objectCall('msg_resend_ans_req', ['msg_ids' => $reply], ['postpone' => true])); - } + $deferred= new Deferred; + $deferred->promise()->onResolve(fn($e, $res) => var_dump(ord($res['info'][0]))); + \danog\MadelineProto\Tools::callFork($connection->objectCall('msg_resend_req', ['msg_ids' => $reply], ['postpone' => true, 'promise' => $deferred])); + }*/ $connection->flush(); }); $list = ''; diff --git a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php index 76a39bcb..ddb0d801 100644 --- a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php @@ -106,7 +106,7 @@ class ReadLoop extends SignalLoop yield from $connection->reconnect(); } elseif ($error === -429) { $API->logger->logger("Got -429 from DC {$datacenter}", Logger::WARNING); - yield Tools::sleep(1); + yield Tools::sleep(3); yield from $connection->reconnect(); } else { yield from $connection->reconnect(); diff --git a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php index 7248d741..b34352ae 100644 --- a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php @@ -20,6 +20,7 @@ namespace danog\MadelineProto\Loop\Connection; use Amp\ByteStream\StreamException; +use Amp\Loop; use danog\MadelineProto\Connection; use danog\MadelineProto\Logger; use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; @@ -33,6 +34,10 @@ use danog\MadelineProto\Tools; */ class WriteLoop extends ResumableSignalLoop { + const MAX_COUNT = 1020; + const MAX_SIZE = 1 << 15; + const MAX_IDS = 8192; + /** * Connection instance. * @@ -157,37 +162,22 @@ class WriteLoop extends ResumableSignalLoop if ($shared->isHttp() && empty($connection->pending_outgoing)) { return; } - $temporary_keys = []; - if (\count($to_ack = $connection->ack_queue)) { - foreach (\array_chunk($connection->ack_queue, 8192) as $acks) { - $connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'msgs_ack', 'serialized_body' => yield from $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'msgs_ack', 'msg_ids' => $acks], 'msgs_ack'), 'contentRelated' => false, 'unencrypted' => false, 'method' => false]; - $temporary_keys[$connection->pending_outgoing_key] = true; - $API->logger->logger("Adding msgs_ack {$connection->pending_outgoing_key}", Logger::ULTRA_VERBOSE); - $connection->pending_outgoing_key++; - } - } - $has_http_wait = false; + + \ksort($connection->pending_outgoing); + $messages = []; $keys = []; - if ($shared->isHttp()) { - foreach ($connection->pending_outgoing as $message) { - if ($message['_'] === 'http_wait') { - $has_http_wait = true; - break; - } - } - if (!$has_http_wait) { - $API->logger->logger("Adding http_wait {$connection->pending_outgoing_key}", Logger::ULTRA_VERBOSE); - $connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'http_wait', 'serialized_body' => yield from $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'http_wait', 'max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'http_wait'), 'contentRelated' => true, 'unencrypted' => false, 'method' => true]; - $temporary_keys[$connection->pending_outgoing_key] = true; - $connection->pending_outgoing_key++; - } - } + $total_length = 0; $count = 0; - \ksort($connection->pending_outgoing); $skipped = false; $inited = false; + + $has_seq = false; + + $has_state = false; + $has_resend = false; + $has_http_wait = false; foreach ($connection->pending_outgoing as $k => $message) { if ($message['unencrypted']) { continue; @@ -201,15 +191,42 @@ class WriteLoop extends ResumableSignalLoop $skipped = true; continue; } + if ($message['_'] === 'http_wait') { + $has_http_wait = true; + } + if ($message['_'] === 'msgs_state_req') { + if ($has_state) { + $API->logger->logger("Already have a state request queued for the current container in DC {$datacenter}"); + continue; + } + $has_state = true; + } + if ($message['_'] === 'msg_resend_req') { + if ($has_resend) { + $API->logger->logger("Already have a resend request queued for the current container in DC {$datacenter}"); + continue; + } + $has_resend = true; + } + $body_length = \strlen($message['serialized_body']); $actual_length = $body_length + 32; if ($total_length && $total_length + $actual_length > 32760 || $count >= 1020) { $API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::ULTRA_VERBOSE); break; } - $message_id = isset($message['msg_id']) ? $message['msg_id'] : $connection->msgIdHandler->generateMessageId(); + if (isset($message['seqno'])) { + $has_seq = true; + } + + $message_id = $message['msg_id'] ?? $connection->msgIdHandler->generateMessageId(); $API->logger->logger("Sending {$message['_']} as encrypted message to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); - $MTmessage = ['_' => 'MTmessage', 'msg_id' => $message_id, 'body' => $message['serialized_body'], 'seqno' => $connection->generateOutSeqNo($message['contentRelated'])]; + $MTmessage = [ + '_' => 'MTmessage', + 'msg_id' => $message_id, + 'body' => $message['serialized_body'], + 'seqno' => $message['seqno'] ?? $connection->generateOutSeqNo($message['contentRelated']) + ]; if (isset($message['method']) && $message['method'] && $message['_'] !== 'http_wait') { if (!$shared->getTempAuthKey()->isInited() && $message['_'] !== 'auth.bindTempAuthKey' && !$inited) { $inited = true; @@ -229,13 +246,14 @@ class WriteLoop extends ResumableSignalLoop } } // TODO - /* if ($API->settings['requests']['gzip_encode_if_gt'] !== -1 && ($l = strlen($MTmessage['body'])) > $API->settings['requests']['gzip_encode_if_gt']) { - if (($g = strlen($gzipped = gzencode($MTmessage['body']))) < $l) { - $MTmessage['body'] = yield $API->getTL()->serializeObject(['type' => ''], ['_' => 'gzip_packed', 'packed_data' => $gzipped], 'gzipped data'); - $API->logger->logger('Using GZIP compression for ' . $message['_'] . ', saved ' . ($l - $g) . ' bytes of data, reduced call size by ' . $g * 100 / $l . '%', \danog\MadelineProto\Logger::ULTRA_VERBOSE); - } - unset($gzipped); - }*/ + /* + if ($API->settings['requests']['gzip_encode_if_gt'] !== -1 && ($l = strlen($MTmessage['body'])) > $API->settings['requests']['gzip_encode_if_gt']) { + if (($g = strlen($gzipped = gzencode($MTmessage['body']))) < $l) { + $MTmessage['body'] = yield $API->getTL()->serializeObject(['type' => ''], ['_' => 'gzip_packed', 'packed_data' => $gzipped], 'gzipped data'); + $API->logger->logger('Using GZIP compression for ' . $message['_'] . ', saved ' . ($l - $g) . ' bytes of data, reduced call size by ' . $g * 100 / $l . '%', \danog\MadelineProto\Logger::ULTRA_VERBOSE); + } + unset($gzipped); + }*/ } } $body_length = \strlen($MTmessage['body']); @@ -249,20 +267,45 @@ class WriteLoop extends ResumableSignalLoop $MTmessage['bytes'] = $body_length; $messages[] = $MTmessage; $keys[$k] = $message_id; - } - if ($shared->isHttp() && $skipped && $count === \count($temporary_keys)) { - foreach ($temporary_keys as $key => $true) { - $API->logger->logger("Removing temporary {$connection->pending_outgoing[$key]['_']} by {$key}", Logger::ULTRA_VERBOSE); - unset($connection->pending_outgoing[$key]); - $count--; - } + + $connection->pending_outgoing[$k]['seqno'] = $MTmessage['seqno']; + $connection->pending_outgoing[$k]['msg_id'] = $MTmessage['msg_id']; } $MTmessage = null; - if ($count > 1) { + + $acks = \array_slice($connection->ack_queue, 0, self::MAX_COUNT); + if ($ackCount = \count($acks)) { + $API->logger->logger("Adding msgs_ack", Logger::ERROR); + + $body = yield from $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'msgs_ack', 'msg_ids' => $acks], 'msgs_ack'); + $messages []= [ + '_' => 'MTmessage', + 'msg_id' => $connection->msgIdHandler->generateMessageId(), + 'body' => $body, + 'seqno' => $connection->generateOutSeqNo(false), + 'bytes' => \strlen($body) + ]; + $count++; + unset($acks, $body); + } + if ($shared->isHttp() && !$has_http_wait) { + $API->logger->logger("Adding http_wait", Logger::ULTRA_VERBOSE); + $body = yield from $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'http_wait', 'max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'http_wait'); + $messages []= [ + '_' => 'MTmessage', + 'msg_id' => $connection->msgIdHandler->generateMessageId(), + 'body' => $body, + 'seqno' => $connection->generateOutSeqNo(true), + 'bytes' => \strlen($body) + ]; + $count++; + unset($body); + } + + if ($count > 1 || $has_seq) { $API->logger->logger("Wrapping in msg_container ({$count} messages of total size {$total_length}) as encrypted message for DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $message_id = $connection->msgIdHandler->generateMessageId(); $connection->pending_outgoing[$connection->pending_outgoing_key] = ['_' => 'msg_container', 'container' => \array_values($keys), 'contentRelated' => false, 'method' => false, 'unencrypted' => false]; - //var_dumP("container ".bin2hex($message_id)); $keys[$connection->pending_outgoing_key++] = $message_id; $message_data = (yield from $API->getTL()->serializeObject(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container')); $message_data_length = \strlen($message_data); @@ -287,31 +330,29 @@ class WriteLoop extends ResumableSignalLoop $message_key = \substr(\hash('sha256', \substr($shared->getTempAuthKey()->getAuthKey(), 88, 32).$plaintext.$padding, true), 8, 16); list($aes_key, $aes_iv) = Crypt::aesCalculate($message_key, $shared->getTempAuthKey()->getAuthKey()); $message = $shared->getTempAuthKey()->getID().$message_key.Crypt::igeEncrypt($plaintext.$padding, $aes_key, $aes_iv); - $buffer = yield $connection->stream->getWriteBuffer($len = \strlen($message)); - //$t = \microtime(true); + $buffer = yield $connection->stream->getWriteBuffer(\strlen($message)); yield $buffer->bufferWrite($message); $connection->httpSent(); $API->logger->logger("Sent encrypted payload to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $sent = \time(); - if ($to_ack) { - $connection->ack_queue = []; + + if ($ackCount) { + $connection->ack_queue = \array_slice($connection->ack_queue, $ackCount); } + foreach ($keys as $key => $message_id) { $connection->outgoing_messages[$message_id] =& $connection->pending_outgoing[$key]; if (isset($connection->outgoing_messages[$message_id]['promise'])) { $connection->new_outgoing[$message_id] = $message_id; $connection->outgoing_messages[$message_id]['sent'] = $sent; - $connection->outgoing_messages[$message_id]['tries'] = 0; } if (isset($connection->outgoing_messages[$message_id]['send_promise'])) { $connection->outgoing_messages[$message_id]['send_promise']->resolve(isset($connection->outgoing_messages[$message_id]['promise']) ? $connection->outgoing_messages[$message_id]['promise'] : true); unset($connection->outgoing_messages[$message_id]['send_promise']); } - //var_dumP("encrypted ".bin2hex($message_id)." ".$connection->outgoing_messages[$message_id]['_']); unset($connection->pending_outgoing[$key]); } - //if (!empty($connection->pending_outgoing)) $connection->select(); - } while (!empty($connection->pending_outgoing) && !$skipped); + } while ($connection->pending_outgoing && !$skipped); if (empty($connection->pending_outgoing)) { $connection->pending_outgoing_key = 'a'; } diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index bd352528..31cc670e 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -91,7 +91,7 @@ class MTProto extends AsyncConstruct implements TLCallback * * @var int */ - const V = 141; + const V = 143; /** * String release version. * @@ -378,6 +378,12 @@ class MTProto extends AsyncConstruct implements TLCallback * @var PeriodicLoop */ private $serializeLoop; + /** + * RPC reporting loop. + * + * @var PeriodicLoop + */ + private $rpcLoop; /** * Feeder loops. * @@ -702,11 +708,15 @@ class MTProto extends AsyncConstruct implements TLCallback if (!$this->configLoop) { $this->configLoop = new PeriodicLoop($this, [$this, 'getConfig'], 'config', 24 * 3600); } + if (!$this->rpcLoop) { + $this->rpcLoop = new PeriodicLoop($this, [$this, 'rpcReport'], 'config', 60); + } $this->callCheckerLoop->start(); $this->serializeLoop->start(); $this->phoneConfigLoop->start(); $this->configLoop->start(); $this->checkTosLoop->start(); + $this->rpcLoop->start(); } /** * Stop all internal loops. @@ -735,6 +745,33 @@ class MTProto extends AsyncConstruct implements TLCallback $this->checkTosLoop->signal(true); $this->checkTosLoop = null; } + if ($this->rpcLoop) { + $this->rpcLoop->signal(true); + $this->rpcLoop = null; + } + } + /** + * Report RPC errors. + * + * @internal + * + * @return \Generator + */ + public function rpcReport(): \Generator + { + $toReport = RPCErrorException::$toReport; + RPCErrorException::$toReport = []; + foreach ($toReport as [$method, $code, $error, $time]) { + try { + $res = \json_decode(yield from $this->fileGetContents('https://rpc.pwrtelegram.xyz/?method='.$method.'&code='.$code.'&error='.$error.'&t='.$time), true); + if (isset($res['ok']) && $res['ok'] && isset($res['result'])) { + $description = $res['result']; + RPCErrorException::$descriptions[$error] = $description; + RPCErrorException::$errorMethodMap[$code][$method][$error] = $error; + } + } catch (\Throwable $e) { + } + } } /** * Clean up properties from previous versions of MadelineProto. @@ -909,7 +946,7 @@ class MTProto extends AsyncConstruct implements TLCallback yield from $this->updateSettings($backtrace['args'][1], false); } } - if (($this->settings['tl_schema']['src']['botAPI'] ?? '') !== __DIR__.'/../../../schemas/TL_botAPI.tl') { + if (($this->settings['tl_schema']['src']['botAPI'] ?? '') !== __DIR__.'/TL_botAPI.tl') { unset($this->v); } if (!\file_exists($this->settings['tl_schema']['src']['telegram'])) { @@ -944,8 +981,6 @@ class MTProto extends AsyncConstruct implements TLCallback $this->startLoops(); if (yield from $this->fullGetSelf()) { $this->authorized = self::LOGGED_IN; - } - if ($this->authorized === self::LOGGED_IN) { yield from $this->getCdnConfig($this->datacenter->curdc); $this->setupLogger(); } @@ -1194,6 +1229,8 @@ class MTProto extends AsyncConstruct implements TLCallback 'ipv6' => Magic::$ipv6, // decides whether to use ipv6, ipv6 attribute of API attribute of API class contains autodetected boolean 'timeout' => 2, + // RPC timeout + 'drop_timeout' => 5*60, // timeout for sockets 'proxy' => Magic::$altervista ? '\\HttpProxy' : '\\Socket', // The proxy class to use @@ -1667,6 +1704,8 @@ class MTProto extends AsyncConstruct implements TLCallback } $this->config = empty($config) ? yield from $this->methodCallAsyncRead('help.getConfig', $config, $options ?: ['datacenter' => $this->settings['connection_settings']['default_dc']]) : $config; yield from $this->parseConfig(); + $this->logger->logger(Lang::$current_lang['config_updated'], Logger::NOTICE); + $this->logger->logger($this->config, Logger::NOTICE); return $this->config; } /** @@ -1681,8 +1720,6 @@ class MTProto extends AsyncConstruct implements TLCallback unset($this->config['dc_options']); yield from $this->parseDcOptions($options); } - $this->logger->logger(Lang::$current_lang['config_updated'], Logger::NOTICE); - $this->logger->logger($this->config, Logger::NOTICE); } /** * Parse DC options from config. @@ -1705,7 +1742,6 @@ class MTProto extends AsyncConstruct implements TLCallback } $id .= $dc['media_only'] ? '_media' : ''; $ipv6 = $dc['ipv6'] ? 'ipv6' : 'ipv4'; - //$id .= isset($this->settings['connection'][$test][$ipv6][$id]) && $this->settings['connection'][$test][$ipv6][$id]['ip_address'] != $dc['ip_address'] ? '_bk' : ''; if (\is_numeric($id)) { $id = (int) $id; } diff --git a/src/danog/MadelineProto/MTProtoSession/AckHandler.php b/src/danog/MadelineProto/MTProtoSession/AckHandler.php index e92543a3..224ad793 100644 --- a/src/danog/MadelineProto/MTProtoSession/AckHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/AckHandler.php @@ -19,11 +19,22 @@ namespace danog\MadelineProto\MTProtoSession; +use Amp\Deferred; +use Amp\Promise; +use danog\MadelineProto\Loop\Connection\WriteLoop; + /** * Manages acknowledgement of messages. */ trait AckHandler { + /** + * Acknowledge outgoing message ID + * + * @param string|int $message_id Message Id + * + * @return boolean + */ public function ackOutgoingMessageId($message_id): bool { // The server acknowledges that it received my message @@ -31,16 +42,15 @@ trait AckHandler $this->logger->logger("WARNING: Couldn't find message id ".$message_id.' in the array of outgoing messages. Maybe try to increase its size?', \danog\MadelineProto\Logger::WARNING); return false; } - //$this->logger->logger("Ack-ed ".$this->outgoing_messages[$message_id]['_']." with message ID $message_id on DC $datacenter"); - /* - if (isset($this->outgoing_messages[$message_id]['body'])) { - unset($this->outgoing_messages[$message_id]['body']); - } - if (isset($this->new_outgoing[$message_id])) { - unset($this->new_outgoing[$message_id]); - }*/ return true; } + /** + * We have gotten response for outgoing message ID + * + * @param string|int $message_id Message ID + * + * @return boolean + */ public function gotResponseForOutgoingMessageId($message_id): bool { // The server acknowledges that it received my message @@ -59,19 +69,23 @@ trait AckHandler } return true; } + /** + * Acknowledge incoming message ID + * + * @param string|int $message_id Message ID + * + * @return boolean + */ public function ackIncomingMessageId($message_id): bool { // I let the server know that I received its message if (!isset($this->incoming_messages[$message_id])) { $this->logger->logger("WARNING: Couldn't find message id ".$message_id.' in the array of incoming messages. Maybe try to increase its size?', \danog\MadelineProto\Logger::WARNING); } - /*if ($this->temp_auth_key['id'] === null || $this->temp_auth_key['id'] === "\0\0\0\0\0\0\0\0") { - // || (isset($this->incoming_messages[$message_id]['ack']) && $this->incoming_messages[$message_id]['ack'])) { - return; - }*/ $this->ack_queue[$message_id] = $message_id; return true; } + /** * Check if there are some pending calls. * @@ -103,6 +117,7 @@ trait AckHandler public function getPendingCalls(): array { $settings = $this->shared->getSettings(); + $dropTimeout = $settings['drop_timeout']; $timeout = $settings['timeout']; $pfs = $settings['pfs']; $unencrypted = !$this->shared->hasTempAuthKey(); @@ -118,6 +133,11 @@ trait AckHandler unset($this->new_outgoing[$k], $this->outgoing_messages[$message_id]); continue; } + if ($this->outgoing_messages[$message_id]['sent'] + $dropTimeout < \time()) { + $this->gotResponseForOutgoingMessageId($message_id); + $this->handleReject($this->outgoing_messages[$message_id], new \danog\MadelineProto\Exception("Request timeout")); + continue; + } $result[] = $message_id; } } diff --git a/src/danog/MadelineProto/MTProtoSession/CallHandler.php b/src/danog/MadelineProto/MTProtoSession/CallHandler.php index a8c7dfad..2103540f 100644 --- a/src/danog/MadelineProto/MTProtoSession/CallHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/CallHandler.php @@ -56,8 +56,8 @@ trait CallHandler } else { Tools::callFork($this->sendMessage($this->outgoing_messages[$message_id], false)); } - $this->ackOutgoingMessageId($message_id); - $this->gotResponseForOutgoingMessageId($message_id); + //$this->ackOutgoingMessageId($message_id); + //$this->gotResponseForOutgoingMessageId($message_id); } else { $this->logger->logger('Could not resend '.(isset($this->outgoing_messages[$message_id]['_']) ? $this->outgoing_messages[$message_id]['_'] : $message_id)); } diff --git a/src/danog/MadelineProto/MTProtoSession/Reliable.php b/src/danog/MadelineProto/MTProtoSession/Reliable.php new file mode 100644 index 00000000..cc267c06 --- /dev/null +++ b/src/danog/MadelineProto/MTProtoSession/Reliable.php @@ -0,0 +1,65 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2020 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\MTProtoSession; + +use Amp\Loop; +use danog\MadelineProto\Logger; +use danog\MadelineProto\MTProto; + +/** + * Manages responses. + */ +trait Reliable +{ + /** + * Send state info for message IDs + * + * @param string|int $req_msg_id Message ID of msgs_state_req that initiated this + * @param array $msg_ids Message IDs to send info about + * + * @return \Generator + */ + public function sendMsgsStateInfo($req_msg_id, array $msg_ids): \Generator + { + $this->logger->logger('Sending state info for '.\count($msg_ids).' message IDs'); + $info = ''; + foreach ($msg_ids as $msg_id) { + $cur_info = 0; + if (!isset($this->incoming_messages[$msg_id])) { + $msg_id = new \tgseclib\Math\BigInteger(\strrev($msg_id), 256); + if ((new \tgseclib\Math\BigInteger(\time() + $this->time_delta + 30))->bitwise_leftShift(32)->compare($msg_id) < 0) { + $this->logger->logger("Do not know anything about {$msg_id} and it is too big"); + $cur_info |= 3; + } elseif ((new \tgseclib\Math\BigInteger(\time() + $this->time_delta - 300))->bitwise_leftShift(32)->compare($msg_id) > 0) { + $this->logger->logger("Do not know anything about {$msg_id} and it is too small"); + $cur_info |= 1; + } else { + $this->logger->logger("Do not know anything about {$msg_id}"); + $cur_info |= 2; + } + } else { + $this->logger->logger("Know about {$msg_id}"); + $cur_info |= 4; + } + $info .= \chr($cur_info); + } + $this->outgoing_messages[yield from $this->objectCall('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['postpone' => true])]['response'] = $req_msg_id; + } +} \ No newline at end of file diff --git a/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php b/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php index f1457745..4250d745 100644 --- a/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php @@ -28,32 +28,6 @@ use danog\MadelineProto\MTProto; */ trait ResponseHandler { - public function sendMsgsStateInfo($req_msg_id, $msg_ids): \Generator - { - $this->logger->logger('Sending state info for '.\count($msg_ids).' message IDs'); - $info = ''; - foreach ($msg_ids as $msg_id) { - $cur_info = 0; - if (!isset($this->incoming_messages[$msg_id])) { - $msg_id = new \tgseclib\Math\BigInteger(\strrev($msg_id), 256); - if ((new \tgseclib\Math\BigInteger(\time() + $this->time_delta + 30))->bitwise_leftShift(32)->compare($msg_id) < 0) { - $this->logger->logger("Do not know anything about {$msg_id} and it is too small"); - $cur_info |= 3; - } elseif ((new \tgseclib\Math\BigInteger(\time() + $this->time_delta - 300))->bitwise_leftShift(32)->compare($msg_id) > 0) { - $this->logger->logger("Do not know anything about {$msg_id} and it is too big"); - $cur_info |= 1; - } else { - $this->logger->logger("Do not know anything about {$msg_id}"); - $cur_info |= 2; - } - } else { - $this->logger->logger("Know about {$msg_id}"); - $cur_info |= 4; - } - $info .= \chr($cur_info); - } - $this->outgoing_messages[yield from $this->objectCall('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['postpone' => true])]['response'] = $req_msg_id; - } public $n = 0; public function handleMessages() { @@ -271,7 +245,6 @@ trait ResponseHandler if ($this->pending_outgoing) { $this->writer->resume(); } - //$this->n--; return $only_updates; } /** diff --git a/src/danog/MadelineProto/MTProtoSession/Session.php b/src/danog/MadelineProto/MTProtoSession/Session.php index d98fe0da..5dab9e6e 100644 --- a/src/danog/MadelineProto/MTProtoSession/Session.php +++ b/src/danog/MadelineProto/MTProtoSession/Session.php @@ -28,6 +28,7 @@ abstract class Session use ResponseHandler; use SeqNoHandler; use CallHandler; + use Reliable; /** * Incoming message array. * diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index a64d89fa..74f8658c 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -96,7 +96,7 @@ trait Files } StatCacheAsync::clear($file); $size = (yield statAsync($file))['size']; - if ($size > 512 * 1024 * 3000) { + if ($size > 512 * 1024 * 4000) { throw new \danog\MadelineProto\Exception('Given file is too big!'); } $stream = yield open($file, 'rb'); @@ -127,7 +127,7 @@ trait Files /** @var $response \Amp\Http\Client\Response */ $request = new Request($url); $request->setTransferTimeout(10 * 1000 * 3600); - $request->setBodySizeLimit(512 * 1024 * 3000); + $request->setBodySizeLimit(512 * 1024 * 4000); $response = yield $this->datacenter->getHTTPClient()->request($request); if (200 !== ($status = $response->getStatus())) { throw new Exception("Wrong status code: {$status} ".$response->getReason()); @@ -269,7 +269,7 @@ trait Files $datacenter .= '_media'; } $part_size = $this->settings['upload']['part_size']; - $parallel_chunks = $this->settings['upload']['parallel_chunks'] ? $this->settings['upload']['parallel_chunks'] : 3000; + $parallel_chunks = $this->settings['upload']['parallel_chunks'] ? $this->settings['upload']['parallel_chunks'] : 4000; $part_total_num = (int) \ceil($size / $part_size); $part_num = 0; $method = $size > 10 * 1024 * 1024 ? 'upload.saveBigFilePart' : 'upload.saveFilePart'; @@ -1234,7 +1234,7 @@ trait Files $end = $messageMedia['size']; } $part_size = $part_size ?? $this->settings['download']['part_size']; - $parallel_chunks = $this->settings['download']['parallel_chunks'] ? $this->settings['download']['parallel_chunks'] : 3000; + $parallel_chunks = $this->settings['download']['parallel_chunks'] ? $this->settings['download']['parallel_chunks'] : 4000; $datacenter = isset($messageMedia['InputFileLocation']['dc_id']) ? $messageMedia['InputFileLocation']['dc_id'] : $this->settings['connection_settings']['default_dc']; if ($this->datacenter->has($datacenter.'_media')) { $datacenter .= '_media'; @@ -1257,7 +1257,7 @@ trait Files } $params = []; $start_at = $offset % $part_size; - $probable_end = $end !== -1 ? $end : 512 * 1024 * 3000; + $probable_end = $end !== -1 ? $end : 512 * 1024 * 4000; $breakOut = false; for ($x = $offset - $start_at; $x < $probable_end; $x += $part_size) { $end_at = $part_size; diff --git a/src/danog/MadelineProto/MTProtoTools/GarbageCollector.php b/src/danog/MadelineProto/MTProtoTools/GarbageCollector.php index 1f3463ed..1e84ae13 100644 --- a/src/danog/MadelineProto/MTProtoTools/GarbageCollector.php +++ b/src/danog/MadelineProto/MTProtoTools/GarbageCollector.php @@ -45,7 +45,7 @@ class GarbageCollector \gc_collect_cycles(); static::$memoryConsumption = static::getMemoryConsumption(); $cleanedMemory = $currentMemory - static::$memoryConsumption; - Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::NOTICE); + Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::VERBOSE); } }); } @@ -53,7 +53,7 @@ class GarbageCollector private static function getMemoryConsumption(): int { $memory = \round(\memory_get_usage()/1024/1024, 1); - Logger::log("Memory consumption: $memory Mb", Logger::VERBOSE); + Logger::log("Memory consumption: $memory Mb", Logger::ULTRA_VERBOSE); return (int) $memory; } } diff --git a/src/danog/MadelineProto/RPCErrorException.php b/src/danog/MadelineProto/RPCErrorException.php index 6d4eb228..60c0d939 100644 --- a/src/danog/MadelineProto/RPCErrorException.php +++ b/src/danog/MadelineProto/RPCErrorException.php @@ -29,6 +29,7 @@ class RPCErrorException extends \Exception private $fetched = false; public static $descriptions = ['RPC_MCGET_FAIL' => 'Telegram is having internal issues, please try again later.', 'RPC_CALL_FAIL' => 'Telegram is having internal issues, please try again later.', 'USER_PRIVACY_RESTRICTED' => "The user's privacy settings do not allow you to do this", 'CHANNEL_PRIVATE' => "You haven't joined this channel/supergroup", 'USER_IS_BOT' => "Bots can't send messages to other bots", 'BOT_METHOD_INVALID' => 'This method cannot be run by a bot', 'PHONE_CODE_EXPIRED' => 'The phone code you provided has expired, this may happen if it was sent to any chat on telegram (if the code is sent through a telegram chat (not the official account) to avoid it append or prepend to the code some chars)', 'USERNAME_INVALID' => 'The provided username is not valid', 'ACCESS_TOKEN_INVALID' => 'The provided token is not valid', 'ACTIVE_USER_REQUIRED' => 'The method is only available to already activated users', 'FIRSTNAME_INVALID' => 'The first name is invalid', 'LASTNAME_INVALID' => 'The last name is invalid', 'PHONE_NUMBER_INVALID' => 'The phone number is invalid', 'PHONE_CODE_HASH_EMPTY' => 'phone_code_hash is missing', 'PHONE_CODE_EMPTY' => 'phone_code is missing', 'API_ID_INVALID' => 'The api_id/api_hash combination is invalid', 'PHONE_NUMBER_OCCUPIED' => 'The phone number is already in use', 'PHONE_NUMBER_UNOCCUPIED' => 'The phone number is not yet being used', 'USERS_TOO_FEW' => 'Not enough users (to create a chat, for example)', 'USERS_TOO_MUCH' => 'The maximum number of users has been exceeded (to create a chat, for example)', 'TYPE_CONSTRUCTOR_INVALID' => 'The type constructor is invalid', 'FILE_PART_INVALID' => 'The file part number is invalid', 'FILE_PARTS_INVALID' => 'The number of file parts is invalid', 'MD5_CHECKSUM_INVALID' => 'The MD5 checksums do not match', 'PHOTO_INVALID_DIMENSIONS' => 'The photo dimensions are invalid', 'FIELD_NAME_INVALID' => 'The field with the name FIELD_NAME is invalid', 'FIELD_NAME_EMPTY' => 'The field with the name FIELD_NAME is missing', 'MSG_WAIT_FAILED' => 'A waiting call returned an error', 'USERNAME_NOT_OCCUPIED' => 'The provided username is not occupied', 'PHONE_NUMBER_BANNED' => 'The provided phone number is banned from telegram', 'AUTH_KEY_UNREGISTERED' => 'The authorization key has expired', 'INVITE_HASH_EXPIRED' => 'The invite link has expired', 'USER_DEACTIVATED' => 'The user was deactivated', 'USER_ALREADY_PARTICIPANT' => 'The user is already in the group', 'MESSAGE_ID_INVALID' => 'The provided message id is invalid', 'PEER_ID_INVALID' => 'The provided peer id is invalid', 'CHAT_ID_INVALID' => 'The provided chat id is invalid', 'MESSAGE_DELETE_FORBIDDEN' => "You can't delete one of the messages you tried to delete, most likely because it is a service message.", 'CHAT_ADMIN_REQUIRED' => 'You must be an admin in this chat to do this', -429 => 'Too many requests', 'PEER_FLOOD' => "You are spamreported, you can't do this"]; public static $errorMethodMap = []; + public static $toReport = []; private $caller = ''; public static function localizeMessage($method, int $code, string $error) { @@ -38,12 +39,12 @@ class RPCErrorException extends \Exception $error = \preg_replace('/\\d+$/', "X", $error); $description = self::$descriptions[$error] ?? ''; if (!isset(self::$errorMethodMap[$code][$method][$error]) || !isset(self::$descriptions[$error]) || $code === 500) { - $res = \json_decode(@\file_get_contents('https://rpc.pwrtelegram.xyz/?method='.$method.'&code='.$code.'&error='.$error, false, \stream_context_create(['http' => ['timeout' => 3]])), true); - if (isset($res['ok']) && $res['ok'] && isset($res['result'])) { - $description = $res['result']; - self::$descriptions[$error] = $description; - self::$errorMethodMap[$code][$method][$error] = $error; + if (\count(self::$toReport) > 100) { + self::$toReport = \array_slice(self::$toReport, -100); } + self::$toReport []= [ + $method, $code, $error, time() + ]; } if (!$description) { return $error;