MadelineProto/src/danog/MadelineProto/Db/MysqlArray.php

331 lines
8.8 KiB
PHP
Raw Normal View History

2020-04-25 21:57:55 +02:00
<?php
namespace danog\MadelineProto\Db;
2020-04-28 02:41:06 +02:00
use Amp\Loop;
2020-04-25 21:57:55 +02:00
use Amp\Mysql\Pool;
2020-04-28 02:41:06 +02:00
use Amp\Producer;
use Amp\Promise;
2020-04-25 21:57:55 +02:00
use Amp\Sql\ResultSet;
2020-04-28 02:41:06 +02:00
use danog\MadelineProto\Logger;
2020-04-25 21:57:55 +02:00
use function Amp\call;
2020-04-28 02:41:06 +02:00
use function Amp\Promise\wait;
2020-04-25 21:57:55 +02:00
2020-04-27 02:21:18 +02:00
class MysqlArray implements DbArray
2020-04-25 21:57:55 +02:00
{
2020-05-02 19:36:59 +02:00
use ArrayCacheTrait;
2020-04-25 21:57:55 +02:00
private string $table;
private array $settings;
private Pool $db;
public function __serialize(): array
{
return [
'table' => $this->table,
'settings' => $this->settings
];
}
public function __unserialize($data): void
{
foreach ($data as $property => $value) {
$this->{$property} = $value;
}
2020-04-28 02:41:06 +02:00
try {
$this->db = static::getDbConnection($this->settings);
} catch (\Throwable $e) {
Logger::log($e->getMessage(), Logger::ERROR);
}
2020-04-25 21:57:55 +02:00
}
2020-05-03 03:33:54 +02:00
public static function getInstance(string $name, $value = null, string $tablePrefix = '', array $settings = []): DbType
2020-04-25 21:57:55 +02:00
{
$instance = new static();
2020-04-28 02:41:06 +02:00
$instance->table = "{$tablePrefix}_{$name}";
$instance->settings = $settings;
$instance->db = static::getDbConnection($settings);
2020-05-02 19:36:59 +02:00
$instance->ttl = $settings['cache_ttl'] ?? $instance->ttl;
2020-04-28 02:41:06 +02:00
if ($value instanceof static) {
2020-05-03 03:33:54 +02:00
if (
mb_strpos($value->table, 'tmp') === 0 &&
mb_strpos($instance->table, 'tmp') !== 0
) {
2020-04-28 02:41:06 +02:00
$instance->renameTable($value->table, $instance->table);
2020-05-03 03:33:54 +02:00
} elseif (mb_strpos($instance->table, 'tmp') === 0){
$instance->table = $value->table;
2020-04-25 21:57:55 +02:00
}
}
2020-04-28 02:41:06 +02:00
$instance->prepareTable();
2020-05-03 03:33:54 +02:00
Loop::defer(static function() use($value, $instance) {
2020-04-28 02:41:06 +02:00
if (!empty($value) && !$value instanceof static) {
Logger::log('Converting database.', Logger::ERROR);
if ($value instanceof DbArray) {
$value = $value->getArrayCopy();
}
$value = (array) $value;
$counter = 0;
$total = count($value);
foreach ((array) $value as $key => $item) {
$counter++;
if ($counter % 100 === 0) {
yield $instance->offsetSetAsync($key, $item);
Logger::log("Converting database. $counter/$total", Logger::WARNING);
} else {
$instance->offsetSetAsync($key, $item);
}
}
Logger::log('Converting database done.', Logger::ERROR);
}
});
2020-04-25 21:57:55 +02:00
return $instance;
}
/**
* Check if offset exists
*
* @link https://php.net/manual/en/arrayiterator.offsetexists.php
*
* @param string $index <p>
* The offset being checked.
* </p>
*
2020-05-03 03:33:54 +02:00
* @return Promise<bool> true if the offset exists, otherwise false
2020-04-25 21:57:55 +02:00
* @throws \Throwable
*/
2020-05-03 03:33:54 +02:00
public function offsetExists($index): Promise
2020-04-25 21:57:55 +02:00
{
2020-05-03 03:33:54 +02:00
return call(fn() => yield $this->offsetGet($index) !== null);
2020-04-25 21:57:55 +02:00
}
2020-05-03 03:33:54 +02:00
public function offsetGet($offset): Promise
2020-04-28 02:41:06 +02:00
{
return call(function() use($offset) {
2020-05-02 19:36:59 +02:00
if ($cached = $this->getCache($offset)) {
return $cached;
}
2020-04-28 02:41:06 +02:00
$row = yield $this->request(
"SELECT `value` FROM {$this->table} WHERE `key` = :index LIMIT 1",
['index' => $offset]
);
2020-05-02 19:36:59 +02:00
if ($value = $this->getValue($row)) {
$this->setCache($offset, $value);
}
return $value;
2020-04-28 02:41:06 +02:00
});
2020-04-25 21:57:55 +02:00
}
/**
* Set value for an offset
*
* @link https://php.net/manual/en/arrayiterator.offsetset.php
*
* @param string $index <p>
* The index to set for.
* </p>
* @param $value
*
2020-05-03 03:33:54 +02:00
* @return Promise
2020-04-25 21:57:55 +02:00
* @throws \Throwable
*/
2020-05-03 03:33:54 +02:00
public function offsetSet($index, $value): void
2020-04-28 02:41:06 +02:00
{
2020-05-02 19:36:59 +02:00
$this->setCache($index, $value);
2020-05-03 03:33:54 +02:00
$this->request("
2020-04-28 02:41:06 +02:00
INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value
ON DUPLICATE KEY UPDATE `value` = :value
",
[
'index' => $index,
'value' => serialize($value),
]
);
}
2020-04-25 21:57:55 +02:00
/**
* Unset value for an offset
*
* @link https://php.net/manual/en/arrayiterator.offsetunset.php
*
* @param string $index <p>
* The offset to unset.
* </p>
*
2020-05-03 03:33:54 +02:00
* @return Promise
2020-04-25 21:57:55 +02:00
* @throws \Throwable
*/
2020-05-03 03:33:54 +02:00
public function offsetUnset($index): Promise
2020-04-25 21:57:55 +02:00
{
2020-05-02 19:36:59 +02:00
$this->unsetCache($index);
2020-05-03 03:33:54 +02:00
return $this->request("
2020-04-25 21:57:55 +02:00
DELETE FROM `{$this->table}`
WHERE `key` = :index
",
['index' => $index]
);
}
/**
* Get array copy
*
* @link https://php.net/manual/en/arrayiterator.getarraycopy.php
* @return array A copy of the array, or array of public properties
* if ArrayIterator refers to an object.
* @throws \Throwable
*/
public function getArrayCopy(): array
{
$rows = $this->syncRequest("SELECT `key`, `value` FROM {$this->table}");
$result = [];
foreach ($rows as $row) {
2020-05-03 03:33:54 +02:00
$result[$row['key']] = $this->getValue($row);
2020-04-25 21:57:55 +02:00
}
2020-04-27 02:21:18 +02:00
2020-04-25 21:57:55 +02:00
return $result;
}
2020-04-28 02:41:06 +02:00
public function getIterator(): Producer
{
return new Producer(function (callable $emit) {
$request = yield $this->db->execute("SELECT `key`, `value` FROM {$this->table}");
while (yield $request->advance()) {
$row = $request->getCurrent();
2020-05-03 03:33:54 +02:00
yield $emit([$row['key'], $this->getValue($row)]);
2020-04-28 02:41:06 +02:00
}
});
}
2020-04-25 21:57:55 +02:00
/**
* Count elements
*
* @link https://php.net/manual/en/arrayiterator.count.php
2020-05-03 03:33:54 +02:00
* @return Promise<int> The number of elements or public properties in the associated
2020-04-25 21:57:55 +02:00
* array or object, respectively.
* @throws \Throwable
*/
2020-05-03 03:33:54 +02:00
public function count(): Promise
2020-04-25 21:57:55 +02:00
{
2020-05-03 03:33:54 +02:00
return call(function(){
$row = yield $this->request("SELECT count(`key`) as `count` FROM {$this->table}");
return $row[0]['count'] ?? 0;
});
2020-04-27 02:21:18 +02:00
}
private function getValue(array $row)
{
if ($row) {
2020-04-28 02:41:06 +02:00
if (!empty($row[0]['value'])) {
$row = reset($row);
}
2020-04-27 02:21:18 +02:00
return unserialize($row['value']);
}
return null;
2020-04-25 21:57:55 +02:00
}
2020-04-28 02:41:06 +02:00
public static function getDbConnection(array $settings): Pool
2020-04-25 21:57:55 +02:00
{
2020-04-28 02:41:06 +02:00
return Mysql::getConnection(
$settings['host'],
$settings['port'],
$settings['user'],
$settings['password'],
$settings['database'],
2020-04-25 21:57:55 +02:00
);
}
/**
* Create table for property
*
* @return array|null
* @throws \Throwable
*/
private function prepareTable()
{
return $this->syncRequest("
CREATE TABLE IF NOT EXISTS `{$this->table}`
(
`key` VARCHAR(255) NOT NULL,
2020-04-27 02:21:18 +02:00
`value` MEDIUMBLOB NULL,
2020-04-25 21:57:55 +02:00
`ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`key`)
)
");
}
2020-04-28 02:41:06 +02:00
private function renameTable(string $from, string $to)
{
try {
$this->syncRequest("
ALTER TABLE {$from} RENAME TO {$to};
");
} catch (\Throwable $e) {
Logger::log("Cant rename table {$from} to {$to}", Logger::WARNING);
2020-05-03 03:33:54 +02:00
try {
$this->syncRequest("
DROP TABLE {$from};
");
} catch (\Throwable $e) {
Logger::log("Cant drop table {$from}", Logger::WARNING);
}
2020-04-28 02:41:06 +02:00
}
}
2020-04-25 21:57:55 +02:00
/**
* Perform blocking request to db
*
* @param string $query
* @param array $params
*
* @return array|null
* @throws \Throwable
*/
private function syncRequest(string $query, array $params = []): array
{
2020-05-02 19:36:59 +02:00
return wait($this->request($query, $params));
2020-04-28 02:41:06 +02:00
}
/**
* Perform blocking request to db
*
* @param string $query
* @param array $params
*
* @return Promise
* @throws \Throwable
*/
private function request(string $query, array $params = []): Promise
{
return call(function() use($query, $params) {
if (empty($this->db)) {
return [];
}
$request = yield $this->db->execute($query, $params);
$result = [];
if ($request instanceof ResultSet) {
while (yield $request->advance()) {
$result[] = $request->getCurrent();
2020-04-25 21:57:55 +02:00
}
2020-04-28 02:41:06 +02:00
}
return $result;
});
2020-04-25 21:57:55 +02:00
}
}