diff --git a/composer.json b/composer.json index 0ddc3bf3..90536d64 100644 --- a/composer.json +++ b/composer.json @@ -28,6 +28,7 @@ "amphp/byte-stream": "^1", "amphp/file": "^1", "amphp/mysql": "^2.0", + "amphp/postgres": "^1.2", "danog/dns-over-https": "^0.2", "amphp/http-client-cookies": "^1", "danog/tg-file-decoder": "^0.1", diff --git a/src/danog/MadelineProto/Db/DbPropertiesFabric.php b/src/danog/MadelineProto/Db/DbPropertiesFabric.php index 5894e637..ba880080 100644 --- a/src/danog/MadelineProto/Db/DbPropertiesFabric.php +++ b/src/danog/MadelineProto/Db/DbPropertiesFabric.php @@ -18,6 +18,7 @@ class DbPropertiesFabric * @uses \danog\MadelineProto\Db\MemoryArray * @uses \danog\MadelineProto\Db\SharedMemoryArray * @uses \danog\MadelineProto\Db\MysqlArray + * @uses \danog\MadelineProto\Db\PostgresArray */ public static function get(array $dbSettings, string $namePrefix, string $propertyType, string $name, $value = null): Promise { @@ -30,6 +31,9 @@ class DbPropertiesFabric case 'mysql': $class .= '\Mysql'; break; + case 'postgres': + $class .= '\Postgres'; + break; default: throw new \InvalidArgumentException("Unknown dbType: {$dbSettings['type']}"); diff --git a/src/danog/MadelineProto/Db/Postgres.php b/src/danog/MadelineProto/Db/Postgres.php new file mode 100644 index 00000000..5bb22849 --- /dev/null +++ b/src/danog/MadelineProto/Db/Postgres.php @@ -0,0 +1,102 @@ +getDatabase(); + $user = $config->getUser(); + $connection = pool($config->withDatabase(null)); + + $result = yield $connection->query("SELECT * FROM pg_database WHERE datname = '{$db}'"); + + while (yield $result->advance()) { + $row = $result->getCurrent(); + if ($row===false) + { + yield $connection->query(" + CREATE DATABASE {$db} + OWNER {$user} + ENCODING utf8 + "); + } + } + yield $connection->query(" + CREATE OR REPLACE FUNCTION update_ts() + RETURNS TRIGGER AS $$ + BEGIN + IF row(NEW.*) IS DISTINCT FROM row(OLD.*) THEN + NEW.ts = now(); + RETURN NEW; + ELSE + RETURN OLD; + END IF; + END; + $$ language 'plpgsql' + "); + $connection->close(); + } catch (\Throwable $e) { + Logger::log($e->getMessage(), Logger::ERROR); + } + })); + } +} diff --git a/src/danog/MadelineProto/Db/PostgresArray.php b/src/danog/MadelineProto/Db/PostgresArray.php new file mode 100644 index 00000000..a9a8cdf9 --- /dev/null +++ b/src/danog/MadelineProto/Db/PostgresArray.php @@ -0,0 +1,384 @@ + + * The index to set for. + *

