Cache for Mysql Data Provider

This commit is contained in:
Alexander Pankratov 2020-05-02 20:36:59 +03:00
parent 26abf9f04e
commit a883684f05
4 changed files with 133 additions and 37 deletions

View File

@ -0,0 +1,76 @@
<?php
namespace danog\MadelineProto\Db;
trait ArrayCacheTrait
{
/**
* Values stored in this format:
* [
* [
* 'value' => mixed,
* 'ttl' => int
* ],
* ...
* ]
* @var array
*/
protected array $cache = [];
protected string $ttl = '+1 day';
private string $ttlCheckInterval = '+1 second';
private int $nextTtlCheckTs = 0;
protected function getCache(string $key, $default = null)
{
if ($cacheItem = $this->cache[$key] ?? null) {
$this->cache[$key]['ttl'] = strtotime($this->ttl);
} else {
return $default;
}
return $cacheItem['value'];
}
/**
* Save item in cache
*
* @param string $key
* @param $value
*/
protected function setCache(string $key, $value): void
{
$now = time();
$this->cache[$key] = [
'value' => $value,
'ttl' => $now,
];
if ($this->nextTtlCheckTs < $now) {
$this->nextTtlCheckTs = strtotime($this->ttlCheckInterval, $now);
foreach ($this->cache as $cacheKey => $cacheValue) {
if ($cacheValue['ttl'] < $now) {
$this->unsetCache($cacheKey);
}
}
}
}
/**
* Remove key from cache
*
* @param string $key
*/
protected function unsetCache(string $key): void
{
unset($this->cache[$key]);
}
/**
* Remove all keys from cache
*/
protected function clearCache(): void
{
$this->cache = [];
}
}

View File

