Add more async requests

This commit is contained in:
Alexander Pankratov 2020-05-12 20:35:15 +03:00
parent 123e3fadfd
commit fa1d73009d

View File

@ -7,7 +7,6 @@ use Amp\Mysql\Pool;
use Amp\Producer; use Amp\Producer;
use Amp\Promise; use Amp\Promise;
use Amp\Sql\ResultSet; use Amp\Sql\ResultSet;
use danog\MadelineProto\Exception;
use danog\MadelineProto\Logger; use danog\MadelineProto\Logger;
use function Amp\call; use function Amp\call;
use function Amp\Promise\wait; use function Amp\Promise\wait;
@ -50,19 +49,20 @@ class MysqlArray implements DbArray
$instance->db = static::getDbConnection($settings); $instance->db = static::getDbConnection($settings);
$instance->ttl = $settings['cache_ttl'] ?? $instance->ttl; $instance->ttl = $settings['cache_ttl'] ?? $instance->ttl;
if ($value instanceof static && $value->table) {
if (
mb_strpos($value->table, 'tmp') === 0 &&
mb_strpos($instance->table, 'tmp') !== 0
) {
$instance->renameTable($value->table, $instance->table);
} elseif (mb_strpos($instance->table, 'tmp') === 0){
$instance->table = $value->table;
}
}
$instance->prepareTable();
Loop::defer(static function() use($value, $instance) { Loop::defer(static function() use($value, $instance) {
if ($value instanceof static && $value->table) {
if (
mb_strpos($value->table, 'tmp') === 0 &&
mb_strpos($instance->table, 'tmp') !== 0
) {
yield from $instance->renameTable($value->table, $instance->table);
} elseif (mb_strpos($instance->table, 'tmp') === 0){
$instance->table = $value->table;
}
}
yield from $instance->prepareTable();
if (!empty($value) && !$value instanceof static) { if (!empty($value) && !$value instanceof static) {
Logger::log('Converting database.', Logger::ERROR); Logger::log('Converting database.', Logger::ERROR);
if ($value instanceof DbArray) { if ($value instanceof DbArray) {
@ -74,7 +74,7 @@ class MysqlArray implements DbArray
foreach ((array) $value as $key => $item) { foreach ((array) $value as $key => $item) {
$counter++; $counter++;
if ($counter % 100 === 0) { if ($counter % 100 === 0) {
yield from $instance->offsetSet($key, $item); yield $instance->offsetSet($key, $item);
Logger::log("Converting database. $counter/$total", Logger::WARNING); Logger::log("Converting database. $counter/$total", Logger::WARNING);
} else { } else {
$instance->offsetSet($key, $item); $instance->offsetSet($key, $item);
@ -136,18 +136,17 @@ class MysqlArray implements DbArray
* </p> * </p>
* @param $value * @param $value
* *
* @return void
* @throws \Throwable * @throws \Throwable
*/ */
public function offsetSet($index, $value) public function offsetSet($index, $value): Promise
{ {
if ($this->getCache($index) === $value) { if ($this->getCache($index) === $value) {
return; return call(fn()=>null);
} }
$this->setCache($index, $value); $this->setCache($index, $value);
yield $this->request(" return $this->request("
INSERT INTO `{$this->table}` INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value SET `key` = :index, `value` = :value
ON DUPLICATE KEY UPDATE `value` = :value ON DUPLICATE KEY UPDATE `value` = :value
@ -263,7 +262,7 @@ class MysqlArray implements DbArray
*/ */
private function prepareTable() private function prepareTable()
{ {
return $this->syncRequest(" return yield $this->request("
CREATE TABLE IF NOT EXISTS `{$this->table}` CREATE TABLE IF NOT EXISTS `{$this->table}`
( (
`key` VARCHAR(255) NOT NULL, `key` VARCHAR(255) NOT NULL,
@ -279,10 +278,11 @@ class MysqlArray implements DbArray
private function renameTable(string $from, string $to) private function renameTable(string $from, string $to)
{ {
$this->syncRequest(" yield $this->request("
ALTER TABLE {$from} RENAME TO {$to}; ALTER TABLE {$from} RENAME TO {$to};
"); ");
$this->syncRequest("
yield $this->request("
DROP TABLE IF EXISTS {$from}; DROP TABLE IF EXISTS {$from};
"); ");
} }
@ -313,7 +313,11 @@ class MysqlArray implements DbArray
private function request(string $query, array $params = []): Promise private function request(string $query, array $params = []): Promise
{ {
return call(function() use($query, $params) { return call(function() use($query, $params) {
if (empty($this->db)) {
Logger::log([$query, $params], Logger::VERBOSE);
if (empty($this->db) || !$this->db->isAlive()) {
Logger::log('No database connection', Logger::WARNING);
return []; return [];
} }