Use prepared statements, and update to new postgres API
This commit is contained in:
parent
fb20b2d213
commit
b0d25f53cb
@ -3,6 +3,7 @@
|
||||
namespace danog\MadelineProto\Db;
|
||||
|
||||
use danog\MadelineProto\MTProto;
|
||||
use danog\MadelineProto\Tools;
|
||||
|
||||
/**
|
||||
* Include this trait and call DbPropertiesTrait::initDb to use MadelineProto's database backend for properties.
|
||||
@ -32,14 +33,19 @@ trait DbPropertiesTrait
|
||||
$dbSettings = $MadelineProto->settings->getDb();
|
||||
$prefix = static::getSessionId($MadelineProto);
|
||||
|
||||
$promises = [];
|
||||
foreach (static::$dbProperties as $property => $type) {
|
||||
if ($reset) {
|
||||
unset($this->{$property});
|
||||
} else {
|
||||
$table = "{$prefix}_{$property}";
|
||||
$this->{$property} = yield DbPropertiesFactory::get($dbSettings, $table, $type, $this->{$property});
|
||||
$promises[$property] = DbPropertiesFactory::get($dbSettings, $table, $type, $this->{$property});
|
||||
}
|
||||
}
|
||||
$promises = yield Tools::all($promises);
|
||||
foreach ($promises as $key => $data) {
|
||||
$this->{$key} = $data;
|
||||
}
|
||||
}
|
||||
|
||||
private static function getSessionId(MTProto $madelineProto): string
|
||||
|
@ -3,16 +3,16 @@
|
||||
namespace danog\MadelineProto\Db;
|
||||
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\Settings\DatabaseAbstract;
|
||||
use danog\MadelineProto\Settings\Database\DatabaseAbstract;
|
||||
|
||||
interface DbType
|
||||
{
|
||||
/**
|
||||
* @param string $table
|
||||
* @param null|DbType|array $value
|
||||
* @param null|DbType|array $previous
|
||||
* @param DatabaseAbstract $settings
|
||||
*
|
||||
* @return Promise<self>
|
||||
*/
|
||||
public static function getInstance(string $table, $value, $settings): Promise;
|
||||
public static function getInstance(string $table, $previous, $settings): Promise;
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ class Mysql
|
||||
->withDatabase($settings->getDatabase());
|
||||
|
||||
yield from static::createDb($config);
|
||||
static::$connections[$dbKey] = pool($config, $settings->getMaxConnections(), $settings->getIdleTimeout());
|
||||
static::$connections[$dbKey] = new Pool($config, $settings->getMaxConnections(), $settings->getIdleTimeout());
|
||||
}
|
||||
|
||||
return static::$connections[$dbKey];
|
||||
|
@ -2,49 +2,146 @@
|
||||
|
||||
namespace danog\MadelineProto\Db;
|
||||
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\Logger;
|
||||
use danog\MadelineProto\Settings\Database\DatabaseAbstract;
|
||||
use danog\MadelineProto\SettingsAbstract;
|
||||
use ReflectionClass;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
* Array caching trait.
|
||||
*/
|
||||
abstract class DriverArray implements DbArray
|
||||
{
|
||||
protected string $table;
|
||||
|
||||
use ArrayCacheTrait;
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
$this->stopCacheCleanupLoop();
|
||||
}
|
||||
/**
|
||||
* Initialize connection.
|
||||
*/
|
||||
abstract public function initConnection(DatabaseAbstract $settings): \Generator;
|
||||
/**
|
||||
* Initialize on startup.
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
abstract public function initStartup(): \Generator;
|
||||
|
||||
/**
|
||||
* Get string representation of driver/table.
|
||||
* Create table for property.
|
||||
*
|
||||
* @return \Generator
|
||||
*
|
||||
* @throws \Throwable
|
||||
*/
|
||||
abstract protected function prepareTable(): \Generator;
|
||||
|
||||
/**
|
||||
* Rename table.
|
||||
*
|
||||
* @param string $from
|
||||
* @param string $to
|
||||
* @return \Generator
|
||||
*/
|
||||
abstract protected function renameTable(string $from, string $to): \Generator;
|
||||
|
||||
/**
|
||||
* Get the value of table.
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
abstract public function __toString(): string;
|
||||
|
||||
public function __wakeup()
|
||||
public function getTable(): string
|
||||
{
|
||||
if (isset($this->settings) && \is_array($this->settings)) {
|
||||
$clazz = (new ReflectionClass($this))->getProperty('dbSettings')->getType()->getName();
|
||||
/**
|
||||
* @var SettingsAbstract
|
||||
* @psalm-suppress UndefinedThisPropertyAssignment
|
||||
*/
|
||||
$this->dbSettings = new $clazz;
|
||||
$this->dbSettings->mergeArray($this->settings);
|
||||
unset($this->settings);
|
||||
return $this->table;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the value of table.
|
||||
*
|
||||
* @param string $table
|
||||
*
|
||||
* @return self
|
||||
*/
|
||||
public function setTable(string $table): self
|
||||
{
|
||||
$this->table = $table;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $table
|
||||
* @param DbArray|array|null $previous
|
||||
* @param DatabaseAbstract $settings
|
||||
*
|
||||
* @return Promise
|
||||
*
|
||||
* @psalm-return Promise<static>
|
||||
*/
|
||||
public static function getInstance(string $table, $previous, $settings): Promise
|
||||
{
|
||||
if ($previous instanceof static && $previous->getTable() === $table) {
|
||||
$instance = &$previous;
|
||||
} else {
|
||||
$instance = new static();
|
||||
$instance->setTable($table);
|
||||
}
|
||||
|
||||
/** @psalm-suppress UndefinedPropertyAssignment */
|
||||
$instance->dbSettings = $settings;
|
||||
$instance->ttl = $settings->getCacheTtl();
|
||||
|
||||
$instance->startCacheCleanupLoop();
|
||||
|
||||
return call(static function () use ($instance, $previous, $settings) {
|
||||
yield from $instance->initConnection($settings);
|
||||
yield from $instance->prepareTable();
|
||||
|
||||
if ($instance !== $previous) {
|
||||
if ($previous instanceof DriverArray) {
|
||||
yield from $previous->initStartup();
|
||||
}
|
||||
yield from static::renameTmpTable($instance, $previous);
|
||||
if ($instance instanceof SqlArray) {
|
||||
Logger::log("Preparing statements...");
|
||||
yield from $instance->prepareStatements();
|
||||
}
|
||||
yield from static::migrateDataToDb($instance, $previous);
|
||||
} elseif ($instance instanceof SqlArray) {
|
||||
Logger::log("Preparing statements...");
|
||||
yield from $instance->prepareStatements();
|
||||
}
|
||||
|
||||
return $instance;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Rename table of old database, if the new one is not a temporary table name.
|
||||
*
|
||||
* Otherwise, simply change name of table in new database to match old table name.
|
||||
*
|
||||
* @param self $new New db
|
||||
* @param DbArray|array|null $old Old db
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
protected static function renameTmpTable(self $new, $old): \Generator
|
||||
{
|
||||
if ($old instanceof static && $old->getTable()) {
|
||||
if (
|
||||
$old->getTable() !== $new->getTable() &&
|
||||
\mb_strpos($new->getTable(), 'tmp') !== 0
|
||||
) {
|
||||
yield from $new->renameTable($old->getTable(), $new->getTable());
|
||||
} else {
|
||||
$new->setTable($old->getTable());
|
||||
}
|
||||
}
|
||||
}
|
||||
public function offsetExists($index): bool
|
||||
{
|
||||
throw new \RuntimeException('Native isset not support promises. Use isset method');
|
||||
}
|
||||
|
||||
abstract public function initConnection(\danog\MadelineProto\Settings\Database\DatabaseAbstract $settings): \Generator;
|
||||
abstract public function initStartup(): \Generator;
|
||||
|
||||
/**
|
||||
* @param self $new
|
||||
@ -56,7 +153,7 @@ abstract class DriverArray implements DbArray
|
||||
protected static function migrateDataToDb(self $new, $old): \Generator
|
||||
{
|
||||
if (!empty($old) && !$old instanceof static) {
|
||||
Logger::log('Converting database.', Logger::ERROR);
|
||||
Logger::log('Converting database to '.\get_class($new), Logger::ERROR);
|
||||
|
||||
if ($old instanceof DbArray) {
|
||||
$old = yield $old->getArrayCopy();
|
||||
@ -77,4 +174,48 @@ abstract class DriverArray implements DbArray
|
||||
Logger::log('Converting database done.', Logger::ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
$this->stopCacheCleanupLoop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the value of table.
|
||||
*
|
||||
* @return string
|
||||
*/
|
||||
public function __toString(): string
|
||||
{
|
||||
return $this->table;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep function.
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function __sleep(): array
|
||||
{
|
||||
return ['table', 'dbSettings'];
|
||||
}
|
||||
|
||||
public function __wakeup()
|
||||
{
|
||||
if (isset($this->settings) && \is_array($this->settings)) {
|
||||
$clazz = (new ReflectionClass($this))->getProperty('dbSettings')->getType()->getName();
|
||||
/**
|
||||
* @var SettingsAbstract
|
||||
* @psalm-suppress UndefinedThisPropertyAssignment
|
||||
*/
|
||||
$this->dbSettings = new $clazz;
|
||||
$this->dbSettings->mergeArray($this->settings);
|
||||
unset($this->settings);
|
||||
}
|
||||
}
|
||||
public function offsetExists($index): bool
|
||||
{
|
||||
throw new \RuntimeException('Native isset not support promises. Use isset method');
|
||||
}
|
||||
}
|
||||
|
@ -3,22 +3,16 @@
|
||||
namespace danog\MadelineProto\Db;
|
||||
|
||||
use Amp\Mysql\Pool;
|
||||
use Amp\Producer;
|
||||
use Amp\Promise;
|
||||
use Amp\Sql\ResultSet;
|
||||
use Amp\Success;
|
||||
use danog\MadelineProto\Db\Driver\Mysql;
|
||||
use danog\MadelineProto\Logger;
|
||||
use danog\MadelineProto\Settings\Database\Mysql as DatabaseMysql;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
* MySQL database backend.
|
||||
*/
|
||||
class MysqlArray extends SqlArray
|
||||
{
|
||||
protected string $table;
|
||||
protected DatabaseMysql $dbSettings;
|
||||
private Pool $db;
|
||||
|
||||
@ -32,157 +26,39 @@ class MysqlArray extends SqlArray
|
||||
*/
|
||||
public function initStartup(): \Generator
|
||||
{
|
||||
return $this->initConnection($this->dbSettings);
|
||||
}
|
||||
public function __toString(): string
|
||||
{
|
||||
return $this->table;
|
||||
}
|
||||
|
||||
public function __sleep(): array
|
||||
{
|
||||
return ['table', 'dbSettings'];
|
||||
yield from $this->initConnection($this->dbSettings);
|
||||
yield from $this->prepareStatements();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if key isset.
|
||||
* Prepare statements.
|
||||
*
|
||||
* @param $key
|
||||
*
|
||||
* @return Promise<bool> true if the offset exists, otherwise false
|
||||
* @return \Generator
|
||||
*/
|
||||
public function isset($key): Promise
|
||||
protected function prepareStatements(): \Generator
|
||||
{
|
||||
return call(fn () => yield $this->offsetGet($key) !== null);
|
||||
}
|
||||
|
||||
|
||||
public function offsetGet($offset): Promise
|
||||
{
|
||||
return call(function () use ($offset) {
|
||||
if ($cached = $this->getCache($offset)) {
|
||||
return $cached;
|
||||
}
|
||||
|
||||
$row = yield $this->request(
|
||||
"SELECT `value` FROM `{$this->table}` WHERE `key` = :index LIMIT 1",
|
||||
['index' => $offset]
|
||||
);
|
||||
|
||||
if ($value = $this->getValue($row)) {
|
||||
$this->setCache($offset, $value);
|
||||
}
|
||||
|
||||
return $value;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Set value for an offset.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.offsetset.php
|
||||
*
|
||||
* @param string|int $index <p>
|
||||
* The index to set for.
|
||||
* </p>
|
||||
* @param $value
|
||||
*
|
||||
* @throws \Throwable
|
||||
*/
|
||||
|
||||
public function offsetSet($index, $value): Promise
|
||||
{
|
||||
if ($this->getCache($index) === $value) {
|
||||
return new Success();
|
||||
}
|
||||
|
||||
$this->setCache($index, $value);
|
||||
|
||||
$request = $this->request(
|
||||
"
|
||||
$this->get = yield $this->db->prepare(
|
||||
"SELECT `value` FROM `{$this->table}` WHERE `key` = :index LIMIT 1"
|
||||
);
|
||||
$this->set = yield $this->db->prepare("
|
||||
INSERT INTO `{$this->table}`
|
||||
SET `key` = :index, `value` = :value
|
||||
ON DUPLICATE KEY UPDATE `value` = :value
|
||||
",
|
||||
[
|
||||
'index' => $index,
|
||||
'value' => \serialize($value),
|
||||
]
|
||||
);
|
||||
|
||||
//Ensure that cache is synced with latest insert in case of concurrent requests.
|
||||
$request->onResolve(fn () => $this->setCache($index, $value));
|
||||
|
||||
return $request;
|
||||
");
|
||||
$this->unset = yield $this->db->prepare("
|
||||
DELETE FROM `{$this->table}`
|
||||
WHERE `key` = :index
|
||||
");
|
||||
$this->count = yield $this->db->prepare("
|
||||
SELECT count(`key`) as `count` FROM `{$this->table}`
|
||||
");
|
||||
$this->iterate = yield $this->db->prepare("
|
||||
SELECT `key`, `value` FROM `{$this->table}`
|
||||
");
|
||||
}
|
||||
|
||||
/**
|
||||
* Unset value for an offset.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.offsetunset.php
|
||||
*
|
||||
* @param string|int $index <p>
|
||||
* The offset to unset.
|
||||
* </p>
|
||||
*
|
||||
* @return Promise
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function offsetUnset($index): Promise
|
||||
{
|
||||
$this->unsetCache($index);
|
||||
|
||||
return $this->request(
|
||||
"
|
||||
DELETE FROM `{$this->table}`
|
||||
WHERE `key` = :index
|
||||
",
|
||||
['index' => $index]
|
||||
);
|
||||
}
|
||||
|
||||
public function getArrayCopy(): Promise
|
||||
{
|
||||
return call(function () {
|
||||
$iterator = $this->getIterator();
|
||||
$result = [];
|
||||
while (yield $iterator->advance()) {
|
||||
[$key, $value] = $iterator->getCurrent();
|
||||
$result[$key] = $value;
|
||||
}
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
public function getIterator(): Producer
|
||||
{
|
||||
return new Producer(function (callable $emit) {
|
||||
$request = yield $this->db->execute("SELECT `key`, `value` FROM `{$this->table}`");
|
||||
|
||||
while (yield $request->advance()) {
|
||||
$row = $request->getCurrent();
|
||||
yield $emit([$row['key'], $this->getValue($row)]);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Count elements.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.count.php
|
||||
* @return Promise<int> The number of elements or public properties in the associated
|
||||
* array or object, respectively.
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function count(): Promise
|
||||
{
|
||||
return call(function () {
|
||||
$row = yield $this->request("SELECT count(`key`) as `count` FROM `{$this->table}`");
|
||||
return $row[0]['count'] ?? 0;
|
||||
});
|
||||
}
|
||||
|
||||
private function getValue(array $row)
|
||||
protected function getValue(array $row)
|
||||
{
|
||||
if ($row) {
|
||||
if (!empty($row[0]['value'])) {
|
||||
@ -218,7 +94,7 @@ class MysqlArray extends SqlArray
|
||||
protected function prepareTable(): \Generator
|
||||
{
|
||||
Logger::log("Creating/checking table {$this->table}", Logger::WARNING);
|
||||
return yield $this->request("
|
||||
return yield $this->db->query("
|
||||
CREATE TABLE IF NOT EXISTS `{$this->table}`
|
||||
(
|
||||
`key` VARCHAR(255) NOT NULL,
|
||||
@ -235,55 +111,12 @@ class MysqlArray extends SqlArray
|
||||
protected function renameTable(string $from, string $to): \Generator
|
||||
{
|
||||
Logger::log("Renaming table {$from} to {$to}", Logger::WARNING);
|
||||
yield $this->request("
|
||||
yield $this->db->query("
|
||||
DROP TABLE IF EXISTS `{$to}`;
|
||||
");
|
||||
|
||||
yield $this->db->query("
|
||||
ALTER TABLE `{$from}` RENAME TO `{$to}`;
|
||||
");
|
||||
|
||||
yield $this->request("
|
||||
DROP TABLE IF EXISTS `{$from}`;
|
||||
");
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform async request to db.
|
||||
*
|
||||
* @param string $query
|
||||
* @param array $params
|
||||
*
|
||||
* @return Promise
|
||||
* @throws \Throwable
|
||||
*/
|
||||
private function request(string $query, array $params = []): Promise
|
||||
{
|
||||
return call(function () use ($query, $params) {
|
||||
//Logger::log([$query, $params], Logger::VERBOSE);
|
||||
|
||||
if (empty($this->db) || !$this->db->isAlive()) {
|
||||
Logger::log('No database connection', Logger::WARNING);
|
||||
return [];
|
||||
}
|
||||
|
||||
if (
|
||||
!empty($params['index'])
|
||||
&& !\mb_check_encoding($params['index'], 'UTF-8')
|
||||
) {
|
||||
$params['index'] = \mb_convert_encoding($params['index'], 'UTF-8');
|
||||
}
|
||||
|
||||
try {
|
||||
$request = yield $this->db->execute($query, $params);
|
||||
} catch (\Throwable $e) {
|
||||
Logger::log($e->getMessage(), Logger::ERROR);
|
||||
return [];
|
||||
}
|
||||
|
||||
$result = [];
|
||||
if ($request instanceof ResultSet) {
|
||||
while (yield $request->advance()) {
|
||||
$result[] = $request->getCurrent();
|
||||
}
|
||||
}
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -2,29 +2,53 @@
|
||||
|
||||
namespace danog\MadelineProto\Db;
|
||||
|
||||
use Amp\Postgres\ByteA;
|
||||
use Amp\Postgres\Pool;
|
||||
use Amp\Producer;
|
||||
use Amp\Promise;
|
||||
use Amp\Sql\ResultSet;
|
||||
use Amp\Success;
|
||||
use danog\MadelineProto\Db\Driver\Postgres;
|
||||
use danog\MadelineProto\Logger;
|
||||
use danog\MadelineProto\Settings\Database\Postgres as DatabasePostgres;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
/**
|
||||
* Postgres database backend.
|
||||
*/
|
||||
class PostgresArray extends SqlArray
|
||||
{
|
||||
protected string $table;
|
||||
public DatabasePostgres $dbSettings;
|
||||
private Pool $db;
|
||||
|
||||
// Legacy
|
||||
protected array $settings;
|
||||
|
||||
/**
|
||||
* Prepare statements.
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
protected function prepareStatements(): \Generator
|
||||
{
|
||||
$this->get = yield $this->db->prepare(
|
||||
"SELECT value FROM \"{$this->table}\" WHERE key = :index LIMIT 1",
|
||||
);
|
||||
$this->set = yield $this->db->prepare("
|
||||
INSERT INTO \"{$this->table}\"
|
||||
(key,value)
|
||||
VALUES (:index, :value)
|
||||
ON CONFLICT (key) DO UPDATE SET value = :value
|
||||
");
|
||||
$this->unset = yield $this->db->prepare("
|
||||
DELETE FROM \"{$this->table}\"
|
||||
WHERE key = :index
|
||||
");
|
||||
$this->count = yield $this->db->prepare("
|
||||
SELECT count(key) as count FROM \"{$this->table}\"
|
||||
");
|
||||
$this->iterate = yield $this->db->prepare("
|
||||
SELECT key, value FROM \"{$this->table}\"
|
||||
");
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize on startup.
|
||||
*
|
||||
@ -32,11 +56,8 @@ class PostgresArray extends SqlArray
|
||||
*/
|
||||
public function initStartup(): \Generator
|
||||
{
|
||||
return $this->initConnection($this->dbSettings);
|
||||
}
|
||||
public function __toString(): string
|
||||
{
|
||||
return $this->table;
|
||||
yield from $this->initConnection($this->dbSettings);
|
||||
yield from $this->prepareStatements();
|
||||
}
|
||||
/**
|
||||
* Initialize connection.
|
||||
@ -51,19 +72,35 @@ class PostgresArray extends SqlArray
|
||||
}
|
||||
}
|
||||
|
||||
protected function getValue(array $row)
|
||||
{
|
||||
if ($row) {
|
||||
if (!empty($row[0]['value'])) {
|
||||
$row = \reset($row);
|
||||
}
|
||||
if (!$row['value']) {
|
||||
return $row['value'];
|
||||
}
|
||||
if ($row['value'][0] === '\\') {
|
||||
$row['value'] = hex2bin(substr($row['value'], 2));
|
||||
}
|
||||
return \unserialize($row['value']);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set value for an offset.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.offsetset.php
|
||||
*
|
||||
* @param string $index <p>
|
||||
* @param string|int $index <p>
|
||||
* The index to set for.
|
||||
* </p>
|
||||
* @param $value
|
||||
*
|
||||
* @throws \Throwable
|
||||
*/
|
||||
|
||||
public function offsetSet($index, $value): Promise
|
||||
{
|
||||
if ($this->getCache($index) === $value) {
|
||||
@ -72,16 +109,11 @@ class PostgresArray extends SqlArray
|
||||
|
||||
$this->setCache($index, $value);
|
||||
|
||||
$request = $this->request(
|
||||
"
|
||||
INSERT INTO \"{$this->table}\"
|
||||
(key,value)
|
||||
VALUES (:index, :value)
|
||||
ON CONFLICT (key) DO UPDATE SET value = :value
|
||||
",
|
||||
$request = $this->execute(
|
||||
$this->set,
|
||||
[
|
||||
'index' => $index,
|
||||
'value' => \serialize($value),
|
||||
'value' => new ByteA(\serialize($value)),
|
||||
]
|
||||
);
|
||||
|
||||
@ -104,203 +136,35 @@ class PostgresArray extends SqlArray
|
||||
{
|
||||
Logger::log("Creating/checking table {$this->table}", Logger::WARNING);
|
||||
|
||||
yield $this->request("
|
||||
yield $this->db->query("
|
||||
CREATE TABLE IF NOT EXISTS \"{$this->table}\"
|
||||
(
|
||||
\"key\" VARCHAR(255) NOT NULL,
|
||||
\"value\" BYTEA NULL,
|
||||
\"ts\" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
CONSTRAINT \"{$this->table}_pkey\" PRIMARY KEY(\"key\")
|
||||
);
|
||||
|
||||
|
||||
|
||||
|
||||
);
|
||||
");
|
||||
|
||||
yield $this->request("
|
||||
yield $this->db->query("
|
||||
DROP TRIGGER IF exists \"{$this->table}_update_ts_trigger\" ON \"{$this->table}\";
|
||||
");
|
||||
|
||||
yield $this->request("
|
||||
yield $this->db->query("
|
||||
CREATE TRIGGER \"{$this->table}_update_ts_trigger\" BEFORE UPDATE ON \"{$this->table}\" FOR EACH ROW EXECUTE PROCEDURE update_ts();
|
||||
");
|
||||
}
|
||||
|
||||
public function __sleep()
|
||||
{
|
||||
return ['table', 'dbSettings'];
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if key isset.
|
||||
*
|
||||
* @param $key
|
||||
*
|
||||
* @return Promise<bool> true if the offset exists, otherwise false
|
||||
*/
|
||||
public function isset($key): Promise
|
||||
{
|
||||
return call(fn () => yield $this->offsetGet($key) !== null);
|
||||
}
|
||||
|
||||
|
||||
public function offsetGet($offset): Promise
|
||||
{
|
||||
return call(function () use ($offset) {
|
||||
if ($cached = $this->getCache($offset)) {
|
||||
return $cached;
|
||||
}
|
||||
|
||||
$row = yield $this->request(
|
||||
"SELECT value FROM \"{$this->table}\" WHERE key = :index LIMIT 1",
|
||||
['index' => $offset]
|
||||
);
|
||||
|
||||
if ($value = $this->getValue($row)) {
|
||||
$this->setCache($offset, $value);
|
||||
}
|
||||
|
||||
return $value;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Unset value for an offset.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.offsetunset.php
|
||||
*
|
||||
* @param string $index <p>
|
||||
* The offset to unset.
|
||||
* </p>
|
||||
*
|
||||
* @return Promise
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function offsetUnset($index): Promise
|
||||
{
|
||||
$this->unsetCache($index);
|
||||
|
||||
return $this->request(
|
||||
"
|
||||
DELETE FROM \"{$this->table}\"
|
||||
WHERE key = :index
|
||||
",
|
||||
['index' => $index]
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get array copy.
|
||||
*
|
||||
* @return Promise<array>
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function getArrayCopy(): Promise
|
||||
{
|
||||
return call(function () {
|
||||
$iterator = $this->getIterator();
|
||||
$result = [];
|
||||
while (yield $iterator->advance()) {
|
||||
[$key, $value] = $iterator->getCurrent();
|
||||
$result[$key] = $value;
|
||||
}
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
public function getIterator(): Producer
|
||||
{
|
||||
return new Producer(function (callable $emit) {
|
||||
$request = yield $this->db->execute("SELECT key, value FROM \"{$this->table}\"");
|
||||
|
||||
while (yield $request->advance()) {
|
||||
$row = $request->getCurrent();
|
||||
yield $emit([$row['key'], $this->getValue($row)]);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Count elements.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.count.php
|
||||
* @return Promise<int> The number of elements or public properties in the associated
|
||||
* array or object, respectively.
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function count(): Promise
|
||||
{
|
||||
return call(function () {
|
||||
$row = yield $this->request("SELECT count(key) as count FROM \"{$this->table}\"");
|
||||
return $row[0]['count'] ?? 0;
|
||||
});
|
||||
}
|
||||
|
||||
private function getValue(array $row)
|
||||
{
|
||||
if ($row) {
|
||||
if (!empty($row[0]['value'])) {
|
||||
$row = \reset($row);
|
||||
}
|
||||
return \unserialize($row['value']);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
protected function renameTable(string $from, string $to): \Generator
|
||||
{
|
||||
Logger::log("Renaming table {$from} to {$to}", Logger::WARNING);
|
||||
yield $this->request("
|
||||
|
||||
yield $this->db->query("
|
||||
DROP TABLE IF EXISTS \"{$to}\";
|
||||
");
|
||||
|
||||
yield $this->db->query("
|
||||
ALTER TABLE \"{$from}\" RENAME TO \"{$to}\";
|
||||
");
|
||||
|
||||
yield $this->request("
|
||||
DROP TABLE IF EXISTS \"{$from}\";
|
||||
");
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform async request to db.
|
||||
*
|
||||
* @param string $query
|
||||
* @param array $params
|
||||
*
|
||||
* @return Promise
|
||||
* @throws \Throwable
|
||||
*/
|
||||
private function request(string $query, array $params = []): Promise
|
||||
{
|
||||
return call(function () use ($query, $params) {
|
||||
Logger::log([$query, $params], Logger::VERBOSE);
|
||||
|
||||
if (empty($this->db) || !$this->db->isAlive()) {
|
||||
Logger::log('No database connection', Logger::WARNING);
|
||||
return [];
|
||||
}
|
||||
|
||||
if (
|
||||
!empty($params['index'])
|
||||
&& !\mb_check_encoding($params['index'], 'UTF-8')
|
||||
) {
|
||||
$params['index'] = \mb_convert_encoding($params['index'], 'UTF-8');
|
||||
}
|
||||
|
||||
try {
|
||||
$request = yield $this->db->execute($query, $params);
|
||||
} catch (\Throwable $e) {
|
||||
Logger::log($e->getMessage(), Logger::ERROR);
|
||||
return [];
|
||||
}
|
||||
|
||||
$result = [];
|
||||
if ($request instanceof ResultSet) {
|
||||
while (yield $request->advance()) {
|
||||
$result[] = $request->getCurrent();
|
||||
}
|
||||
}
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ use function Amp\call;
|
||||
/**
|
||||
* Redis database backend.
|
||||
*/
|
||||
class RedisArray extends SqlArray
|
||||
class RedisArray extends DriverArray
|
||||
{
|
||||
protected DatabaseRedis $dbSettings;
|
||||
private RedisRedis $db;
|
||||
@ -33,10 +33,6 @@ class RedisArray extends SqlArray
|
||||
{
|
||||
return $this->initConnection($this->dbSettings);
|
||||
}
|
||||
public function __toString(): string
|
||||
{
|
||||
return $this->table;
|
||||
}
|
||||
/**
|
||||
* @return Generator
|
||||
*
|
||||
@ -47,7 +43,6 @@ class RedisArray extends SqlArray
|
||||
yield new Success;
|
||||
}
|
||||
|
||||
|
||||
protected function renameTable(string $from, string $to): \Generator
|
||||
{
|
||||
Logger::log("Renaming table {$from} to {$to}", Logger::WARNING);
|
||||
@ -77,11 +72,6 @@ class RedisArray extends SqlArray
|
||||
}
|
||||
|
||||
|
||||
public function __sleep()
|
||||
{
|
||||
return ['table', 'dbSettings'];
|
||||
}
|
||||
|
||||
/**
|
||||
* Get redis key name.
|
||||
*
|
||||
|
@ -2,8 +2,12 @@
|
||||
|
||||
namespace danog\MadelineProto\Db;
|
||||
|
||||
use Amp\Producer;
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\Settings\Database\DatabaseAbstract;
|
||||
use Amp\Sql\ResultSet;
|
||||
use Amp\Sql\Statement;
|
||||
use Amp\Success;
|
||||
use danog\MadelineProto\Logger;
|
||||
|
||||
use function Amp\call;
|
||||
|
||||
@ -12,82 +16,189 @@ use function Amp\call;
|
||||
*/
|
||||
abstract class SqlArray extends DriverArray
|
||||
{
|
||||
protected string $table;
|
||||
protected Statement $get;
|
||||
protected Statement $set;
|
||||
protected Statement $unset;
|
||||
protected Statement $count;
|
||||
|
||||
protected Statement $iterate;
|
||||
|
||||
/**
|
||||
* Create table for property.
|
||||
* Prepare statements.
|
||||
*
|
||||
* @return \Generator
|
||||
*
|
||||
* @throws \Throwable
|
||||
*/
|
||||
abstract protected function prepareTable(): \Generator;
|
||||
|
||||
abstract protected function renameTable(string $from, string $to): \Generator;
|
||||
|
||||
abstract protected function prepareStatements(): \Generator;
|
||||
|
||||
/**
|
||||
* @param string $table
|
||||
* @param DbArray|array|null $value
|
||||
* @param DatabaseAbstract $settings
|
||||
* Get value from row.
|
||||
*
|
||||
* @return Promise
|
||||
*
|
||||
* @psalm-return Promise<static>
|
||||
* @param array $row
|
||||
* @return void
|
||||
*/
|
||||
public static function getInstance(string $table, $value, $settings): Promise
|
||||
abstract protected function getValue(array $row);
|
||||
|
||||
|
||||
public function getIterator(): Producer
|
||||
{
|
||||
if ($value instanceof static && $value->table === $table) {
|
||||
$instance = &$value;
|
||||
} else {
|
||||
$instance = new static();
|
||||
$instance->table = $table;
|
||||
}
|
||||
return new Producer(function (callable $emit) {
|
||||
$request = yield $this->iterate->execute();
|
||||
|
||||
/** @psalm-suppress UndefinedPropertyAssignment */
|
||||
$instance->dbSettings = $settings;
|
||||
$instance->ttl = $settings->getCacheTtl();
|
||||
|
||||
$instance->startCacheCleanupLoop();
|
||||
|
||||
return call(static function () use ($instance, $value, $settings) {
|
||||
yield from $instance->initConnection($settings);
|
||||
yield from $instance->prepareTable();
|
||||
|
||||
// Skip migrations if its same object
|
||||
if ($instance !== $value) {
|
||||
if ($value instanceof DriverArray) {
|
||||
yield from $value->initStartup();
|
||||
}
|
||||
yield from static::renameTmpTable($instance, $value);
|
||||
yield from static::migrateDataToDb($instance, $value);
|
||||
while (yield $request->advance()) {
|
||||
$row = $request->getCurrent();
|
||||
yield $emit([$row['key'], $this->getValue($row)]);
|
||||
}
|
||||
|
||||
return $instance;
|
||||
});
|
||||
}
|
||||
public function getArrayCopy(): Promise
|
||||
{
|
||||
return call(function () {
|
||||
$iterator = $this->getIterator();
|
||||
$result = [];
|
||||
while (yield $iterator->advance()) {
|
||||
[$key, $value] = $iterator->getCurrent();
|
||||
$result[$key] = $value;
|
||||
}
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Rename table of old database, if the new one is not a temporary table name.
|
||||
* Check if key isset.
|
||||
*
|
||||
* Otherwise, change name of table in new database to match old table name.
|
||||
* @param $key
|
||||
*
|
||||
* @param self $new New db
|
||||
* @param DbArray|array|null $old Old db
|
||||
*
|
||||
* @return \Generator
|
||||
* @return Promise<bool> true if the offset exists, otherwise false
|
||||
*/
|
||||
protected static function renameTmpTable(self $new, $old): \Generator
|
||||
public function isset($key): Promise
|
||||
{
|
||||
if ($old instanceof static && $old->table) {
|
||||
if (
|
||||
$old->table !== $new->table &&
|
||||
\mb_strpos($new->table, 'tmp') !== 0
|
||||
) {
|
||||
yield from $new->renameTable($old->table, $new->table);
|
||||
} else {
|
||||
$new->table = $old->table;
|
||||
return call(fn () => yield $this->offsetGet($key) !== null);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Unset value for an offset.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.offsetunset.php
|
||||
*
|
||||
* @param string|int $index <p>
|
||||
* The offset to unset.
|
||||
* </p>
|
||||
*
|
||||
* @return Promise
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function offsetUnset($index): Promise
|
||||
{
|
||||
$this->unsetCache($index);
|
||||
|
||||
return $this->execute(
|
||||
$this->unset,
|
||||
['index' => $index]
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Count elements.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.count.php
|
||||
* @return Promise<int> The number of elements or public properties in the associated
|
||||
* array or object, respectively.
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function count(): Promise
|
||||
{
|
||||
return call(function () {
|
||||
$row = yield $this->execute($this->count);
|
||||
return $row[0]['count'] ?? 0;
|
||||
});
|
||||
}
|
||||
|
||||
public function offsetGet($offset): Promise
|
||||
{
|
||||
return call(function () use ($offset) {
|
||||
if ($cached = $this->getCache($offset)) {
|
||||
return $cached;
|
||||
}
|
||||
|
||||
$row = yield $this->execute($this->get, ['index' => $offset]);
|
||||
|
||||
if ($value = $this->getValue($row)) {
|
||||
$this->setCache($offset, $value);
|
||||
}
|
||||
|
||||
return $value;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set value for an offset.
|
||||
*
|
||||
* @link https://php.net/manual/en/arrayiterator.offsetset.php
|
||||
*
|
||||
* @param string|int $index <p>
|
||||
* The index to set for.
|
||||
* </p>
|
||||
* @param $value
|
||||
*
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function offsetSet($index, $value): Promise
|
||||
{
|
||||
if ($this->getCache($index) === $value) {
|
||||
return new Success();
|
||||
}
|
||||
|
||||
$this->setCache($index, $value);
|
||||
|
||||
$request = $this->execute(
|
||||
$this->set,
|
||||
[
|
||||
'index' => $index,
|
||||
'value' => \serialize($value),
|
||||
]
|
||||
);
|
||||
|
||||
//Ensure that cache is synced with latest insert in case of concurrent requests.
|
||||
$request->onResolve(fn () => $this->setCache($index, $value));
|
||||
|
||||
return $request;
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform async request to db.
|
||||
*
|
||||
* @param Statement $query
|
||||
* @param array $params
|
||||
*
|
||||
* @return Promise
|
||||
* @throws \Throwable
|
||||
*/
|
||||
protected function execute(Statement $stmt, array $params = []): Promise
|
||||
{
|
||||
return call(function () use ($stmt, $params) {
|
||||
if (
|
||||
!empty($params['index'])
|
||||
&& !\mb_check_encoding($params['index'], 'UTF-8')
|
||||
) {
|
||||
$params['index'] = \mb_convert_encoding($params['index'], 'UTF-8');
|
||||
}
|
||||
|
||||
try {
|
||||
$request = yield $stmt->execute($params);
|
||||
} catch (\Throwable $e) {
|
||||
Logger::log($e->getMessage(), Logger::ERROR);
|
||||
return [];
|
||||
}
|
||||
|
||||
$result = [];
|
||||
if ($request instanceof ResultSet) {
|
||||
while (yield $request->advance()) {
|
||||
$result[] = $request->getCurrent();
|
||||
}
|
||||
}
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -528,6 +528,11 @@ class MTProto extends AsyncConstruct implements TLCallback
|
||||
*/
|
||||
public static function serializeAll(): void
|
||||
{
|
||||
static $done = false;
|
||||
if ($done) {
|
||||
return;
|
||||
}
|
||||
$done = true;
|
||||
Logger::log('Prompting final serialization (SHUTDOWN)...');
|
||||
foreach (self::$references as $instance) {
|
||||
Tools::wait($instance->wrapper->serialize());
|
||||
@ -976,17 +981,18 @@ class MTProto extends AsyncConstruct implements TLCallback
|
||||
if (!isset($this->datacenter)) {
|
||||
$this->datacenter ??= new DataCenter($this, $this->dcList, $this->settings->getConnection());
|
||||
}
|
||||
$db = [];
|
||||
if (!isset($this->referenceDatabase)) {
|
||||
$this->referenceDatabase = new ReferenceDatabase($this);
|
||||
yield from $this->referenceDatabase->init();
|
||||
$db []= $this->referenceDatabase->init();
|
||||
} else {
|
||||
yield from $this->referenceDatabase->init();
|
||||
$db []= $this->referenceDatabase->init();
|
||||
}
|
||||
if (!isset($this->minDatabase)) {
|
||||
$this->minDatabase = new MinDatabase($this);
|
||||
yield from $this->minDatabase->init();
|
||||
$db []= $this->minDatabase->init();
|
||||
} else {
|
||||
yield from $this->minDatabase->init();
|
||||
$db []= $this->minDatabase->init();
|
||||
}
|
||||
if (!isset($this->TL)) {
|
||||
$this->TL = new TL($this);
|
||||
@ -997,7 +1003,8 @@ class MTProto extends AsyncConstruct implements TLCallback
|
||||
$this->TL->init($this->settings->getSchema(), $callbacks);
|
||||
}
|
||||
|
||||
yield from $this->initDb($this);
|
||||
$db []= $this->initDb($this);
|
||||
yield Tools::all($db);
|
||||
yield from $this->fillUsernamesCache();
|
||||
}
|
||||
|
||||
|
@ -232,8 +232,8 @@ class Magic
|
||||
}
|
||||
if (!self::$initedLight) {
|
||||
// Setup error reporting
|
||||
\set_error_handler(['\\danog\\MadelineProto\\Exception', 'ExceptionErrorHandler']);
|
||||
\set_exception_handler(['\\danog\\MadelineProto\\Exception', 'ExceptionHandler']);
|
||||
\set_error_handler([Exception::class, 'ExceptionErrorHandler']);
|
||||
\set_exception_handler([Exception::class, 'ExceptionHandler']);
|
||||
self::$isIpcWorker = \defined('MADELINE_WORKER_TYPE') ? \MADELINE_WORKER_TYPE === 'madeline-ipc' : false;
|
||||
if (PHP_SAPI !== 'cli' && PHP_SAPI !== 'phpdbg') {
|
||||
try {
|
||||
|
@ -21,7 +21,7 @@ abstract class SqlAbstract extends DatabaseAbstract
|
||||
/**
|
||||
* Maximum connection limit.
|
||||
*/
|
||||
protected int $maxConnections = 10;
|
||||
protected int $maxConnections = 100;
|
||||
|
||||
/**
|
||||
* Idle timeout.
|
||||
@ -49,6 +49,12 @@ abstract class SqlAbstract extends DatabaseAbstract
|
||||
parent::mergeArray($settings);
|
||||
}
|
||||
|
||||
public function __wakeup()
|
||||
{
|
||||
if ($this->maxConnections < 40) {
|
||||
$this->maxConnections = 100;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Get maximum connection limit.
|
||||
*
|
||||
|
Loading…
Reference in New Issue
Block a user