From 9df1e27780c5c7514551db422759725fb821f950 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sat, 12 Sep 2020 19:06:42 +0200 Subject: [PATCH] Add redis backend --- examples/bot.php | 3 +- .../MadelineProto/Db/ArrayCacheTrait.php | 2 +- src/danog/MadelineProto/Db/DbArray.php | 2 - .../MadelineProto/Db/DbPropertiesFactory.php | 9 +- src/danog/MadelineProto/Db/Driver/Redis.php | 11 +- src/danog/MadelineProto/Db/DriverArray.php | 6 + src/danog/MadelineProto/Db/MysqlArray.php | 11 +- src/danog/MadelineProto/Db/PostgresArray.php | 45 ++- src/danog/MadelineProto/Db/RedisArray.php | 291 ++++++------------ src/danog/MadelineProto/Db/SqlArray.php | 8 +- src/danog/MadelineProto/MTProto.php | 11 +- 11 files changed, 147 insertions(+), 252 deletions(-) diff --git a/examples/bot.php b/examples/bot.php index 9c622fba..5c907b8b 100755 --- a/examples/bot.php +++ b/examples/bot.php @@ -92,7 +92,8 @@ $settings = [ ], 'serialization' => [ 'serialization_interval' => 30, - ] + ], + 'db' => ['type' => 'redis'] ]; $MadelineProto = new API('bot.madeline', $settings); diff --git a/src/danog/MadelineProto/Db/ArrayCacheTrait.php b/src/danog/MadelineProto/Db/ArrayCacheTrait.php index d11336d2..da4ba3e9 100644 --- a/src/danog/MadelineProto/Db/ArrayCacheTrait.php +++ b/src/danog/MadelineProto/Db/ArrayCacheTrait.php @@ -55,7 +55,7 @@ trait ArrayCacheTrait protected function startCacheCleanupLoop(): void { - $this->cacheCleanupId = Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, [$this, 'cleanupCache']); + $this->cacheCleanupId = Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, fn () => $this->cleanupCache()); } protected function stopCacheCleanupLoop(): void { diff --git a/src/danog/MadelineProto/Db/DbArray.php b/src/danog/MadelineProto/Db/DbArray.php index bb5de97e..eedd250a 100644 --- a/src/danog/MadelineProto/Db/DbArray.php +++ b/src/danog/MadelineProto/Db/DbArray.php @@ -14,8 +14,6 @@ interface DbArray extends DbType, \ArrayAccess, \Countable public function offsetUnset($offset): Promise; public function count(): Promise; public function getIterator(): Producer; - - public function init(): Promise; /** * @deprecated diff --git a/src/danog/MadelineProto/Db/DbPropertiesFactory.php b/src/danog/MadelineProto/Db/DbPropertiesFactory.php index cfde10c4..3a7cf1fa 100644 --- a/src/danog/MadelineProto/Db/DbPropertiesFactory.php +++ b/src/danog/MadelineProto/Db/DbPropertiesFactory.php @@ -26,13 +26,16 @@ class DbPropertiesFactory switch (\strtolower($dbSettings['type'])) { case 'memory': - $class .= '\Memory'; + $class .= '\\Memory'; break; case 'mysql': - $class .= '\Mysql'; + $class .= '\\Mysql'; break; case 'postgres': - $class .= '\Postgres'; + $class .= '\\Postgres'; + break; + case 'redis': + $class .= '\\Redis'; break; default: throw new \InvalidArgumentException("Unknown dbType: {$dbSettings['type']}"); diff --git a/src/danog/MadelineProto/Db/Driver/Redis.php b/src/danog/MadelineProto/Db/Driver/Redis.php index dd42c75e..3829afe8 100644 --- a/src/danog/MadelineProto/Db/Driver/Redis.php +++ b/src/danog/MadelineProto/Db/Driver/Redis.php @@ -26,23 +26,22 @@ class Redis * @throws \Amp\Sql\FailureException * @throws \Throwable * - * @return RedisRedis + * @return \Generator */ public static function getConnection( string $host = '127.0.0.1', int $port = 6379, string $password = '', - int $db = 0, - int $maxConnections = ConnectionPool::DEFAULT_MAX_CONNECTIONS, - int $idleTimeout = ConnectionPool::DEFAULT_IDLE_TIMEOUT - ): RedisRedis { + int $db = 0 + ): \Generator { $dbKey = "$host:$port:$db"; if (empty(static::$connections[$dbKey])) { $config = Config::fromUri( - "host={$host} port={$port} password={$password} db={$db}" + "{$host}:{$port}?password={$password}&db={$db}" ); static::$connections[$dbKey] = new RedisRedis((new RemoteExecutorFactory($config))->createQueryExecutor()); + yield static::$connections[$dbKey]->ping(); } return static::$connections[$dbKey]; diff --git a/src/danog/MadelineProto/Db/DriverArray.php b/src/danog/MadelineProto/Db/DriverArray.php index 1bb9cec1..e01bd3bd 100644 --- a/src/danog/MadelineProto/Db/DriverArray.php +++ b/src/danog/MadelineProto/Db/DriverArray.php @@ -13,6 +13,12 @@ abstract class DriverArray implements DbArray $this->stopCacheCleanupLoop(); } + + public function offsetExists($index): bool + { + throw new \RuntimeException('Native isset not support promises. Use isset method'); + } + abstract protected function initConnection(array $settings): \Generator; /** diff --git a/src/danog/MadelineProto/Db/MysqlArray.php b/src/danog/MadelineProto/Db/MysqlArray.php index 0c67fceb..f08f74d8 100644 --- a/src/danog/MadelineProto/Db/MysqlArray.php +++ b/src/danog/MadelineProto/Db/MysqlArray.php @@ -21,12 +21,7 @@ class MysqlArray extends SqlArray { return ['table', 'settings']; } - - public function offsetExists($index): bool - { - throw new \RuntimeException('Native isset not support promises. Use isset method'); - } - + /** * Check if key isset. * @@ -203,7 +198,7 @@ class MysqlArray extends SqlArray * @return array|null * @throws \Throwable */ - private function prepareTable() + protected function prepareTable(): \Generator { Logger::log("Creating/checking table {$this->table}", Logger::WARNING); return yield $this->request(" @@ -220,7 +215,7 @@ class MysqlArray extends SqlArray "); } - private function renameTable(string $from, string $to) + protected function renameTable(string $from, string $to): \Generator { Logger::log("Renaming table {$from} to {$to}", Logger::WARNING); yield $this->request(" diff --git a/src/danog/MadelineProto/Db/PostgresArray.php b/src/danog/MadelineProto/Db/PostgresArray.php index f4bc5057..5a9e1a2e 100644 --- a/src/danog/MadelineProto/Db/PostgresArray.php +++ b/src/danog/MadelineProto/Db/PostgresArray.php @@ -6,16 +6,32 @@ use Amp\Postgres\Pool; use Amp\Producer; use Amp\Promise; use Amp\Sql\ResultSet; +use Amp\Success; use danog\MadelineProto\Db\Driver\Postgres; use danog\MadelineProto\Logger; use function Amp\call; class PostgresArray extends SqlArray { - private string $table; - private array $settings; + protected string $table; + protected array $settings; private Pool $db; + protected function initConnection(array $settings): \Generator + { + if (!isset($this->db)) { + $this->db = yield from Postgres::getConnection( + $settings['host'], + $settings['port'], + $settings['user'], + $settings['password'], + $settings['database'], + $settings['max_connections'], + $settings['idle_timeout'] + ); + } + } + /** * Set value for an offset. * @@ -32,7 +48,7 @@ class PostgresArray extends SqlArray public function offsetSet($index, $value): Promise { if ($this->getCache($index) === $value) { - return call(fn () =>null); + return new Success(); } $this->setCache($index, $value); @@ -56,26 +72,13 @@ class PostgresArray extends SqlArray return $request; } - protected static function getDbConnection(array $settings): \Generator - { - return Postgres::getConnection( - $settings['host'], - $settings['port'], - $settings['user'], - $settings['password'], - $settings['database'], - $settings['max_connections'], - $settings['idle_timeout'] - ); - } - /** * Create table for property. * * @return array|null * @throws \Throwable */ - protected function prepareTable() + protected function prepareTable(): \Generator { Logger::log("Creating/checking table {$this->table}", Logger::WARNING); @@ -107,12 +110,6 @@ class PostgresArray extends SqlArray return ['table', 'settings']; } - - public function offsetExists($index): bool - { - throw new \RuntimeException('Native isset not support promises. Use isset method'); - } - /** * Check if key isset. * @@ -230,7 +227,7 @@ class PostgresArray extends SqlArray } - protected function renameTable(string $from, string $to) + protected function renameTable(string $from, string $to): \Generator { Logger::log("Renaming table {$from} to {$to}", Logger::WARNING); yield $this->request(" diff --git a/src/danog/MadelineProto/Db/RedisArray.php b/src/danog/MadelineProto/Db/RedisArray.php index f1f85625..f9ec2859 100644 --- a/src/danog/MadelineProto/Db/RedisArray.php +++ b/src/danog/MadelineProto/Db/RedisArray.php @@ -2,25 +2,86 @@ namespace danog\MadelineProto\Db; -use Amp\Postgres\Pool; use Amp\Producer; use Amp\Promise; -use Amp\Redis\Redis; -use Amp\Sql\ResultSet; -use danog\MadelineProto\Db\Driver\Redis as DriverRedis; +use Amp\Redis\Redis as RedisRedis; +use Amp\Success; +use danog\MadelineProto\Db\Driver\Redis as Redis; use danog\MadelineProto\Logger; use Generator; use function Amp\call; -class RedisArray extends DriverArray +class RedisArray extends SqlArray { - use ArrayCacheTrait; + protected string $table; + protected array $settings; + private RedisRedis $db; - private string $table; - private array $settings; - private Pool $db; + protected function prepareTable(): Generator + { + yield new Success; + } + + protected function renameTable(string $from, string $to): \Generator + { + Logger::log("Renaming table {$from} to {$to}", Logger::WARNING); + $request = $this->db->scan($this->itKey()); + + $lenK = \strlen($from); + while (yield $request->advance()) { + $key = $request->getCurrent(); + yield $this->db->rename($key, $to.\substr($key, $lenK)); + } + } + + protected function initConnection(array $settings): \Generator + { + if (!isset($this->db)) { + $this->db = yield from Redis::getConnection( + $settings['host'], + $settings['port'], + $settings['password'], + $settings['database'] + ); + } + } + + public function __sleep() + { + return ['table', 'settings']; + } + /** + * Get redis key name. + * + * @param string $key + * @return string + */ + private function rKey(string $key): string + { + return 'va:'.$this->table.':'.$key; + } + /** + * Get redis ts name. + * + * @param string $key + * @return string + */ + private function tsKey(string $key): string + { + return 'ts:'.$this->table.$key; + } + + /** + * Get iterator key. + * + * @return string + */ + private function itKey(): string + { + return 'va:'.$this->table.'*'; + } /** * Set value for an offset. * @@ -37,23 +98,19 @@ class RedisArray extends DriverArray public function offsetSet($index, $value): Promise { if ($this->getCache($index) === $value) { - return call(fn () =>null); + return new Success(); } $this->setCache($index, $value); - $request = $this->request( - " - INSERT INTO \"{$this->table}\" - (key,value) - VALUES (:index, :value) - ON CONFLICT (key) DO UPDATE SET value = :value - ", + /* + $request = $this->db->setMultiple( [ - 'index' => $index, - 'value' => \serialize($value), + $this->rKey($index) => \serialize($value), + $this->tsKey($index) => \time() ] - ); + );*/ + $request = $this->db->set($this->rKey($index), \serialize($value)); //Ensure that cache is synced with latest insert in case of concurrent requests. $request->onResolve(fn () => $this->setCache($index, $value)); @@ -61,107 +118,6 @@ class RedisArray extends DriverArray return $request; } - protected function initConnection(array $settings): \Generator - { - } - - public function __sleep() - { - return ['table', 'settings']; - } - - /** - * @param string $name - * @param DbArray|array|null $value - * @param string $tablePrefix - * @param array $settings - * - * @return Promise - */ - public static function getInstance(string $name, $value = null, string $tablePrefix = '', array $settings = []): Promise - { - $tableName = "{$tablePrefix}_{$name}"; - if ($value instanceof self && $value->table === $tableName) { - $instance = &$value; - } else { - $instance = new static(); - $instance->table = $tableName; - } - - $instance->settings = $settings; - $instance->db = static::getDbConnection($settings); - $instance->ttl = $settings['cache_ttl'] ?? $instance->ttl; - - $instance->startCacheCleanupLoop(); - - return call(static function () use ($instance, $value, $settings) { - //Skip migrations if its same object - if ($instance !== $value) { - yield from static::renameTmpTable($instance, $value); - yield from static::migrateDataToDb($instance, $value); - } - - return $instance; - }); - } - - /** - * @param PostgresArray $instance - * @param DbArray|array|null $value - * - * @return \Generator - */ - private static function renameTmpTable(self $instance, $value): \Generator - { - if ($value instanceof static && $value->table) { - if ( - $value->table !== $instance->table && - \mb_strpos($instance->table, 'tmp') !== 0 - ) { - yield from $instance->renameTable($value->table, $instance->table); - } else { - $instance->table = $value->table; - } - } - } - - /** - * @param PostgresArray $instance - * @param DbArray|array|null $value - * - * @return \Generator - * @throws \Throwable - */ - private static function migrateDataToDb(self $instance, $value): \Generator - { - if (!empty($value) && !$value instanceof PostgresArray) { - Logger::log('Converting database.', Logger::ERROR); - - if ($value instanceof DbArray) { - $value = yield $value->getArrayCopy(); - } else { - $value = (array) $value; - } - $counter = 0; - $total = \count($value); - foreach ($value as $key => $item) { - $counter++; - if ($counter % 500 === 0) { - yield $instance->offsetSet($key, $item); - Logger::log("Loading data to table {$instance->table}: $counter/$total", Logger::WARNING); - } else { - $instance->offsetSet($key, $item); - } - } - Logger::log('Converting database done.', Logger::ERROR); - } - } - - public function offsetExists($index): bool - { - throw new \RuntimeException('Native isset not support promises. Use isset method'); - } - /** * Check if key isset. * @@ -171,7 +127,7 @@ class RedisArray extends DriverArray */ public function isset($key): Promise { - return call(fn () => yield $this->offsetGet($key) !== null); + return $this->db->has($this->rKey($key)); } @@ -182,12 +138,9 @@ class RedisArray extends DriverArray return $cached; } - $row = yield $this->request( - "SELECT value FROM \"{$this->table}\" WHERE key = :index LIMIT 1", - ['index' => $offset] - ); + $value = yield $this->db->get($this->rKey($offset)); - if ($value = $this->getValue($row)) { + if ($value = \unserialize($value)) { $this->setCache($offset, $value); } @@ -211,13 +164,7 @@ class RedisArray extends DriverArray { $this->unsetCache($index); - return $this->request( - " - DELETE FROM \"{$this->table}\" - WHERE key = :index - ", - ['index' => $index] - ); + return $this->db->delete($this->rkey($index)); } /** @@ -242,11 +189,10 @@ class RedisArray extends DriverArray public function getIterator(): Producer { return new Producer(function (callable $emit) { - $request = yield $this->db->execute("SELECT key, value FROM \"{$this->table}\""); + $request = $this->db->scan($this->itKey()); while (yield $request->advance()) { - $row = $request->getCurrent(); - yield $emit([$row['key'], $this->getValue($row)]); + yield $emit([$key = $request->getCurrent(), \unserialize(yield $this->db->get($key))]); } }); } @@ -262,75 +208,14 @@ class RedisArray extends DriverArray public function count(): Promise { return call(function () { - $row = yield $this->request("SELECT count(key) as count FROM \"{$this->table}\""); - return $row[0]['count'] ?? 0; - }); - } + $request = $this->db->scan($this->itKey()); + $count = 0; - private function getValue(array $row) - { - if ($row) { - if (!empty($row[0]['value'])) { - $row = \reset($row); - } - return \unserialize($row['value']); - } - return null; - } - - - private function renameTable(string $from, string $to) - { - Logger::log("Renaming table {$from} to {$to}", Logger::WARNING); - yield $this->request(" - ALTER TABLE \"{$from}\" RENAME TO \"{$to}\"; - "); - - yield $this->request(" - DROP TABLE IF EXISTS \"{$from}\"; - "); - } - - /** - * Perform async request to db. - * - * @param string $query - * @param array $params - * - * @return Promise - * @throws \Throwable - */ - private function request(string $query, array $params = []): Promise - { - return call(function () use ($query, $params) { - Logger::log([$query, $params], Logger::VERBOSE); - - if (empty($this->db) || !$this->db->isAlive()) { - Logger::log('No database connection', Logger::WARNING); - return []; + while (yield $request->advance()) { + $count++; } - if ( - !empty($params['index']) - && !\mb_check_encoding($params['index'], 'UTF-8') - ) { - $params['index'] = \mb_convert_encoding($params['index'], 'UTF-8'); - } - - try { - $request = yield $this->db->execute($query, $params); - } catch (\Throwable $e) { - Logger::log($e->getMessage(), Logger::ERROR); - return []; - } - - $result = []; - if ($request instanceof ResultSet) { - while (yield $request->advance()) { - $result[] = $request->getCurrent(); - } - } - return $result; + return $count; }); } } diff --git a/src/danog/MadelineProto/Db/SqlArray.php b/src/danog/MadelineProto/Db/SqlArray.php index fcb0c682..ed72484a 100644 --- a/src/danog/MadelineProto/Db/SqlArray.php +++ b/src/danog/MadelineProto/Db/SqlArray.php @@ -2,6 +2,8 @@ namespace danog\MadelineProto\Db; +use Amp\Promise; + use function Amp\call; abstract class SqlArray extends DriverArray @@ -12,9 +14,9 @@ abstract class SqlArray extends DriverArray * @return array|null * @throws \Throwable */ - abstract protected function prepareTable(); + abstract protected function prepareTable(): \Generator; - abstract protected function renameTable(string $from, string $to); + abstract protected function renameTable(string $from, string $to): \Generator; /** * @param string $name @@ -57,7 +59,7 @@ abstract class SqlArray extends DriverArray } /** - * Rename table of old database, if is a temporary table name. + * Rename table of old database, if the new one is not a temporary table name. * * Otherwise, change name of table in new database to match old table name. * diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index 8f86e6c1..777c76a5 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -27,6 +27,7 @@ use danog\MadelineProto\Async\AsyncConstruct; use danog\MadelineProto\Db\DbArray; use danog\MadelineProto\Db\DbPropertiesFactory; use danog\MadelineProto\Db\DbPropertiesTrait; +use danog\MadelineProto\Db\Driver\Redis; use danog\MadelineProto\Db\Mysql; use danog\MadelineProto\Ipc\Server; use danog\MadelineProto\Loop\Generic\PeriodicLoopInternal; @@ -92,7 +93,7 @@ class MTProto extends AsyncConstruct implements TLCallback * * @var int */ - const V = 145; + const V = 147; /** * String release version. * @@ -1382,6 +1383,14 @@ class MTProto extends AsyncConstruct implements TLCallback 'idle_timeout' => 60, 'cache_ttl' => '+5 minutes', //keep records in memory after last read ], + /** @see Redis */ + 'redis' => [ + 'host' => 'redis://127.0.0.1', + 'port' => 6379, + 'password' => '', + 'database' => 0, //will be created automatically + 'cache_ttl' => '+5 minutes', //keep records in memory after last read + ], ], 'upload' => ['allow_automatic_upload' => true, 'part_size' => 512 * 1024, 'parallel_chunks' => 20], 'download' => ['report_broken_media' => true, 'part_size' => 1024 * 1024, 'parallel_chunks' => 20], 'pwr' => [ 'pwr' => false,