diff --git a/.gitmodules b/.gitmodules index 2aef1e0c..0816d508 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,12 +1,12 @@ [submodule "docs"] path = docs - url = git@github.com:danog/MadelineProtoDocs + url = https://github.com/danog/MadelineProtoDocs.git [submodule "examples/magnaluna"] path = examples/magnaluna - url = git@github.com:danog/magnaluna + url = https://github.com/danog/magnaluna.git [submodule "examples/pipesbot"] path = examples/pipesbot - url = git@github.com:danog/pipesbot + url = https://github.com/danog/pipesbot.git [submodule "schemas"] path = schemas - url = git@github.com:danog/schemas + url = https://github.com/danog/schemas.git diff --git a/composer.json b/composer.json index daf717d0..b4bcc079 100644 --- a/composer.json +++ b/composer.json @@ -27,6 +27,7 @@ "amphp/dns": "^1", "amphp/byte-stream": "^1", "amphp/file": "^1", + "amphp/mysql": "^2.0", "danog/dns-over-https": "^0.2", "amphp/http-client-cookies": "^1", "danog/tg-file-decoder": "^0.1", @@ -80,6 +81,9 @@ "url": "https://github.com/danog/dns" }], "scripts": { + "post-autoload-dump": [ + "git submodule init && git submodule update" + ], "build": [ "@docs", "@cs-fix", diff --git a/examples/bot.php b/examples/bot.php index 2e50fa4a..5e9cea43 100755 --- a/examples/bot.php +++ b/examples/bot.php @@ -82,17 +82,9 @@ class MyEventHandler extends EventHandler return; } $res = \json_encode($update, JSON_PRETTY_PRINT); - try { - yield $this->messages->sendMessage(['peer' => $update, 'message' => "$res", 'reply_to_msg_id' => isset($update['message']['id']) ? $update['message']['id'] : null, 'parse_mode' => 'HTML']); - if (isset($update['message']['media']) && $update['message']['media']['_'] !== 'messageMediaGame') { - yield $this->messages->sendMedia(['peer' => $update, 'message' => $update['message']['message'], 'media' => $update]); - } - } catch (RPCErrorException $e) { - $this->report("Surfaced: $e"); - } catch (Exception $e) { - if (\stripos($e->getMessage(), 'invalid constructor given') === false) { - $this->report("Surfaced: $e"); - } + yield $this->messages->sendMessage(['peer' => $update, 'message' => "$res", 'reply_to_msg_id' => isset($update['message']['id']) ? $update['message']['id'] : null, 'parse_mode' => 'HTML']); + if (isset($update['message']['media']) && $update['message']['media']['_'] !== 'messageMediaGame') { + yield $this->messages->sendMedia(['peer' => $update, 'message' => $update['message']['message'], 'media' => $update]); } } } diff --git a/src/danog/MadelineProto/API.php b/src/danog/MadelineProto/API.php index e3e4f8c1..0919250c 100644 --- a/src/danog/MadelineProto/API.php +++ b/src/danog/MadelineProto/API.php @@ -41,7 +41,7 @@ class API extends InternalDoc /** * Instance of MadelineProto. * - * @var ?MTProto + * @var null|MTProto */ public $API; @@ -66,7 +66,7 @@ class API extends InternalDoc * * @internal * - * @var ?MyTelegramOrgWrapper + * @var null|MyTelegramOrgWrapper */ private $myTelegramOrgWrapper; diff --git a/src/danog/MadelineProto/Db/ArrayCacheTrait.php b/src/danog/MadelineProto/Db/ArrayCacheTrait.php new file mode 100644 index 00000000..20abdb9c --- /dev/null +++ b/src/danog/MadelineProto/Db/ArrayCacheTrait.php @@ -0,0 +1,92 @@ + mixed, + * 'ttl' => int + * ], + * ... + * ] + * @var array + */ + protected array $cache = []; + protected string $ttl = '+5 minutes'; + private string $ttlCheckInterval = '+1 minute'; + + protected function getCache(string $key, $default = null) + { + $cacheItem = $this->cache[$key] ?? null; + $result = $default; + + if (\is_array($cacheItem)) { + $result = $cacheItem['value']; + $this->cache[$key]['ttl'] = strtotime($this->ttl); + } + + return $result; + } + + /** + * Save item in cache + * + * @param string $key + * @param $value + */ + protected function setCache(string $key, $value): void + { + $this->cache[$key] = [ + 'value' => $value, + 'ttl' => strtotime($this->ttl), + ]; + } + + /** + * Remove key from cache + * + * @param string $key + */ + protected function unsetCache(string $key): void + { + unset($this->cache[$key]); + } + + protected function startCacheCleanupLoop(): void + { + Loop::repeat(strtotime($this->ttlCheckInterval, 0) * 1000, fn() => $this->cleanupCache()); + } + + /** + * Remove all keys from cache + */ + protected function cleanupCache(): void + { + $now = time(); + $oldKeys = []; + foreach ($this->cache as $cacheKey => $cacheValue) { + if ($cacheValue['ttl'] < $now) { + $oldKeys[] = $cacheKey; + } + } + foreach ($oldKeys as $oldKey) { + $this->unsetCache($oldKey); + } + + Logger::log( + sprintf( + "cache for table:%s; keys left: %s; keys removed: %s", + $this->table, \count($this->cache), \count($oldKeys) + ), + Logger::VERBOSE + ); + } + +} \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/DbArray.php b/src/danog/MadelineProto/Db/DbArray.php new file mode 100644 index 00000000..b86385d8 --- /dev/null +++ b/src/danog/MadelineProto/Db/DbArray.php @@ -0,0 +1,28 @@ + + * + * @uses \danog\MadelineProto\Db\MemoryArray + * @uses \danog\MadelineProto\Db\SharedMemoryArray + * @uses \danog\MadelineProto\Db\MysqlArray + */ + public static function get(array $dbSettings, string $namePrefix, string $propertyType, string $name, $value = null): Promise + { + $class = __NAMESPACE__; + + switch (strtolower($dbSettings['type'])) { + case 'memory': + $class .= '\Memory'; + break; + case 'mysql': + $class .= '\Mysql'; + break; + default: + throw new \InvalidArgumentException("Unknown dbType: {$dbSettings['type']}"); + + } + + /** @var DbType $class */ + switch (strtolower($propertyType)){ + case 'array': + $class .= 'Array'; + break; + default: + throw new \InvalidArgumentException("Unknown $propertyType: {$propertyType}"); + } + + return $class::getInstance($name, $value, $namePrefix, $dbSettings[$dbSettings['type']]??[]); + } + +} \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/DbPropertiesTrait.php b/src/danog/MadelineProto/Db/DbPropertiesTrait.php new file mode 100644 index 00000000..4c5a6f33 --- /dev/null +++ b/src/danog/MadelineProto/Db/DbPropertiesTrait.php @@ -0,0 +1,52 @@ +dbProperies)) { + throw new \LogicException(__CLASS__ . ' must have a $dbProperies'); + } + $dbSettings = $MadelineProto->settings['db']; + $prefix = static::getSessionId($MadelineProto); + + foreach ($this->dbProperies as $property => $type) { + if ($reset) { + unset($this->{$property}); + } else { + $this->{$property} = yield DbPropertiesFabric::get($dbSettings, $prefix, $type, $property, $this->{$property}); + } + } + + if (!$reset && yield $this->usernames->count() === 0) { + $this->logger('Filling database cache. This can take few minutes.', Logger::WARNING); + $iterator = $this->chats->getIterator(); + while (yield $iterator->advance()) { + [$id, $chat] = $iterator->getCurrent(); + if (isset($chat['username'])) { + $this->usernames[\strtolower($chat['username'])] = $this->getId($chat); + } + } + $this->logger('Cache filled.', Logger::WARNING); + } + } + + private static function getSessionId(MTProto $madelineProto): string + { + $result = $madelineProto->getSelf()['id'] ?? null; + if (!$result) { + $result = 'tmp_'; + $result .= str_replace('0','', spl_object_hash($madelineProto)); + } + + $className = explode('\\',__CLASS__); + $result .= '_' . end($className); + return $result; + } +} \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/DbType.php b/src/danog/MadelineProto/Db/DbType.php new file mode 100644 index 00000000..19f15e04 --- /dev/null +++ b/src/danog/MadelineProto/Db/DbType.php @@ -0,0 +1,18 @@ + + */ + static function getInstance(string $name, $value = null, string $tablePrefix = '', array $settings = []): Promise; +} \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/MemoryArray.php b/src/danog/MadelineProto/Db/MemoryArray.php new file mode 100644 index 00000000..8a97116f --- /dev/null +++ b/src/danog/MadelineProto/Db/MemoryArray.php @@ -0,0 +1,69 @@ +getArrayCopy(); + } + return new static($value); + }); + } + + public function offsetExists($offset) + { + throw new \RuntimeException('Native isset not support promises. Use isset method'); + } + + public function isset($key): Promise + { + return call(fn() => parent::offsetExists($key)); + } + + public function offsetGet($offset): Promise + { + return call(fn() => parent::offsetExists($offset) ? parent::offsetGet($offset) : null); + } + + public function offsetUnset($offset): Promise + { + return call(fn() => parent::offsetUnset($offset)); + } + + public function count(): Promise + { + return call(fn() => parent::count()); + } + + public function getArrayCopy(): Promise + { + return call(fn() => parent::getArrayCopy()); + } + + public function getIterator(): Producer + { + return new Producer(function (callable $emit) { + foreach ($this as $key => $value) { + yield $emit([$key, $value]); + } + }); + } +} \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/Mysql.php b/src/danog/MadelineProto/Db/Mysql.php new file mode 100644 index 00000000..75d5618e --- /dev/null +++ b/src/danog/MadelineProto/Db/Mysql.php @@ -0,0 +1,82 @@ +getDatabase(); + $connection = pool($config->withDatabase(null)); + yield $connection->query(" + CREATE DATABASE IF NOT EXISTS `{$db}` + CHARACTER SET 'utf8mb4' + COLLATE 'utf8mb4_general_ci' + "); + $connection->close(); + } catch (\Throwable $e) { + Logger::log($e->getMessage(), Logger::ERROR); + } + })); + + } + +} \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/MysqlArray.php b/src/danog/MadelineProto/Db/MysqlArray.php new file mode 100644 index 00000000..e29e4657 --- /dev/null +++ b/src/danog/MadelineProto/Db/MysqlArray.php @@ -0,0 +1,373 @@ + $this->table, + 'settings' => $this->settings + ]; + } + + public function __unserialize($data): void + { + foreach ($data as $property => $value) { + $this->{$property} = $value; + } + try { + $this->db = static::getDbConnection($this->settings); + } catch (\Throwable $e) { + Logger::log($e->getMessage(), Logger::ERROR); + } + + } + + /** + * @param string $name + * @param DbArray|array|null $value + * @param string $tablePrefix + * @param array $settings + * + * @return Promise + */ + public static function getInstance(string $name, $value = null, string $tablePrefix = '', array $settings = []): Promise + { + $tableName = "{$tablePrefix}_{$name}"; + if ($value instanceof self && $value->table === $tableName) { + $instance = &$value; + } else { + $instance = new static(); + $instance->table = $tableName; + } + + $instance->settings = $settings; + $instance->db = static::getDbConnection($settings); + $instance->ttl = $settings['cache_ttl'] ?? $instance->ttl; + + $instance->startCacheCleanupLoop(); + + return call(static function() use($instance, $value) { + yield from $instance->prepareTable(); + + //Skip migrations if its same object + if ($instance !== $value) { + yield from static::renameTmpTable($instance, $value); + yield from static::migrateDataToDb($instance, $value); + } + + return $instance; + }); + } + + /** + * @param MysqlArray $instance + * @param DbArray|array|null $value + * + * @return \Generator + */ + private static function renameTmpTable(MysqlArray $instance, $value): \Generator + { + if ($value instanceof static && $value->table) { + if ( + $value->table !== $instance->table && + mb_strpos($instance->table, 'tmp') !== 0 + ) { + yield from $instance->renameTable($value->table, $instance->table); + } else { + $instance->table = $value->table; + } + } + } + + /** + * @param MysqlArray $instance + * @param DbArray|array|null $value + * + * @return \Generator + * @throws \Throwable + */ + private static function migrateDataToDb(MysqlArray $instance, $value): \Generator + { + if (!empty($value) && !$value instanceof MysqlArray) { + Logger::log('Converting database.', Logger::ERROR); + + if ($value instanceof DbArray) { + $value = yield $value->getArrayCopy(); + } else { + $value = (array) $value; + } + $counter = 0; + $total = count($value); + foreach ($value as $key => $item) { + $counter++; + if ($counter % 500 === 0) { + yield $instance->offsetSet($key, $item); + Logger::log("Loading data to table {$instance->table}: $counter/$total", Logger::WARNING); + } else { + $instance->offsetSet($key, $item); + } + + } + Logger::log('Converting database done.', Logger::ERROR); + } + } + + public function offsetExists($index): bool + { + throw new \RuntimeException('Native isset not support promises. Use isset method'); + } + + /** + * Check if key isset + * + * @param $key + * + * @return Promise true if the offset exists, otherwise false + */ + public function isset($key): Promise + { + return call(fn() => yield $this->offsetGet($key) !== null); + } + + + public function offsetGet($offset): Promise + { + return call(function() use($offset) { + if ($cached = $this->getCache($offset)) { + return $cached; + } + + $row = yield $this->request( + "SELECT `value` FROM `{$this->table}` WHERE `key` = :index LIMIT 1", + ['index' => $offset] + ); + + if ($value = $this->getValue($row)) { + $this->setCache($offset, $value); + } + + return $value; + }); + } + + /** + * Set value for an offset + * + * @link https://php.net/manual/en/arrayiterator.offsetset.php + * + * @param string $index

