Add redis backend

This commit is contained in:
Daniil Gentili 2020-09-12 19:06:42 +02:00
parent 5a5c02e56b
commit 9df1e27780
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
11 changed files with 147 additions and 252 deletions

View File

@ -92,7 +92,8 @@ $settings = [
], ],
'serialization' => [ 'serialization' => [
'serialization_interval' => 30, 'serialization_interval' => 30,
] ],
'db' => ['type' => 'redis']
]; ];
$MadelineProto = new API('bot.madeline', $settings); $MadelineProto = new API('bot.madeline', $settings);

View File

@ -55,7 +55,7 @@ trait ArrayCacheTrait
protected function startCacheCleanupLoop(): void protected function startCacheCleanupLoop(): void
{ {
$this->cacheCleanupId = Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, [$this, 'cleanupCache']); $this->cacheCleanupId = Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, fn () => $this->cleanupCache());
} }
protected function stopCacheCleanupLoop(): void protected function stopCacheCleanupLoop(): void
{ {

View File

@ -14,8 +14,6 @@ interface DbArray extends DbType, \ArrayAccess, \Countable
public function offsetUnset($offset): Promise; public function offsetUnset($offset): Promise;
public function count(): Promise; public function count(): Promise;
public function getIterator(): Producer; public function getIterator(): Producer;
public function init(): Promise;
/** /**
* @deprecated * @deprecated

View File

@ -26,13 +26,16 @@ class DbPropertiesFactory
switch (\strtolower($dbSettings['type'])) { switch (\strtolower($dbSettings['type'])) {
case 'memory': case 'memory':
$class .= '\Memory'; $class .= '\\Memory';
break; break;
case 'mysql': case 'mysql':
$class .= '\Mysql'; $class .= '\\Mysql';
break; break;
case 'postgres': case 'postgres':
$class .= '\Postgres'; $class .= '\\Postgres';
break;
case 'redis':
$class .= '\\Redis';
break; break;
default: default:
throw new \InvalidArgumentException("Unknown dbType: {$dbSettings['type']}"); throw new \InvalidArgumentException("Unknown dbType: {$dbSettings['type']}");

View File

@ -26,23 +26,22 @@ class Redis
* @throws \Amp\Sql\FailureException * @throws \Amp\Sql\FailureException
* @throws \Throwable * @throws \Throwable
* *
* @return RedisRedis * @return \Generator<RedisRedis>
*/ */
public static function getConnection( public static function getConnection(
string $host = '127.0.0.1', string $host = '127.0.0.1',
int $port = 6379, int $port = 6379,
string $password = '', string $password = '',
int $db = 0, int $db = 0
int $maxConnections = ConnectionPool::DEFAULT_MAX_CONNECTIONS, ): \Generator {
int $idleTimeout = ConnectionPool::DEFAULT_IDLE_TIMEOUT
): RedisRedis {
$dbKey = "$host:$port:$db"; $dbKey = "$host:$port:$db";
if (empty(static::$connections[$dbKey])) { if (empty(static::$connections[$dbKey])) {
$config = Config::fromUri( $config = Config::fromUri(
"host={$host} port={$port} password={$password} db={$db}" "{$host}:{$port}?password={$password}&db={$db}"
); );
static::$connections[$dbKey] = new RedisRedis((new RemoteExecutorFactory($config))->createQueryExecutor()); static::$connections[$dbKey] = new RedisRedis((new RemoteExecutorFactory($config))->createQueryExecutor());
yield static::$connections[$dbKey]->ping();
} }
return static::$connections[$dbKey]; return static::$connections[$dbKey];

View File

@ -13,6 +13,12 @@ abstract class DriverArray implements DbArray
$this->stopCacheCleanupLoop(); $this->stopCacheCleanupLoop();
} }
public function offsetExists($index): bool
{
throw new \RuntimeException('Native isset not support promises. Use isset method');
}
abstract protected function initConnection(array $settings): \Generator; abstract protected function initConnection(array $settings): \Generator;
/** /**

View File

@ -21,12 +21,7 @@ class MysqlArray extends SqlArray
{ {
return ['table', 'settings']; return ['table', 'settings'];
} }
public function offsetExists($index): bool
{
throw new \RuntimeException('Native isset not support promises. Use isset method');
}
/** /**
* Check if key isset. * Check if key isset.
* *
@ -203,7 +198,7 @@ class MysqlArray extends SqlArray
* @return array|null * @return array|null
* @throws \Throwable * @throws \Throwable
*/ */
private function prepareTable() protected function prepareTable(): \Generator
{ {
Logger::log("Creating/checking table {$this->table}", Logger::WARNING); Logger::log("Creating/checking table {$this->table}", Logger::WARNING);
return yield $this->request(" return yield $this->request("
@ -220,7 +215,7 @@ class MysqlArray extends SqlArray
"); ");
} }
private function renameTable(string $from, string $to) protected function renameTable(string $from, string $to): \Generator
{ {
Logger::log("Renaming table {$from} to {$to}", Logger::WARNING); Logger::log("Renaming table {$from} to {$to}", Logger::WARNING);
yield $this->request(" yield $this->request("

View File

@ -6,16 +6,32 @@ use Amp\Postgres\Pool;
use Amp\Producer; use Amp\Producer;
use Amp\Promise; use Amp\Promise;
use Amp\Sql\ResultSet; use Amp\Sql\ResultSet;
use Amp\Success;
use danog\MadelineProto\Db\Driver\Postgres; use danog\MadelineProto\Db\Driver\Postgres;
use danog\MadelineProto\Logger; use danog\MadelineProto\Logger;
use function Amp\call; use function Amp\call;
class PostgresArray extends SqlArray class PostgresArray extends SqlArray
{ {
private string $table; protected string $table;
private array $settings; protected array $settings;
private Pool $db; private Pool $db;
protected function initConnection(array $settings): \Generator
{
if (!isset($this->db)) {
$this->db = yield from Postgres::getConnection(
$settings['host'],
$settings['port'],
$settings['user'],
$settings['password'],
$settings['database'],
$settings['max_connections'],
$settings['idle_timeout']
);
}
}
/** /**
* Set value for an offset. * Set value for an offset.
* *
@ -32,7 +48,7 @@ class PostgresArray extends SqlArray
public function offsetSet($index, $value): Promise public function offsetSet($index, $value): Promise
{ {
if ($this->getCache($index) === $value) { if ($this->getCache($index) === $value) {
return call(fn () =>null); return new Success();
} }
$this->setCache($index, $value); $this->setCache($index, $value);
@ -56,26 +72,13 @@ class PostgresArray extends SqlArray
return $request; return $request;
} }
protected static function getDbConnection(array $settings): \Generator
{
return Postgres::getConnection(
$settings['host'],
$settings['port'],
$settings['user'],
$settings['password'],
$settings['database'],
$settings['max_connections'],
$settings['idle_timeout']
);
}
/** /**
* Create table for property. * Create table for property.
* *
* @return array|null * @return array|null
* @throws \Throwable * @throws \Throwable
*/ */
protected function prepareTable() protected function prepareTable(): \Generator
{ {
Logger::log("Creating/checking table {$this->table}", Logger::WARNING); Logger::log("Creating/checking table {$this->table}", Logger::WARNING);
@ -107,12 +110,6 @@ class PostgresArray extends SqlArray
return ['table', 'settings']; return ['table', 'settings'];
} }
public function offsetExists($index): bool
{
throw new \RuntimeException('Native isset not support promises. Use isset method');
}
/** /**
* Check if key isset. * Check if key isset.
* *
@ -230,7 +227,7 @@ class PostgresArray extends SqlArray
} }
protected function renameTable(string $from, string $to) protected function renameTable(string $from, string $to): \Generator
{ {
Logger::log("Renaming table {$from} to {$to}", Logger::WARNING); Logger::log("Renaming table {$from} to {$to}", Logger::WARNING);
yield $this->request(" yield $this->request("

View File

@ -2,25 +2,86 @@
namespace danog\MadelineProto\Db; namespace danog\MadelineProto\Db;
use Amp\Postgres\Pool;
use Amp\Producer; use Amp\Producer;
use Amp\Promise; use Amp\Promise;
use Amp\Redis\Redis; use Amp\Redis\Redis as RedisRedis;
use Amp\Sql\ResultSet; use Amp\Success;
use danog\MadelineProto\Db\Driver\Redis as DriverRedis; use danog\MadelineProto\Db\Driver\Redis as Redis;
use danog\MadelineProto\Logger; use danog\MadelineProto\Logger;
use Generator; use Generator;
use function Amp\call; use function Amp\call;
class RedisArray extends DriverArray class RedisArray extends SqlArray
{ {
use ArrayCacheTrait; protected string $table;
protected array $settings;
private RedisRedis $db;
private string $table; protected function prepareTable(): Generator
private array $settings; {
private Pool $db; yield new Success;
}
protected function renameTable(string $from, string $to): \Generator
{
Logger::log("Renaming table {$from} to {$to}", Logger::WARNING);
$request = $this->db->scan($this->itKey());
$lenK = \strlen($from);
while (yield $request->advance()) {
$key = $request->getCurrent();
yield $this->db->rename($key, $to.\substr($key, $lenK));
}
}
protected function initConnection(array $settings): \Generator
{
if (!isset($this->db)) {
$this->db = yield from Redis::getConnection(
$settings['host'],
$settings['port'],
$settings['password'],
$settings['database']
);
}
}
public function __sleep()
{
return ['table', 'settings'];
}
/**
* Get redis key name.
*
* @param string $key
* @return string
*/
private function rKey(string $key): string
{
return 'va:'.$this->table.':'.$key;
}
/**
* Get redis ts name.
*
* @param string $key
* @return string
*/
private function tsKey(string $key): string
{
return 'ts:'.$this->table.$key;
}
/**
* Get iterator key.
*
* @return string
*/
private function itKey(): string
{
return 'va:'.$this->table.'*';
}
/** /**
* Set value for an offset. * Set value for an offset.
* *
@ -37,23 +98,19 @@ class RedisArray extends DriverArray
public function offsetSet($index, $value): Promise public function offsetSet($index, $value): Promise
{ {
if ($this->getCache($index) === $value) { if ($this->getCache($index) === $value) {
return call(fn () =>null); return new Success();
} }
$this->setCache($index, $value); $this->setCache($index, $value);
$request = $this->request( /*
" $request = $this->db->setMultiple(
INSERT INTO \"{$this->table}\"
(key,value)
VALUES (:index, :value)
ON CONFLICT (key) DO UPDATE SET value = :value
",
[ [
'index' => $index, $this->rKey($index) => \serialize($value),
'value' => \serialize($value), $this->tsKey($index) => \time()
] ]
); );*/
$request = $this->db->set($this->rKey($index), \serialize($value));
//Ensure that cache is synced with latest insert in case of concurrent requests. //Ensure that cache is synced with latest insert in case of concurrent requests.
$request->onResolve(fn () => $this->setCache($index, $value)); $request->onResolve(fn () => $this->setCache($index, $value));
@ -61,107 +118,6 @@ class RedisArray extends DriverArray
return $request; return $request;
} }
protected function initConnection(array $settings): \Generator
{
}
public function __sleep()
{
return ['table', 'settings'];
}
/**
* @param string $name
* @param DbArray|array|null $value
* @param string $tablePrefix
* @param array $settings
*
* @return Promise
*/
public static function getInstance(string $name, $value = null, string $tablePrefix = '', array $settings = []): Promise
{
$tableName = "{$tablePrefix}_{$name}";
if ($value instanceof self && $value->table === $tableName) {
$instance = &$value;
} else {
$instance = new static();
$instance->table = $tableName;
}
$instance->settings = $settings;
$instance->db = static::getDbConnection($settings);
$instance->ttl = $settings['cache_ttl'] ?? $instance->ttl;
$instance->startCacheCleanupLoop();
return call(static function () use ($instance, $value, $settings) {
//Skip migrations if its same object
if ($instance !== $value) {
yield from static::renameTmpTable($instance, $value);
yield from static::migrateDataToDb($instance, $value);
}
return $instance;
});
}
/**
* @param PostgresArray $instance
* @param DbArray|array|null $value
*
* @return \Generator
*/
private static function renameTmpTable(self $instance, $value): \Generator
{
if ($value instanceof static && $value->table) {
if (
$value->table !== $instance->table &&
\mb_strpos($instance->table, 'tmp') !== 0
) {
yield from $instance->renameTable($value->table, $instance->table);
} else {
$instance->table = $value->table;
}
}
}
/**
* @param PostgresArray $instance
* @param DbArray|array|null $value
*
* @return \Generator
* @throws \Throwable
*/
private static function migrateDataToDb(self $instance, $value): \Generator
{
if (!empty($value) && !$value instanceof PostgresArray) {
Logger::log('Converting database.', Logger::ERROR);
if ($value instanceof DbArray) {
$value = yield $value->getArrayCopy();
} else {
$value = (array) $value;
}
$counter = 0;
$total = \count($value);
foreach ($value as $key => $item) {
$counter++;
if ($counter % 500 === 0) {
yield $instance->offsetSet($key, $item);
Logger::log("Loading data to table {$instance->table}: $counter/$total", Logger::WARNING);
} else {
$instance->offsetSet($key, $item);
}
}
Logger::log('Converting database done.', Logger::ERROR);
}
}
public function offsetExists($index): bool
{
throw new \RuntimeException('Native isset not support promises. Use isset method');
}
/** /**
* Check if key isset. * Check if key isset.
* *
@ -171,7 +127,7 @@ class RedisArray extends DriverArray
*/ */
public function isset($key): Promise public function isset($key): Promise
{ {
return call(fn () => yield $this->offsetGet($key) !== null); return $this->db->has($this->rKey($key));
} }
@ -182,12 +138,9 @@ class RedisArray extends DriverArray
return $cached; return $cached;
} }
$row = yield $this->request( $value = yield $this->db->get($this->rKey($offset));
"SELECT value FROM \"{$this->table}\" WHERE key = :index LIMIT 1",
['index' => $offset]
);
if ($value = $this->getValue($row)) { if ($value = \unserialize($value)) {
$this->setCache($offset, $value); $this->setCache($offset, $value);
} }
@ -211,13 +164,7 @@ class RedisArray extends DriverArray
{ {
$this->unsetCache($index); $this->unsetCache($index);
return $this->request( return $this->db->delete($this->rkey($index));
"
DELETE FROM \"{$this->table}\"
WHERE key = :index
",
['index' => $index]
);
} }
/** /**
@ -242,11 +189,10 @@ class RedisArray extends DriverArray
public function getIterator(): Producer public function getIterator(): Producer
{ {
return new Producer(function (callable $emit) { return new Producer(function (callable $emit) {
$request = yield $this->db->execute("SELECT key, value FROM \"{$this->table}\""); $request = $this->db->scan($this->itKey());
while (yield $request->advance()) { while (yield $request->advance()) {
$row = $request->getCurrent(); yield $emit([$key = $request->getCurrent(), \unserialize(yield $this->db->get($key))]);
yield $emit([$row['key'], $this->getValue($row)]);
} }
}); });
} }
@ -262,75 +208,14 @@ class RedisArray extends DriverArray
public function count(): Promise public function count(): Promise
{ {
return call(function () { return call(function () {
$row = yield $this->request("SELECT count(key) as count FROM \"{$this->table}\""); $request = $this->db->scan($this->itKey());
return $row[0]['count'] ?? 0; $count = 0;
});
}
private function getValue(array $row) while (yield $request->advance()) {
{ $count++;
if ($row) {
if (!empty($row[0]['value'])) {
$row = \reset($row);
}
return \unserialize($row['value']);
}
return null;
}
private function renameTable(string $from, string $to)
{
Logger::log("Renaming table {$from} to {$to}", Logger::WARNING);
yield $this->request("
ALTER TABLE \"{$from}\" RENAME TO \"{$to}\";
");
yield $this->request("
DROP TABLE IF EXISTS \"{$from}\";
");
}
/**
* Perform async request to db.
*
* @param string $query
* @param array $params
*
* @return Promise
* @throws \Throwable
*/
private function request(string $query, array $params = []): Promise
{
return call(function () use ($query, $params) {
Logger::log([$query, $params], Logger::VERBOSE);
if (empty($this->db) || !$this->db->isAlive()) {
Logger::log('No database connection', Logger::WARNING);
return [];
} }
if ( return $count;
!empty($params['index'])
&& !\mb_check_encoding($params['index'], 'UTF-8')
) {
$params['index'] = \mb_convert_encoding($params['index'], 'UTF-8');
}
try {
$request = yield $this->db->execute($query, $params);
} catch (\Throwable $e) {
Logger::log($e->getMessage(), Logger::ERROR);
return [];
}
$result = [];
if ($request instanceof ResultSet) {
while (yield $request->advance()) {
$result[] = $request->getCurrent();
}
}
return $result;
}); });
} }
} }