+ * @param $value + * + * @throws \Throwable + */ + + public function offsetSet($index, $value): Promise + { + if ($this->getCache($index) === $value) { + return call(fn () =>null); + } + + $this->setCache($index, $value); + + $request = $this->request(" + INSERT INTO \"{$this->table}\" + (key,value) + VALUES (:index, :value) + ON CONFLICT (key) DO UPDATE SET value = :value + ", + [ + 'index' => $index, + 'value' => \serialize($value), + ] + ); + + //Ensure that cache is synced with latest insert in case of concurrent requests. + $request->onResolve(fn () => $this->setCache($index, $value)); + + return $request; + } + + public static function getDbConnection(array $settings): Pool + { + return Postgres::getConnection( + $settings['host'], + $settings['port'], + $settings['user'], + $settings['password'], + $settings['database'], + $settings['max_connections'], + $settings['idle_timeout'] + ); + } + + /** + * Create table for property. + * + * @return array|null + * @throws \Throwable + */ + private function prepareTable() + { + Logger::log("Creating/checking table {$this->table}", Logger::WARNING); + + yield $this->request(" + CREATE TABLE IF NOT EXISTS \"{$this->table}\" + ( + \"key\" VARCHAR(255) NOT NULL, + \"value\" BYTEA NULL, + \"ts\" TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT \"{$this->table}_pkey\" PRIMARY KEY(\"key\") + ); + + + + + "); + + yield $this->request(" + DROP TRIGGER IF exists \"{$this->table}_update_ts_trigger\" ON \"{$this->table}\"; + "); + + yield $this->request(" + CREATE TRIGGER \"{$this->table}_update_ts_trigger\" BEFORE UPDATE ON \"{$this->table}\" FOR EACH ROW EXECUTE PROCEDURE update_ts(); + "); + } + + public function __serialize(): array + { + return [ + 'table' => $this->table, + 'settings' => $this->settings + ]; + } + + public function __unserialize($data): void + { + foreach ($data as $property => $value) { + $this->{$property} = $value; + } + try { + $this->db = static::getDbConnection($this->settings); + } catch (\Throwable $e) { + Logger::log($e->getMessage(), Logger::ERROR); + } + } + + /** + * @param string $name + * @param DbArray|array|null $value + * @param string $tablePrefix + * @param array $settings + * + * @return Promise + */ + public static function getInstance(string $name, $value = null, string $tablePrefix = '', array $settings = []): Promise + { + $tableName = "{$tablePrefix}_{$name}"; + if ($value instanceof self && $value->table === $tableName) { + $instance = &$value; + } else { + $instance = new static(); + $instance->table = $tableName; + } + + $instance->settings = $settings; + $instance->db = static::getDbConnection($settings); + $instance->ttl = $settings['cache_ttl'] ?? $instance->ttl; + + $instance->startCacheCleanupLoop(); + + return call(static function () use ($instance, $value) { + yield from $instance->prepareTable(); + + //Skip migrations if its same object + if ($instance !== $value) { + yield from static::renameTmpTable($instance, $value); + yield from static::migrateDataToDb($instance, $value); + } + + return $instance; + }); + } + + /** + * @param PostgresArray $instance + * @param DbArray|array|null $value + * + * @return \Generator + */ + private static function renameTmpTable(PostgresArray $instance, $value): \Generator + { + if ($value instanceof static && $value->table) { + if ( + $value->table !== $instance->table && + \mb_strpos($instance->table, 'tmp') !== 0 + ) { + yield from $instance->renameTable($value->table, $instance->table); + } else { + $instance->table = $value->table; + } + } + } + + /** + * @param PostgresArray $instance + * @param DbArray|array|null $value + * + * @return \Generator + * @throws \Throwable + */ + private static function migrateDataToDb(PostgresArray $instance, $value): \Generator + { + if (!empty($value) && !$value instanceof PostgresArray) { + Logger::log('Converting database.', Logger::ERROR); + + if ($value instanceof DbArray) { + $value = yield $value->getArrayCopy(); + } else { + $value = (array) $value; + } + $counter = 0; + $total = \count($value); + foreach ($value as $key => $item) { + $counter++; + if ($counter % 500 === 0) { + yield $instance->offsetSet($key, $item); + Logger::log("Loading data to table {$instance->table}: $counter/$total", Logger::WARNING); + } else { + $instance->offsetSet($key, $item); + } + } + Logger::log('Converting database done.', Logger::ERROR); + } + } + + public function offsetExists($index): bool + { + throw new \RuntimeException('Native isset not support promises. Use isset method'); + } + + /** + * Check if key isset. + * + * @param $key + * + * @return Promise true if the offset exists, otherwise false + */ + public function isset($key): Promise + { + return call(fn () => yield $this->offsetGet($key) !== null); + } + + + public function offsetGet($offset): Promise + { + return call(function () use ($offset) { + if ($cached = $this->getCache($offset)) { + return $cached; + } + + $row = yield $this->request( + "SELECT value FROM \"{$this->table}\" WHERE key = :index LIMIT 1", + ['index' => $offset] + ); + + if ($value = $this->getValue($row)) { + $this->setCache($offset, $value); + } + + return $value; + }); + } + + /** + * Unset value for an offset. + * + * @link https://php.net/manual/en/arrayiterator.offsetunset.php + * + * @param string $index

+ * The offset to unset. + *

+ * + * @return Promise + * @throws \Throwable + */ + public function offsetUnset($index): Promise + { + $this->unsetCache($index); + + return $this->request( + " + DELETE FROM \"{$this->table}\" + WHERE key = :index + ", + ['index' => $index] + ); + } + + /** + * Get array copy. + * + * @return Promise + * @throws \Throwable + */ + public function getArrayCopy(): Promise + { + return call(function () { + $iterator = $this->getIterator(); + $result = []; + while (yield $iterator->advance()) { + [$key, $value] = $iterator->getCurrent(); + $result[$key] = $value; + } + return $result; + }); + } + + public function getIterator(): Producer + { + return new Producer(function (callable $emit) { + $request = yield $this->db->execute("SELECT key, value FROM \"{$this->table}\""); + + while (yield $request->advance()) { + $row = $request->getCurrent(); + yield $emit([$row['key'], $this->getValue($row)]); + } + }); + } + + /** + * Count elements. + * + * @link https://php.net/manual/en/arrayiterator.count.php + * @return Promise The number of elements or public properties in the associated + * array or object, respectively. + * @throws \Throwable + */ + public function count(): Promise + { + return call(function () { + $row = yield $this->request("SELECT count(key) as count FROM \"{$this->table}\""); + return $row[0]['count'] ?? 0; + }); + } + + private function getValue(array $row) + { + if ($row) { + if (!empty($row[0]['value'])) { + $row = \reset($row); + } + return \unserialize($row['value']); + } + return null; + } + + + private function renameTable(string $from, string $to) + { + Logger::log("Renaming table {$from} to {$to}", Logger::WARNING); + yield $this->request(" + ALTER TABLE \"{$from}\" RENAME TO \"{$to}\"; + "); + + yield $this->request(" + DROP TABLE IF EXISTS \"{$from}\"; + "); + } + + /** + * Perform async request to db. + * + * @param string $query + * @param array $params + * + * @return Promise + * @throws \Throwable + */ + private function request(string $query, array $params = []): Promise + { + return call(function () use ($query, $params) { + Logger::log([$query, $params], Logger::VERBOSE); + + if (empty($this->db) || !$this->db->isAlive()) { + Logger::log('No database connection', Logger::WARNING); + return []; + } + + try { + $request = yield $this->db->execute($query, $params); + } catch (\Throwable $e) { + Logger::log($e->getMessage(), Logger::ERROR); + return []; + } + + $result = []; + if ($request instanceof ResultSet) { + while (yield $request->advance()) { + $result[] = $request->getCurrent(); + } + } + return $result; + }); + } +} \ No newline at end of file diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index d34d4aae..69fc73c7 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -28,6 +28,7 @@ use danog\MadelineProto\Db\DbArray; use danog\MadelineProto\Db\DbPropertiesFabric; use danog\MadelineProto\Db\DbPropertiesTrait; use danog\MadelineProto\Db\Mysql; +use danog\MadelineProto\Db\Pgsql; use danog\MadelineProto\Ipc\Server; use danog\MadelineProto\Loop\Generic\PeriodicLoop; use danog\MadelineProto\Loop\Update\FeedLoop; @@ -1359,7 +1360,18 @@ class MTProto extends AsyncConstruct implements TLCallback 'max_connections' => 10, 'idle_timeout' => 60, 'cache_ttl' => '+5 minutes', //keep records in memory after last read - ] + ], + /** @see Postgres */ + 'postgres' => [ + 'host' => '127.0.0.1', + 'port' => 5432, + 'user' => 'root', + 'password' => '', + 'database' => 'MadelineProto', //will be created automatically + 'max_connections' => 10, + 'idle_timeout' => 60, + 'cache_ttl' => '+5 minutes', //keep records in memory after last read + ], ], 'upload' => ['allow_automatic_upload' => true, 'part_size' => 512 * 1024, 'parallel_chunks' => 20], 'download' => ['report_broken_media' => true, 'part_size' => 1024 * 1024, 'parallel_chunks' => 20], 'pwr' => [ 'pwr' => false,