Improve caching mechanism, deduplicate SQL code
This commit is contained in:
parent
5567490c7a
commit
5a5c02e56b
@ -37,7 +37,8 @@
|
|||||||
"tivie/htaccess-parser": "^0.2.3",
|
"tivie/htaccess-parser": "^0.2.3",
|
||||||
"amphp/log": "^1.1",
|
"amphp/log": "^1.1",
|
||||||
"danog/loop": "^0.1.0",
|
"danog/loop": "^0.1.0",
|
||||||
"danog/tgseclib": "^3"
|
"danog/tgseclib": "^3",
|
||||||
|
"amphp/redis": "^1.0"
|
||||||
},
|
},
|
||||||
"require-dev": {
|
"require-dev": {
|
||||||
"vlucas/phpdotenv": "^3",
|
"vlucas/phpdotenv": "^3",
|
||||||
|
@ -8,31 +8,27 @@ use danog\MadelineProto\Logger;
|
|||||||
trait ArrayCacheTrait
|
trait ArrayCacheTrait
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* Values stored in this format:
|
* @var array<mixed>
|
||||||
* [
|
|
||||||
* [
|
|
||||||
* 'value' => mixed,
|
|
||||||
* 'ttl' => int
|
|
||||||
* ],
|
|
||||||
* ...
|
|
||||||
* ].
|
|
||||||
* @var array
|
|
||||||
*/
|
*/
|
||||||
protected array $cache = [];
|
protected array $cache = [];
|
||||||
|
/**
|
||||||
|
* @var array<int>
|
||||||
|
*/
|
||||||
|
protected array $ttlValues = [];
|
||||||
|
|
||||||
|
|
||||||
protected string $ttl = '+5 minutes';
|
protected string $ttl = '+5 minutes';
|
||||||
private string $ttlCheckInterval = '+1 minute';
|
private string $ttlCheckInterval = '+1 minute';
|
||||||
|
|
||||||
|
private ?string $cacheCleanupId = null;
|
||||||
|
|
||||||
protected function getCache(string $key, $default = null)
|
protected function getCache(string $key, $default = null)
|
||||||
{
|
{
|
||||||
$cacheItem = $this->cache[$key] ?? null;
|
if (!isset($this->ttlValues[$key])) {
|
||||||
$result = $default;
|
return $default;
|
||||||
|
|
||||||
if (\is_array($cacheItem)) {
|
|
||||||
$result = $cacheItem['value'];
|
|
||||||
$this->cache[$key]['ttl'] = \strtotime($this->ttl);
|
|
||||||
}
|
}
|
||||||
|
$this->ttlValues[$key] = \strtotime($this->ttl);
|
||||||
return $result;
|
return $this->cache[$key];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -43,10 +39,8 @@ trait ArrayCacheTrait
|
|||||||
*/
|
*/
|
||||||
protected function setCache(string $key, $value): void
|
protected function setCache(string $key, $value): void
|
||||||
{
|
{
|
||||||
$this->cache[$key] = [
|
$this->cache[$key] = $value;
|
||||||
'value' => $value,
|
$this->ttlValues[$key] = \strtotime($this->ttl);
|
||||||
'ttl' => \strtotime($this->ttl),
|
|
||||||
];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -56,12 +50,19 @@ trait ArrayCacheTrait
|
|||||||
*/
|
*/
|
||||||
protected function unsetCache(string $key): void
|
protected function unsetCache(string $key): void
|
||||||
{
|
{
|
||||||
unset($this->cache[$key]);
|
unset($this->cache[$key], $this->ttlValues[$key]);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected function startCacheCleanupLoop(): void
|
protected function startCacheCleanupLoop(): void
|
||||||
{
|
{
|
||||||
Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, fn () => $this->cleanupCache());
|
$this->cacheCleanupId = Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, [$this, 'cleanupCache']);
|
||||||
|
}
|
||||||
|
protected function stopCacheCleanupLoop(): void
|
||||||
|
{
|
||||||
|
if ($this->cacheCleanupId) {
|
||||||
|
Loop::cancel($this->cacheCleanupId);
|
||||||
|
$this->cacheCleanupId = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -70,22 +71,20 @@ trait ArrayCacheTrait
|
|||||||
protected function cleanupCache(): void
|
protected function cleanupCache(): void
|
||||||
{
|
{
|
||||||
$now = \time();
|
$now = \time();
|
||||||
$oldKeys = [];
|
$oldCount = 0;
|
||||||
foreach ($this->cache as $cacheKey => $cacheValue) {
|
foreach ($this->ttlValues as $cacheKey => $ttl) {
|
||||||
if ($cacheValue['ttl'] < $now) {
|
if ($ttl < $now) {
|
||||||
$oldKeys[] = $cacheKey;
|
$this->unsetCache($cacheKey);
|
||||||
|
$oldCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
foreach ($oldKeys as $oldKey) {
|
|
||||||
$this->unsetCache($oldKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
Logger::log(
|
Logger::log(
|
||||||
\sprintf(
|
\sprintf(
|
||||||
"cache for table:%s; keys left: %s; keys removed: %s",
|
"cache for table:%s; keys left: %s; keys removed: %s",
|
||||||
$this->table,
|
$this->table,
|
||||||
\count($this->cache),
|
\count($this->cache),
|
||||||
\count($oldKeys)
|
$oldCount
|
||||||
),
|
),
|
||||||
Logger::VERBOSE
|
Logger::VERBOSE
|
||||||
);
|
);
|
||||||
|
@ -15,6 +15,8 @@ interface DbArray extends DbType, \ArrayAccess, \Countable
|
|||||||
public function count(): Promise;
|
public function count(): Promise;
|
||||||
public function getIterator(): Producer;
|
public function getIterator(): Producer;
|
||||||
|
|
||||||
|
public function init(): Promise;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated
|
* @deprecated
|
||||||
* @internal
|
* @internal
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
<?php
|
<?php
|
||||||
|
|
||||||
namespace danog\MadelineProto\Db;
|
namespace danog\MadelineProto\Db\Driver;
|
||||||
|
|
||||||
use Amp\Mysql\ConnectionConfig;
|
use Amp\Mysql\ConnectionConfig;
|
||||||
use Amp\Mysql\Pool;
|
use Amp\Mysql\Pool;
|
@ -1,6 +1,6 @@
|
|||||||
<?php
|
<?php
|
||||||
|
|
||||||
namespace danog\MadelineProto\Db;
|
namespace danog\MadelineProto\Db\Driver;
|
||||||
|
|
||||||
use Amp\Postgres\ConnectionConfig;
|
use Amp\Postgres\ConnectionConfig;
|
||||||
use Amp\Postgres\Pool;
|
use Amp\Postgres\Pool;
|
50
src/danog/MadelineProto/Db/Driver/Redis.php
Normal file
50
src/danog/MadelineProto/Db/Driver/Redis.php
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace danog\MadelineProto\Db\Driver;
|
||||||
|
|
||||||
|
use Amp\Redis\Config;
|
||||||
|
use Amp\Redis\Redis as RedisRedis;
|
||||||
|
use Amp\Redis\RemoteExecutorFactory;
|
||||||
|
use Amp\Sql\Common\ConnectionPool;
|
||||||
|
|
||||||
|
class Redis
|
||||||
|
{
|
||||||
|
/** @var RedisRedis[] */
|
||||||
|
private static array $connections = [];
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param string $host
|
||||||
|
* @param int $port
|
||||||
|
* @param string $user
|
||||||
|
* @param string $password
|
||||||
|
* @param string $db
|
||||||
|
*
|
||||||
|
* @param int $maxConnections
|
||||||
|
* @param int $idleTimeout
|
||||||
|
*
|
||||||
|
* @throws \Amp\Sql\ConnectionException
|
||||||
|
* @throws \Amp\Sql\FailureException
|
||||||
|
* @throws \Throwable
|
||||||
|
*
|
||||||
|
* @return RedisRedis
|
||||||
|
*/
|
||||||
|
public static function getConnection(
|
||||||
|
string $host = '127.0.0.1',
|
||||||
|
int $port = 6379,
|
||||||
|
string $password = '',
|
||||||
|
int $db = 0,
|
||||||
|
int $maxConnections = ConnectionPool::DEFAULT_MAX_CONNECTIONS,
|
||||||
|
int $idleTimeout = ConnectionPool::DEFAULT_IDLE_TIMEOUT
|
||||||
|
): RedisRedis {
|
||||||
|
$dbKey = "$host:$port:$db";
|
||||||
|
if (empty(static::$connections[$dbKey])) {
|
||||||
|
$config = Config::fromUri(
|
||||||
|
"host={$host} port={$port} password={$password} db={$db}"
|
||||||
|
);
|
||||||
|
|
||||||
|
static::$connections[$dbKey] = new RedisRedis((new RemoteExecutorFactory($config))->createQueryExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
|
return static::$connections[$dbKey];
|
||||||
|
}
|
||||||
|
}
|
49
src/danog/MadelineProto/Db/DriverArray.php
Normal file
49
src/danog/MadelineProto/Db/DriverArray.php
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace danog\MadelineProto\Db;
|
||||||
|
|
||||||
|
use danog\MadelineProto\Logger;
|
||||||
|
|
||||||
|
abstract class DriverArray implements DbArray
|
||||||
|
{
|
||||||
|
use ArrayCacheTrait;
|
||||||
|
|
||||||
|
public function __destruct()
|
||||||
|
{
|
||||||
|
$this->stopCacheCleanupLoop();
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract protected function initConnection(array $settings): \Generator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param self $new
|
||||||
|
* @param DbArray|array|null $old
|
||||||
|
*
|
||||||
|
* @return \Generator
|
||||||
|
* @throws \Throwable
|
||||||
|
*/
|
||||||
|
protected static function migrateDataToDb(self $new, $old): \Generator
|
||||||
|
{
|
||||||
|
if (!empty($old) && !$old instanceof static) {
|
||||||
|
Logger::log('Converting database.', Logger::ERROR);
|
||||||
|
|
||||||
|
if ($old instanceof DbArray) {
|
||||||
|
$old = yield $old->getArrayCopy();
|
||||||
|
} else {
|
||||||
|
$old = (array) $old;
|
||||||
|
}
|
||||||
|
$counter = 0;
|
||||||
|
$total = \count($old);
|
||||||
|
foreach ($old as $key => $item) {
|
||||||
|
$counter++;
|
||||||
|
if ($counter % 500 === 0) {
|
||||||
|
yield $new->offsetSet($key, $item);
|
||||||
|
Logger::log("Loading data to table {$new->table}: $counter/$total", Logger::WARNING);
|
||||||
|
} else {
|
||||||
|
$new->offsetSet($key, $item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Logger::log('Converting database done.', Logger::ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -7,15 +7,14 @@ use Amp\Producer;
|
|||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Amp\Sql\ResultSet;
|
use Amp\Sql\ResultSet;
|
||||||
use Amp\Success;
|
use Amp\Success;
|
||||||
|
use danog\MadelineProto\Db\Driver\Mysql;
|
||||||
use danog\MadelineProto\Logger;
|
use danog\MadelineProto\Logger;
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
|
|
||||||
class MysqlArray implements DbArray
|
class MysqlArray extends SqlArray
|
||||||
{
|
{
|
||||||
use ArrayCacheTrait;
|
protected string $table;
|
||||||
|
protected array $settings;
|
||||||
private string $table;
|
|
||||||
private array $settings;
|
|
||||||
private Pool $db;
|
private Pool $db;
|
||||||
|
|
||||||
public function __sleep(): array
|
public function __sleep(): array
|
||||||
@ -23,95 +22,6 @@ class MysqlArray implements DbArray
|
|||||||
return ['table', 'settings'];
|
return ['table', 'settings'];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param string $name
|
|
||||||
* @param DbArray|array|null $value
|
|
||||||
* @param string $tablePrefix
|
|
||||||
* @param array $settings
|
|
||||||
*
|
|
||||||
* @return Promise
|
|
||||||
*/
|
|
||||||
public static function getInstance(string $name, $value = null, string $tablePrefix = '', array $settings = []): Promise
|
|
||||||
{
|
|
||||||
$tableName = "{$tablePrefix}_{$name}";
|
|
||||||
if ($value instanceof self && $value->table === $tableName) {
|
|
||||||
$instance = &$value;
|
|
||||||
} else {
|
|
||||||
$instance = new static();
|
|
||||||
$instance->table = $tableName;
|
|
||||||
}
|
|
||||||
|
|
||||||
$instance->settings = $settings;
|
|
||||||
$instance->ttl = $settings['cache_ttl'] ?? $instance->ttl;
|
|
||||||
|
|
||||||
$instance->startCacheCleanupLoop();
|
|
||||||
|
|
||||||
return call(static function () use ($instance, $value, $settings) {
|
|
||||||
$instance->db = yield from static::getDbConnection($settings);
|
|
||||||
yield from $instance->prepareTable();
|
|
||||||
|
|
||||||
//Skip migrations if its same object
|
|
||||||
if ($instance !== $value) {
|
|
||||||
yield from static::renameTmpTable($instance, $value);
|
|
||||||
yield from static::migrateDataToDb($instance, $value);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $instance;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param MysqlArray $instance
|
|
||||||
* @param DbArray|array|null $value
|
|
||||||
*
|
|
||||||
* @return \Generator
|
|
||||||
*/
|
|
||||||
private static function renameTmpTable(MysqlArray $instance, $value): \Generator
|
|
||||||
{
|
|
||||||
if ($value instanceof static && $value->table) {
|
|
||||||
if (
|
|
||||||
$value->table !== $instance->table &&
|
|
||||||
\mb_strpos($instance->table, 'tmp') !== 0
|
|
||||||
) {
|
|
||||||
yield from $instance->renameTable($value->table, $instance->table);
|
|
||||||
} else {
|
|
||||||
$instance->table = $value->table;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param MysqlArray $instance
|
|
||||||
* @param DbArray|array|null $value
|
|
||||||
*
|
|
||||||
* @return \Generator
|
|
||||||
* @throws \Throwable
|
|
||||||
*/
|
|
||||||
private static function migrateDataToDb(MysqlArray $instance, $value): \Generator
|
|
||||||
{
|
|
||||||
if (!empty($value) && !$value instanceof MysqlArray) {
|
|
||||||
Logger::log('Converting database.', Logger::ERROR);
|
|
||||||
|
|
||||||
if ($value instanceof DbArray) {
|
|
||||||
$value = yield $value->getArrayCopy();
|
|
||||||
} else {
|
|
||||||
$value = (array) $value;
|
|
||||||
}
|
|
||||||
$counter = 0;
|
|
||||||
$total = \count($value);
|
|
||||||
foreach ($value as $key => $item) {
|
|
||||||
$counter++;
|
|
||||||
if ($counter % 500 === 0) {
|
|
||||||
yield $instance->offsetSet($key, $item);
|
|
||||||
Logger::log("Loading data to table {$instance->table}: $counter/$total", Logger::WARNING);
|
|
||||||
} else {
|
|
||||||
$instance->offsetSet($key, $item);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Logger::log('Converting database done.', Logger::ERROR);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public function offsetExists($index): bool
|
public function offsetExists($index): bool
|
||||||
{
|
{
|
||||||
throw new \RuntimeException('Native isset not support promises. Use isset method');
|
throw new \RuntimeException('Native isset not support promises. Use isset method');
|
||||||
@ -272,17 +182,19 @@ class MysqlArray implements DbArray
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function getDbConnection(array $settings): \Generator
|
protected function initConnection(array $settings): \Generator
|
||||||
{
|
{
|
||||||
return Mysql::getConnection(
|
if (!isset($this->db)) {
|
||||||
$settings['host'],
|
$this->db = yield from Mysql::getConnection(
|
||||||
$settings['port'],
|
$settings['host'],
|
||||||
$settings['user'],
|
$settings['port'],
|
||||||
$settings['password'],
|
$settings['user'],
|
||||||
$settings['database'],
|
$settings['password'],
|
||||||
$settings['max_connections'],
|
$settings['database'],
|
||||||
$settings['idle_timeout']
|
$settings['max_connections'],
|
||||||
);
|
$settings['idle_timeout']
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -6,13 +6,12 @@ use Amp\Postgres\Pool;
|
|||||||
use Amp\Producer;
|
use Amp\Producer;
|
||||||
use Amp\Promise;
|
use Amp\Promise;
|
||||||
use Amp\Sql\ResultSet;
|
use Amp\Sql\ResultSet;
|
||||||
|
use danog\MadelineProto\Db\Driver\Postgres;
|
||||||
use danog\MadelineProto\Logger;
|
use danog\MadelineProto\Logger;
|
||||||
use function Amp\call;
|
use function Amp\call;
|
||||||
|
|
||||||
class PostgresArray implements DbArray
|
class PostgresArray extends SqlArray
|
||||||
{
|
{
|
||||||
use ArrayCacheTrait;
|
|
||||||
|
|
||||||
private string $table;
|
private string $table;
|
||||||
private array $settings;
|
private array $settings;
|
||||||
private Pool $db;
|
private Pool $db;
|
||||||
@ -57,7 +56,7 @@ class PostgresArray implements DbArray
|
|||||||
return $request;
|
return $request;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static function getDbConnection(array $settings): \Generator
|
protected static function getDbConnection(array $settings): \Generator
|
||||||
{
|
{
|
||||||
return Postgres::getConnection(
|
return Postgres::getConnection(
|
||||||
$settings['host'],
|
$settings['host'],
|
||||||
@ -76,7 +75,7 @@ class PostgresArray implements DbArray
|
|||||||
* @return array|null
|
* @return array|null
|
||||||
* @throws \Throwable
|
* @throws \Throwable
|
||||||
*/
|
*/
|
||||||
private function prepareTable()
|
protected function prepareTable()
|
||||||
{
|
{
|
||||||
Logger::log("Creating/checking table {$this->table}", Logger::WARNING);
|
Logger::log("Creating/checking table {$this->table}", Logger::WARNING);
|
||||||
|
|
||||||
@ -108,95 +107,6 @@ class PostgresArray implements DbArray
|
|||||||
return ['table', 'settings'];
|
return ['table', 'settings'];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param string $name
|
|
||||||
* @param DbArray|array|null $value
|
|
||||||
* @param string $tablePrefix
|
|
||||||
* @param array $settings
|
|
||||||
*
|
|
||||||
* @return Promise
|
|
||||||
*/
|
|
||||||
public static function getInstance(string $name, $value = null, string $tablePrefix = '', array $settings = []): Promise
|
|
||||||
{
|
|
||||||
$tableName = "{$tablePrefix}_{$name}";
|
|
||||||
if ($value instanceof self && $value->table === $tableName) {
|
|
||||||
$instance = &$value;
|
|
||||||
} else {
|
|
||||||
$instance = new static();
|
|
||||||
$instance->table = $tableName;
|
|
||||||
}
|
|
||||||
|
|
||||||
$instance->settings = $settings;
|
|
||||||
$instance->ttl = $settings['cache_ttl'] ?? $instance->ttl;
|
|
||||||
|
|
||||||
$instance->startCacheCleanupLoop();
|
|
||||||
|
|
||||||
return call(static function () use ($instance, $value, $settings) {
|
|
||||||
$instance->db = yield from static::getDbConnection($settings);
|
|
||||||
|
|
||||||
yield from $instance->prepareTable();
|
|
||||||
|
|
||||||
//Skip migrations if its same object
|
|
||||||
if ($instance !== $value) {
|
|
||||||
yield from static::renameTmpTable($instance, $value);
|
|
||||||
yield from static::migrateDataToDb($instance, $value);
|
|
||||||
}
|
|
||||||
|
|
||||||
return $instance;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param PostgresArray $instance
|
|
||||||
* @param DbArray|array|null $value
|
|
||||||
*
|
|
||||||
* @return \Generator
|
|
||||||
*/
|
|
||||||
private static function renameTmpTable(PostgresArray $instance, $value): \Generator
|
|
||||||
{
|
|
||||||
if ($value instanceof static && $value->table) {
|
|
||||||
if (
|
|
||||||
$value->table !== $instance->table &&
|
|
||||||
\mb_strpos($instance->table, 'tmp') !== 0
|
|
||||||
) {
|
|
||||||
yield from $instance->renameTable($value->table, $instance->table);
|
|
||||||
} else {
|
|
||||||
$instance->table = $value->table;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param PostgresArray $instance
|
|
||||||
* @param DbArray|array|null $value
|
|
||||||
*
|
|
||||||
* @return \Generator
|
|
||||||
* @throws \Throwable
|
|
||||||
*/
|
|
||||||
private static function migrateDataToDb(PostgresArray $instance, $value): \Generator
|
|
||||||
{
|
|
||||||
if (!empty($value) && !$value instanceof PostgresArray) {
|
|
||||||
Logger::log('Converting database.', Logger::ERROR);
|
|
||||||
|
|
||||||
if ($value instanceof DbArray) {
|
|
||||||
$value = yield $value->getArrayCopy();
|
|
||||||
} else {
|
|
||||||
$value = (array) $value;
|
|
||||||
}
|
|
||||||
$counter = 0;
|
|
||||||
$total = \count($value);
|
|
||||||
foreach ($value as $key => $item) {
|
|
||||||
$counter++;
|
|
||||||
if ($counter % 500 === 0) {
|
|
||||||
yield $instance->offsetSet($key, $item);
|
|
||||||
Logger::log("Loading data to table {$instance->table}: $counter/$total", Logger::WARNING);
|
|
||||||
} else {
|
|
||||||
$instance->offsetSet($key, $item);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Logger::log('Converting database done.', Logger::ERROR);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public function offsetExists($index): bool
|
public function offsetExists($index): bool
|
||||||
{
|
{
|
||||||
@ -320,7 +230,7 @@ class PostgresArray implements DbArray
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private function renameTable(string $from, string $to)
|
protected function renameTable(string $from, string $to)
|
||||||
{
|
{
|
||||||
Logger::log("Renaming table {$from} to {$to}", Logger::WARNING);
|
Logger::log("Renaming table {$from} to {$to}", Logger::WARNING);
|
||||||
yield $this->request("
|
yield $this->request("
|
||||||
|
336
src/danog/MadelineProto/Db/RedisArray.php
Normal file
336
src/danog/MadelineProto/Db/RedisArray.php
Normal file
@ -0,0 +1,336 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace danog\MadelineProto\Db;
|
||||||
|
|
||||||
|
use Amp\Postgres\Pool;
|
||||||
|
use Amp\Producer;
|
||||||
|
use Amp\Promise;
|
||||||
|
use Amp\Redis\Redis;
|
||||||
|
use Amp\Sql\ResultSet;
|
||||||
|
use danog\MadelineProto\Db\Driver\Redis as DriverRedis;
|
||||||
|
use danog\MadelineProto\Logger;
|
||||||
|
use Generator;
|
||||||
|
|
||||||
|
use function Amp\call;
|
||||||
|
|
||||||
|
class RedisArray extends DriverArray
|
||||||
|
{
|
||||||
|
use ArrayCacheTrait;
|
||||||
|
|
||||||
|
private string $table;
|
||||||
|
private array $settings;
|
||||||
|
private Pool $db;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set value for an offset.
|
||||||
|
*
|
||||||
|
* @link https://php.net/manual/en/arrayiterator.offsetset.php
|
||||||
|
*
|
||||||
|
* @param string $index <p>
|
||||||
|
* The index to set for.
|
||||||
|
* </p>
|
||||||
|
* @param $value
|
||||||
|
*
|
||||||
|
* @throws \Throwable
|
||||||
|
*/
|
||||||
|
|
||||||
|
public function offsetSet($index, $value): Promise
|
||||||
|
{
|
||||||
|
if ($this->getCache($index) === $value) {
|
||||||
|
return call(fn () =>null);
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->setCache($index, $value);
|
||||||
|
|
||||||
|
$request = $this->request(
|
||||||
|
"
|
||||||
|
INSERT INTO \"{$this->table}\"
|
||||||
|
(key,value)
|
||||||
|
VALUES (:index, :value)
|
||||||
|
ON CONFLICT (key) DO UPDATE SET value = :value
|
||||||
|
",
|
||||||
|
[
|
||||||
|
'index' => $index,
|
||||||
|
'value' => \serialize($value),
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
|
//Ensure that cache is synced with latest insert in case of concurrent requests.
|
||||||
|
$request->onResolve(fn () => $this->setCache($index, $value));
|
||||||
|
|
||||||
|
return $request;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected function initConnection(array $settings): \Generator
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
public function __sleep()
|
||||||
|
{
|
||||||
|
return ['table', 'settings'];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param string $name
|
||||||
|
* @param DbArray|array|null $value
|
||||||
|
* @param string $tablePrefix
|
||||||
|
* @param array $settings
|
||||||
|
*
|
||||||
|
* @return Promise
|
||||||
|
*/
|
||||||
|
public static function getInstance(string $name, $value = null, string $tablePrefix = '', array $settings = []): Promise
|
||||||
|
{
|
||||||
|
$tableName = "{$tablePrefix}_{$name}";
|
||||||
|
if ($value instanceof self && $value->table === $tableName) {
|
||||||
|
$instance = &$value;
|
||||||
|
} else {
|
||||||
|
$instance = new static();
|
||||||
|
$instance->table = $tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
$instance->settings = $settings;
|
||||||
|
$instance->db = static::getDbConnection($settings);
|
||||||
|
$instance->ttl = $settings['cache_ttl'] ?? $instance->ttl;
|
||||||
|
|
||||||
|
$instance->startCacheCleanupLoop();
|
||||||
|
|
||||||
|
return call(static function () use ($instance, $value, $settings) {
|
||||||
|
//Skip migrations if its same object
|
||||||
|
if ($instance !== $value) {
|
||||||
|
yield from static::renameTmpTable($instance, $value);
|
||||||
|
yield from static::migrateDataToDb($instance, $value);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $instance;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param PostgresArray $instance
|
||||||
|
* @param DbArray|array|null $value
|
||||||
|
*
|
||||||
|
* @return \Generator
|
||||||
|
*/
|
||||||
|
private static function renameTmpTable(self $instance, $value): \Generator
|
||||||
|
{
|
||||||
|
if ($value instanceof static && $value->table) {
|
||||||
|
if (
|
||||||
|
$value->table !== $instance->table &&
|
||||||
|
\mb_strpos($instance->table, 'tmp') !== 0
|
||||||
|
) {
|
||||||
|
yield from $instance->renameTable($value->table, $instance->table);
|
||||||
|
} else {
|
||||||
|
$instance->table = $value->table;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param PostgresArray $instance
|
||||||
|
* @param DbArray|array|null $value
|
||||||
|
*
|
||||||
|
* @return \Generator
|
||||||
|
* @throws \Throwable
|
||||||
|
*/
|
||||||
|
private static function migrateDataToDb(self $instance, $value): \Generator
|
||||||
|
{
|
||||||
|
if (!empty($value) && !$value instanceof PostgresArray) {
|
||||||
|
Logger::log('Converting database.', Logger::ERROR);
|
||||||
|
|
||||||
|
if ($value instanceof DbArray) {
|
||||||
|
$value = yield $value->getArrayCopy();
|
||||||
|
} else {
|
||||||
|
$value = (array) $value;
|
||||||
|
}
|
||||||
|
$counter = 0;
|
||||||
|
$total = \count($value);
|
||||||
|
foreach ($value as $key => $item) {
|
||||||
|
$counter++;
|
||||||
|
if ($counter % 500 === 0) {
|
||||||
|
yield $instance->offsetSet($key, $item);
|
||||||
|
Logger::log("Loading data to table {$instance->table}: $counter/$total", Logger::WARNING);
|
||||||
|
} else {
|
||||||
|
$instance->offsetSet($key, $item);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Logger::log('Converting database done.', Logger::ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public function offsetExists($index): bool
|
||||||
|
{
|
||||||
|
throw new \RuntimeException('Native isset not support promises. Use isset method');
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if key isset.
|
||||||
|
*
|
||||||
|
* @param $key
|
||||||
|
*
|
||||||
|
* @return Promise<bool> true if the offset exists, otherwise false
|
||||||
|
*/
|
||||||
|
public function isset($key): Promise
|
||||||
|
{
|
||||||
|
return call(fn () => yield $this->offsetGet($key) !== null);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public function offsetGet($offset): Promise
|
||||||
|
{
|
||||||
|
return call(function () use ($offset) {
|
||||||
|
if ($cached = $this->getCache($offset)) {
|
||||||
|
return $cached;
|
||||||
|
}
|
||||||
|
|
||||||
|
$row = yield $this->request(
|
||||||
|
"SELECT value FROM \"{$this->table}\" WHERE key = :index LIMIT 1",
|
||||||
|
['index' => $offset]
|
||||||
|
);
|
||||||
|
|
||||||
|
if ($value = $this->getValue($row)) {
|
||||||
|
$this->setCache($offset, $value);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $value;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unset value for an offset.
|
||||||
|
*
|
||||||
|
* @link https://php.net/manual/en/arrayiterator.offsetunset.php
|
||||||
|
*
|
||||||
|
* @param string $index <p>
|
||||||
|
* The offset to unset.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @return Promise
|
||||||
|
* @throws \Throwable
|
||||||
|
*/
|
||||||
|
public function offsetUnset($index): Promise
|
||||||
|
{
|
||||||
|
$this->unsetCache($index);
|
||||||
|
|
||||||
|
return $this->request(
|
||||||
|
"
|
||||||
|
DELETE FROM \"{$this->table}\"
|
||||||
|
WHERE key = :index
|
||||||
|
",
|
||||||
|
['index' => $index]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get array copy.
|
||||||
|
*
|
||||||
|
* @return Promise<array>
|
||||||
|
* @throws \Throwable
|
||||||
|
*/
|
||||||
|
public function getArrayCopy(): Promise
|
||||||
|
{
|
||||||
|
return call(function () {
|
||||||
|
$iterator = $this->getIterator();
|
||||||
|
$result = [];
|
||||||
|
while (yield $iterator->advance()) {
|
||||||
|
[$key, $value] = $iterator->getCurrent();
|
||||||
|
$result[$key] = $value;
|
||||||
|
}
|
||||||
|
return $result;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public function getIterator(): Producer
|
||||||
|
{
|
||||||
|
return new Producer(function (callable $emit) {
|
||||||
|
$request = yield $this->db->execute("SELECT key, value FROM \"{$this->table}\"");
|
||||||
|
|
||||||
|
while (yield $request->advance()) {
|
||||||
|
$row = $request->getCurrent();
|
||||||
|
yield $emit([$row['key'], $this->getValue($row)]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Count elements.
|
||||||
|
*
|
||||||
|
* @link https://php.net/manual/en/arrayiterator.count.php
|
||||||
|
* @return Promise<int> The number of elements or public properties in the associated
|
||||||
|
* array or object, respectively.
|
||||||
|
* @throws \Throwable
|
||||||
|
*/
|
||||||
|
public function count(): Promise
|
||||||
|
{
|
||||||
|
return call(function () {
|
||||||
|
$row = yield $this->request("SELECT count(key) as count FROM \"{$this->table}\"");
|
||||||
|
return $row[0]['count'] ?? 0;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private function getValue(array $row)
|
||||||
|
{
|
||||||
|
if ($row) {
|
||||||
|
if (!empty($row[0]['value'])) {
|
||||||
|
$row = \reset($row);
|
||||||
|
}
|
||||||
|
return \unserialize($row['value']);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private function renameTable(string $from, string $to)
|
||||||
|
{
|
||||||
|
Logger::log("Renaming table {$from} to {$to}", Logger::WARNING);
|
||||||
|
yield $this->request("
|
||||||
|
ALTER TABLE \"{$from}\" RENAME TO \"{$to}\";
|
||||||
|
");
|
||||||
|
|
||||||
|
yield $this->request("
|
||||||
|
DROP TABLE IF EXISTS \"{$from}\";
|
||||||
|
");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Perform async request to db.
|
||||||
|
*
|
||||||
|
* @param string $query
|
||||||
|
* @param array $params
|
||||||
|
*
|
||||||
|
* @return Promise
|
||||||
|
* @throws \Throwable
|
||||||
|
*/
|
||||||
|
private function request(string $query, array $params = []): Promise
|
||||||
|
{
|
||||||
|
return call(function () use ($query, $params) {
|
||||||
|
Logger::log([$query, $params], Logger::VERBOSE);
|
||||||
|
|
||||||
|
if (empty($this->db) || !$this->db->isAlive()) {
|
||||||
|
Logger::log('No database connection', Logger::WARNING);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
!empty($params['index'])
|
||||||
|
&& !\mb_check_encoding($params['index'], 'UTF-8')
|
||||||
|
) {
|
||||||
|
$params['index'] = \mb_convert_encoding($params['index'], 'UTF-8');
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
$request = yield $this->db->execute($query, $params);
|
||||||
|
} catch (\Throwable $e) {
|
||||||
|
Logger::log($e->getMessage(), Logger::ERROR);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
$result = [];
|
||||||
|
if ($request instanceof ResultSet) {
|
||||||
|
while (yield $request->advance()) {
|
||||||
|
$result[] = $request->getCurrent();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return $result;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
82
src/danog/MadelineProto/Db/SqlArray.php
Normal file
82
src/danog/MadelineProto/Db/SqlArray.php
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
<?php
|
||||||
|
|
||||||
|
namespace danog\MadelineProto\Db;
|
||||||
|
|
||||||
|
use function Amp\call;
|
||||||
|
|
||||||
|
abstract class SqlArray extends DriverArray
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Create table for property.
|
||||||
|
*
|
||||||
|
* @return array|null
|
||||||
|
* @throws \Throwable
|
||||||
|
*/
|
||||||
|
abstract protected function prepareTable();
|
||||||
|
|
||||||
|
abstract protected function renameTable(string $from, string $to);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param string $name
|
||||||
|
* @param DbArray|array|null $value
|
||||||
|
* @param string $tablePrefix
|
||||||
|
* @param array $settings
|
||||||
|
*
|
||||||
|
* @return Promise
|
||||||
|
*/
|
||||||
|
public static function getInstance(string $name, $value = null, string $tablePrefix = '', array $settings = []): Promise
|
||||||
|
{
|
||||||
|
$tableName = "{$tablePrefix}_{$name}";
|
||||||
|
if ($value instanceof static && $value->table === $tableName) {
|
||||||
|
$instance = &$value;
|
||||||
|
} else {
|
||||||
|
$instance = new static();
|
||||||
|
$instance->table = $tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
$instance->settings = $settings;
|
||||||
|
$instance->ttl = $settings['cache_ttl'] ?? $instance->ttl;
|
||||||
|
|
||||||
|
$instance->startCacheCleanupLoop();
|
||||||
|
|
||||||
|
return call(static function () use ($instance, $value, $settings) {
|
||||||
|
yield from $instance->initConnection($settings);
|
||||||
|
yield from $instance->prepareTable();
|
||||||
|
|
||||||
|
// Skip migrations if its same object
|
||||||
|
if ($instance !== $value) {
|
||||||
|
if ($value instanceof DriverArray) {
|
||||||
|
yield from $value->initConnection($value->settings);
|
||||||
|
}
|
||||||
|
yield from static::renameTmpTable($instance, $value);
|
||||||
|
yield from static::migrateDataToDb($instance, $value);
|
||||||
|
}
|
||||||
|
|
||||||
|
return $instance;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rename table of old database, if is a temporary table name.
|
||||||
|
*
|
||||||
|
* Otherwise, change name of table in new database to match old table name.
|
||||||
|
*
|
||||||
|
* @param self $new New db
|
||||||
|
* @param DbArray|array|null $old Old db
|
||||||
|
*
|
||||||
|
* @return \Generator
|
||||||
|
*/
|
||||||
|
protected static function renameTmpTable(self $new, $old): \Generator
|
||||||
|
{
|
||||||
|
if ($old instanceof static && $old->table) {
|
||||||
|
if (
|
||||||
|
$old->table !== $new->table &&
|
||||||
|
\mb_strpos($new->table, 'tmp') !== 0
|
||||||
|
) {
|
||||||
|
yield from $new->renameTable($old->table, $new->table);
|
||||||
|
} else {
|
||||||
|
$new->table = $old->table;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user