View File

@ -2,6 +2,8 @@
namespace danog\MadelineProto\Db; namespace danog\MadelineProto\Db;
use Amp\Promise;
use function Amp\call; use function Amp\call;
abstract class SqlArray extends DriverArray abstract class SqlArray extends DriverArray
@ -12,9 +14,9 @@ abstract class SqlArray extends DriverArray
* @return array|null * @return array|null
* @throws \Throwable * @throws \Throwable
*/ */
abstract protected function prepareTable(); abstract protected function prepareTable(): \Generator;
abstract protected function renameTable(string $from, string $to); abstract protected function renameTable(string $from, string $to): \Generator;
/** /**
* @param string $name * @param string $name
@ -57,7 +59,7 @@ abstract class SqlArray extends DriverArray
} }
/** /**
* Rename table of old database, if is a temporary table name. * Rename table of old database, if the new one is not a temporary table name.
* *
* Otherwise, change name of table in new database to match old table name. * Otherwise, change name of table in new database to match old table name.
* *

View File

@ -27,6 +27,7 @@ use danog\MadelineProto\Async\AsyncConstruct;
use danog\MadelineProto\Db\DbArray; use danog\MadelineProto\Db\DbArray;
use danog\MadelineProto\Db\DbPropertiesFactory; use danog\MadelineProto\Db\DbPropertiesFactory;
use danog\MadelineProto\Db\DbPropertiesTrait; use danog\MadelineProto\Db\DbPropertiesTrait;
use danog\MadelineProto\Db\Driver\Redis;
use danog\MadelineProto\Db\Mysql; use danog\MadelineProto\Db\Mysql;
use danog\MadelineProto\Ipc\Server; use danog\MadelineProto\Ipc\Server;
use danog\MadelineProto\Loop\Generic\PeriodicLoopInternal; use danog\MadelineProto\Loop\Generic\PeriodicLoopInternal;
@ -92,7 +93,7 @@ class MTProto extends AsyncConstruct implements TLCallback
* *
* @var int * @var int
*/ */
const V = 145; const V = 147;
/** /**
* String release version. * String release version.
* *
@ -1382,6 +1383,14 @@ class MTProto extends AsyncConstruct implements TLCallback
'idle_timeout' => 60, 'idle_timeout' => 60,
'cache_ttl' => '+5 minutes', //keep records in memory after last read 'cache_ttl' => '+5 minutes', //keep records in memory after last read
], ],
/** @see Redis */
'redis' => [
'host' => 'redis://127.0.0.1',
'port' => 6379,
'password' => '',
'database' => 0, //will be created automatically
'cache_ttl' => '+5 minutes', //keep records in memory after last read
],
], ],
'upload' => ['allow_automatic_upload' => true, 'part_size' => 512 * 1024, 'parallel_chunks' => 20], 'download' => ['report_broken_media' => true, 'part_size' => 1024 * 1024, 'parallel_chunks' => 20], 'pwr' => [ 'upload' => ['allow_automatic_upload' => true, 'part_size' => 512 * 1024, 'parallel_chunks' => 20], 'download' => ['report_broken_media' => true, 'part_size' => 1024 * 1024, 'parallel_chunks' => 20], 'pwr' => [
'pwr' => false, 'pwr' => false,