Bugfixes and optimizations

This commit is contained in:
Alexander Pankratov 2020-04-28 03:41:06 +03:00
parent 0d191f4157
commit 26abf9f04e
9 changed files with 216 additions and 114 deletions

View File

@ -2,7 +2,13 @@
namespace danog\MadelineProto\Db;
use Amp\Producer;
use Amp\Promise;
interface DbArray extends DbType, \ArrayAccess, \Countable, \Iterator, \SeekableIterator
{
public function getArrayCopy();
public function offsetGetAsync(string $offset): Promise;
public function offsetSetAsync(string $offset, $value): Promise;
public function getIterator(): Producer;
}

View File

@ -2,10 +2,13 @@
namespace danog\MadelineProto\Db;
use danog\MadelineProto\API;
use danog\MadelineProto\MTProto;
class DbPropertiesFabric
{
/**
* @param array $dbSettings
* @param MTProto $madelineProto
* @param string $propertyType
* @param string $name
* @param $value
@ -16,16 +19,14 @@ class DbPropertiesFabric
* @uses \danog\MadelineProto\Db\SharedMemoryArray
* @uses \danog\MadelineProto\Db\MysqlArray
*/
public static function get(array $dbSettings, string $propertyType, string $name, $value = null): DbType
public static function get(MTProto $madelineProto, string $propertyType, string $name, $value = null): DbType
{
$class = __NAMESPACE__;
$dbSettings = $madelineProto->settings['db'];
switch (strtolower($dbSettings['type'])) {
case 'memory':
$class .= '\Memory';
break;
case 'sharedmemory':
$class .= '\SharedMemory';
break;
case 'mysql':
$class .= '\Mysql';
break;
@ -34,6 +35,7 @@ class DbPropertiesFabric
}
/** @var DbType $class */
switch (strtolower($propertyType)){
case 'array':
$class .= 'Array';
@ -42,8 +44,8 @@ class DbPropertiesFabric
throw new \InvalidArgumentException("Unknown $propertyType: {$propertyType}");
}
/** @var DbType $class */
return $class::getInstance($dbSettings, $name, $value);
$prefix = (string) ($madelineProto->getSelf()['id'] ?? 'tmp');
return $class::getInstance($name, $value, $prefix, $dbSettings[$dbSettings['type']]??[]);
}
}

View File

@ -4,5 +4,5 @@ namespace danog\MadelineProto\Db;
interface DbType
{
static function getInstance(array $settings, string $name, $value): self;
static function getInstance(string $name, $value, string $tablePrefix, array $settings): self;
}

View File

@ -2,6 +2,10 @@
namespace danog\MadelineProto\Db;
use Amp\Producer;
use Amp\Promise;
use function Amp\call;
class MemoryArray extends \ArrayIterator implements DbArray
{
protected function __construct($array = [], $flags = 0)
@ -9,11 +13,35 @@ class MemoryArray extends \ArrayIterator implements DbArray
parent::__construct((array) $array, $flags | self::STD_PROP_LIST);
}
static function getInstance(array $settings, string $name, $value = []): DbArray
public static function getInstance(string $name, $value, string $tablePrefix, array $settings): DbArray
{
if ($value instanceof DbArray) {
$value = $value->getArrayCopy();
}
return new static($value);
}
public static function getDbConnection(array $settings)
{
return null;
}
public function offsetGetAsync(string $offset): Promise
{
return call(fn() => $this->offsetGet($offset));
}
public function offsetSetAsync(string $offset, $value): Promise
{
return call(fn() => $this->offsetSet($offset, $value));
}
public function getIterator(): Producer
{
return new Producer(function (callable $emit) {
foreach ($this as $value) {
yield $emit($value);
}
});
}
}

View File

@ -41,11 +41,4 @@ class Mysql
return static::$connections[$dbKey];
}
public function __destruct()
{
foreach (static::$connections as $connection) {
$connection->close();
}
}
}

View File