@ -5,26 +5,25 @@ namespace danog\MadelineProto\Db;
use Amp\Mysql\ConnectionConfig; use Amp\Mysql\ConnectionConfig;
use Amp\Mysql\Pool; use Amp\Mysql\Pool;
use function Amp\Mysql\Pool; use function Amp\Mysql\Pool;
use function Amp\Promise\wait;
class Mysql class Mysql
{ {
/** @var Pool[] */ /** @var Pool[] */
private static array $connections; private static array $connections;
private static function connect( /**
string $host = '127.0.0.1', * @param string $host
int $port = 3306, * @param int $port
string $user = 'root', * @param string $user
string $password = '', * @param string $password
string $db = 'MadelineProto' * @param string $db
) { *
$config = ConnectionConfig::fromString( * @return Pool
"host={$host} port={$port} user={$user} password={$password} db={$db}" * @throws \Amp\Sql\ConnectionException
); * @throws \Amp\Sql\FailureException
* @throws \Throwable
return Pool($config); */
}
public static function getConnection( public static function getConnection(
string $host = '127.0.0.1', string $host = '127.0.0.1',
int $port = 3306, int $port = 3306,
@ -35,10 +34,31 @@ class Mysql
{ {
$dbKey = "$host:$port:$db"; $dbKey = "$host:$port:$db";
if (empty(static::$connections[$dbKey])) { if (empty(static::$connections[$dbKey])) {
static::$connections[$dbKey] = static::connect($host, $port, $user, $password, $db); $config = ConnectionConfig::fromString(
"host={$host} port={$port} user={$user} password={$password} db={$db}"
);
static::createDb($config);
static::$connections[$dbKey] = pool($config);
} }
return static::$connections[$dbKey]; return static::$connections[$dbKey];
} }
/**
* @param ConnectionConfig $config
*
* @throws \Amp\Sql\ConnectionException
* @throws \Amp\Sql\FailureException
* @throws \Throwable
*/
private static function createDb(ConnectionConfig $config) {
$db = $config->getDatabase();
wait(pool($config->withDatabase(null))->query("
CREATE DATABASE IF NOT EXISTS `{$db}`
CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci
"));
}
} }

View File

@ -8,12 +8,13 @@ use Amp\Producer;
use Amp\Promise; use Amp\Promise;
use Amp\Sql\ResultSet; use Amp\Sql\ResultSet;
use danog\MadelineProto\Logger; use danog\MadelineProto\Logger;
use danog\MadelineProto\Tools;
use function Amp\call; use function Amp\call;
use function Amp\Promise\wait; use function Amp\Promise\wait;
class MysqlArray implements DbArray class MysqlArray implements DbArray
{ {
use ArrayCacheTrait;
private string $table; private string $table;
private array $settings; private array $settings;
private Pool $db; private Pool $db;
@ -48,6 +49,7 @@ class MysqlArray implements DbArray
$instance->table = "{$tablePrefix}_{$name}"; $instance->table = "{$tablePrefix}_{$name}";
$instance->settings = $settings; $instance->settings = $settings;
$instance->db = static::getDbConnection($settings); $instance->db = static::getDbConnection($settings);
$instance->ttl = $settings['cache_ttl'] ?? $instance->ttl;
if ($value instanceof static) { if ($value instanceof static) {
if ($instance->table !== $value->table) { if ($instance->table !== $value->table) {
@ -97,12 +99,7 @@ class MysqlArray implements DbArray
*/ */
public function offsetExists($index) public function offsetExists($index)
{ {
$row = $this->syncRequest( return $this->offsetGet($index) !== null;
"SELECT count(`key`) as `count` FROM {$this->table} WHERE `key` = :index LIMIT 1",
['index' => $index]
);
return !empty($row[0]['count']);
} }
/** /**
@ -126,11 +123,20 @@ class MysqlArray implements DbArray
public function offsetGetAsync(string $offset): Promise public function offsetGetAsync(string $offset): Promise
{ {
return call(function() use($offset) { return call(function() use($offset) {
if ($cached = $this->getCache($offset)) {
return $cached;
}
$row = yield $this->request( $row = yield $this->request(
"SELECT `value` FROM {$this->table} WHERE `key` = :index LIMIT 1", "SELECT `value` FROM {$this->table} WHERE `key` = :index LIMIT 1",
['index' => $offset] ['index' => $offset]
); );
return $this->getValue($row);
if ($value = $this->getValue($row)) {
$this->setCache($offset, $value);
}
return $value;
}); });
} }
@ -149,20 +155,13 @@ class MysqlArray implements DbArray
*/ */
public function offsetSet($index, $value) public function offsetSet($index, $value)
{ {
$this->syncRequest(" wait($this->offsetSetAsync($index, $value));
INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value
ON DUPLICATE KEY UPDATE `value` = :value
",
[
'index' => $index,
'value' => serialize($value),
]
);
} }
public function offsetSetAsync($index, $value): Promise public function offsetSetAsync($index, $value): Promise
{ {
$this->setCache($index, $value);
return $this->request(" return $this->request("
INSERT INTO `{$this->table}` INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value SET `key` = :index, `value` = :value
@ -189,6 +188,8 @@ class MysqlArray implements DbArray
*/ */
public function offsetUnset($index) public function offsetUnset($index)
{ {
$this->unsetCache($index);
$this->syncRequest(" $this->syncRequest("
DELETE FROM `{$this->table}` DELETE FROM `{$this->table}`
WHERE `key` = :index WHERE `key` = :index
@ -398,7 +399,7 @@ class MysqlArray implements DbArray
*/ */
private function syncRequest(string $query, array $params = []): array private function syncRequest(string $query, array $params = []): array
{ {
return Tools::wait($this->request($query, $params)); return wait($this->request($query, $params));
} }
/** /**
@ -426,7 +427,5 @@ class MysqlArray implements DbArray
} }
return $result; return $result;
}); });
} }
} }

View File

@ -1314,7 +1314,8 @@ class MTProto extends AsyncConstruct implements TLCallback
'port' => 3306, 'port' => 3306,
'user' => 'root', 'user' => 'root',
'password' => '', 'password' => '',
'database' => 'MadelineProto' 'database' => 'MadelineProto',
'cache_ttl' => '+1 day',
] ]
], ],
'upload' => ['allow_automatic_upload' => true, 'part_size' => 512 * 1024, 'parallel_chunks' => 20], 'download' => ['report_broken_media' => true, 'part_size' => 1024 * 1024, 'parallel_chunks' => 20], 'pwr' => [ 'upload' => ['allow_automatic_upload' => true, 'part_size' => 512 * 1024, 'parallel_chunks' => 20], 'download' => ['report_broken_media' => true, 'part_size' => 1024 * 1024, 'parallel_chunks' => 20], 'pwr' => [