+ * The index to set for. + *

+ * @param $value + * + * @throws \Throwable + */ + + public function offsetSet($index, $value): Promise + { + if ($this->getCache($index) === $value) { + return call(fn()=>null); + } + + $this->setCache($index, $value); + + $request = $this->request(" + INSERT INTO `{$this->table}` + SET `key` = :index, `value` = :value + ON DUPLICATE KEY UPDATE `value` = :value + ", + [ + 'index' => $index, + 'value' => serialize($value), + ] + ); + + //Ensure that cache is synced with latest insert in case of concurrent requests. + $request->onResolve(fn() => $this->setCache($index, $value)); + + return $request; + } + + /** + * Unset value for an offset + * + * @link https://php.net/manual/en/arrayiterator.offsetunset.php + * + * @param string $index

+ * The offset to unset. + *

+ * + * @return Promise + * @throws \Throwable + */ + public function offsetUnset($index): Promise + { + $this->unsetCache($index); + + return $this->request(" + DELETE FROM `{$this->table}` + WHERE `key` = :index + ", + ['index' => $index] + ); + } + + /** + * Get array copy + * + * @return Promise + * @throws \Throwable + */ + public function getArrayCopy(): Promise + { + return call(function(){ + $iterator = $this->getIterator(); + $result = []; + while (yield $iterator->advance()) { + [$key, $value] = $iterator->getCurrent(); + $result[$key] = $value; + } + return $result; + }); + } + + public function getIterator(): Producer + { + return new Producer(function (callable $emit) { + $request = yield $this->db->execute("SELECT `key`, `value` FROM `{$this->table}`"); + + while (yield $request->advance()) { + $row = $request->getCurrent(); + yield $emit([$row['key'], $this->getValue($row)]); + } + }); + } + + /** + * Count elements + * + * @link https://php.net/manual/en/arrayiterator.count.php + * @return Promise The number of elements or public properties in the associated + * array or object, respectively. + * @throws \Throwable + */ + public function count(): Promise + { + return call(function(){ + $row = yield $this->request("SELECT count(`key`) as `count` FROM `{$this->table}`"); + return $row[0]['count'] ?? 0; + }); + } + + private function getValue(array $row) + { + if ($row) { + if (!empty($row[0]['value'])) { + $row = reset($row); + } + return unserialize($row['value']); + } + return null; + } + + public static function getDbConnection(array $settings): Pool + { + return Mysql::getConnection( + $settings['host'], + $settings['port'], + $settings['user'], + $settings['password'], + $settings['database'], + $settings['max_connections'], + $settings['idle_timeout'] + ); + } + + /** + * Create table for property + * + * @return array|null + * @throws \Throwable + */ + private function prepareTable() + { + Logger::log("Creating/checking table {$this->table}", Logger::WARNING); + return yield $this->request(" + CREATE TABLE IF NOT EXISTS `{$this->table}` + ( + `key` VARCHAR(255) NOT NULL, + `value` MEDIUMBLOB NULL, + `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`key`) + ) + ENGINE = InnoDB + CHARACTER SET 'utf8mb4' + COLLATE 'utf8mb4_general_ci' + "); + } + + private function renameTable(string $from, string $to) + { + Logger::log("Renaming table {$from} to {$to}", Logger::WARNING); + yield $this->request(" + ALTER TABLE `{$from}` RENAME TO `{$to}`; + "); + + yield $this->request(" + DROP TABLE IF EXISTS `{$from}`; + "); + } + + /** + * Perform async request to db + * + * @param string $query + * @param array $params + * + * @return Promise + * @throws \Throwable + */ + private function request(string $query, array $params = []): Promise + { + return call(function() use($query, $params) { + + Logger::log([$query, $params], Logger::VERBOSE); + + if (empty($this->db) || !$this->db->isAlive()) { + Logger::log('No database connection', Logger::WARNING); + return []; + } + + try { + $request = yield $this->db->execute($query, $params); + } catch (\Throwable $e) { + Logger::log($e->getMessage(), Logger::ERROR); + return []; + } + + $result = []; + if ($request instanceof ResultSet) { + while (yield $request->advance()) { + $result[] = $request->getCurrent(); + } + } + return $result; + }); + } +} \ No newline at end of file diff --git a/src/danog/MadelineProto/InternalDoc.php b/src/danog/MadelineProto/InternalDoc.php index b14eac10..d49a4120 100644 --- a/src/danog/MadelineProto/InternalDoc.php +++ b/src/danog/MadelineProto/InternalDoc.php @@ -4285,11 +4285,12 @@ class InternalDoc extends APIFactory * * @param array $user User info * - * @return void + * @return \Generator + * @throws Exception */ - public function addUser(array $user): void + public function addUser(array $user): \Generator { - $this->API->addUser($user); + yield from $this->API->addUser($user); } /** * Call promise $b after promise $a. @@ -4754,11 +4755,11 @@ class InternalDoc extends APIFactory * * @param mixed $id Chat ID * - * @return integer + * @return \Generator */ - public function fullChatLastUpdated($id): int + public function fullChatLastUpdated($id): \Generator { - return $this->API->fullChatLastUpdated($id); + return yield from $this->API->fullChatLastUpdated($id); } /** * Get info about the logged-in user, not cached. diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index 256a17f5..aeb69538 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -22,12 +22,17 @@ namespace danog\MadelineProto; use Amp\Dns\Resolver; use Amp\File\StatCache; use Amp\Http\Client\HttpClient; -use Amp\Loop; +use Amp\Promise; use danog\MadelineProto\Async\AsyncConstruct; +use danog\MadelineProto\Db\DbArray; +use danog\MadelineProto\Db\DbPropertiesFabric; +use danog\MadelineProto\Db\DbPropertiesTrait; +use danog\MadelineProto\Db\Mysql; use danog\MadelineProto\Loop\Generic\PeriodicLoop; use danog\MadelineProto\Loop\Update\FeedLoop; use danog\MadelineProto\Loop\Update\SeqLoop; use danog\MadelineProto\Loop\Update\UpdateLoop; +use danog\MadelineProto\MTProtoTools\GarbageCollector; use danog\MadelineProto\MTProtoTools\CombinedUpdatesState; use danog\MadelineProto\MTProtoTools\MinDatabase; use danog\MadelineProto\MTProtoTools\ReferenceDatabase; @@ -68,6 +73,7 @@ class MTProto extends AsyncConstruct implements TLCallback use \danog\MadelineProto\Wrappers\Start; use \danog\MadelineProto\Wrappers\Templates; use \danog\MadelineProto\Wrappers\TOS; + use DbPropertiesTrait; /** * Old internal version of MadelineProto. * @@ -85,7 +91,7 @@ class MTProto extends AsyncConstruct implements TLCallback * * @var int */ - const V = 138; + const V = 139; /** * String release version. * @@ -206,7 +212,7 @@ class MTProto extends AsyncConstruct implements TLCallback /** * Instance of wrapper API. * - * @var ?APIWrapper + * @var null|APIWrapper */ public $wrapper; /** @@ -278,15 +284,22 @@ class MTProto extends AsyncConstruct implements TLCallback /** * Internal peer database. * - * @var array + * @var DbArray */ - public $chats = []; + public $chats; + + /** + * Cache of usernames for chats + * + * @var DbArray|Promise[] + */ + public $usernames; /** * Cached parameters for fetching channel participants. * - * @var array + * @var DbArray|Promise[] */ - public $channel_participants = []; + public $channel_participants; /** * When we last stored data in remote peer database (now doesn't exist anymore). * @@ -302,9 +315,9 @@ class MTProto extends AsyncConstruct implements TLCallback /** * Full chat info database. * - * @var array + * @var DbArray|Promise[] */ - public $full_chats = []; + public $full_chats; /** * Latest chat message ID map for update handling. * @@ -407,6 +420,19 @@ class MTProto extends AsyncConstruct implements TLCallback * @var \danog\MadelineProto\TL\TL */ private $TL; + + /** + * List of properties stored in database (memory or external) + * @see DbPropertiesFabric + * @var array + */ + protected array $dbProperies = [ + 'chats' => 'array', + 'full_chats' => 'array', + 'channel_participants' => 'array', + 'usernames' => 'array', + ]; + /** * Constructor function. * @@ -431,7 +457,7 @@ class MTProto extends AsyncConstruct implements TLCallback // Parse and store settings yield from $this->updateSettings($settings, false); $this->logger->logger(Lang::$current_lang['inst_dc'], Logger::ULTRA_VERBOSE); - $this->cleanupProperties(); + yield from $this->cleanupProperties(); // Load rsa keys $this->logger->logger(Lang::$current_lang['load_rsa'], Logger::ULTRA_VERBOSE); $this->rsa_keys = []; @@ -465,6 +491,8 @@ class MTProto extends AsyncConstruct implements TLCallback yield from $this->getConfig([], ['datacenter' => $this->datacenter->curdc]); $this->startUpdateSystem(true); $this->v = self::V; + + GarbageCollector::start(); } /** * Sleep function. @@ -483,6 +511,7 @@ class MTProto extends AsyncConstruct implements TLCallback 'referenceDatabase', 'minDatabase', 'channel_participants', + 'usernames', // Misc caching 'dialog_params', @@ -538,6 +567,7 @@ class MTProto extends AsyncConstruct implements TLCallback 'reportDest' ]; } + /** * Cleanup memory and session file. * @@ -749,11 +779,18 @@ class MTProto extends AsyncConstruct implements TLCallback } $this->TL->init($this->settings['tl_schema']['src'], $callbacks); } + + yield from $this->initDb($this); + } + /** * Upgrade MadelineProto instance. * * @return \Generator + * @throws Exception + * @throws RPCErrorException + * @throws \Throwable */ private function upgradeMadelineProto(): \Generator { @@ -775,17 +812,20 @@ class MTProto extends AsyncConstruct implements TLCallback if (isset($settings['authorization']['rsa_key'])) { unset($settings['authorization']['rsa_key']); } - if (!isset($this->full_chats)) { - $this->full_chats = []; - } + + yield from $this->initDb($this); + if (!isset($this->secret_chats)) { $this->secret_chats = []; } - foreach ($this->full_chats as $id => $full) { + $iterator = $this->full_chats->getIterator(); + while (yield $iterator->advance()) { + [$id, $full] = $iterator->getCurrent(); if (isset($full['full'], $full['last_update'])) { $this->full_chats[$id] = ['full' => $full['full'], 'last_update' => $full['last_update']]; } } + foreach ($this->secret_chats as $key => &$chat) { if (!\is_array($chat)) { unset($this->secret_chats[$key]); @@ -797,6 +837,8 @@ class MTProto extends AsyncConstruct implements TLCallback $chat['mtproto'] = 1; } } + unset($chat); + foreach ($settings['connection_settings'] as $key => &$connection) { if (\in_array($key, ['default_dc', 'media_socket_count', 'robin_period'])) { continue; @@ -819,6 +861,8 @@ class MTProto extends AsyncConstruct implements TLCallback $connection['obfuscated'] = true; } } + unset($connection); + $this->resetMTProtoSession(true, true); $this->config = ['expires' => -1]; $this->dh_config = ['version' => 0]; @@ -883,7 +927,7 @@ class MTProto extends AsyncConstruct implements TLCallback $force = true; } // Cleanup old properties, init new stuffs - $this->cleanupProperties(); + yield from $this->cleanupProperties(); // Update TL callbacks $callbacks = [$this, $this->referenceDatabase]; if (!($this->authorization['user']['bot'] ?? false)) { @@ -921,6 +965,8 @@ class MTProto extends AsyncConstruct implements TLCallback yield $this->updaters[false]->resume(); } $this->updaters[false]->start(); + + GarbageCollector::start(); } /** * Unreference instance, allowing destruction. @@ -1240,6 +1286,25 @@ class MTProto extends AsyncConstruct implements TLCallback 'run_callback' => true, ], 'secret_chats' => ['accept_chats' => true], 'serialization' => ['serialization_interval' => 30, 'cleanup_before_serialization' => false], + /** + * Where internal database will be stored? + * memory - session file + * mysql - mysql database + */ + 'db' => [ + 'type' => 'memory', + /** @see Mysql */ + 'mysql' => [ + 'host' => '127.0.0.1', + 'port' => 3306, + 'user' => 'root', + 'password' => '', + 'database' => 'MadelineProto', //will be created automatically + 'max_connections' => 10, + 'idle_timeout' => 60, + 'cache_ttl' => '+5 minutes', //keep records in memory after last read + ] + ], 'upload' => ['allow_automatic_upload' => true, 'part_size' => 512 * 1024, 'parallel_chunks' => 20], 'download' => ['report_broken_media' => true, 'part_size' => 1024 * 1024, 'parallel_chunks' => 20], 'pwr' => [ 'pwr' => false, // Need info ? @@ -1431,9 +1496,9 @@ class MTProto extends AsyncConstruct implements TLCallback * * @internal * - * @return void + * @return \Generator */ - public function resetSession(): void + public function resetSession(): \Generator { if (isset($this->seqUpdater)) { $this->seqUpdater->signal(true); @@ -1465,13 +1530,13 @@ class MTProto extends AsyncConstruct implements TLCallback $this->authorization = null; $this->updates = []; $this->secret_chats = []; - $this->chats = []; - $this->users = []; + + yield from $this->initDb($this,true); + $this->tos = ['expires' => 0, 'accepted' => true]; $this->referenceDatabase = new ReferenceDatabase($this); $this->minDatabase = new MinDatabase($this); $this->dialog_params = ['_' => 'MadelineProto.dialogParams', 'limit' => 0, 'offset_date' => 0, 'offset_id' => 0, 'offset_peer' => ['_' => 'inputPeerEmpty'], 'count' => 0]; - $this->full_chats = []; } /** * Reset the update state and fetch all updates from the beginning. diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 670eeb53..a64d89fa 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -499,7 +499,7 @@ trait Files $cb = [$bridge, 'callback']; $read = $this->uploadFromCallable($reader, $size, $mime, '', $cb, true, $encrypted); $write = $this->downloadToCallable($media, $writer, null, true, 0, -1, $chunk_size); - list($res) = yield \danog\MadelineProto\Tools::all([$read, $write]); + [$res] = yield \danog\MadelineProto\Tools::all([$read, $write]); return $res; } @@ -645,7 +645,7 @@ trait Files */ public function getPropicInfo($data): \Generator { - return yield from $this->getDownloadInfo($this->chats[(yield from $this->getInfo($data))['bot_api_id']]); + return yield from $this->getDownloadInfo(yield $this->chats[(yield from $this->getInfo($data))['bot_api_id']]); } /** * Extract file info from bot API message. @@ -996,7 +996,7 @@ trait Files } $response = new Response($result['code'], $result['headers'], $body); - if ($result['serve']) { + if ($result['serve'] && !empty($result['headers']['Content-Length'])) { $response->setHeader('content-length', $result['headers']['Content-Length']); } @@ -1038,7 +1038,7 @@ trait Files if (\count($range) == 1) { $range[1] = ''; } - list($size_unit, $range_orig) = $range; + [$size_unit, $range_orig] = $range; if ($size_unit == 'bytes') { //multiple ranges could be specified at the same time, but for simplicity only serve the first range //http://tools.ietf.org/id/draft-ietf-http-range-retrieval-00.txt @@ -1046,7 +1046,7 @@ trait Files if (\count($list) == 1) { $list[1] = ''; } - list($range, $extra_ranges) = $list; + [$range, $extra_ranges] = $list; } else { return [ 'serve' => false, @@ -1061,9 +1061,10 @@ trait Files if (\count($listseek) == 1) { $listseek[1] = ''; } - list($seek_start, $seek_end) = $listseek; + [$seek_start, $seek_end] = $listseek; - $seek_end = empty($seek_end) ? ($messageMedia['size'] - 1) : \min(\abs(\intval($seek_end)), $messageMedia['size'] - 1); + $size = $messageMedia['size'] ?? 0; + $seek_end = empty($seek_end) ? ($size - 1) : \min(\abs(\intval($seek_end)), $size - 1); if (!empty($seek_start) && $seek_end < \abs(\intval($seek_start))) { return [ @@ -1079,12 +1080,12 @@ trait Files 'code' => Status::OK, 'headers' => [] ]; - if ($seek_start > 0 || $seek_end < $messageMedia['size'] - 1) { + if ($seek_start > 0 || $seek_end < $size - 1) { $result['code'] = Status::PARTIAL_CONTENT; - $result['headers']['Content-Range'] = "bytes ${seek_start}-${seek_end}/${messageMedia['size']}"; + $result['headers']['Content-Range'] = "bytes ${seek_start}-${seek_end}/${$size}"; $result['headers']['Content-Length'] = $seek_end - $seek_start + 1; - } else { - $result['headers']['Content-Length'] = $messageMedia['size']; + } elseif ($size > 0) { + $result['headers']['Content-Length'] = $size; } $result['headers']['Content-Type'] = $messageMedia['mime']; $result['headers']['Cache-Control'] = 'max-age=31556926'; @@ -1092,7 +1093,11 @@ trait Files $result['headers']['Accept-Ranges'] = 'bytes'; if ($result['serve']) { - $result['serve'] = [$seek_start, $seek_end + 1]; + if ($seek_start === 0 && $seek_end === -1) { + $result['serve'] = [0, -1]; + } else { + $result['serve'] = [$seek_start, $seek_end + 1]; + } } return $result; @@ -1274,7 +1279,7 @@ trait Files $time = 0; $speed = 0; $origCb = $cb; - $cb = function () use ($cb, $count, &$time, &$speed) { + $cb = static function () use ($cb, $count, &$time, &$speed) { static $cur = 0; $cur++; \danog\MadelineProto\Tools::callFork($cb($cur * 100 / $count, $time, $speed)); diff --git a/src/danog/MadelineProto/MTProtoTools/GarbageCollector.php b/src/danog/MadelineProto/MTProtoTools/GarbageCollector.php new file mode 100644 index 00000000..55ee2a4d --- /dev/null +++ b/src/danog/MadelineProto/MTProtoTools/GarbageCollector.php @@ -0,0 +1,59 @@ + static::$memoryConsumption + static::$memoryDiffMb) { + gc_collect_cycles(); + static::$memoryConsumption = static::getMemoryConsumption(); + $cleanedMemory = $currentMemory - static::$memoryConsumption; + Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::NOTICE); + } + }); + } + + private static function getMemoryConsumption(): int + { + $memory = round(memory_get_usage()/1024/1024, 1); + Logger::log("Memory consumption: $memory Mb", Logger::VERBOSE); + return (int) $memory; + } +} \ No newline at end of file diff --git a/src/danog/MadelineProto/MTProtoTools/PeerHandler.php b/src/danog/MadelineProto/MTProtoTools/PeerHandler.php index debf2f8f..461c67d4 100644 --- a/src/danog/MadelineProto/MTProtoTools/PeerHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/PeerHandler.php @@ -22,6 +22,7 @@ namespace danog\MadelineProto\MTProtoTools; use Amp\Http\Client\Request; use danog\Decoder\FileId; use danog\Decoder\PhotoSizeSource\PhotoSizeSourceDialogPhoto; +use danog\MadelineProto\Db\DbArray; use const danog\Decoder\PROFILE_PHOTO; @@ -34,6 +35,7 @@ trait PeerHandler public $caching_simple_username = []; public $caching_possible_username = []; public $caching_full_info = []; + /** * Convert MTProto channel ID to bot API channel ID. * @@ -81,26 +83,32 @@ trait PeerHandler { $this->supportUser = $support['user']['id']; } + /** * Add user info. * * @param array $user User info * - * @return void + * @return \Generator + * @throws \danog\MadelineProto\Exception */ - public function addUser(array $user): void + public function addUser(array $user): \Generator { + $existingChat = yield $this->chats[$user['id']]; + if ($existingChat) { + $this->cacheChatUsername($user['id'], $user); + } if (!isset($user['access_hash']) && !($user['min'] ?? false)) { - if (isset($this->chats[$user['id']]['access_hash']) && $this->chats[$user['id']]['access_hash']) { + if (!empty($existingChat['access_hash'])) { $this->logger->logger("No access hash with user {$user['id']}, using backup"); - $user['access_hash'] = $this->chats[$user['id']]['access_hash']; + $user['access_hash'] = $existingChat['access_hash']; } elseif (!isset($this->caching_simple[$user['id']]) && !(isset($user['username']) && isset($this->caching_simple_username[$user['username']]))) { $this->logger->logger("No access hash with user {$user['id']}, trying to fetch by ID..."); if (isset($user['username']) && !isset($this->caching_simple_username[$user['username']])) { $this->caching_possible_username[$user['id']] = $user['username']; } $this->cachePwrChat($user['id'], false, true); - } elseif (isset($user['username']) && !isset($this->chats[$user['id']]) && !isset($this->caching_simple_username[$user['username']])) { + } elseif (isset($user['username']) && !$existingChat && !isset($this->caching_simple_username[$user['username']])) { $this->logger->logger("No access hash with user {$user['id']}, trying to fetch by username..."); $this->cachePwrChat($user['username'], false, true); } else { @@ -110,18 +118,19 @@ trait PeerHandler } switch ($user['_']) { case 'user': - if (!isset($this->chats[$user['id']]) || $this->chats[$user['id']] !== $user) { + if (!$existingChat || $existingChat !== $user) { $this->logger->logger("Updated user {$user['id']}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); - if (($user['min'] ?? false) && isset($this->chats[$user['id']]) && !($this->chats[$user['id']]['min'] ?? false)) { + if (($user['min'] ?? false) && !($existingChat['min'] ?? false)) { $this->logger->logger("{$user['id']} is min, filling missing fields", \danog\MadelineProto\Logger::ULTRA_VERBOSE); - if (isset($this->chats[$user['id']]['access_hash'])) { + if (isset($existingChat['access_hash'])) { $user['min'] = false; - $user['access_hash'] = $this->chats[$user['id']]['access_hash']; + $user['access_hash'] = $existingChat['access_hash']; } } $this->chats[$user['id']] = $user; $this->cachePwrChat($user['id'], false, true); } + $this->cacheChatUsername($user['id'], $user); break; case 'userEmpty': break; @@ -136,7 +145,7 @@ trait PeerHandler * * @internal * - * @return void + * @return \Generator */ public function addChat($chat): \Generator { @@ -144,11 +153,13 @@ trait PeerHandler case 'chat': case 'chatEmpty': case 'chatForbidden': - if (!isset($this->chats[-$chat['id']]) || $this->chats[-$chat['id']] !== $chat) { + $existingChat = yield $this->chats[-$chat['id']]; + if (!$existingChat || $existingChat !== $chat) { $this->logger->logger("Updated chat -{$chat['id']}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $this->chats[-$chat['id']] = $chat; $this->cachePwrChat(-$chat['id'], $this->settings['peer']['full_fetch'], true); } + $this->cacheChatUsername(-$chat['id'], $chat); break; case 'channelEmpty': break; @@ -162,7 +173,7 @@ trait PeerHandler $this->caching_possible_username[$bot_api_id] = $chat['username']; } $this->cachePwrChat($bot_api_id, false, true); - } elseif (isset($chat['username']) && !isset($this->chats[$bot_api_id]) && !isset($this->caching_simple_username[$chat['username']])) { + } elseif (isset($chat['username']) && !(yield $this->chats[$bot_api_id]) && !isset($this->caching_simple_username[$chat['username']])) { $this->logger->logger("No access hash with {$chat['_']} {$bot_api_id}, trying to fetch by username..."); $this->cachePwrChat($chat['username'], false, true); } else { @@ -170,11 +181,12 @@ trait PeerHandler } return; } - if (!isset($this->chats[$bot_api_id]) || $this->chats[$bot_api_id] !== $chat) { + $existingChat = yield $this->chats[$bot_api_id]; + if (!$existingChat || $existingChat !== $chat) { $this->logger->logger("Updated chat {$bot_api_id}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); - if (($chat['min'] ?? false) && isset($this->chats[$bot_api_id]) && !($this->chats[$bot_api_id]['min'] ?? false)) { + if (($chat['min'] ?? false) && $existingChat && !($existingChat['min'] ?? false)) { $this->logger->logger("{$bot_api_id} is min, filling missing fields", \danog\MadelineProto\Logger::ULTRA_VERBOSE); - $newchat = $this->chats[$bot_api_id]; + $newchat = $existingChat; foreach (['title', 'username', 'photo', 'banned_rights', 'megagroup', 'verified'] as $field) { if (isset($chat[$field])) { $newchat[$field] = $chat[$field]; @@ -183,13 +195,23 @@ trait PeerHandler $chat = $newchat; } $this->chats[$bot_api_id] = $chat; - if ($this->settings['peer']['full_fetch'] && (!isset($this->full_chats[$bot_api_id]) || $this->full_chats[$bot_api_id]['full']['participants_count'] !== (yield from $this->getFullInfo($bot_api_id))['full']['participants_count'])) { + $fullChat = yield $this->full_chats[$bot_api_id]; + if ($this->settings['peer']['full_fetch'] && (!$fullChat || $fullChat['full']['participants_count'] !== (yield from $this->getFullInfo($bot_api_id))['full']['participants_count'])) { $this->cachePwrChat($bot_api_id, $this->settings['peer']['full_fetch'], true); } } + $this->cacheChatUsername($bot_api_id, $chat); break; } } + + private function cacheChatUsername(int $id, array $chat) + { + if ($id && !empty($chat['username'])) { + $this->usernames[strtolower($chat['username'])] = $id; + } + } + private function cachePwrChat($id, $full_fetch, $send) { \danog\MadelineProto\Tools::callFork((function () use ($id, $full_fetch, $send): \Generator { @@ -212,7 +234,9 @@ trait PeerHandler public function peerIsset($id): \Generator { try { - return isset($this->chats[(yield from $this->getInfo($id))['bot_api_id']]); + $info = yield from $this->getInfo($id); + $chatId = $info['bot_api_id']; + return (yield $this->chats[$chatId]) !== null; } catch (\danog\MadelineProto\Exception $e) { return false; } catch (\danog\MadelineProto\RPCErrorException $e) { @@ -469,7 +493,7 @@ trait PeerHandler } $tried_simple = false; if (\is_numeric($id)) { - if (!isset($this->chats[$id])) { + if (! yield $this->chats[$id]) { try { $this->logger->logger("Try fetching {$id} with access hash 0"); $this->caching_simple[$id] = true; @@ -493,15 +517,15 @@ trait PeerHandler $tried_simple = true; } } - if (isset($this->chats[$id])) { - if (($this->chats[$id]['min'] ?? false) && $this->minDatabase->hasPeer($id) && !isset($this->caching_full_info[$id])) { + if (yield $this->chats[$id]) { + if (((yield $this->chats[$id])['min'] ?? false) && $this->minDatabase->hasPeer($id) && !isset($this->caching_full_info[$id])) { $this->caching_full_info[$id] = true; $this->logger->logger("Only have min peer for {$id} in database, trying to fetch full info"); try { if ($id < 0) { - yield from $this->methodCallAsyncRead('channels.getChannels', ['id' => [$this->genAll($this->chats[$id], $folder_id)['InputChannel']]], ['datacenter' => $this->datacenter->curdc]); + yield from $this->methodCallAsyncRead('channels.getChannels', ['id' => [$this->genAll(yield $this->chats[$id], $folder_id)['InputChannel']]], ['datacenter' => $this->datacenter->curdc]); } else { - yield from $this->methodCallAsyncRead('users.getUsers', ['id' => [$this->genAll($this->chats[$id], $folder_id)['InputUser']]], ['datacenter' => $this->datacenter->curdc]); + yield from $this->methodCallAsyncRead('users.getUsers', ['id' => [$this->genAll(yield $this->chats[$id], $folder_id)['InputUser']]], ['datacenter' => $this->datacenter->curdc]); } } catch (\danog\MadelineProto\Exception $e) { $this->logger->logger($e->getMessage(), \danog\MadelineProto\Logger::WARNING); @@ -512,10 +536,10 @@ trait PeerHandler } } try { - return $this->genAll($this->chats[$id], $folder_id); + return $this->genAll(yield $this->chats[$id], $folder_id); } catch (\danog\MadelineProto\Exception $e) { if ($e->getMessage() === 'This peer is not present in the internal peer database') { - unset($this->chats[$id]); + yield $this->chats->offsetUnset($id);/** @uses DbArray::offsetUnset() */ } else { throw $e; } @@ -562,16 +586,21 @@ trait PeerHandler } return yield from $this->getInfo($this->supportUser); } - foreach ($this->chats as $bot_api_id => $chat) { + if ($bot_api_id = yield $this->usernames[$id]) { + $chat = yield $this->chats[$bot_api_id]; + if (empty($chat['username']) || \strtolower($chat['username']) !== $id) { + yield $this->usernames->offsetUnset($id); /** @uses DbArray::offsetUnset() */ + } + if (isset($chat['username']) && \strtolower($chat['username']) === $id) { if ($chat['min'] ?? false && !isset($this->caching_full_info[$bot_api_id])) { $this->caching_full_info[$bot_api_id] = true; $this->logger->logger("Only have min peer for {$bot_api_id} in database, trying to fetch full info"); try { if ($bot_api_id < 0) { - yield from $this->methodCallAsyncRead('channels.getChannels', ['id' => [$this->genAll($this->chats[$bot_api_id], $folder_id)['InputChannel']]], ['datacenter' => $this->datacenter->curdc]); + yield from $this->methodCallAsyncRead('channels.getChannels', ['id' => [$this->genAll(yield $this->chats[$bot_api_id], $folder_id)['InputChannel']]], ['datacenter' => $this->datacenter->curdc]); } else { - yield from $this->methodCallAsyncRead('users.getUsers', ['id' => [$this->genAll($this->chats[$bot_api_id], $folder_id)['InputUser']]], ['datacenter' => $this->datacenter->curdc]); + yield from $this->methodCallAsyncRead('users.getUsers', ['id' => [$this->genAll(yield $this->chats[$bot_api_id], $folder_id)['InputUser']]], ['datacenter' => $this->datacenter->curdc]); } } catch (\danog\MadelineProto\Exception $e) { $this->logger->logger($e->getMessage(), \danog\MadelineProto\Logger::WARNING); @@ -581,9 +610,10 @@ trait PeerHandler unset($this->caching_full_info[$bot_api_id]); } } - return $this->genAll($this->chats[$bot_api_id], $folder_id); + return $this->genAll(yield $this->chats[$bot_api_id], $folder_id); } } + if ($recursive) { yield from $this->resolveUsername($id); return yield from $this->getInfo($id, false); @@ -656,11 +686,11 @@ trait PeerHandler * * @param mixed $id Chat ID * - * @return integer + * @return \Generator */ - public function fullChatLastUpdated($id): int + public function fullChatLastUpdated($id): \Generator { - return isset($this->full_chats[$id]['last_update']) ? $this->full_chats[$id]['last_update'] : 0; + return (yield $this->full_chats[$id])['last_update'] ?? 0; } /** * Get full info about peer, returns an FullInfo object. @@ -674,8 +704,8 @@ trait PeerHandler public function getFullInfo($id): \Generator { $partial = (yield from $this->getInfo($id)); - if (\time() - $this->fullChatLastUpdated($partial['bot_api_id']) < (isset($this->settings['peer']['full_info_cache_time']) ? $this->settings['peer']['full_info_cache_time'] : 0)) { - return \array_merge($partial, $this->full_chats[$partial['bot_api_id']]); + if (\time() - (yield from $this->fullChatLastUpdated($partial['bot_api_id'])) < (isset($this->settings['peer']['full_info_cache_time']) ? $this->settings['peer']['full_info_cache_time'] : 0)) { + return \array_merge($partial, yield $this->full_chats[$partial['bot_api_id']]); } switch ($partial['type']) { case 'user': @@ -886,7 +916,7 @@ trait PeerHandler $last_count = -1; do { try { - $gres = yield from $this->methodCallAsyncRead('channels.getParticipants', ['channel' => $channel, 'filter' => ['_' => $filter, 'q' => $q], 'offset' => $offset, 'limit' => $limit, 'hash' => $hash = $this->getParticipantsHash($channel, $filter, $q, $offset, $limit)], ['datacenter' => $this->datacenter->curdc, 'heavy' => true]); + $gres = yield from $this->methodCallAsyncRead('channels.getParticipants', ['channel' => $channel, 'filter' => ['_' => $filter, 'q' => $q], 'offset' => $offset, 'limit' => $limit, 'hash' => $hash = yield from $this->getParticipantsHash($channel, $filter, $q, $offset, $limit)], ['datacenter' => $this->datacenter->curdc, 'heavy' => true]); } catch (\danog\MadelineProto\RPCErrorException $e) { if ($e->rpc === 'CHAT_ADMIN_REQUIRED') { $this->logger->logger($e->rpc); @@ -894,10 +924,10 @@ trait PeerHandler } throw $e; } - if ($cached = $gres['_'] === 'channels.channelParticipantsNotModified') { - $gres = $this->fetchParticipantsCache($channel, $filter, $q, $offset, $limit); + if ($cached = ($gres['_'] === 'channels.channelParticipantsNotModified')) { + $gres = yield from $this->fetchParticipantsCache($channel, $filter, $q, $offset, $limit); } else { - $this->storeParticipantsCache($gres, $channel, $filter, $q, $offset, $limit); + yield from $this->storeParticipantsCache($gres, $channel, $filter, $q, $offset, $limit); } if ($last_count !== -1 && $last_count !== $gres['count']) { $has_more = true; @@ -957,11 +987,10 @@ trait PeerHandler } private function fetchParticipantsCache($channel, $filter, $q, $offset, $limit) { - return $this->channel_participants[$channel['channel_id']][$filter][$q][$offset][$limit]; + return (yield $this->channel_participants[$channel['channel_id']])[$filter][$q][$offset][$limit]; } - private function storeParticipantsCache($gres, $channel, $filter, $q, $offset, $limit) + private function storeParticipantsCache($gres, $channel, $filter, $q, $offset, $limit): \Generator { - //return; unset($gres['users']); $ids = []; foreach ($gres['participants'] as $participant) { @@ -969,11 +998,13 @@ trait PeerHandler } \sort($ids, SORT_NUMERIC); $gres['hash'] = \danog\MadelineProto\Tools::genVectorHash($ids); - $this->channel_participants[$channel['channel_id']][$filter][$q][$offset][$limit] = $gres; + $participant = yield $this->channel_participants[$channel['channel_id']]; + $participant[$filter][$q][$offset][$limit] = $gres; + $this->channel_participants[$channel['channel_id']] = $participant; } - private function getParticipantsHash($channel, $filter, $q, $offset, $limit) + private function getParticipantsHash($channel, $filter, $q, $offset, $limit): \Generator { - return isset($this->channel_participants[$channel['channel_id']][$filter][$q][$offset][$limit]) ? $this->channel_participants[$channel['channel_id']][$filter][$q][$offset][$limit]['hash'] : 0; + return (yield $this->channel_participants[$channel['channel_id']])[$filter][$q][$offset][$limit]['hash'] ?? 0; } private function storeDb($res, $force = false): \Generator { @@ -1034,6 +1065,12 @@ trait PeerHandler } } if ($res['_'] === 'contacts.resolvedPeer') { + foreach ($res['chats'] as $chat) { + yield from $this->addChat($chat); + } + foreach ($res['users'] as $user) { + yield from $this->addUser($user); + } return $res; } return false; diff --git a/src/danog/MadelineProto/MTProtoTools/ReferenceDatabase.php b/src/danog/MadelineProto/MTProtoTools/ReferenceDatabase.php index 236fb4f9..b877a57c 100644 --- a/src/danog/MadelineProto/MTProtoTools/ReferenceDatabase.php +++ b/src/danog/MadelineProto/MTProtoTools/ReferenceDatabase.php @@ -422,8 +422,10 @@ class ReferenceDatabase implements TLCallback break; // Peer + photo ID case self::PEER_PHOTO_ORIGIN: - if (isset($this->API->full_chats[$origin['peer']]['last_update'])) { - $this->API->full_chats[$origin['peer']]['last_update'] = 0; + $fullChat = yield $this->API->full_chats[$origin['peer']]; + if (isset($fullChat['last_update'])) { + $fullChat['last_update'] = 0; + $this->API->full_chats[$origin['peer']] = $fullChat; } $this->API->getFullInfo($origin['peer']); break; diff --git a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php index 67424f42..942e4d7c 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php @@ -332,7 +332,9 @@ trait UpdateHandler } if (\in_array($update['_'], ['updateUserName', 'updateUserPhone', 'updateUserBlocked', 'updateUserPhoto', 'updateContactRegistered', 'updateContactLink'])) { $id = $this->getId($update); - $this->full_chats[$id]['last_update'] = 0; + $chat = yield $this->full_chats[$id]; + $chat['last_update'] = 0; + $this->full_chats[$id] = $chat; yield from $this->getFullInfo($id); } if ($update['_'] === 'updateDcOptions') { diff --git a/src/danog/MadelineProto/Wrappers/DialogHandler.php b/src/danog/MadelineProto/Wrappers/DialogHandler.php index c9d80767..26f0eab1 100644 --- a/src/danog/MadelineProto/Wrappers/DialogHandler.php +++ b/src/danog/MadelineProto/Wrappers/DialogHandler.php @@ -32,8 +32,15 @@ trait DialogHandler { if ($this->authorization['user']['bot']) { $res = []; - foreach ($this->chats as $chat) { - $res[] = $this->genAll($chat)['Peer']; + /** @uses DbArray::getIterator() */ + $iterator = $this->chats->getIterator(); + while (yield $iterator->advance()) { + [$id, $chat] = $iterator->getCurrent(); + try { + $res[] = $this->genAll($chat)['Peer']; + } catch (\Throwable $e) { + continue; + } } return $res; } diff --git a/src/danog/MadelineProto/Wrappers/Login.php b/src/danog/MadelineProto/Wrappers/Login.php index 64809d66..82536ea9 100644 --- a/src/danog/MadelineProto/Wrappers/Login.php +++ b/src/danog/MadelineProto/Wrappers/Login.php @@ -35,7 +35,7 @@ trait Login public function logout(): \Generator { yield from $this->methodCallAsyncRead('auth.logOut', [], ['datacenter' => $this->datacenter->curdc]); - $this->resetSession(); + yield from $this->resetSession(); $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['logout_ok'], \danog\MadelineProto\Logger::NOTICE); $this->startUpdateSystem(); return true;