Postgres support for sessions (#846)
This commit is contained in:
parent
553e4c4da0
commit
226cb3bbd8
@ -28,6 +28,7 @@
|
||||
"amphp/byte-stream": "^1",
|
||||
"amphp/file": "^1",
|
||||
"amphp/mysql": "^2.0",
|
||||
"amphp/postgres": "^1.2",
|
||||
"danog/dns-over-https": "^0.2",
|
||||
"amphp/http-client-cookies": "^1",
|
||||
"danog/tg-file-decoder": "^0.1",
|
||||
|
@ -18,6 +18,7 @@ class DbPropertiesFabric
|
||||
* @uses \danog\MadelineProto\Db\MemoryArray
|
||||
* @uses \danog\MadelineProto\Db\SharedMemoryArray
|
||||
* @uses \danog\MadelineProto\Db\MysqlArray
|
||||
* @uses \danog\MadelineProto\Db\PostgresArray
|
||||
*/
|
||||
public static function get(array $dbSettings, string $namePrefix, string $propertyType, string $name, $value = null): Promise
|
||||
{
|
||||
@ -30,6 +31,9 @@ class DbPropertiesFabric
|
||||
case 'mysql':
|
||||
$class .= '\Mysql';
|
||||
break;
|
||||
case 'postgres':
|
||||
$class .= '\Postgres';
|
||||
break;
|
||||
default:
|
||||
throw new \InvalidArgumentException("Unknown dbType: {$dbSettings['type']}");
|
||||
|
||||
|
102
src/danog/MadelineProto/Db/Postgres.php
Normal file
102
src/danog/MadelineProto/Db/Postgres.php
Normal file
@ -0,0 +1,102 @@
|
||||
<?php
|
||||
|
||||
namespace danog\MadelineProto\Db;
|
||||
|
||||
use Amp\Postgres\ConnectionConfig;
|
||||
use Amp\Postgres\Pool;
|
||||
use Amp\Sql\Common\ConnectionPool;
|
||||
use danog\MadelineProto\Logger;
|
||||
use function Amp\call;
|
||||
use function Amp\Postgres\Pool;
|
||||
use function Amp\Promise\wait;
|
||||
|
||||
class Postgres
|
||||
{
|
||||
/** @var Pool[] */
|
||||
private static array $connections;
|
||||
|
||||
/**
|
||||
* @param string $host
|
||||
* @param int $port
|
||||
* @param string $user
|
||||
* @param string $password
|
||||
* @param string $db
|
||||
*
|
||||
* @param int $maxConnections
|
||||
* @param int $idleTimeout
|
||||
*
|
||||
* @return Pool
|
||||
* @throws \Amp\Sql\ConnectionException
|
||||
* @throws \Amp\Sql\FailureException
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public static function getConnection(
|
||||
string $host = '127.0.0.1',
|
||||
int $port = 5432,
|
||||
string $user = 'root',
|
||||
string $password = '',
|
||||
string $db = 'MadelineProto',
|
||||
int $maxConnections = ConnectionPool::DEFAULT_MAX_CONNECTIONS,
|
||||
int $idleTimeout = ConnectionPool::DEFAULT_IDLE_TIMEOUT
|
||||
): Pool {
|
||||
$dbKey = "$host:$port:$db";
|
||||
if (empty(static::$connections[$dbKey])) {
|
||||
$config = ConnectionConfig::fromString(
|
||||
"host={$host} port={$port} user={$user} password={$password} db={$db}"
|
||||
);
|
||||
|
||||
static::createDb($config);
|
||||
static::$connections[$dbKey] = pool($config, $maxConnections, $idleTimeout);
|
||||
}
|
||||
|
||||
return static::$connections[$dbKey];
|
||||
}
|
||||
|
||||
/**
|
||||
* @param ConnectionConfig $config
|
||||
*
|
||||
* @throws \Amp\Sql\ConnectionException
|
||||
* @throws \Amp\Sql\FailureException
|
||||
* @throws \Throwable
|
||||
*/
|
||||
private static function createDb(ConnectionConfig $config)
|
||||
{
|
||||
wait(call(static function () use ($config) {
|
||||
try {
|
||||
$db = $config->getDatabase();
|
||||
$user = $config->getUser();
|
||||
$connection = pool($config->withDatabase(null));
|
||||
|
||||
$result = yield $connection->query("SELECT * FROM pg_database WHERE datname = '{$db}'");
|
||||
|
||||
while (yield $result->advance()) {
|
||||
$row = $result->getCurrent();
|
||||
if ($row===false)
|
||||
{
|
||||
yield $connection->query("
|
||||
CREATE DATABASE {$db}
|
||||
OWNER {$user}
|
||||
ENCODING utf8
|
||||
");
|
||||
}
|
||||
}
|
||||
yield $connection->query("
|
||||
CREATE OR REPLACE FUNCTION update_ts()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
IF row(NEW.*) IS DISTINCT FROM row(OLD.*) THEN
|
||||
NEW.ts = now();
|
||||
RETURN NEW;
|
||||
ELSE
|
||||
RETURN OLD;
|
||||
END IF;
|
||||
END;
|
||||
$$ language 'plpgsql'
|
||||
");
|
||||
$connection->close();
|
||||
} catch (\Throwable $e) {
|
||||
Logger::log($e->getMessage(), Logger::ERROR);
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
384
src/danog/MadelineProto/Db/PostgresArray.php
Normal file
384
src/danog/MadelineProto/Db/PostgresArray.php
Normal file
@ -0,0 +1,384 @@
|
||||
<?php
|
||||
|
||||
namespace danog\MadelineProto\Db;
|
||||
|
||||
use Amp\Postgres\Pool;
|
||||
use Amp\Producer;
|
||||
use Amp\Promise;
|
||||
use Amp\Sql\ResultSet;
|
||||
use danog\MadelineProto\Logger;
|
||||
use function Amp\call;
|
||||
|
||||
|
||||
class PostgresArray implements DbArray
|
||||
{
|
||||
use ArrayCacheTrait;
|
||||
|
||||
private string $table;
|
||||
private array $settings;
|
||||
private Pool $db;
|
||||
|
||||
/**
|
||||
* Set value for an offset.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.offsetset.php
|
||||
*
|
||||
* @param string $index <p>
|
||||
* The index to set for.
|
||||
* </p>
|
||||
* @param $value
|
||||
*
|
||||
* @throws \Throwable
|
||||
*/
|
||||
|
||||
public function offsetSet($index, $value): Promise
|
||||
{
|
||||
if ($this->getCache($index) === $value) {
|
||||
return call(fn () =>null);
|
||||
}
|
||||
|
||||
$this->setCache($index, $value);
|
||||
|
||||
$request = $this->request("
|
||||
INSERT INTO \"{$this->table}\"
|
||||
(key,value)
|
||||
VALUES (:index, :value)
|
||||
ON CONFLICT (key) DO UPDATE SET value = :value
|
||||
",
|
||||
[
|
||||
'index' => $index,
|
||||
'value' => \serialize($value),
|
||||
]
|
||||
);
|
||||
|
||||
//Ensure that cache is synced with latest insert in case of concurrent requests.
|
||||
$request->onResolve(fn () => $this->setCache($index, $value));
|
||||
|
||||
return $request;
|
||||
}
|
||||
|
||||
public static function getDbConnection(array $settings): Pool
|
||||
{
|
||||
return Postgres::getConnection(
|
||||
$settings['host'],
|
||||
$settings['port'],
|
||||
$settings['user'],
|
||||
$settings['password'],
|
||||
$settings['database'],
|
||||
$settings['max_connections'],
|
||||
$settings['idle_timeout']
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create table for property.
|
||||
*
|
||||
* @return array|null
|
||||
* @throws \Throwable
|
||||
*/
|
||||
private function prepareTable()
|
||||
{
|
||||
Logger::log("Creating/checking table {$this->table}", Logger::WARNING);
|
||||
|
||||
yield $this->request("
|
||||
CREATE TABLE IF NOT EXISTS \"{$this->table}\"
|
||||
(
|
||||
\"key\" VARCHAR(255) NOT NULL,
|
||||
\"value\" BYTEA NULL,
|
||||
\"ts\" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
CONSTRAINT \"{$this->table}_pkey\" PRIMARY KEY(\"key\")
|
||||
);
|
||||
|
||||
|
||||
|
||||
|
||||
");
|
||||
|
||||
yield $this->request("
|
||||
DROP TRIGGER IF exists \"{$this->table}_update_ts_trigger\" ON \"{$this->table}\";
|
||||
");
|
||||
|
||||
yield $this->request("
|
||||
CREATE TRIGGER \"{$this->table}_update_ts_trigger\" BEFORE UPDATE ON \"{$this->table}\" FOR EACH ROW EXECUTE PROCEDURE update_ts();
|
||||
");
|
||||
}
|
||||
|
||||
public function __serialize(): array
|
||||
{
|
||||
return [
|
||||
'table' => $this->table,
|
||||
'settings' => $this->settings
|
||||
];
|
||||
}
|
||||
|
||||
public function __unserialize($data): void
|
||||
{
|
||||
foreach ($data as $property => $value) {
|
||||
$this->{$property} = $value;
|
||||
}
|
||||
try {
|
||||
$this->db = static::getDbConnection($this->settings);
|
||||
} catch (\Throwable $e) {
|
||||
Logger::log($e->getMessage(), Logger::ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $name
|
||||
* @param DbArray|array|null $value
|
||||
* @param string $tablePrefix
|
||||
* @param array $settings
|
||||
*
|
||||
* @return Promise
|
||||
*/
|
||||
public static function getInstance(string $name, $value = null, string $tablePrefix = '', array $settings = []): Promise
|
||||
{
|
||||
$tableName = "{$tablePrefix}_{$name}";
|
||||
if ($value instanceof self && $value->table === $tableName) {
|
||||
$instance = &$value;
|
||||
} else {
|
||||
$instance = new static();
|
||||
$instance->table = $tableName;
|
||||
}
|
||||
|
||||
$instance->settings = $settings;
|
||||
$instance->db = static::getDbConnection($settings);
|
||||
$instance->ttl = $settings['cache_ttl'] ?? $instance->ttl;
|
||||
|
||||
$instance->startCacheCleanupLoop();
|
||||
|
||||
return call(static function () use ($instance, $value) {
|
||||
yield from $instance->prepareTable();
|
||||
|
||||
//Skip migrations if its same object
|
||||
if ($instance !== $value) {
|
||||
yield from static::renameTmpTable($instance, $value);
|
||||
yield from static::migrateDataToDb($instance, $value);
|
||||
}
|
||||
|
||||
return $instance;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @param PostgresArray $instance
|
||||
* @param DbArray|array|null $value
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
private static function renameTmpTable(PostgresArray $instance, $value): \Generator
|
||||
{
|
||||
if ($value instanceof static && $value->table) {
|
||||
if (
|
||||
$value->table !== $instance->table &&
|
||||
\mb_strpos($instance->table, 'tmp') !== 0
|
||||
) {
|
||||
yield from $instance->renameTable($value->table, $instance->table);
|
||||
} else {
|
||||
$instance->table = $value->table;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param PostgresArray $instance
|
||||
* @param DbArray|array|null $value
|
||||
*
|
||||
* @return \Generator
|
||||
* @throws \Throwable
|
||||
*/
|
||||
private static function migrateDataToDb(PostgresArray $instance, $value): \Generator
|
||||
{
|
||||
if (!empty($value) && !$value instanceof PostgresArray) {
|
||||
Logger::log('Converting database.', Logger::ERROR);
|
||||
|
||||
if ($value instanceof DbArray) {
|
||||
$value = yield $value->getArrayCopy();
|
||||
} else {
|
||||
$value = (array) $value;
|
||||
}
|
||||
$counter = 0;
|
||||
$total = \count($value);
|
||||
foreach ($value as $key => $item) {
|
||||
$counter++;
|
||||
if ($counter % 500 === 0) {
|
||||
yield $instance->offsetSet($key, $item);
|
||||
Logger::log("Loading data to table {$instance->table}: $counter/$total", Logger::WARNING);
|
||||
} else {
|
||||
$instance->offsetSet($key, $item);
|
||||
}
|
||||
}
|
||||
Logger::log('Converting database done.', Logger::ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
public function offsetExists($index): bool
|
||||
{
|
||||
throw new \RuntimeException('Native isset not support promises. Use isset method');
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if key isset.
|
||||
*
|
||||
* @param $key
|
||||
*
|
||||
* @return Promise<bool> true if the offset exists, otherwise false
|
||||
*/
|
||||
public function isset($key): Promise
|
||||
{
|
||||
return call(fn () => yield $this->offsetGet($key) !== null);
|
||||
}
|
||||
|
||||
|
||||
public function offsetGet($offset): Promise
|
||||
{
|
||||
return call(function () use ($offset) {
|
||||
if ($cached = $this->getCache($offset)) {
|
||||
return $cached;
|
||||
}
|
||||
|
||||
$row = yield $this->request(
|
||||
"SELECT value FROM \"{$this->table}\" WHERE key = :index LIMIT 1",
|
||||
['index' => $offset]
|
||||
);
|
||||
|
||||
if ($value = $this->getValue($row)) {
|
||||
$this->setCache($offset, $value);
|
||||
}
|
||||
|
||||
return $value;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Unset value for an offset.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.offsetunset.php
|
||||
*
|
||||
* @param string $index <p>
|
||||
* The offset to unset.
|
||||
* </p>
|
||||
*
|
||||
* @return Promise
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function offsetUnset($index): Promise
|
||||
{
|
||||
$this->unsetCache($index);
|
||||
|
||||
return $this->request(
|
||||
"
|
||||
DELETE FROM \"{$this->table}\"
|
||||
WHERE key = :index
|
||||
",
|
||||
['index' => $index]
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get array copy.
|
||||
*
|
||||
* @return Promise<array>
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function getArrayCopy(): Promise
|
||||
{
|
||||
return call(function () {
|
||||
$iterator = $this->getIterator();
|
||||
$result = [];
|
||||
while (yield $iterator->advance()) {
|
||||
[$key, $value] = $iterator->getCurrent();
|
||||
$result[$key] = $value;
|
||||
}
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
public function getIterator(): Producer
|
||||
{
|
||||
return new Producer(function (callable $emit) {
|
||||
$request = yield $this->db->execute("SELECT key, value FROM \"{$this->table}\"");
|
||||
|
||||
while (yield $request->advance()) {
|
||||
$row = $request->getCurrent();
|
||||
yield $emit([$row['key'], $this->getValue($row)]);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Count elements.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.count.php
|
||||
* @return Promise<int> The number of elements or public properties in the associated
|
||||
* array or object, respectively.
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function count(): Promise
|
||||
{
|
||||
return call(function () {
|
||||
$row = yield $this->request("SELECT count(key) as count FROM \"{$this->table}\"");
|
||||
return $row[0]['count'] ?? 0;
|
||||
});
|
||||
}
|
||||
|
||||
private function getValue(array $row)
|
||||
{
|
||||
if ($row) {
|
||||
if (!empty($row[0]['value'])) {
|
||||
$row = \reset($row);
|
||||
}
|
||||
return \unserialize($row['value']);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
private function renameTable(string $from, string $to)
|
||||
{
|
||||
Logger::log("Renaming table {$from} to {$to}", Logger::WARNING);
|
||||
yield $this->request("
|
||||
ALTER TABLE \"{$from}\" RENAME TO \"{$to}\";
|
||||
");
|
||||
|
||||
yield $this->request("
|
||||
DROP TABLE IF EXISTS \"{$from}\";
|
||||
");
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform async request to db.
|
||||
*
|
||||
* @param string $query
|
||||
* @param array $params
|
||||
*
|
||||
* @return Promise
|
||||
* @throws \Throwable
|
||||
*/
|
||||
private function request(string $query, array $params = []): Promise
|
||||
{
|
||||
return call(function () use ($query, $params) {
|
||||
Logger::log([$query, $params], Logger::VERBOSE);
|
||||
|
||||
if (empty($this->db) || !$this->db->isAlive()) {
|
||||
Logger::log('No database connection', Logger::WARNING);
|
||||
return [];
|
||||
}
|
||||
|
||||
try {
|
||||
$request = yield $this->db->execute($query, $params);
|
||||
} catch (\Throwable $e) {
|
||||
Logger::log($e->getMessage(), Logger::ERROR);
|
||||
return [];
|
||||
}
|
||||
|
||||
$result = [];
|
||||
if ($request instanceof ResultSet) {
|
||||
while (yield $request->advance()) {
|
||||
$result[] = $request->getCurrent();
|
||||
}
|
||||
}
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
}
|
@ -28,6 +28,7 @@ use danog\MadelineProto\Db\DbArray;
|
||||
use danog\MadelineProto\Db\DbPropertiesFabric;
|
||||
use danog\MadelineProto\Db\DbPropertiesTrait;
|
||||
use danog\MadelineProto\Db\Mysql;
|
||||
use danog\MadelineProto\Db\Pgsql;
|
||||
use danog\MadelineProto\Ipc\Server;
|
||||
use danog\MadelineProto\Loop\Generic\PeriodicLoop;
|
||||
use danog\MadelineProto\Loop\Update\FeedLoop;
|
||||
@ -1359,7 +1360,18 @@ class MTProto extends AsyncConstruct implements TLCallback
|
||||
'max_connections' => 10,
|
||||
'idle_timeout' => 60,
|
||||
'cache_ttl' => '+5 minutes', //keep records in memory after last read
|
||||
]
|
||||
],
|
||||
/** @see Postgres */
|
||||
'postgres' => [
|
||||
'host' => '127.0.0.1',
|
||||
'port' => 5432,
|
||||
'user' => 'root',
|
||||
'password' => '',
|
||||
'database' => 'MadelineProto', //will be created automatically
|
||||
'max_connections' => 10,
|
||||
'idle_timeout' => 60,
|
||||
'cache_ttl' => '+5 minutes', //keep records in memory after last read
|
||||
],
|
||||
],
|
||||
'upload' => ['allow_automatic_upload' => true, 'part_size' => 512 * 1024, 'parallel_chunks' => 20], 'download' => ['report_broken_media' => true, 'part_size' => 1024 * 1024, 'parallel_chunks' => 20], 'pwr' => [
|
||||
'pwr' => false,
|
||||
|
Loading…
Reference in New Issue
Block a user