Async reference/min database

This commit is contained in:
Daniil Gentili 2020-09-12 14:24:57 +02:00
parent 07248a4803
commit 1c45c6c65b
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
5 changed files with 51 additions and 88 deletions

View File

@ -4,7 +4,7 @@ namespace danog\MadelineProto\Db;
use Amp\Promise; use Amp\Promise;
class DbPropertiesFabric class DbPropertiesFactory
{ {
/** /**
* @param array $dbSettings * @param array $dbSettings

View File

@ -19,7 +19,7 @@ trait DbPropertiesTrait
if ($reset) { if ($reset) {
unset($this->{$property}); unset($this->{$property});
} else { } else {
$this->{$property} = yield DbPropertiesFabric::get($dbSettings, $prefix, $type, $property, $this->{$property}); $this->{$property} = yield DbPropertiesFactory::get($dbSettings, $prefix, $type, $property, $this->{$property});
} }
} }
} }

View File

@ -25,7 +25,7 @@ use Amp\Http\Client\HttpClient;
use Amp\Promise; use Amp\Promise;
use danog\MadelineProto\Async\AsyncConstruct; use danog\MadelineProto\Async\AsyncConstruct;
use danog\MadelineProto\Db\DbArray; use danog\MadelineProto\Db\DbArray;
use danog\MadelineProto\Db\DbPropertiesFabric; use danog\MadelineProto\Db\DbPropertiesFactory;
use danog\MadelineProto\Db\DbPropertiesTrait; use danog\MadelineProto\Db\DbPropertiesTrait;
use danog\MadelineProto\Db\Mysql; use danog\MadelineProto\Db\Mysql;
use danog\MadelineProto\Ipc\Server; use danog\MadelineProto\Ipc\Server;
@ -414,7 +414,7 @@ class MTProto extends AsyncConstruct implements TLCallback
/** /**
* List of properties stored in database (memory or external). * List of properties stored in database (memory or external).
* @see DbPropertiesFabric * @see DbPropertiesFactory
* @var array * @var array
*/ */
protected array $dbProperies = [ protected array $dbProperies = [
@ -569,12 +569,14 @@ class MTProto extends AsyncConstruct implements TLCallback
*/ */
public function cleanup(): void public function cleanup(): void
{ {
/*
// :)
$this->referenceDatabase = new ReferenceDatabase($this); $this->referenceDatabase = new ReferenceDatabase($this);
$callbacks = [$this, $this->referenceDatabase]; $callbacks = [$this, $this->referenceDatabase];
if (!($this->authorization['user']['bot'] ?? false)) { if (!($this->authorization['user']['bot'] ?? false)) {
$callbacks[] = $this->minDatabase; $callbacks[] = $this->minDatabase;
} }
$this->TL->updateCallbacks($callbacks); $this->TL->updateCallbacks($callbacks);*/
} }
private function fillUsernamesCache(): \Generator private function fillUsernamesCache(): \Generator
@ -824,9 +826,15 @@ class MTProto extends AsyncConstruct implements TLCallback
} }
if (!isset($this->referenceDatabase)) { if (!isset($this->referenceDatabase)) {
$this->referenceDatabase = new ReferenceDatabase($this); $this->referenceDatabase = new ReferenceDatabase($this);
yield from $this->referenceDatabase->init();
} else {
yield from $this->referenceDatabase->init();
} }
if (!isset($this->minDatabase)) { if (!isset($this->minDatabase)) {
$this->minDatabase = new MinDatabase($this); $this->minDatabase = new MinDatabase($this);
yield from $this->minDatabase->init();
} else {
yield from $this->minDatabase->init();
} }
if (!isset($this->TL)) { if (!isset($this->TL)) {
$this->TL = new TL($this); $this->TL = new TL($this);
@ -1606,9 +1614,13 @@ class MTProto extends AsyncConstruct implements TLCallback
yield from $this->initDb($this, true); yield from $this->initDb($this, true);
$this->tos = ['expires' => 0, 'accepted' => true]; $this->tos = ['expires' => 0, 'accepted' => true];
$this->referenceDatabase = new ReferenceDatabase($this);
$this->minDatabase = new MinDatabase($this);
$this->dialog_params = ['_' => 'MadelineProto.dialogParams', 'limit' => 0, 'offset_date' => 0, 'offset_id' => 0, 'offset_peer' => ['_' => 'inputPeerEmpty'], 'count' => 0]; $this->dialog_params = ['_' => 'MadelineProto.dialogParams', 'limit' => 0, 'offset_date' => 0, 'offset_id' => 0, 'offset_peer' => ['_' => 'inputPeerEmpty'], 'count' => 0];
$this->referenceDatabase = new ReferenceDatabase($this);
yield from $this->referenceDatabase->init();
$this->minDatabase = new MinDatabase($this);
yield from $this->minDatabase->init();
} }
/** /**
* Reset the update state and fetch all updates from the beginning. * Reset the update state and fetch all updates from the beginning.

View File

@ -19,13 +19,11 @@
namespace danog\MadelineProto\MTProtoTools; namespace danog\MadelineProto\MTProtoTools;
use Amp\Loop;
use Amp\Promise; use Amp\Promise;
use danog\MadelineProto\Db\DbArray; use danog\MadelineProto\Db\DbArray;
use danog\MadelineProto\Db\DbPropertiesTrait; use danog\MadelineProto\Db\DbPropertiesTrait;
use danog\MadelineProto\MTProto; use danog\MadelineProto\MTProto;
use danog\MadelineProto\TL\TLCallback; use danog\MadelineProto\TL\TLCallback;
use danog\MadelineProto\Tools;
/** /**
* Manages min peers. * Manages min peers.
@ -58,7 +56,7 @@ class MinDatabase implements TLCallback
/** /**
* List of properties stored in database (memory or external). * List of properties stored in database (memory or external).
* @see DbPropertiesFabric * @see DbPropertiesFactory
* @var array * @var array
*/ */
protected array $dbProperies = [ protected array $dbProperies = [
@ -68,20 +66,14 @@ class MinDatabase implements TLCallback
public function __construct(MTProto $API) public function __construct(MTProto $API)
{ {
$this->API = $API; $this->API = $API;
$this->init();
}
public function __wakeup()
{
$this->init();
} }
public function __sleep() public function __sleep()
{ {
return ['db', 'API']; return ['db', 'API'];
} }
public function init() public function init(): \Generator
{ {
Tools::wait($this->initDb($this->API)); yield from $this->initDb($this->API);
Loop::defer(function() {
$iterator = $this->db->getIterator(); $iterator = $this->db->getIterator();
while (yield $iterator->advance()) { while (yield $iterator->advance()) {
[$id, $origin] = $iterator->getCurrent(); [$id, $origin] = $iterator->getCurrent();
@ -89,7 +81,6 @@ class MinDatabase implements TLCallback
$this->db->offsetUnset($id); $this->db->offsetUnset($id);
} }
} }
});
} }
public function getMethodCallbacks(): array public function getMethodCallbacks(): array
{ {
@ -115,48 +106,15 @@ class MinDatabase implements TLCallback
{ {
return []; return [];
} }
public function reset() public function reset(): void
{ {
if ($this->cache) { if ($this->cache) {
$this->API->logger->logger('Found '.\count($this->cache).' pending contexts', \danog\MadelineProto\Logger::ERROR); $this->API->logger->logger('Found '.\count($this->cache).' pending contexts', \danog\MadelineProto\Logger::ERROR);
$this->cache = []; $this->cache = [];
} }
} }
public function addPeer(array $location) public function addPeer(array $location): bool
{ {
if (!$this->cache) {
return;
$this->API->logger->logger('Trying to add peer out of context, report the following message to @danogentili!', \danog\MadelineProto\Logger::ERROR);
$frames = [];
$previous = '';
foreach (\debug_backtrace(0) as $k => $frame) {
if (isset($frame['function']) && $frame['function'] === 'deserialize') {
if (isset($frame['args'][1]['subtype'])) {
if ($frame['args'][1]['subtype'] === $previous) {
continue;
}
$frames[] = $frame['args'][1]['subtype'];
$previous = $frame['args'][1]['subtype'];
} elseif (isset($frame['args'][1]['type'])) {
if ($frame['args'][1]['type'] === '') {
break;
}
if ($frame['args'][1]['type'] === $previous) {
continue;
}
$frames[] = $frame['args'][1]['type'];
$previous = $frame['args'][1]['type'];
}
}
}
$frames = \array_reverse($frames);
$tlTrace = \array_shift($frames);
foreach ($frames as $frame) {
$tlTrace .= "['".$frame."']";
}
$this->API->logger->logger($tlTrace, \danog\MadelineProto\Logger::ERROR);
return false;
}
$peers = []; $peers = [];
switch ($location['_']) { switch ($location['_']) {
case 'messageFwdHeader': case 'messageFwdHeader':
@ -189,12 +147,12 @@ class MinDatabase implements TLCallback
} }
return true; return true;
} }
public function addOriginContext(string $type) public function addOriginContext(string $type): void
{ {
$this->API->logger->logger("Adding peer origin context for {$type}!", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $this->API->logger->logger("Adding peer origin context for {$type}!", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->cache[] = []; $this->cache[] = [];
} }
public function addOrigin(array $data = []) public function addOrigin(array $data = []): void
{ {
$cache = \array_pop($this->cache); $cache = \array_pop($this->cache);
if ($cache === null) { if ($cache === null) {

View File

@ -19,14 +19,12 @@
namespace danog\MadelineProto\MTProtoTools; namespace danog\MadelineProto\MTProtoTools;
use Amp\Loop;
use Amp\Promise; use Amp\Promise;
use danog\MadelineProto\Db\DbArray; use danog\MadelineProto\Db\DbArray;
use danog\MadelineProto\Db\DbPropertiesTrait; use danog\MadelineProto\Db\DbPropertiesTrait;
use danog\MadelineProto\Exception; use danog\MadelineProto\Exception;
use danog\MadelineProto\MTProto; use danog\MadelineProto\MTProto;
use danog\MadelineProto\TL\TLCallback; use danog\MadelineProto\TL\TLCallback;
use danog\MadelineProto\Tools;
/** /**
* Manages upload and download of files. * Manages upload and download of files.
@ -83,7 +81,7 @@ class ReferenceDatabase implements TLCallback
/** /**
* List of properties stored in database (memory or external). * List of properties stored in database (memory or external).
* @see DbPropertiesFabric * @see DbPropertiesFactory
* @var array * @var array
*/ */
protected array $dbProperies = [ protected array $dbProperies = [
@ -93,19 +91,14 @@ class ReferenceDatabase implements TLCallback
public function __construct(MTProto $API) public function __construct(MTProto $API)
{ {
$this->API = $API; $this->API = $API;
$this->init();
}
public function __wakeup()
{
$this->init();
} }
public function __sleep() public function __sleep()
{ {
return ['db', 'API']; return ['db', 'API'];
} }
public function init() public function init(): \Generator
{ {
Tools::wait($this->initDb($this->API)); return $this->initDb($this->API);
} }
public function getMethodCallbacks(): array public function getMethodCallbacks(): array
{ {
@ -131,7 +124,7 @@ class ReferenceDatabase implements TLCallback
{ {
return []; return [];
} }
public function reset() public function reset(): void
{ {
if ($this->cacheContexts) { if ($this->cacheContexts) {
$this->API->logger->logger('Found '.\count($this->cacheContexts).' pending contexts', \danog\MadelineProto\Logger::ERROR); $this->API->logger->logger('Found '.\count($this->cacheContexts).' pending contexts', \danog\MadelineProto\Logger::ERROR);
@ -142,7 +135,7 @@ class ReferenceDatabase implements TLCallback
$this->cache = []; $this->cache = [];
} }
} }
public function addReference(array $location) public function addReference(array $location): bool
{ {
if (!$this->cacheContexts) { if (!$this->cacheContexts) {
$this->API->logger->logger('Trying to add reference out of context, report the following message to @danogentili!', \danog\MadelineProto\Logger::ERROR); $this->API->logger->logger('Trying to add reference out of context, report the following message to @danogentili!', \danog\MadelineProto\Logger::ERROR);
@ -201,7 +194,7 @@ class ReferenceDatabase implements TLCallback
$this->cache[$key][$this->serializeLocation($locationType, $location)] = (string) $location['file_reference']; $this->cache[$key][$this->serializeLocation($locationType, $location)] = (string) $location['file_reference'];
return true; return true;
} }
public function addOriginContext(string $type) public function addOriginContext(string $type): void
{ {
if (!isset(self::CONSTRUCTOR_CONTEXT[$type])) { if (!isset(self::CONSTRUCTOR_CONTEXT[$type])) {
throw new \danog\MadelineProto\Exception("Unknown origin type provided: {$type}"); throw new \danog\MadelineProto\Exception("Unknown origin type provided: {$type}");
@ -210,7 +203,7 @@ class ReferenceDatabase implements TLCallback
$this->API->logger->logger("Adding origin context {$originContext} for {$type}!", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $this->API->logger->logger("Adding origin context {$originContext} for {$type}!", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->cacheContexts[] = $originContext; $this->cacheContexts[] = $originContext;
} }
public function addOrigin(array $data = []) public function addOrigin(array $data = []): \Generator
{ {
$key = \count($this->cacheContexts) - 1; $key = \count($this->cacheContexts) - 1;
if ($key === -1) { if ($key === -1) {
@ -286,11 +279,11 @@ class ReferenceDatabase implements TLCallback
throw new \danog\MadelineProto\Exception("Unknown origin type provided: {$data['_']}"); throw new \danog\MadelineProto\Exception("Unknown origin type provided: {$data['_']}");
} }
foreach ($cache as $location => $reference) { foreach ($cache as $location => $reference) {
$this->storeReference($location, $reference, $originType, $origin); yield from $this->storeReference($location, $reference, $originType, $origin);
} }
$this->API->logger->logger("Added origin {$originType} ({$data['_']}) to ".\count($cache).' references', \danog\MadelineProto\Logger::ULTRA_VERBOSE); $this->API->logger->logger("Added origin {$originType} ({$data['_']}) to ".\count($cache).' references', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
} }
public function addOriginMethodContext(string $type) public function addOriginMethodContext(string $type): void
{ {
if (!isset(self::METHOD_CONTEXT[$type])) { if (!isset(self::METHOD_CONTEXT[$type])) {
throw new \danog\MadelineProto\Exception("Unknown origin type provided: {$type}"); throw new \danog\MadelineProto\Exception("Unknown origin type provided: {$type}");
@ -299,7 +292,7 @@ class ReferenceDatabase implements TLCallback
$this->API->logger->logger("Adding origin context {$originContext} for {$type}!", \danog\MadelineProto\Logger::ULTRA_VERBOSE); $this->API->logger->logger("Adding origin context {$originContext} for {$type}!", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->cacheContexts[] = $originContext; $this->cacheContexts[] = $originContext;
} }
public function addOriginMethod(array $data, array $res) public function addOriginMethod(array $data, array $res): \Generator
{ {
$key = \count($this->cacheContexts) - 1; $key = \count($this->cacheContexts) - 1;
if ($key === -1) { if ($key === -1) {
@ -339,7 +332,7 @@ class ReferenceDatabase implements TLCallback
if (isset($cache[$location])) { if (isset($cache[$location])) {
$reference = $cache[$location]; $reference = $cache[$location];
unset($cache[$location]); unset($cache[$location]);
$this->storeReference($location, $reference, $originType, $origin); yield from $this->storeReference($location, $reference, $originType, $origin);
$count++; $count++;
} }
if (isset($photo['sizes'])) { if (isset($photo['sizes'])) {
@ -350,7 +343,7 @@ class ReferenceDatabase implements TLCallback
if (isset($cache[$location])) { if (isset($cache[$location])) {
$reference = $cache[$location]; $reference = $cache[$location];
unset($cache[$location]); unset($cache[$location]);
$this->storeReference($location, $reference, $originType, $origin); yield from $this->storeReference($location, $reference, $originType, $origin);
$count++; $count++;
} }
} }
@ -366,13 +359,13 @@ class ReferenceDatabase implements TLCallback
throw new \danog\MadelineProto\Exception("Unknown origin type provided: {$data['_']}"); throw new \danog\MadelineProto\Exception("Unknown origin type provided: {$data['_']}");
} }
foreach ($cache as $location => $reference) { foreach ($cache as $location => $reference) {
$this->storeReference($location, $reference, $originType, $origin); yield from $this->storeReference($location, $reference, $originType, $origin);
} }
$this->API->logger->logger("Added origin {$originType} ({$data['_']}) to ".\count($cache).' references', \danog\MadelineProto\Logger::ULTRA_VERBOSE); $this->API->logger->logger("Added origin {$originType} ({$data['_']}) to ".\count($cache).' references', \danog\MadelineProto\Logger::ULTRA_VERBOSE);
} }
public function storeReference(string $location, string $reference, int $originType, array $origin) public function storeReference(string $location, string $reference, int $originType, array $origin): \Generator
{ {
$locationValue = Tools::wait($this->db[$location]); $locationValue = yield $this->db[$location];
if (!$locationValue) { if (!$locationValue) {
$locationValue = ['origins' => []]; $locationValue = ['origins' => []];
} }
@ -388,7 +381,7 @@ class ReferenceDatabase implements TLCallback
$this->cache[$key][$location] = $reference; $this->cache[$key][$location] = $reference;
} }
} }
public function refreshNext(bool $refresh = false) public function refreshNext(bool $refresh = false): void
{ {
if ($this->refreshCount === 1 && !$refresh) { if ($this->refreshCount === 1 && !$refresh) {
$this->refreshed = []; $this->refreshed = [];
@ -405,7 +398,7 @@ class ReferenceDatabase implements TLCallback
$this->refreshCount--; $this->refreshCount--;
} }
} }
public function refreshReference(int $locationType, array $location) public function refreshReference(int $locationType, array $location): \Generator
{ {
return $this->refreshReferenceInternal($this->serializeLocation($locationType, $location)); return $this->refreshReferenceInternal($this->serializeLocation($locationType, $location));
} }
@ -479,7 +472,7 @@ class ReferenceDatabase implements TLCallback
$object['file_reference'] = yield $this->getReference(self::LOCATION_CONTEXT[$object['_']], $object); $object['file_reference'] = yield $this->getReference(self::LOCATION_CONTEXT[$object['_']], $object);
return $object; return $object;
} }
public function getReference(int $locationType, array $location) public function getReference(int $locationType, array $location): \Generator
{ {
$locationString = $this->serializeLocation($locationType, $location); $locationString = $this->serializeLocation($locationType, $location);
if (!isset((yield $this->db[$locationString])['reference'])) { if (!isset((yield $this->db[$locationString])['reference'])) {
@ -499,7 +492,7 @@ class ReferenceDatabase implements TLCallback
} }
return (yield $this->db[$locationString])['reference']; return (yield $this->db[$locationString])['reference'];
} }
private function serializeLocation(int $locationType, array $location) private function serializeLocation(int $locationType, array $location): string
{ {
switch ($locationType) { switch ($locationType) {
case self::DOCUMENT_LOCATION: case self::DOCUMENT_LOCATION: