From 26abf9f04e895e89a804049f934bf7bed5757011 Mon Sep 17 00:00:00 2001 From: Alexander Pankratov Date: Tue, 28 Apr 2020 03:41:06 +0300 Subject: [PATCH] Bugfixes and optimizations --- src/danog/MadelineProto/Db/DbArray.php | 6 + .../MadelineProto/Db/DbPropertiesFabric.php | 16 +- src/danog/MadelineProto/Db/DbType.php | 2 +- src/danog/MadelineProto/Db/MemoryArray.php | 30 ++- src/danog/MadelineProto/Db/Mysql.php | 7 - src/danog/MadelineProto/Db/MysqlArray.php | 174 ++++++++++++++---- .../MadelineProto/Db/SharedMemoryArray.php | 30 --- src/danog/MadelineProto/MTProto.php | 39 ++-- .../MTProtoTools/PeerHandler.php | 26 +-- 9 files changed, 216 insertions(+), 114 deletions(-) delete mode 100644 src/danog/MadelineProto/Db/SharedMemoryArray.php diff --git a/src/danog/MadelineProto/Db/DbArray.php b/src/danog/MadelineProto/Db/DbArray.php index 631bb302..27b7e538 100644 --- a/src/danog/MadelineProto/Db/DbArray.php +++ b/src/danog/MadelineProto/Db/DbArray.php @@ -2,7 +2,13 @@ namespace danog\MadelineProto\Db; +use Amp\Producer; +use Amp\Promise; + interface DbArray extends DbType, \ArrayAccess, \Countable, \Iterator, \SeekableIterator { public function getArrayCopy(); + public function offsetGetAsync(string $offset): Promise; + public function offsetSetAsync(string $offset, $value): Promise; + public function getIterator(): Producer; } \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/DbPropertiesFabric.php b/src/danog/MadelineProto/Db/DbPropertiesFabric.php index 8ac5754b..6739ce3e 100644 --- a/src/danog/MadelineProto/Db/DbPropertiesFabric.php +++ b/src/danog/MadelineProto/Db/DbPropertiesFabric.php @@ -2,10 +2,13 @@ namespace danog\MadelineProto\Db; +use danog\MadelineProto\API; +use danog\MadelineProto\MTProto; + class DbPropertiesFabric { /** - * @param array $dbSettings + * @param MTProto $madelineProto * @param string $propertyType * @param string $name * @param $value @@ -16,16 +19,14 @@ class DbPropertiesFabric * @uses \danog\MadelineProto\Db\SharedMemoryArray * @uses \danog\MadelineProto\Db\MysqlArray */ - public static function get(array $dbSettings, string $propertyType, string $name, $value = null): DbType + public static function get(MTProto $madelineProto, string $propertyType, string $name, $value = null): DbType { $class = __NAMESPACE__; + $dbSettings = $madelineProto->settings['db']; switch (strtolower($dbSettings['type'])) { case 'memory': $class .= '\Memory'; break; - case 'sharedmemory': - $class .= '\SharedMemory'; - break; case 'mysql': $class .= '\Mysql'; break; @@ -34,6 +35,7 @@ class DbPropertiesFabric } + /** @var DbType $class */ switch (strtolower($propertyType)){ case 'array': $class .= 'Array'; @@ -42,8 +44,8 @@ class DbPropertiesFabric throw new \InvalidArgumentException("Unknown $propertyType: {$propertyType}"); } - /** @var DbType $class */ - return $class::getInstance($dbSettings, $name, $value); + $prefix = (string) ($madelineProto->getSelf()['id'] ?? 'tmp'); + return $class::getInstance($name, $value, $prefix, $dbSettings[$dbSettings['type']]??[]); } } \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/DbType.php b/src/danog/MadelineProto/Db/DbType.php index e9bf5dcc..bab9d584 100644 --- a/src/danog/MadelineProto/Db/DbType.php +++ b/src/danog/MadelineProto/Db/DbType.php @@ -4,5 +4,5 @@ namespace danog\MadelineProto\Db; interface DbType { - static function getInstance(array $settings, string $name, $value): self; + static function getInstance(string $name, $value, string $tablePrefix, array $settings): self; } \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/MemoryArray.php b/src/danog/MadelineProto/Db/MemoryArray.php index 658edc56..39d94ac1 100644 --- a/src/danog/MadelineProto/Db/MemoryArray.php +++ b/src/danog/MadelineProto/Db/MemoryArray.php @@ -2,6 +2,10 @@ namespace danog\MadelineProto\Db; +use Amp\Producer; +use Amp\Promise; +use function Amp\call; + class MemoryArray extends \ArrayIterator implements DbArray { protected function __construct($array = [], $flags = 0) @@ -9,11 +13,35 @@ class MemoryArray extends \ArrayIterator implements DbArray parent::__construct((array) $array, $flags | self::STD_PROP_LIST); } - static function getInstance(array $settings, string $name, $value = []): DbArray + public static function getInstance(string $name, $value, string $tablePrefix, array $settings): DbArray { if ($value instanceof DbArray) { $value = $value->getArrayCopy(); } return new static($value); } + + public static function getDbConnection(array $settings) + { + return null; + } + + public function offsetGetAsync(string $offset): Promise + { + return call(fn() => $this->offsetGet($offset)); + } + + public function offsetSetAsync(string $offset, $value): Promise + { + return call(fn() => $this->offsetSet($offset, $value)); + } + + public function getIterator(): Producer + { + return new Producer(function (callable $emit) { + foreach ($this as $value) { + yield $emit($value); + } + }); + } } \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/Mysql.php b/src/danog/MadelineProto/Db/Mysql.php index 4374d71e..569cebe0 100644 --- a/src/danog/MadelineProto/Db/Mysql.php +++ b/src/danog/MadelineProto/Db/Mysql.php @@ -41,11 +41,4 @@ class Mysql return static::$connections[$dbKey]; } - public function __destruct() - { - foreach (static::$connections as $connection) { - $connection->close(); - } - } - } \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/MysqlArray.php b/src/danog/MadelineProto/Db/MysqlArray.php index 0bb89057..d9763efc 100644 --- a/src/danog/MadelineProto/Db/MysqlArray.php +++ b/src/danog/MadelineProto/Db/MysqlArray.php @@ -2,10 +2,15 @@ namespace danog\MadelineProto\Db; +use Amp\Loop; use Amp\Mysql\Pool; +use Amp\Producer; +use Amp\Promise; use Amp\Sql\ResultSet; +use danog\MadelineProto\Logger; use danog\MadelineProto\Tools; use function Amp\call; +use function Amp\Promise\wait; class MysqlArray implements DbArray { @@ -28,25 +33,51 @@ class MysqlArray implements DbArray foreach ($data as $property => $value) { $this->{$property} = $value; } - $this->initDbConnection(); + try { + $this->db = static::getDbConnection($this->settings); + } catch (\Throwable $e) { + Logger::log($e->getMessage(), Logger::ERROR); + } + } - public static function getInstance(array $settings, string $name, $value = []): DbType + public static function getInstance(string $name, $value, string $tablePrefix, array $settings): DbType { $instance = new static(); - $instance->table = $name; - $instance->settings = $settings['mysql']; - $instance->initDbConnection(); - $instance->prepareTable(); - if (!empty($value) && !$value instanceof static) { - if ($value instanceof DbArray) { - $value = $value->getArrayCopy(); - } - foreach ((array) $value as $key => $item) { - $instance[$key] = $item; + $instance->table = "{$tablePrefix}_{$name}"; + $instance->settings = $settings; + $instance->db = static::getDbConnection($settings); + + if ($value instanceof static) { + if ($instance->table !== $value->table) { + $instance->renameTable($value->table, $instance->table); } } + $instance->prepareTable(); + + Loop::defer(function() use($value, $instance){ + if (!empty($value) && !$value instanceof static) { + Logger::log('Converting database.', Logger::ERROR); + if ($value instanceof DbArray) { + $value = $value->getArrayCopy(); + } + $value = (array) $value; + $counter = 0; + $total = count($value); + foreach ((array) $value as $key => $item) { + $counter++; + if ($counter % 100 === 0) { + yield $instance->offsetSetAsync($key, $item); + Logger::log("Converting database. $counter/$total", Logger::WARNING); + } else { + $instance->offsetSetAsync($key, $item); + } + + } + Logger::log('Converting database done.', Logger::ERROR); + } + }); return $instance; @@ -88,12 +119,19 @@ class MysqlArray implements DbArray */ public function offsetGet($index) { - $row = $this->syncRequest( - "SELECT `value` FROM {$this->table} WHERE `key` = :index LIMIT 1", - ['index' => $index] - ); - return $this->getValue($row); + return wait($this->offsetGetAsync($index)); + } + + public function offsetGetAsync(string $offset): Promise + { + return call(function() use($offset) { + $row = yield $this->request( + "SELECT `value` FROM {$this->table} WHERE `key` = :index LIMIT 1", + ['index' => $offset] + ); + return $this->getValue($row); + }); } /** @@ -123,6 +161,20 @@ class MysqlArray implements DbArray ); } + public function offsetSetAsync($index, $value): Promise + { + return $this->request(" + INSERT INTO `{$this->table}` + SET `key` = :index, `value` = :value + ON DUPLICATE KEY UPDATE `value` = :value + ", + [ + 'index' => $index, + 'value' => serialize($value), + ] + ); + } + /** * Unset value for an offset * @@ -164,6 +216,19 @@ class MysqlArray implements DbArray return $result; } + public function getIterator(): Producer + { + return new Producer(function (callable $emit) { + $request = yield $this->db->execute("SELECT `key`, `value` FROM {$this->table}"); + + while (yield $request->advance()) { + $row = $request->getCurrent(); + + yield $emit($this->getValue($row)); + } + }); + } + /** * Count elements * @@ -207,7 +272,9 @@ class MysqlArray implements DbArray private function getValue(array $row) { if ($row) { - $row = reset($row); + if (!empty($row[0]['value'])) { + $row = reset($row); + } return unserialize($row['value']); } return null; @@ -272,21 +339,20 @@ class MysqlArray implements DbArray public function seek($position) { $row = $this->syncRequest( - "SELECT `key` FROM {$this->table} ORDER BY `key` LIMIT 1, :position", + "SELECT `key` FROM {$this->table} ORDER BY `key` LIMIT 1 OFFSET :position", ['offset' => $position] ); $this->key = $row[0]['key'] ?? $this->key; } - private function initDbConnection() + public static function getDbConnection(array $settings): Pool { - //TODO Use MtProto::$settings - $this->db = Mysql::getConnection( - $this->settings['host'], - $this->settings['port'], - $this->settings['user'], - $this->settings['password'], - $this->settings['database'], + return Mysql::getConnection( + $settings['host'], + $settings['port'], + $settings['user'], + $settings['password'], + $settings['database'], ); } @@ -309,6 +375,18 @@ class MysqlArray implements DbArray "); } + private function renameTable(string $from, string $to) + { + try { + $this->syncRequest(" + ALTER TABLE {$from} RENAME TO {$to}; + "); + } catch (\Throwable $e) { + Logger::log("Cant rename table {$from} to {$to}", Logger::WARNING); + } + + } + /** * Perform blocking request to db * @@ -320,19 +398,35 @@ class MysqlArray implements DbArray */ private function syncRequest(string $query, array $params = []): array { - return Tools::wait( - call( - function() use($query, $params) { - $request = yield $this->db->execute($query, $params); - $result = []; - if ($request instanceof ResultSet) { - while (yield $request->advance()) { - $result[] = $request->getCurrent(); - } - } - return $result; + return Tools::wait($this->request($query, $params)); + } + + /** + * Perform blocking request to db + * + * @param string $query + * @param array $params + * + * @return Promise + * @throws \Throwable + */ + private function request(string $query, array $params = []): Promise + { + return call(function() use($query, $params) { + if (empty($this->db)) { + return []; + } + + $request = yield $this->db->execute($query, $params); + $result = []; + if ($request instanceof ResultSet) { + while (yield $request->advance()) { + $result[] = $request->getCurrent(); } - ) - ); + } + return $result; + }); + + } } \ No newline at end of file diff --git a/src/danog/MadelineProto/Db/SharedMemoryArray.php b/src/danog/MadelineProto/Db/SharedMemoryArray.php deleted file mode 100644 index 2d82c1dc..00000000 --- a/src/danog/MadelineProto/Db/SharedMemoryArray.php +++ /dev/null @@ -1,30 +0,0 @@ -getArrayCopy(); - } - $value = array_replace_recursive(static::$instance->getArrayCopy(), (array) $value); - foreach ($value as $key => $item) { - static::$instance[$key] = $item; - } - } - - return static::$instance; - } -} \ No newline at end of file diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index c738173f..45aa8362 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -24,10 +24,8 @@ use Amp\File\StatCache; use Amp\Http\Client\HttpClient; use danog\MadelineProto\Async\AsyncConstruct; use danog\MadelineProto\Db\DbArray; -use danog\MadelineProto\Db\Engines\DbInterface; use danog\MadelineProto\Db\DbPropertiesFabric; use danog\MadelineProto\Db\Mysql; -use danog\MadelineProto\Db\Types\ArrayType; use danog\MadelineProto\Loop\Generic\PeriodicLoop; use danog\MadelineProto\Loop\Update\FeedLoop; use danog\MadelineProto\Loop\Update\SeqLoop; @@ -286,6 +284,13 @@ class MTProto extends AsyncConstruct implements TLCallback * @var DbArray */ public $chats; + + /** + * Cache of usernames for chats + * + * @var DbArray + */ + public $usernames; /** * Cached parameters for fetching channel participants. * @@ -422,11 +427,7 @@ class MTProto extends AsyncConstruct implements TLCallback 'chats' => 'array', 'full_chats' => 'array', 'channel_participants' => 'array', - 'caching_simple' => 'array', - 'caching_simple_username' => 'array', - 'caching_possible_username' => 'array', - 'caching_full_info' => 'array', - 'caching_username_id' => 'array', + 'usernames' => 'array', ]; /** @@ -507,6 +508,7 @@ class MTProto extends AsyncConstruct implements TLCallback 'referenceDatabase', 'minDatabase', 'channel_participants', + 'usernames', // Misc caching 'dialog_params', @@ -569,18 +571,22 @@ class MTProto extends AsyncConstruct implements TLCallback if ($reset) { unset($this->{$property}); } else { - $this->{$property} = DbPropertiesFabric::get($this->settings['db'], $type, $property, $this->{$property}); + $this->{$property} = DbPropertiesFabric::get($this, $type, $property, $this->{$property}); } } - if (!$reset && count($this->caching_username_id) === 0) { - $this->logger('Filling database cache. This can take few minutes.', Logger::WARNING); - foreach ($this->chats as $id => $chat) { - if (isset($chat['username'])) { - $this->caching_username_id[$chat['username']] = $id; + if (!$reset && count($this->usernames) === 0) { + \Amp\Loop::run(function() { + $this->logger('Filling database cache. This can take few minutes.', Logger::WARNING); + $iterator = $this->chats->getIterator(); + while (yield $iterator->advance()) { + $chat = $iterator->getCurrent(); + if (isset($chat['username'])) { + $this->usernames->offsetSetAsync(\strtolower($chat['username']), $this->getId($chat)); + } } - } - $this->logger('Cache filled.', Logger::WARNING); + $this->logger('Cache filled.', Logger::WARNING); + }); } } @@ -1298,8 +1304,7 @@ class MTProto extends AsyncConstruct implements TLCallback /** * Where internal database will be stored? * memory - session file - * sharedMemory - multiples instances share db if run in single process - * mysql - mysql database, shared by all instances in all processes. + * mysql - mysql database */ 'db' => [ 'type' => 'memory', diff --git a/src/danog/MadelineProto/MTProtoTools/PeerHandler.php b/src/danog/MadelineProto/MTProtoTools/PeerHandler.php index cdb395e4..31f7f0e6 100644 --- a/src/danog/MadelineProto/MTProtoTools/PeerHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/PeerHandler.php @@ -34,7 +34,6 @@ trait PeerHandler public $caching_simple_username = []; public $caching_possible_username = []; public $caching_full_info = []; - public $caching_username_id = []; /** * Convert MTProto channel ID to bot API channel ID. @@ -122,6 +121,7 @@ trait PeerHandler } } $this->chats[$user['id']] = $user; + $this->cacheChatUsername($user['id'], $user); $this->cachePwrChat($user['id'], false, true); } break; @@ -149,6 +149,7 @@ trait PeerHandler if (!isset($this->chats[-$chat['id']]) || $this->chats[-$chat['id']] !== $chat) { $this->logger->logger("Updated chat -{$chat['id']}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $this->chats[-$chat['id']] = $chat; + $this->cacheChatUsername(-$chat['id'], $chat); $this->cachePwrChat(-$chat['id'], $this->settings['peer']['full_fetch'], true); } break; @@ -185,6 +186,7 @@ trait PeerHandler $chat = $newchat; } $this->chats[$bot_api_id] = $chat; + $this->cacheChatUsername($bot_api_id, $chat); if ($this->settings['peer']['full_fetch'] && (!isset($this->full_chats[$bot_api_id]) || $this->full_chats[$bot_api_id]['full']['participants_count'] !== (yield from $this->getFullInfo($bot_api_id))['full']['participants_count'])) { $this->cachePwrChat($bot_api_id, $this->settings['peer']['full_fetch'], true); } @@ -192,6 +194,14 @@ trait PeerHandler break; } } + + private function cacheChatUsername(int $id, array $chat) + { + if (!empty($chat['username'])) { + $this->usernames[strtolower($chat['username'])] = $id; + } + } + private function cachePwrChat($id, $full_fetch, $send) { \danog\MadelineProto\Tools::callFork((function () use ($id, $full_fetch, $send): \Generator { @@ -564,19 +574,12 @@ trait PeerHandler } return yield from $this->getInfo($this->supportUser); } - if ($bot_api_id = $this->caching_username_id[$id] ?? null) { + if ($bot_api_id = $this->usernames[$id] ?? null) { $chat = $this->chats[$bot_api_id]; - if (empty($chat['username']) || $chat['username'] !== $id) { - unset($this->caching_username_id[$id]); - } else { - return $this->genAll($this->chats[$bot_api_id], $folder_id); + if (empty($chat['username']) || \strtolower($chat['username']) !== $id) { + unset($this->usernames[$id]); } - } - foreach ($this->chats as $bot_api_id => $chat) { - if (isset($chat['username'])) { - $this->caching_username_id[$id] = $bot_api_id; - } if (isset($chat['username']) && \strtolower($chat['username']) === $id) { if ($chat['min'] ?? false && !isset($this->caching_full_info[$bot_api_id])) { $this->caching_full_info[$bot_api_id] = true; @@ -598,6 +601,7 @@ trait PeerHandler return $this->genAll($this->chats[$bot_api_id], $folder_id); } } + if ($recursive) { yield from $this->resolveUsername($id); return yield from $this->getInfo($id, false);