@ -2,10 +2,15 @@
namespace danog\MadelineProto\Db;
use Amp\Loop;
use Amp\Mysql\Pool;
use Amp\Producer;
use Amp\Promise;
use Amp\Sql\ResultSet;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Tools;
use function Amp\call;
use function Amp\Promise\wait;
class MysqlArray implements DbArray
{
@ -28,25 +33,51 @@ class MysqlArray implements DbArray
foreach ($data as $property => $value) {
$this->{$property} = $value;
}
$this->initDbConnection();
try {
$this->db = static::getDbConnection($this->settings);
} catch (\Throwable $e) {
Logger::log($e->getMessage(), Logger::ERROR);
}
}
public static function getInstance(array $settings, string $name, $value = []): DbType
public static function getInstance(string $name, $value, string $tablePrefix, array $settings): DbType
{
$instance = new static();
$instance->table = $name;
$instance->settings = $settings['mysql'];
$instance->initDbConnection();
$instance->prepareTable();
if (!empty($value) && !$value instanceof static) {
if ($value instanceof DbArray) {
$value = $value->getArrayCopy();
}
foreach ((array) $value as $key => $item) {
$instance[$key] = $item;
$instance->table = "{$tablePrefix}_{$name}";
$instance->settings = $settings;
$instance->db = static::getDbConnection($settings);
if ($value instanceof static) {
if ($instance->table !== $value->table) {
$instance->renameTable($value->table, $instance->table);
}
}
$instance->prepareTable();
Loop::defer(function() use($value, $instance){
if (!empty($value) && !$value instanceof static) {
Logger::log('Converting database.', Logger::ERROR);
if ($value instanceof DbArray) {
$value = $value->getArrayCopy();
}
$value = (array) $value;
$counter = 0;
$total = count($value);
foreach ((array) $value as $key => $item) {
$counter++;
if ($counter % 100 === 0) {
yield $instance->offsetSetAsync($key, $item);
Logger::log("Converting database. $counter/$total", Logger::WARNING);
} else {
$instance->offsetSetAsync($key, $item);
}
}
Logger::log('Converting database done.', Logger::ERROR);
}
});
return $instance;
@ -88,12 +119,19 @@ class MysqlArray implements DbArray
*/
public function offsetGet($index)
{
$row = $this->syncRequest(
"SELECT `value` FROM {$this->table} WHERE `key` = :index LIMIT 1",
['index' => $index]
);
return $this->getValue($row);
return wait($this->offsetGetAsync($index));
}
public function offsetGetAsync(string $offset): Promise
{
return call(function() use($offset) {
$row = yield $this->request(
"SELECT `value` FROM {$this->table} WHERE `key` = :index LIMIT 1",
['index' => $offset]
);
return $this->getValue($row);
});
}
/**
@ -123,6 +161,20 @@ class MysqlArray implements DbArray
);
}
public function offsetSetAsync($index, $value): Promise
{
return $this->request("
INSERT INTO `{$this->table}`
SET `key` = :index, `value` = :value
ON DUPLICATE KEY UPDATE `value` = :value
",
[
'index' => $index,
'value' => serialize($value),
]
);
}
/**
* Unset value for an offset
*
@ -164,6 +216,19 @@ class MysqlArray implements DbArray
return $result;
}
public function getIterator(): Producer
{
return new Producer(function (callable $emit) {
$request = yield $this->db->execute("SELECT `key`, `value` FROM {$this->table}");
while (yield $request->advance()) {
$row = $request->getCurrent();
yield $emit($this->getValue($row));
}
});
}
/**
* Count elements
*
@ -207,7 +272,9 @@ class MysqlArray implements DbArray
private function getValue(array $row)
{
if ($row) {
$row = reset($row);
if (!empty($row[0]['value'])) {
$row = reset($row);
}
return unserialize($row['value']);
}
return null;
@ -272,21 +339,20 @@ class MysqlArray implements DbArray
public function seek($position)
{
$row = $this->syncRequest(
"SELECT `key` FROM {$this->table} ORDER BY `key` LIMIT 1, :position",
"SELECT `key` FROM {$this->table} ORDER BY `key` LIMIT 1 OFFSET :position",
['offset' => $position]
);
$this->key = $row[0]['key'] ?? $this->key;
}
private function initDbConnection()
public static function getDbConnection(array $settings): Pool
{
//TODO Use MtProto::$settings
$this->db = Mysql::getConnection(
$this->settings['host'],
$this->settings['port'],
$this->settings['user'],
$this->settings['password'],
$this->settings['database'],
return Mysql::getConnection(
$settings['host'],
$settings['port'],
$settings['user'],
$settings['password'],
$settings['database'],
);
}
@ -309,6 +375,18 @@ class MysqlArray implements DbArray
");
}
private function renameTable(string $from, string $to)
{
try {
$this->syncRequest("
ALTER TABLE {$from} RENAME TO {$to};
");
} catch (\Throwable $e) {
Logger::log("Cant rename table {$from} to {$to}", Logger::WARNING);
}
}
/**
* Perform blocking request to db
*
@ -320,19 +398,35 @@ class MysqlArray implements DbArray
*/
private function syncRequest(string $query, array $params = []): array
{
return Tools::wait(
call(
function() use($query, $params) {
$request = yield $this->db->execute($query, $params);
$result = [];
if ($request instanceof ResultSet) {
while (yield $request->advance()) {
$result[] = $request->getCurrent();
}
}
return $result;
return Tools::wait($this->request($query, $params));
}
/**
* Perform blocking request to db
*
* @param string $query
* @param array $params
*
* @return Promise
* @throws \Throwable
*/
private function request(string $query, array $params = []): Promise
{
return call(function() use($query, $params) {
if (empty($this->db)) {
return [];
}
$request = yield $this->db->execute($query, $params);
$result = [];
if ($request instanceof ResultSet) {
while (yield $request->advance()) {
$result[] = $request->getCurrent();
}
)
);
}
return $result;
});
}
}

View File

@ -1,30 +0,0 @@
<?php
namespace danog\MadelineProto\Db;
class SharedMemoryArray extends \ArrayIterator implements DbArray
{
private static SharedMemoryArray $instance;
protected function __construct($array = [], $flags = 0)
{
parent::__construct((array) $array, $flags | self::STD_PROP_LIST);
}
public static function getInstance(array $settings, string $name, $value = []): DbArray
{
if (empty(static::$instance)) {
static::$instance = new static($value);
} else {
if ($value instanceof DbArray) {
$value = $value->getArrayCopy();
}
$value = array_replace_recursive(static::$instance->getArrayCopy(), (array) $value);
foreach ($value as $key => $item) {
static::$instance[$key] = $item;
}
}
return static::$instance;
}
}

View File

@ -24,10 +24,8 @@ use Amp\File\StatCache;
use Amp\Http\Client\HttpClient;
use danog\MadelineProto\Async\AsyncConstruct;
use danog\MadelineProto\Db\DbArray;
use danog\MadelineProto\Db\Engines\DbInterface;
use danog\MadelineProto\Db\DbPropertiesFabric;
use danog\MadelineProto\Db\Mysql;
use danog\MadelineProto\Db\Types\ArrayType;
use danog\MadelineProto\Loop\Generic\PeriodicLoop;
use danog\MadelineProto\Loop\Update\FeedLoop;
use danog\MadelineProto\Loop\Update\SeqLoop;
@ -286,6 +284,13 @@ class MTProto extends AsyncConstruct implements TLCallback
* @var DbArray
*/
public $chats;
/**
* Cache of usernames for chats
*
* @var DbArray
*/
public $usernames;
/**
* Cached parameters for fetching channel participants.
*
@ -422,11 +427,7 @@ class MTProto extends AsyncConstruct implements TLCallback
'chats' => 'array',
'full_chats' => 'array',
'channel_participants' => 'array',
'caching_simple' => 'array',
'caching_simple_username' => 'array',
'caching_possible_username' => 'array',
'caching_full_info' => 'array',
'caching_username_id' => 'array',
'usernames' => 'array',
];
/**
@ -507,6 +508,7 @@ class MTProto extends AsyncConstruct implements TLCallback
'referenceDatabase',
'minDatabase',
'channel_participants',
'usernames',
// Misc caching
'dialog_params',
@ -569,18 +571,22 @@ class MTProto extends AsyncConstruct implements TLCallback
if ($reset) {
unset($this->{$property});
} else {
$this->{$property} = DbPropertiesFabric::get($this->settings['db'], $type, $property, $this->{$property});
$this->{$property} = DbPropertiesFabric::get($this, $type, $property, $this->{$property});
}
}
if (!$reset && count($this->caching_username_id) === 0) {
$this->logger('Filling database cache. This can take few minutes.', Logger::WARNING);
foreach ($this->chats as $id => $chat) {
if (isset($chat['username'])) {
$this->caching_username_id[$chat['username']] = $id;
if (!$reset && count($this->usernames) === 0) {
\Amp\Loop::run(function() {
$this->logger('Filling database cache. This can take few minutes.', Logger::WARNING);
$iterator = $this->chats->getIterator();
while (yield $iterator->advance()) {
$chat = $iterator->getCurrent();
if (isset($chat['username'])) {
$this->usernames->offsetSetAsync(\strtolower($chat['username']), $this->getId($chat));
}
}
}
$this->logger('Cache filled.', Logger::WARNING);
$this->logger('Cache filled.', Logger::WARNING);
});
}
}
@ -1298,8 +1304,7 @@ class MTProto extends AsyncConstruct implements TLCallback
/**
* Where internal database will be stored?
* memory - session file
* sharedMemory - multiples instances share db if run in single process
* mysql - mysql database, shared by all instances in all processes.
* mysql - mysql database
*/
'db' => [
'type' => 'memory',

View File

@ -34,7 +34,6 @@ trait PeerHandler
public $caching_simple_username = [];
public $caching_possible_username = [];
public $caching_full_info = [];
public $caching_username_id = [];
/**
* Convert MTProto channel ID to bot API channel ID.
@ -122,6 +121,7 @@ trait PeerHandler
}
}
$this->chats[$user['id']] = $user;
$this->cacheChatUsername($user['id'], $user);
$this->cachePwrChat($user['id'], false, true);
}
break;
@ -149,6 +149,7 @@ trait PeerHandler
if (!isset($this->chats[-$chat['id']]) || $this->chats[-$chat['id']] !== $chat) {
$this->logger->logger("Updated chat -{$chat['id']}", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->chats[-$chat['id']] = $chat;
$this->cacheChatUsername(-$chat['id'], $chat);
$this->cachePwrChat(-$chat['id'], $this->settings['peer']['full_fetch'], true);
}
break;
@ -185,6 +186,7 @@ trait PeerHandler
$chat = $newchat;
}
$this->chats[$bot_api_id] = $chat;
$this->cacheChatUsername($bot_api_id, $chat);
if ($this->settings['peer']['full_fetch'] && (!isset($this->full_chats[$bot_api_id]) || $this->full_chats[$bot_api_id]['full']['participants_count'] !== (yield from $this->getFullInfo($bot_api_id))['full']['participants_count'])) {
$this->cachePwrChat($bot_api_id, $this->settings['peer']['full_fetch'], true);
}
@ -192,6 +194,14 @@ trait PeerHandler
break;
}
}
private function cacheChatUsername(int $id, array $chat)
{
if (!empty($chat['username'])) {
$this->usernames[strtolower($chat['username'])] = $id;
}
}
private function cachePwrChat($id, $full_fetch, $send)
{
\danog\MadelineProto\Tools::callFork((function () use ($id, $full_fetch, $send): \Generator {
@ -564,19 +574,12 @@ trait PeerHandler
}
return yield from $this->getInfo($this->supportUser);
}
if ($bot_api_id = $this->caching_username_id[$id] ?? null) {
if ($bot_api_id = $this->usernames[$id] ?? null) {
$chat = $this->chats[$bot_api_id];
if (empty($chat['username']) || $chat['username'] !== $id) {
unset($this->caching_username_id[$id]);
} else {
return $this->genAll($this->chats[$bot_api_id], $folder_id);
if (empty($chat['username']) || \strtolower($chat['username']) !== $id) {
unset($this->usernames[$id]);
}
}
foreach ($this->chats as $bot_api_id => $chat) {
if (isset($chat['username'])) {
$this->caching_username_id[$id] = $bot_api_id;
}
if (isset($chat['username']) && \strtolower($chat['username']) === $id) {
if ($chat['min'] ?? false && !isset($this->caching_full_info[$bot_api_id])) {
$this->caching_full_info[$bot_api_id] = true;
@ -598,6 +601,7 @@ trait PeerHandler
return $this->genAll($this->chats[$bot_api_id], $folder_id);
}
}
if ($recursive) {
yield from $this->resolveUsername($id);
return yield from $this->getInfo($id, false);