From b0d25f53cb04b2f78c0acf5eb3b7985ee65230cf Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Fri, 23 Oct 2020 00:14:40 +0200 Subject: [PATCH] Use prepared statements, and update to new postgres API --- .../MadelineProto/Db/DbPropertiesTrait.php | 8 +- src/danog/MadelineProto/Db/DbType.php | 6 +- src/danog/MadelineProto/Db/Driver/Mysql.php | 2 +- src/danog/MadelineProto/Db/DriverArray.php | 191 +++++++++++-- src/danog/MadelineProto/Db/MysqlArray.php | 221 ++------------- src/danog/MadelineProto/Db/PostgresArray.php | 260 +++++------------- src/danog/MadelineProto/Db/RedisArray.php | 12 +- src/danog/MadelineProto/Db/SqlArray.php | 223 +++++++++++---- src/danog/MadelineProto/MTProto.php | 17 +- src/danog/MadelineProto/Magic.php | 4 +- .../Settings/Database/SqlAbstract.php | 8 +- 11 files changed, 455 insertions(+), 497 deletions(-) diff --git a/src/danog/MadelineProto/Db/DbPropertiesTrait.php b/src/danog/MadelineProto/Db/DbPropertiesTrait.php index 119fabfd..d963c789 100644 --- a/src/danog/MadelineProto/Db/DbPropertiesTrait.php +++ b/src/danog/MadelineProto/Db/DbPropertiesTrait.php @@ -3,6 +3,7 @@ namespace danog\MadelineProto\Db; use danog\MadelineProto\MTProto; +use danog\MadelineProto\Tools; /** * Include this trait and call DbPropertiesTrait::initDb to use MadelineProto's database backend for properties. @@ -32,14 +33,19 @@ trait DbPropertiesTrait $dbSettings = $MadelineProto->settings->getDb(); $prefix = static::getSessionId($MadelineProto); + $promises = []; foreach (static::$dbProperties as $property => $type) { if ($reset) { unset($this->{$property}); } else { $table = "{$prefix}_{$property}"; - $this->{$property} = yield DbPropertiesFactory::get($dbSettings, $table, $type, $this->{$property}); + $promises[$property] = DbPropertiesFactory::get($dbSettings, $table, $type, $this->{$property}); } } + $promises = yield Tools::all($promises); + foreach ($promises as $key => $data) { + $this->{$key} = $data; + } } private static function getSessionId(MTProto $madelineProto): string diff --git a/src/danog/MadelineProto/Db/DbType.php b/src/danog/MadelineProto/Db/DbType.php index c60ca886..f6ee61cc 100644 --- a/src/danog/MadelineProto/Db/DbType.php +++ b/src/danog/MadelineProto/Db/DbType.php @@ -3,16 +3,16 @@ namespace danog\MadelineProto\Db; use Amp\Promise; -use danog\MadelineProto\Settings\DatabaseAbstract; +use danog\MadelineProto\Settings\Database\DatabaseAbstract; interface DbType { /** * @param string $table - * @param null|DbType|array $value + * @param null|DbType|array $previous * @param DatabaseAbstract $settings * * @return Promise */ - public static function getInstance(string $table, $value, $settings): Promise; + public static function getInstance(string $table, $previous, $settings): Promise; } diff --git a/src/danog/MadelineProto/Db/Driver/Mysql.php b/src/danog/MadelineProto/Db/Driver/Mysql.php index 2d407116..4e42cf2b 100644 --- a/src/danog/MadelineProto/Db/Driver/Mysql.php +++ b/src/danog/MadelineProto/Db/Driver/Mysql.php @@ -45,7 +45,7 @@ class Mysql ->withDatabase($settings->getDatabase()); yield from static::createDb($config); - static::$connections[$dbKey] = pool($config, $settings->getMaxConnections(), $settings->getIdleTimeout()); + static::$connections[$dbKey] = new Pool($config, $settings->getMaxConnections(), $settings->getIdleTimeout()); } return static::$connections[$dbKey]; diff --git a/src/danog/MadelineProto/Db/DriverArray.php b/src/danog/MadelineProto/Db/DriverArray.php index 62ea8eff..c75a2988 100644 --- a/src/danog/MadelineProto/Db/DriverArray.php +++ b/src/danog/MadelineProto/Db/DriverArray.php @@ -2,49 +2,146 @@ namespace danog\MadelineProto\Db; +use Amp\Promise; use danog\MadelineProto\Logger; +use danog\MadelineProto\Settings\Database\DatabaseAbstract; use danog\MadelineProto\SettingsAbstract; use ReflectionClass; +use function Amp\call; + /** * Array caching trait. */ abstract class DriverArray implements DbArray { + protected string $table; + use ArrayCacheTrait; - public function __destruct() - { - $this->stopCacheCleanupLoop(); - } + /** + * Initialize connection. + */ + abstract public function initConnection(DatabaseAbstract $settings): \Generator; + /** + * Initialize on startup. + * + * @return \Generator + */ + abstract public function initStartup(): \Generator; /** - * Get string representation of driver/table. + * Create table for property. + * + * @return \Generator + * + * @throws \Throwable + */ + abstract protected function prepareTable(): \Generator; + + /** + * Rename table. + * + * @param string $from + * @param string $to + * @return \Generator + */ + abstract protected function renameTable(string $from, string $to): \Generator; + + /** + * Get the value of table. * * @return string */ - abstract public function __toString(): string; - - public function __wakeup() + public function getTable(): string { - if (isset($this->settings) && \is_array($this->settings)) { - $clazz = (new ReflectionClass($this))->getProperty('dbSettings')->getType()->getName(); - /** - * @var SettingsAbstract - * @psalm-suppress UndefinedThisPropertyAssignment - */ - $this->dbSettings = new $clazz; - $this->dbSettings->mergeArray($this->settings); - unset($this->settings); + return $this->table; + } + + /** + * Set the value of table. + * + * @param string $table + * + * @return self + */ + public function setTable(string $table): self + { + $this->table = $table; + + return $this; + } + + /** + * @param string $table + * @param DbArray|array|null $previous + * @param DatabaseAbstract $settings + * + * @return Promise + * + * @psalm-return Promise + */ + public static function getInstance(string $table, $previous, $settings): Promise + { + if ($previous instanceof static && $previous->getTable() === $table) { + $instance = &$previous; + } else { + $instance = new static(); + $instance->setTable($table); + } + + /** @psalm-suppress UndefinedPropertyAssignment */ + $instance->dbSettings = $settings; + $instance->ttl = $settings->getCacheTtl(); + + $instance->startCacheCleanupLoop(); + + return call(static function () use ($instance, $previous, $settings) { + yield from $instance->initConnection($settings); + yield from $instance->prepareTable(); + + if ($instance !== $previous) { + if ($previous instanceof DriverArray) { + yield from $previous->initStartup(); + } + yield from static::renameTmpTable($instance, $previous); + if ($instance instanceof SqlArray) { + Logger::log("Preparing statements..."); + yield from $instance->prepareStatements(); + } + yield from static::migrateDataToDb($instance, $previous); + } elseif ($instance instanceof SqlArray) { + Logger::log("Preparing statements..."); + yield from $instance->prepareStatements(); + } + + return $instance; + }); + } + + /** + * Rename table of old database, if the new one is not a temporary table name. + * + * Otherwise, simply change name of table in new database to match old table name. + * + * @param self $new New db + * @param DbArray|array|null $old Old db + * + * @return \Generator + */ + protected static function renameTmpTable(self $new, $old): \Generator + { + if ($old instanceof static && $old->getTable()) { + if ( + $old->getTable() !== $new->getTable() && + \mb_strpos($new->getTable(), 'tmp') !== 0 + ) { + yield from $new->renameTable($old->getTable(), $new->getTable()); + } else { + $new->setTable($old->getTable()); + } } } - public function offsetExists($index): bool - { - throw new \RuntimeException('Native isset not support promises. Use isset method'); - } - - abstract public function initConnection(\danog\MadelineProto\Settings\Database\DatabaseAbstract $settings): \Generator; - abstract public function initStartup(): \Generator; /** * @param self $new @@ -56,7 +153,7 @@ abstract class DriverArray implements DbArray protected static function migrateDataToDb(self $new, $old): \Generator { if (!empty($old) && !$old instanceof static) { - Logger::log('Converting database.', Logger::ERROR); + Logger::log('Converting database to '.\get_class($new), Logger::ERROR); if ($old instanceof DbArray) { $old = yield $old->getArrayCopy(); @@ -77,4 +174,48 @@ abstract class DriverArray implements DbArray Logger::log('Converting database done.', Logger::ERROR); } } + + + public function __destruct() + { + $this->stopCacheCleanupLoop(); + } + + /** + * Get the value of table. + * + * @return string + */ + public function __toString(): string + { + return $this->table; + } + + /** + * Sleep function. + * + * @return array + */ + public function __sleep(): array + { + return ['table', 'dbSettings']; + } + + public function __wakeup() + { + if (isset($this->settings) && \is_array($this->settings)) { + $clazz = (new ReflectionClass($this))->getProperty('dbSettings')->getType()->getName(); + /** + * @var SettingsAbstract + * @psalm-suppress UndefinedThisPropertyAssignment + */ + $this->dbSettings = new $clazz; + $this->dbSettings->mergeArray($this->settings); + unset($this->settings); + } + } + public function offsetExists($index): bool + { + throw new \RuntimeException('Native isset not support promises. Use isset method'); + } } diff --git a/src/danog/MadelineProto/Db/MysqlArray.php b/src/danog/MadelineProto/Db/MysqlArray.php index e6ca2f7c..e0bbe15b 100644 --- a/src/danog/MadelineProto/Db/MysqlArray.php +++ b/src/danog/MadelineProto/Db/MysqlArray.php @@ -3,22 +3,16 @@ namespace danog\MadelineProto\Db; use Amp\Mysql\Pool; -use Amp\Producer; use Amp\Promise; -use Amp\Sql\ResultSet; -use Amp\Success; use danog\MadelineProto\Db\Driver\Mysql; use danog\MadelineProto\Logger; use danog\MadelineProto\Settings\Database\Mysql as DatabaseMysql; -use function Amp\call; - /** * MySQL database backend. */ class MysqlArray extends SqlArray { - protected string $table; protected DatabaseMysql $dbSettings; private Pool $db; @@ -32,157 +26,39 @@ class MysqlArray extends SqlArray */ public function initStartup(): \Generator { - return $this->initConnection($this->dbSettings); - } - public function __toString(): string - { - return $this->table; - } - - public function __sleep(): array - { - return ['table', 'dbSettings']; + yield from $this->initConnection($this->dbSettings); + yield from $this->prepareStatements(); } /** - * Check if key isset. + * Prepare statements. * - * @param $key - * - * @return Promise true if the offset exists, otherwise false + * @return \Generator */ - public function isset($key): Promise + protected function prepareStatements(): \Generator { - return call(fn () => yield $this->offsetGet($key) !== null); - } - - - public function offsetGet($offset): Promise - { - return call(function () use ($offset) { - if ($cached = $this->getCache($offset)) { - return $cached; - } - - $row = yield $this->request( - "SELECT `value` FROM `{$this->table}` WHERE `key` = :index LIMIT 1", - ['index' => $offset] - ); - - if ($value = $this->getValue($row)) { - $this->setCache($offset, $value); - } - - return $value; - }); - } - - /** - * Set value for an offset. - * - * @link https://php.net/manual/en/arrayiterator.offsetset.php - * - * @param string|int $index

- * The index to set for. - *

- * @param $value - * - * @throws \Throwable - */ - - public function offsetSet($index, $value): Promise - { - if ($this->getCache($index) === $value) { - return new Success(); - } - - $this->setCache($index, $value); - - $request = $this->request( - " + $this->get = yield $this->db->prepare( + "SELECT `value` FROM `{$this->table}` WHERE `key` = :index LIMIT 1" + ); + $this->set = yield $this->db->prepare(" INSERT INTO `{$this->table}` SET `key` = :index, `value` = :value ON DUPLICATE KEY UPDATE `value` = :value - ", - [ - 'index' => $index, - 'value' => \serialize($value), - ] - ); - - //Ensure that cache is synced with latest insert in case of concurrent requests. - $request->onResolve(fn () => $this->setCache($index, $value)); - - return $request; + "); + $this->unset = yield $this->db->prepare(" + DELETE FROM `{$this->table}` + WHERE `key` = :index + "); + $this->count = yield $this->db->prepare(" + SELECT count(`key`) as `count` FROM `{$this->table}` + "); + $this->iterate = yield $this->db->prepare(" + SELECT `key`, `value` FROM `{$this->table}` + "); } - /** - * Unset value for an offset. - * - * @link https://php.net/manual/en/arrayiterator.offsetunset.php - * - * @param string|int $index

- * The offset to unset. - *

- * - * @return Promise - * @throws \Throwable - */ - public function offsetUnset($index): Promise - { - $this->unsetCache($index); - return $this->request( - " - DELETE FROM `{$this->table}` - WHERE `key` = :index - ", - ['index' => $index] - ); - } - - public function getArrayCopy(): Promise - { - return call(function () { - $iterator = $this->getIterator(); - $result = []; - while (yield $iterator->advance()) { - [$key, $value] = $iterator->getCurrent(); - $result[$key] = $value; - } - return $result; - }); - } - - public function getIterator(): Producer - { - return new Producer(function (callable $emit) { - $request = yield $this->db->execute("SELECT `key`, `value` FROM `{$this->table}`"); - - while (yield $request->advance()) { - $row = $request->getCurrent(); - yield $emit([$row['key'], $this->getValue($row)]); - } - }); - } - - /** - * Count elements. - * - * @link https://php.net/manual/en/arrayiterator.count.php - * @return Promise The number of elements or public properties in the associated - * array or object, respectively. - * @throws \Throwable - */ - public function count(): Promise - { - return call(function () { - $row = yield $this->request("SELECT count(`key`) as `count` FROM `{$this->table}`"); - return $row[0]['count'] ?? 0; - }); - } - - private function getValue(array $row) + protected function getValue(array $row) { if ($row) { if (!empty($row[0]['value'])) { @@ -218,7 +94,7 @@ class MysqlArray extends SqlArray protected function prepareTable(): \Generator { Logger::log("Creating/checking table {$this->table}", Logger::WARNING); - return yield $this->request(" + return yield $this->db->query(" CREATE TABLE IF NOT EXISTS `{$this->table}` ( `key` VARCHAR(255) NOT NULL, @@ -235,55 +111,12 @@ class MysqlArray extends SqlArray protected function renameTable(string $from, string $to): \Generator { Logger::log("Renaming table {$from} to {$to}", Logger::WARNING); - yield $this->request(" + yield $this->db->query(" + DROP TABLE IF EXISTS `{$to}`; + "); + + yield $this->db->query(" ALTER TABLE `{$from}` RENAME TO `{$to}`; "); - - yield $this->request(" - DROP TABLE IF EXISTS `{$from}`; - "); - } - - /** - * Perform async request to db. - * - * @param string $query - * @param array $params - * - * @return Promise - * @throws \Throwable - */ - private function request(string $query, array $params = []): Promise - { - return call(function () use ($query, $params) { - //Logger::log([$query, $params], Logger::VERBOSE); - - if (empty($this->db) || !$this->db->isAlive()) { - Logger::log('No database connection', Logger::WARNING); - return []; - } - - if ( - !empty($params['index']) - && !\mb_check_encoding($params['index'], 'UTF-8') - ) { - $params['index'] = \mb_convert_encoding($params['index'], 'UTF-8'); - } - - try { - $request = yield $this->db->execute($query, $params); - } catch (\Throwable $e) { - Logger::log($e->getMessage(), Logger::ERROR); - return []; - } - - $result = []; - if ($request instanceof ResultSet) { - while (yield $request->advance()) { - $result[] = $request->getCurrent(); - } - } - return $result; - }); } } diff --git a/src/danog/MadelineProto/Db/PostgresArray.php b/src/danog/MadelineProto/Db/PostgresArray.php index 10f6ded9..da849471 100644 --- a/src/danog/MadelineProto/Db/PostgresArray.php +++ b/src/danog/MadelineProto/Db/PostgresArray.php @@ -2,29 +2,53 @@ namespace danog\MadelineProto\Db; +use Amp\Postgres\ByteA; use Amp\Postgres\Pool; -use Amp\Producer; use Amp\Promise; -use Amp\Sql\ResultSet; use Amp\Success; use danog\MadelineProto\Db\Driver\Postgres; use danog\MadelineProto\Logger; use danog\MadelineProto\Settings\Database\Postgres as DatabasePostgres; -use function Amp\call; - /** * Postgres database backend. */ class PostgresArray extends SqlArray { - protected string $table; public DatabasePostgres $dbSettings; private Pool $db; // Legacy protected array $settings; + /** + * Prepare statements. + * + * @return \Generator + */ + protected function prepareStatements(): \Generator + { + $this->get = yield $this->db->prepare( + "SELECT value FROM \"{$this->table}\" WHERE key = :index LIMIT 1", + ); + $this->set = yield $this->db->prepare(" + INSERT INTO \"{$this->table}\" + (key,value) + VALUES (:index, :value) + ON CONFLICT (key) DO UPDATE SET value = :value + "); + $this->unset = yield $this->db->prepare(" + DELETE FROM \"{$this->table}\" + WHERE key = :index + "); + $this->count = yield $this->db->prepare(" + SELECT count(key) as count FROM \"{$this->table}\" + "); + $this->iterate = yield $this->db->prepare(" + SELECT key, value FROM \"{$this->table}\" + "); + } + /** * Initialize on startup. * @@ -32,11 +56,8 @@ class PostgresArray extends SqlArray */ public function initStartup(): \Generator { - return $this->initConnection($this->dbSettings); - } - public function __toString(): string - { - return $this->table; + yield from $this->initConnection($this->dbSettings); + yield from $this->prepareStatements(); } /** * Initialize connection. @@ -51,19 +72,35 @@ class PostgresArray extends SqlArray } } + protected function getValue(array $row) + { + if ($row) { + if (!empty($row[0]['value'])) { + $row = \reset($row); + } + if (!$row['value']) { + return $row['value']; + } + if ($row['value'][0] === '\\') { + $row['value'] = hex2bin(substr($row['value'], 2)); + } + return \unserialize($row['value']); + } + return null; + } + /** * Set value for an offset. * * @link https://php.net/manual/en/arrayiterator.offsetset.php * - * @param string $index

+ * @param string|int $index

* The index to set for. *

* @param $value * * @throws \Throwable */ - public function offsetSet($index, $value): Promise { if ($this->getCache($index) === $value) { @@ -72,16 +109,11 @@ class PostgresArray extends SqlArray $this->setCache($index, $value); - $request = $this->request( - " - INSERT INTO \"{$this->table}\" - (key,value) - VALUES (:index, :value) - ON CONFLICT (key) DO UPDATE SET value = :value - ", + $request = $this->execute( + $this->set, [ 'index' => $index, - 'value' => \serialize($value), + 'value' => new ByteA(\serialize($value)), ] ); @@ -104,203 +136,35 @@ class PostgresArray extends SqlArray { Logger::log("Creating/checking table {$this->table}", Logger::WARNING); - yield $this->request(" + yield $this->db->query(" CREATE TABLE IF NOT EXISTS \"{$this->table}\" ( \"key\" VARCHAR(255) NOT NULL, \"value\" BYTEA NULL, \"ts\" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, CONSTRAINT \"{$this->table}_pkey\" PRIMARY KEY(\"key\") - ); - - - - + ); "); - yield $this->request(" + yield $this->db->query(" DROP TRIGGER IF exists \"{$this->table}_update_ts_trigger\" ON \"{$this->table}\"; "); - yield $this->request(" + yield $this->db->query(" CREATE TRIGGER \"{$this->table}_update_ts_trigger\" BEFORE UPDATE ON \"{$this->table}\" FOR EACH ROW EXECUTE PROCEDURE update_ts(); "); } - public function __sleep() - { - return ['table', 'dbSettings']; - } - - /** - * Check if key isset. - * - * @param $key - * - * @return Promise true if the offset exists, otherwise false - */ - public function isset($key): Promise - { - return call(fn () => yield $this->offsetGet($key) !== null); - } - - - public function offsetGet($offset): Promise - { - return call(function () use ($offset) { - if ($cached = $this->getCache($offset)) { - return $cached; - } - - $row = yield $this->request( - "SELECT value FROM \"{$this->table}\" WHERE key = :index LIMIT 1", - ['index' => $offset] - ); - - if ($value = $this->getValue($row)) { - $this->setCache($offset, $value); - } - - return $value; - }); - } - - /** - * Unset value for an offset. - * - * @link https://php.net/manual/en/arrayiterator.offsetunset.php - * - * @param string $index

- * The offset to unset. - *

- * - * @return Promise - * @throws \Throwable - */ - public function offsetUnset($index): Promise - { - $this->unsetCache($index); - - return $this->request( - " - DELETE FROM \"{$this->table}\" - WHERE key = :index - ", - ['index' => $index] - ); - } - - /** - * Get array copy. - * - * @return Promise - * @throws \Throwable - */ - public function getArrayCopy(): Promise - { - return call(function () { - $iterator = $this->getIterator(); - $result = []; - while (yield $iterator->advance()) { - [$key, $value] = $iterator->getCurrent(); - $result[$key] = $value; - } - return $result; - }); - } - - public function getIterator(): Producer - { - return new Producer(function (callable $emit) { - $request = yield $this->db->execute("SELECT key, value FROM \"{$this->table}\""); - - while (yield $request->advance()) { - $row = $request->getCurrent(); - yield $emit([$row['key'], $this->getValue($row)]); - } - }); - } - - /** - * Count elements. - * - * @link https://php.net/manual/en/arrayiterator.count.php - * @return Promise The number of elements or public properties in the associated - * array or object, respectively. - * @throws \Throwable - */ - public function count(): Promise - { - return call(function () { - $row = yield $this->request("SELECT count(key) as count FROM \"{$this->table}\""); - return $row[0]['count'] ?? 0; - }); - } - - private function getValue(array $row) - { - if ($row) { - if (!empty($row[0]['value'])) { - $row = \reset($row); - } - return \unserialize($row['value']); - } - return null; - } - - protected function renameTable(string $from, string $to): \Generator { Logger::log("Renaming table {$from} to {$to}", Logger::WARNING); - yield $this->request(" + + yield $this->db->query(" + DROP TABLE IF EXISTS \"{$to}\"; + "); + + yield $this->db->query(" ALTER TABLE \"{$from}\" RENAME TO \"{$to}\"; "); - - yield $this->request(" - DROP TABLE IF EXISTS \"{$from}\"; - "); - } - - /** - * Perform async request to db. - * - * @param string $query - * @param array $params - * - * @return Promise - * @throws \Throwable - */ - private function request(string $query, array $params = []): Promise - { - return call(function () use ($query, $params) { - Logger::log([$query, $params], Logger::VERBOSE); - - if (empty($this->db) || !$this->db->isAlive()) { - Logger::log('No database connection', Logger::WARNING); - return []; - } - - if ( - !empty($params['index']) - && !\mb_check_encoding($params['index'], 'UTF-8') - ) { - $params['index'] = \mb_convert_encoding($params['index'], 'UTF-8'); - } - - try { - $request = yield $this->db->execute($query, $params); - } catch (\Throwable $e) { - Logger::log($e->getMessage(), Logger::ERROR); - return []; - } - - $result = []; - if ($request instanceof ResultSet) { - while (yield $request->advance()) { - $result[] = $request->getCurrent(); - } - } - return $result; - }); } } diff --git a/src/danog/MadelineProto/Db/RedisArray.php b/src/danog/MadelineProto/Db/RedisArray.php index 52ca2ece..8b5f62e7 100644 --- a/src/danog/MadelineProto/Db/RedisArray.php +++ b/src/danog/MadelineProto/Db/RedisArray.php @@ -16,7 +16,7 @@ use function Amp\call; /** * Redis database backend. */ -class RedisArray extends SqlArray +class RedisArray extends DriverArray { protected DatabaseRedis $dbSettings; private RedisRedis $db; @@ -33,10 +33,6 @@ class RedisArray extends SqlArray { return $this->initConnection($this->dbSettings); } - public function __toString(): string - { - return $this->table; - } /** * @return Generator * @@ -47,7 +43,6 @@ class RedisArray extends SqlArray yield new Success; } - protected function renameTable(string $from, string $to): \Generator { Logger::log("Renaming table {$from} to {$to}", Logger::WARNING); @@ -77,11 +72,6 @@ class RedisArray extends SqlArray } - public function __sleep() - { - return ['table', 'dbSettings']; - } - /** * Get redis key name. * diff --git a/src/danog/MadelineProto/Db/SqlArray.php b/src/danog/MadelineProto/Db/SqlArray.php index b63be2c5..9d5fdd90 100644 --- a/src/danog/MadelineProto/Db/SqlArray.php +++ b/src/danog/MadelineProto/Db/SqlArray.php @@ -2,8 +2,12 @@ namespace danog\MadelineProto\Db; +use Amp\Producer; use Amp\Promise; -use danog\MadelineProto\Settings\Database\DatabaseAbstract; +use Amp\Sql\ResultSet; +use Amp\Sql\Statement; +use Amp\Success; +use danog\MadelineProto\Logger; use function Amp\call; @@ -12,82 +16,189 @@ use function Amp\call; */ abstract class SqlArray extends DriverArray { - protected string $table; + protected Statement $get; + protected Statement $set; + protected Statement $unset; + protected Statement $count; + + protected Statement $iterate; /** - * Create table for property. + * Prepare statements. * * @return \Generator - * - * @throws \Throwable */ - abstract protected function prepareTable(): \Generator; - - abstract protected function renameTable(string $from, string $to): \Generator; - + abstract protected function prepareStatements(): \Generator; /** - * @param string $table - * @param DbArray|array|null $value - * @param DatabaseAbstract $settings + * Get value from row. * - * @return Promise - * - * @psalm-return Promise + * @param array $row + * @return void */ - public static function getInstance(string $table, $value, $settings): Promise + abstract protected function getValue(array $row); + + + public function getIterator(): Producer { - if ($value instanceof static && $value->table === $table) { - $instance = &$value; - } else { - $instance = new static(); - $instance->table = $table; - } + return new Producer(function (callable $emit) { + $request = yield $this->iterate->execute(); - /** @psalm-suppress UndefinedPropertyAssignment */ - $instance->dbSettings = $settings; - $instance->ttl = $settings->getCacheTtl(); - - $instance->startCacheCleanupLoop(); - - return call(static function () use ($instance, $value, $settings) { - yield from $instance->initConnection($settings); - yield from $instance->prepareTable(); - - // Skip migrations if its same object - if ($instance !== $value) { - if ($value instanceof DriverArray) { - yield from $value->initStartup(); - } - yield from static::renameTmpTable($instance, $value); - yield from static::migrateDataToDb($instance, $value); + while (yield $request->advance()) { + $row = $request->getCurrent(); + yield $emit([$row['key'], $this->getValue($row)]); } - - return $instance; + }); + } + public function getArrayCopy(): Promise + { + return call(function () { + $iterator = $this->getIterator(); + $result = []; + while (yield $iterator->advance()) { + [$key, $value] = $iterator->getCurrent(); + $result[$key] = $value; + } + return $result; }); } /** - * Rename table of old database, if the new one is not a temporary table name. + * Check if key isset. * - * Otherwise, change name of table in new database to match old table name. + * @param $key * - * @param self $new New db - * @param DbArray|array|null $old Old db - * - * @return \Generator + * @return Promise true if the offset exists, otherwise false */ - protected static function renameTmpTable(self $new, $old): \Generator + public function isset($key): Promise { - if ($old instanceof static && $old->table) { - if ( - $old->table !== $new->table && - \mb_strpos($new->table, 'tmp') !== 0 - ) { - yield from $new->renameTable($old->table, $new->table); - } else { - $new->table = $old->table; + return call(fn () => yield $this->offsetGet($key) !== null); + } + + + /** + * Unset value for an offset. + * + * @link https://php.net/manual/en/arrayiterator.offsetunset.php + * + * @param string|int $index

+ * The offset to unset. + *

+ * + * @return Promise + * @throws \Throwable + */ + public function offsetUnset($index): Promise + { + $this->unsetCache($index); + + return $this->execute( + $this->unset, + ['index' => $index] + ); + } + + /** + * Count elements. + * + * @link https://php.net/manual/en/arrayiterator.count.php + * @return Promise The number of elements or public properties in the associated + * array or object, respectively. + * @throws \Throwable + */ + public function count(): Promise + { + return call(function () { + $row = yield $this->execute($this->count); + return $row[0]['count'] ?? 0; + }); + } + + public function offsetGet($offset): Promise + { + return call(function () use ($offset) { + if ($cached = $this->getCache($offset)) { + return $cached; } + + $row = yield $this->execute($this->get, ['index' => $offset]); + + if ($value = $this->getValue($row)) { + $this->setCache($offset, $value); + } + + return $value; + }); + } + + + /** + * Set value for an offset. + * + * @link https://php.net/manual/en/arrayiterator.offsetset.php + * + * @param string|int $index

+ * The index to set for. + *

+ * @param $value + * + * @throws \Throwable + */ + public function offsetSet($index, $value): Promise + { + if ($this->getCache($index) === $value) { + return new Success(); } + + $this->setCache($index, $value); + + $request = $this->execute( + $this->set, + [ + 'index' => $index, + 'value' => \serialize($value), + ] + ); + + //Ensure that cache is synced with latest insert in case of concurrent requests. + $request->onResolve(fn () => $this->setCache($index, $value)); + + return $request; + } + + /** + * Perform async request to db. + * + * @param Statement $query + * @param array $params + * + * @return Promise + * @throws \Throwable + */ + protected function execute(Statement $stmt, array $params = []): Promise + { + return call(function () use ($stmt, $params) { + if ( + !empty($params['index']) + && !\mb_check_encoding($params['index'], 'UTF-8') + ) { + $params['index'] = \mb_convert_encoding($params['index'], 'UTF-8'); + } + + try { + $request = yield $stmt->execute($params); + } catch (\Throwable $e) { + Logger::log($e->getMessage(), Logger::ERROR); + return []; + } + + $result = []; + if ($request instanceof ResultSet) { + while (yield $request->advance()) { + $result[] = $request->getCurrent(); + } + } + return $result; + }); } } diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index e79055b7..632c064f 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -528,6 +528,11 @@ class MTProto extends AsyncConstruct implements TLCallback */ public static function serializeAll(): void { + static $done = false; + if ($done) { + return; + } + $done = true; Logger::log('Prompting final serialization (SHUTDOWN)...'); foreach (self::$references as $instance) { Tools::wait($instance->wrapper->serialize()); @@ -976,17 +981,18 @@ class MTProto extends AsyncConstruct implements TLCallback if (!isset($this->datacenter)) { $this->datacenter ??= new DataCenter($this, $this->dcList, $this->settings->getConnection()); } + $db = []; if (!isset($this->referenceDatabase)) { $this->referenceDatabase = new ReferenceDatabase($this); - yield from $this->referenceDatabase->init(); + $db []= $this->referenceDatabase->init(); } else { - yield from $this->referenceDatabase->init(); + $db []= $this->referenceDatabase->init(); } if (!isset($this->minDatabase)) { $this->minDatabase = new MinDatabase($this); - yield from $this->minDatabase->init(); + $db []= $this->minDatabase->init(); } else { - yield from $this->minDatabase->init(); + $db []= $this->minDatabase->init(); } if (!isset($this->TL)) { $this->TL = new TL($this); @@ -997,7 +1003,8 @@ class MTProto extends AsyncConstruct implements TLCallback $this->TL->init($this->settings->getSchema(), $callbacks); } - yield from $this->initDb($this); + $db []= $this->initDb($this); + yield Tools::all($db); yield from $this->fillUsernamesCache(); } diff --git a/src/danog/MadelineProto/Magic.php b/src/danog/MadelineProto/Magic.php index 4e4295ea..435a0596 100644 --- a/src/danog/MadelineProto/Magic.php +++ b/src/danog/MadelineProto/Magic.php @@ -232,8 +232,8 @@ class Magic } if (!self::$initedLight) { // Setup error reporting - \set_error_handler(['\\danog\\MadelineProto\\Exception', 'ExceptionErrorHandler']); - \set_exception_handler(['\\danog\\MadelineProto\\Exception', 'ExceptionHandler']); + \set_error_handler([Exception::class, 'ExceptionErrorHandler']); + \set_exception_handler([Exception::class, 'ExceptionHandler']); self::$isIpcWorker = \defined('MADELINE_WORKER_TYPE') ? \MADELINE_WORKER_TYPE === 'madeline-ipc' : false; if (PHP_SAPI !== 'cli' && PHP_SAPI !== 'phpdbg') { try { diff --git a/src/danog/MadelineProto/Settings/Database/SqlAbstract.php b/src/danog/MadelineProto/Settings/Database/SqlAbstract.php index 768f5ba7..15324133 100644 --- a/src/danog/MadelineProto/Settings/Database/SqlAbstract.php +++ b/src/danog/MadelineProto/Settings/Database/SqlAbstract.php @@ -21,7 +21,7 @@ abstract class SqlAbstract extends DatabaseAbstract /** * Maximum connection limit. */ - protected int $maxConnections = 10; + protected int $maxConnections = 100; /** * Idle timeout. @@ -49,6 +49,12 @@ abstract class SqlAbstract extends DatabaseAbstract parent::mergeArray($settings); } + public function __wakeup() + { + if ($this->maxConnections < 40) { + $this->maxConnections = 100; + } + } /** * Get maximum connection limit. *