From 444d55355ab52982a98ef19dbb51f750da83c5df Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Tue, 28 Jul 2020 20:39:32 +0200 Subject: [PATCH] Switch to external loop --- composer.json | 3 +- .../MadelineProto/DataCenterConnection.php | 8 +- .../MadelineProto/Ipc/Runner/WebRunner.php | 6 +- src/danog/MadelineProto/Ipc/Server.php | 2 +- ...ResumableLoopInterface.php => APILoop.php} | 33 ++--- .../Loop/Connection/CheckLoop.php | 38 ++---- .../MadelineProto/Loop/Connection/Common.php | 61 +++++++++ .../Loop/Connection/HttpWaitLoop.php | 33 ++--- .../Loop/Connection/PingLoop.php | 36 ++---- .../Loop/Connection/ReadLoop.php | 33 ++--- .../Loop/Connection/WriteLoop.php | 36 ++---- .../Loop/Generic/GenericLoop.php | 59 +++------ .../Loop/Generic/PeriodicLoop.php | 67 +++------- .../PeriodicLoopInternal.php} | 40 +++--- src/danog/MadelineProto/Loop/Impl/Loop.php | 80 ------------ .../Loop/Impl/ResumableSignalLoop.php | 90 ------------- .../MadelineProto/Loop/Impl/SignalLoop.php | 70 ----------- .../{LoopInterface.php => InternalLoop.php} | 38 +++--- src/danog/MadelineProto/Loop/LoggerLoop.php | 119 ++++++++++++++++++ .../MadelineProto/Loop/Update/FeedLoop.php | 66 +++++++--- .../MadelineProto/Loop/Update/SeqLoop.php | 34 +++-- .../MadelineProto/Loop/Update/UpdateLoop.php | 55 ++++++-- src/danog/MadelineProto/MTProto.php | 60 ++++----- .../MTProtoSession/ResponseHandler.php | 5 +- .../MTProtoTools/AuthKeyHandler.php | 4 +- .../MTProtoTools/UpdateHandler.php | 18 +-- .../MTProtoTools/UpdatesState.php | 40 +++--- .../SecretChats/AuthKeyHandler.php | 9 +- .../MadelineProto/VoIP/AuthKeyHandler.php | 5 +- .../MadelineProto/Wrappers/Templates.php | 2 +- tools/phar.php | 6 +- 31 files changed, 516 insertions(+), 640 deletions(-) rename src/danog/MadelineProto/Loop/{ResumableLoopInterface.php => APILoop.php} (67%) create mode 100644 src/danog/MadelineProto/Loop/Connection/Common.php rename src/danog/MadelineProto/Loop/{SignalLoopInterface.php => Generic/PeriodicLoopInternal.php} (56%) delete mode 100644 src/danog/MadelineProto/Loop/Impl/Loop.php delete mode 100644 src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php delete mode 100644 src/danog/MadelineProto/Loop/Impl/SignalLoop.php rename src/danog/MadelineProto/Loop/{LoopInterface.php => InternalLoop.php} (72%) create mode 100644 src/danog/MadelineProto/Loop/LoggerLoop.php diff --git a/composer.json b/composer.json index 0ddc3bf3..7d2cf53a 100644 --- a/composer.json +++ b/composer.json @@ -35,7 +35,8 @@ "league/uri": "^6", "danog/ipc": "^0.1", "tivie/htaccess-parser": "^0.2.3", - "amphp/log": "^1.1" + "amphp/log": "^1.1", + "danog/loop": "^0.1.0" }, "require-dev": { "vlucas/phpdotenv": "^3", diff --git a/src/danog/MadelineProto/DataCenterConnection.php b/src/danog/MadelineProto/DataCenterConnection.php index 86baed3b..d2e386b1 100644 --- a/src/danog/MadelineProto/DataCenterConnection.php +++ b/src/danog/MadelineProto/DataCenterConnection.php @@ -22,7 +22,7 @@ namespace danog\MadelineProto; use Amp\Deferred; use Amp\Promise; use Amp\Success; -use danog\MadelineProto\Loop\Generic\PeriodicLoop; +use danog\MadelineProto\Loop\Generic\PeriodicLoopInternal; use danog\MadelineProto\MTProto\AuthKey; use danog\MadelineProto\MTProto\PermAuthKey; use danog\MadelineProto\MTProto\TempAuthKey; @@ -99,10 +99,8 @@ class DataCenterConnection implements JsonSerializable private $linked; /** * Loop to keep weights at sane value. - * - * @var \danog\MadelineProto\Loop\Generic\PeriodicLoop */ - private $robinLoop; + private ?PeriodicLoopInternal $robinLoop = null; /** * Decrement roundrobin weight by this value if busy reading. * @@ -366,7 +364,7 @@ class DataCenterConnection implements JsonSerializable $count = $media ? $this->API->settings['connection_settings']['media_socket_count']['min'] : 1; if ($count > 1) { if (!$this->robinLoop) { - $this->robinLoop = new PeriodicLoop($this->API, [$this, 'even'], "robin loop DC {$this->datacenter}", $this->API->settings['connection_settings']['robin_period']); + $this->robinLoop = new PeriodicLoopInternal($this->API, [$this, 'even'], "robin loop DC {$this->datacenter}", $this->API->settings['connection_settings']['robin_period'] * 1000); } $this->robinLoop->start(); } diff --git a/src/danog/MadelineProto/Ipc/Runner/WebRunner.php b/src/danog/MadelineProto/Ipc/Runner/WebRunner.php index fa47e1c8..61663636 100644 --- a/src/danog/MadelineProto/Ipc/Runner/WebRunner.php +++ b/src/danog/MadelineProto/Ipc/Runner/WebRunner.php @@ -12,7 +12,7 @@ final class WebRunner extends RunnerAbstract private static $runPath; /** - * Resources + * Resources. */ private static array $resources = []; /** @@ -46,7 +46,9 @@ final class WebRunner extends RunnerAbstract } $rootDir = \dirname($rootDir).DIRECTORY_SEPARATOR; $uriDir = \dirname($uri); - if ($uriDir !== '/') $uriDir .= DIRECTORY_SEPARATOR; + if ($uriDir !== '/') { + $uriDir .= DIRECTORY_SEPARATOR; + } if (\substr($rootDir, -\strlen($uriDir)) !== $uriDir) { throw new ContextException("Mismatch between absolute root dir ($rootDir) and URI dir ($uriDir)"); diff --git a/src/danog/MadelineProto/Ipc/Server.php b/src/danog/MadelineProto/Ipc/Server.php index 3c143eca..78a07451 100644 --- a/src/danog/MadelineProto/Ipc/Server.php +++ b/src/danog/MadelineProto/Ipc/Server.php @@ -20,10 +20,10 @@ namespace danog\MadelineProto\Ipc; use Amp\Ipc\IpcServer; use Amp\Ipc\Sync\ChannelledSocket; +use danog\Loop\SignalLoop; use danog\MadelineProto\Ipc\Runner\ProcessRunner; use danog\MadelineProto\Ipc\Runner\WebRunner; use danog\MadelineProto\Logger; -use danog\MadelineProto\Loop\Impl\SignalLoop; use danog\MadelineProto\Tools; /** diff --git a/src/danog/MadelineProto/Loop/ResumableLoopInterface.php b/src/danog/MadelineProto/Loop/APILoop.php similarity index 67% rename from src/danog/MadelineProto/Loop/ResumableLoopInterface.php rename to src/danog/MadelineProto/Loop/APILoop.php index 8c9a8e08..0603a1e9 100644 --- a/src/danog/MadelineProto/Loop/ResumableLoopInterface.php +++ b/src/danog/MadelineProto/Loop/APILoop.php @@ -1,7 +1,6 @@ + * API loop trait. */ -interface ResumableLoopInterface extends LoopInterface +trait APILoop { + use LoggerLoop { + __construct as private setLogger; + } + /** - * Pause the loop. - * - * @param int $time For how long to pause the loop, if null will pause forever (until resume is called from outside of the loop) - * - * @return Promise + * API instance. */ - public function pause($time = null): Promise; + protected InternalDoc $API; /** - * Resume the loop. + * Constructor. * - * @return ?Promise + * @param InternalDoc $API API instance */ - public function resume(); + public function __construct(InternalDoc $API) + { + $this->API = $API; + $this->setLogger($API->getLogger()); + } } diff --git a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php index 00434738..b04a1c39 100644 --- a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php @@ -21,8 +21,7 @@ namespace danog\MadelineProto\Loop\Connection; use Amp\Deferred; use Amp\Loop; -use danog\MadelineProto\Connection; -use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; +use danog\Loop\ResumableSignalLoop; use danog\MadelineProto\Tools; /** @@ -32,31 +31,12 @@ use danog\MadelineProto\Tools; */ class CheckLoop extends ResumableSignalLoop { + use Common; /** - * Connection instance. + * Main loop. * - * @var \danog\MadelineProto\Connection + * @return \Generator */ - protected $connection; - /** - * DC ID. - * - * @var string - */ - protected $datacenter; - /** - * DataCenterConnection instance. - * - * @var \danog\MadelineProto\DataCenterConnection - */ - protected $datacenterConnection; - public function __construct(Connection $connection) - { - $this->connection = $connection; - $this->API = $connection->getExtra(); - $this->datacenter = $connection->getDatacenterID(); - $this->datacenterConnection = $connection->getShared(); - } public function loop(): \Generator { $API = $this->API; @@ -64,6 +44,7 @@ class CheckLoop extends ResumableSignalLoop $connection = $this->connection; $shared = $this->datacenterConnection; $timeout = $shared->getSettings()['timeout']; + $timeoutMs = $timeout * 1000; $timeoutResend = $timeout * $timeout; // Typically 25 seconds, good enough while (true) { @@ -73,7 +54,7 @@ class CheckLoop extends ResumableSignalLoop } } if (!$connection->hasPendingCalls()) { - if (yield $this->waitSignal($this->pause($timeout))) { + if (yield $this->waitSignal($this->pause($timeoutMs))) { return; } continue; @@ -161,7 +142,7 @@ class CheckLoop extends ResumableSignalLoop } $connection->flush(); } - if (yield $this->waitSignal($this->pause($timeout))) { + if (yield $this->waitSignal($this->pause($timeoutMs))) { return; } if ($connection->msgIdHandler->getMaxId(true) === $last_msgid && $connection->getLastChunk() === $last_chunk) { @@ -172,6 +153,11 @@ class CheckLoop extends ResumableSignalLoop } } } + /** + * Loop name. + * + * @return string + */ public function __toString(): string { return "check loop in DC {$this->datacenter}"; diff --git a/src/danog/MadelineProto/Loop/Connection/Common.php b/src/danog/MadelineProto/Loop/Connection/Common.php new file mode 100644 index 00000000..2c8688cf --- /dev/null +++ b/src/danog/MadelineProto/Loop/Connection/Common.php @@ -0,0 +1,61 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2020 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Loop\Connection; + +use danog\MadelineProto\Connection; +use danog\MadelineProto\DataCenterConnection; +use danog\MadelineProto\Loop\InternalLoop; + +/** + * RPC call status check loop. + * + * @author Daniil Gentili + */ +trait Common +{ + use InternalLoop { + __construct as private init; + } + /** + * Connection instance. + */ + protected Connection $connection; + /** + * DC ID. + * + * @var string + */ + protected string $datacenter; + /** + * DataCenterConnection instance. + */ + protected DataCenterConnection $datacenterConnection; + /** + * Constructor function. + * + * @param Connection $connection Connection + */ + public function __construct(Connection $connection) + { + $this->init($connection->getExtra()); + $this->connection = $connection; + $this->datacenter = $connection->getDatacenterID(); + $this->datacenterConnection = $connection->getShared(); + } +} diff --git a/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php b/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php index e3f9316a..56413014 100644 --- a/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php @@ -19,8 +19,7 @@ namespace danog\MadelineProto\Loop\Connection; -use danog\MadelineProto\Connection; -use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; +use danog\Loop\ResumableSignalLoop; /** * HttpWait loop. @@ -29,31 +28,12 @@ use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; */ class HttpWaitLoop extends ResumableSignalLoop { + use Common; /** - * Connection instance. + * Main loop. * - * @var \danog\MadelineProto\Connection + * @return \Generator */ - protected $connection; - /** - * DC ID. - * - * @var string - */ - protected $datacenter; - /** - * DataCenterConnection instance. - * - * @var \danog\MadelineProto\DataCenterConnection - */ - protected $datacenterConnection; - public function __construct(Connection $connection) - { - $this->connection = $connection; - $this->API = $connection->getExtra(); - $this->datacenter = $connection->getDatacenterID(); - $this->datacenterConnection = $connection->getShared(); - } public function loop(): \Generator { $API = $this->API; @@ -82,6 +62,11 @@ class HttpWaitLoop extends ResumableSignalLoop $API->logger->logger("DC {$datacenter}: request {$connection->countHttpSent()}, response {$connection->countHttpReceived()}"); } } + /** + * Loop name. + * + * @return string + */ public function __toString(): string { return "HTTP wait loop in DC {$this->datacenter}"; diff --git a/src/danog/MadelineProto/Loop/Connection/PingLoop.php b/src/danog/MadelineProto/Loop/Connection/PingLoop.php index eb101ab6..03d8720e 100644 --- a/src/danog/MadelineProto/Loop/Connection/PingLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/PingLoop.php @@ -19,8 +19,7 @@ namespace danog\MadelineProto\Loop\Connection; -use danog\MadelineProto\Connection; -use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; +use danog\Loop\ResumableSignalLoop; /** * Ping loop. @@ -29,31 +28,12 @@ use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; */ class PingLoop extends ResumableSignalLoop { + use Common; /** - * Connection instance. + * Main loop. * - * @var \danog\MadelineProto\Connection + * @return \Generator */ - protected $connection; - /** - * DC ID. - * - * @var string - */ - protected $datacenter; - /** - * DataCenterConnection instance. - * - * @var \danog\MadelineProto\DataCenterConnection - */ - protected $datacenterConnection; - public function __construct(Connection $connection) - { - $this->connection = $connection; - $this->API = $connection->getExtra(); - $this->datacenter = $connection->getDatacenterID(); - $this->datacenterConnection = $connection->getShared(); - } public function loop(): \Generator { $API = $this->API; @@ -61,13 +41,14 @@ class PingLoop extends ResumableSignalLoop $connection = $this->connection; $shared = $this->datacenterConnection; $timeout = $shared->getSettings()['timeout']; + $timeoutMs = $timeout * 1000; while (true) { while (!$shared->hasTempAuthKey()) { if (yield $this->waitSignal($this->pause())) { return; } } - if (yield $this->waitSignal($this->pause($timeout))) { + if (yield $this->waitSignal($this->pause($timeoutMs))) { return; } if (\time() - $connection->getLastChunk() >= $timeout) { @@ -81,6 +62,11 @@ class PingLoop extends ResumableSignalLoop } } } + /** + * Get loop name. + * + * @return string + */ public function __toString(): string { return "Ping loop in DC {$this->datacenter}"; diff --git a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php index ddb0d801..e03eddd9 100644 --- a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php @@ -23,9 +23,8 @@ use Amp\ByteStream\PendingReadError; use Amp\ByteStream\StreamException; use Amp\Loop; use Amp\Websocket\ClosedException; -use danog\MadelineProto\Connection; +use danog\Loop\SignalLoop; use danog\MadelineProto\Logger; -use danog\MadelineProto\Loop\Impl\SignalLoop; use danog\MadelineProto\MTProtoTools\Crypt; use danog\MadelineProto\NothingInTheSocketException; use danog\MadelineProto\Tools; @@ -37,31 +36,12 @@ use danog\MadelineProto\Tools; */ class ReadLoop extends SignalLoop { + use Common; /** - * Connection instance. + * Main loop. * - * @var \danog\MadelineProto\Connection + * @return \Generator */ - protected $connection; - /** - * DataCenterConnection instance. - * - * @var \danog\MadelineProto\DataCenterConnection - */ - protected $datacenterConnection; - /** - * DC ID. - * - * @var string - */ - protected $datacenter; - public function __construct(Connection $connection) - { - $this->connection = $connection; - $this->API = $connection->getExtra(); - $this->datacenter = $connection->getDatacenterID(); - $this->datacenterConnection = $connection->getShared(); - } public function loop(): \Generator { $API = $this->API; @@ -229,6 +209,11 @@ class ReadLoop extends SignalLoop } return true; } + /** + * Get loop name. + * + * @return string + */ public function __toString(): string { return "read loop in DC {$this->datacenter}"; diff --git a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php index d89604eb..18e2ad0b 100644 --- a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php @@ -21,9 +21,8 @@ namespace danog\MadelineProto\Loop\Connection; use Amp\ByteStream\StreamException; use Amp\Loop; -use danog\MadelineProto\Connection; +use danog\Loop\ResumableSignalLoop; use danog\MadelineProto\Logger; -use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; use danog\MadelineProto\MTProtoTools\Crypt; use danog\MadelineProto\Tools; @@ -38,32 +37,12 @@ class WriteLoop extends ResumableSignalLoop const MAX_SIZE = 1 << 15; const MAX_IDS = 8192; + use Common; /** - * Connection instance. + * Main loop. * - * @var \danog\MadelineProto\Connection + * @return \Generator */ - protected $connection; - /** - * DataCenterConnection instance. - * - * @var \danog\MadelineProto\DataCenterConnection - */ - protected $datacenterConnection; - /** - * DC ID. - * - * @var string - */ - protected $datacenter; - public function __construct(Connection $connection) - { - $this->connection = $connection; - $this->datacenterConnection = $connection->getShared(); - $this->API = $connection->getExtra(); - $ctx = $connection->getCtx(); - $this->datacenter = $connection->getDatacenterID(); - } public function loop(): \Generator { $API = $this->API; @@ -91,7 +70,7 @@ class WriteLoop extends ResumableSignalLoop } $connection->writing(true); try { - $please_wait = yield $this->{$shared->hasTempAuthKey() ? 'encryptedWriteLoop' : 'unencryptedWriteLoop'}(); + $please_wait = yield from $this->{$shared->hasTempAuthKey() ? 'encryptedWriteLoop' : 'unencryptedWriteLoop'}(); } catch (StreamException $e) { if ($connection->shouldReconnect()) { return; @@ -358,6 +337,11 @@ class WriteLoop extends ResumableSignalLoop } return $skipped; } + /** + * Get loop name. + * + * @return string + */ public function __toString(): string { return "write loop in DC {$this->datacenter}"; diff --git a/src/danog/MadelineProto/Loop/Generic/GenericLoop.php b/src/danog/MadelineProto/Loop/Generic/GenericLoop.php index 5a06b0bc..d82b4381 100644 --- a/src/danog/MadelineProto/Loop/Generic/GenericLoop.php +++ b/src/danog/MadelineProto/Loop/Generic/GenericLoop.php @@ -1,7 +1,7 @@ + * @deprecated Use the danog/loop API instead */ -class GenericLoop extends ResumableSignalLoop +class GenericLoop extends GenericGenericLoop { - const STOP = -1; - const PAUSE = null; - const CONTINUE = 0; - protected $callback; - protected $name; + use APILoop { + __construct as private init; + } /** * Constructor. * - * The callback will be bound to the GenericLoop instance: this means that you will be able to use `$this` as if the callback were actually the `loop` function (you can access the API property, use the pause/waitSignal methods & so on). - * The return value of the callable can be: - * A number - the loop will be paused for the specified number of seconds - * GenericLoop::STOP - The loop will stop - * GenericLoop::PAUSE - The loop will pause forever (or until the `resume` method is called on the loop object from outside the loop) - * GenericLoop::CONTINUE - Return this if you want to rerun the loop without waiting - * - * @param \danog\MadelineProto\API $API Instance of MadelineProto - * @param callable $callback Callback to run - * @param string $name Fetcher name + * @param InternalDoc $API API instance + * @param callable $callable Method + * @param string $name Loop name */ - public function __construct($API, $callback, $name) + public function __construct(InternalDoc $API, callable $callable, string $name) { - $this->API = $API; - $this->callback = $callback->bindTo($this); - $this->name = $name; - } - public function loop(): \Generator - { - $callback = $this->callback; - while (true) { - $timeout = yield $callback(); - if ($timeout === self::PAUSE) { - $this->API->logger->logger("Pausing {$this}", \danog\MadelineProto\Logger::VERBOSE); - } elseif ($timeout > 0) { - $this->API->logger->logger("Pausing {$this} for {$timeout}", \danog\MadelineProto\Logger::VERBOSE); - } - if ($timeout === self::STOP || yield $this->waitSignal($this->pause($timeout))) { - return; - } - } - } - public function __toString(): string - { - return $this->name; + $this->init($API); + parent::__construct($callable, $name); } } diff --git a/src/danog/MadelineProto/Loop/Generic/PeriodicLoop.php b/src/danog/MadelineProto/Loop/Generic/PeriodicLoop.php index cedc8596..834eb204 100644 --- a/src/danog/MadelineProto/Loop/Generic/PeriodicLoop.php +++ b/src/danog/MadelineProto/Loop/Generic/PeriodicLoop.php @@ -1,7 +1,7 @@ + * @deprecated Use the danog/loop API instead */ -class PeriodicLoop extends ResumableSignalLoop +class PeriodicLoop extends GenericPeriodicLoop { - /** - * Callback. - * - * @var callable - */ - private $callback; - /** - * Loop name. - * - * @var string - */ - private string $name; - /** - * Loop timeeout. - * - * @var int - */ - private $timeout; + use APILoop { + __construct as private init; + } /** * Constructor. * - * @param \danog\MadelineProto\API $API Instance of MTProto class - * @param callable $callback Callback to call - * @param string $name Loop name - * @param int|float $timeout Loop timeout + * @param InternalDoc $API API instance + * @param callable $callable Method + * @param string $name Loop name + * @param ?int $interval Interval */ - public function __construct($API, callable $callback, string $name, $timeout) + public function __construct(InternalDoc $API, callable $callable, string $name, ?int $interval) { - $this->API = $API; - $this->callback = $callback; - $this->name = $name; - $this->timeout = $timeout; - } - public function loop(): \Generator - { - $callback = $this->callback; - $logger = $this->API->logger; - while (true) { - $result = yield $this->waitSignal($this->pause($this->timeout)); - if ($result) { - $logger->logger("Got signal in {$this}, exiting"); - return; - } - yield $callback(); - } - } - public function __toString(): string - { - return $this->name; + $this->init($API); + parent::__construct($callable, $name, $interval === null ? $interval : $interval * 1000); } } diff --git a/src/danog/MadelineProto/Loop/SignalLoopInterface.php b/src/danog/MadelineProto/Loop/Generic/PeriodicLoopInternal.php similarity index 56% rename from src/danog/MadelineProto/Loop/SignalLoopInterface.php rename to src/danog/MadelineProto/Loop/Generic/PeriodicLoopInternal.php index 28dc7daf..357dd1e3 100644 --- a/src/danog/MadelineProto/Loop/SignalLoopInterface.php +++ b/src/danog/MadelineProto/Loop/Generic/PeriodicLoopInternal.php @@ -1,7 +1,7 @@ + * @internal For internal use */ -interface SignalLoopInterface extends LoopInterface +class PeriodicLoopInternal extends GenericPeriodicLoop { + use InternalLoop { + __construct as private init; + } /** - * Resolve the promise or return|throw the signal. + * Constructor. * - * @param Promise $promise The origin promise - * - * @return Promise + * @param MTProto $API API instance + * @param callable $callable Method + * @param string $name Loop name + * @param int|null $interval Interval */ - public function waitSignal($promise): Promise; - /** - * Send a signal to the the loop. - * - * @param \Throwable|mixed $data Signal to send - * - * @return void - */ - public function signal($data): void; + public function __construct(MTProto $API, callable $callable, string $name, ?int $interval) + { + $this->init($API); + parent::__construct($callable, $name, $interval); + } } diff --git a/src/danog/MadelineProto/Loop/Impl/Loop.php b/src/danog/MadelineProto/Loop/Impl/Loop.php deleted file mode 100644 index 09a93614..00000000 --- a/src/danog/MadelineProto/Loop/Impl/Loop.php +++ /dev/null @@ -1,80 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2016-2020 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - * - * @link https://docs.madelineproto.xyz MadelineProto documentation - */ - -namespace danog\MadelineProto\Loop\Impl; - -use Amp\Promise; -use danog\MadelineProto\Logger; -use danog\MadelineProto\Loop\LoopInterface; - -/** - * Loop helper trait. - * - * Wraps the asynchronous generator methods with asynchronous promise-based methods - * - * @author Daniil Gentili - */ -abstract class Loop implements LoopInterface -{ - private $count = 0; - /** - * MTProto instance. - * - * @var \danog\MadelineProto\MTProto - */ - public $API; - public function __construct($API) - { - $this->API = $API; - } - public function start() - { - if ($this->count) { - //$this->API->logger->logger("NOT entering $this with running count {$this->count}", Logger::ERROR); - return false; - } - return \danog\MadelineProto\Tools::callFork($this->loopImpl()); - } - private function loopImpl(): \Generator - { - $this->startedLoop(); - $this->API->logger->logger("Entered {$this}", Logger::ULTRA_VERBOSE); - try { - yield from $this->loop(); - } finally { - $this->exitedLoop(); - $this->API->logger->logger("Physically exited {$this}", Logger::ULTRA_VERBOSE); - } - } - public function exitedLoop() - { - if ($this->count) { - $this->API->logger->logger("Exited {$this}", Logger::ULTRA_VERBOSE); - $this->count--; - } - } - public function startedLoop() - { - $this->count++; - } - public function isRunning() - { - return $this->count; - } -} diff --git a/src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php b/src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php deleted file mode 100644 index aea21013..00000000 --- a/src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php +++ /dev/null @@ -1,90 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2016-2020 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - * - * @link https://docs.madelineproto.xyz MadelineProto documentation - */ - -namespace danog\MadelineProto\Loop\Impl; - -use Amp\Deferred; -use Amp\Loop; -use Amp\Promise; -use Amp\Success; -use danog\MadelineProto\Loop\ResumableLoopInterface; - -/** - * Resumable signal loop helper trait. - * - * @author Daniil Gentili - */ -abstract class ResumableSignalLoop extends SignalLoop implements ResumableLoopInterface -{ - private $resume; - private $pause; - protected $resumeWatcher; - public function pause($time = null): Promise - { - if (!\is_null($time)) { - if ($time <= 0) { - return new Success(0); - } - $resume = \microtime(true) + $time; - if ($this->resumeWatcher) { - Loop::cancel($this->resumeWatcher); - $this->resumeWatcher = null; - } - $this->resumeWatcher = Loop::delay((int) ($time * 1000), [$this, 'resume'], $resume); - } - $this->resume = new Deferred(); - $pause = $this->pause; - $this->pause = new Deferred(); - if ($pause) { - Loop::defer([$pause, 'resolve']); - } - return $this->resume->promise(); - } - public function resume($watcherId = null, $expected = 0) - { - if ($this->resumeWatcher) { - $storedWatcherId = $this->resumeWatcher; - Loop::cancel($storedWatcherId); - $this->resumeWatcher = null; - if ($watcherId && $storedWatcherId !== $watcherId) { - return; - } - } - if ($this->resume) { - $resume = $this->resume; - $this->resume = null; - $resume->resolve(); - return $this->pause ? $this->pause->promise() : null; - } - } - public function resumeDefer() - { - Loop::defer([$this, 'resume']); - return $this->pause ? $this->pause->promise() : null; - } - - public function exitedLoop() - { - parent::exitedLoop(); - if ($this->resumeWatcher) { - Loop::cancel($this->resumeWatcher); - $this->resumeWatcher = null; - } - } -} diff --git a/src/danog/MadelineProto/Loop/Impl/SignalLoop.php b/src/danog/MadelineProto/Loop/Impl/SignalLoop.php deleted file mode 100644 index 999c24a6..00000000 --- a/src/danog/MadelineProto/Loop/Impl/SignalLoop.php +++ /dev/null @@ -1,70 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2016-2020 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - * - * @link https://docs.madelineproto.xyz MadelineProto documentation - */ - -namespace danog\MadelineProto\Loop\Impl; - -use Amp\Deferred; -use Amp\Promise; -use danog\MadelineProto\Coroutine; -use danog\MadelineProto\Loop\SignalLoopInterface; - -/** - * Signal loop helper trait. - * - * @author Daniil Gentili - */ -abstract class SignalLoop extends Loop implements SignalLoopInterface -{ - private $signalDeferred; - /** - * Send signal to loop. - * - * @param mixed $what Data to signal - * - * @return void - */ - public function signal($what): void - { - if ($this->signalDeferred) { - $deferred = $this->signalDeferred; - $this->signalDeferred = null; - if ($what instanceof \Exception || $what instanceof \Throwable) { - $deferred->fail($what); - } else { - $deferred->resolve($what); - } - } - } - public function waitSignal($promise): Promise - { - if ($promise instanceof \Generator) { - $promise = new Coroutine($promise); - } - $this->signalDeferred = new Deferred(); - $dpromise = $this->signalDeferred->promise(); - $promise->onResolve(function () use ($promise) { - if ($this->signalDeferred !== null) { - $deferred = $this->signalDeferred; - $this->signalDeferred = null; - $deferred->resolve($promise); - } - }); - return $dpromise; - } -} diff --git a/src/danog/MadelineProto/Loop/LoopInterface.php b/src/danog/MadelineProto/Loop/InternalLoop.php similarity index 72% rename from src/danog/MadelineProto/Loop/LoopInterface.php rename to src/danog/MadelineProto/Loop/InternalLoop.php index 05e5e4b2..5c3b5136 100644 --- a/src/danog/MadelineProto/Loop/LoopInterface.php +++ b/src/danog/MadelineProto/Loop/InternalLoop.php @@ -1,7 +1,6 @@ - */ -interface LoopInterface +use danog\MadelineProto\MTProto; + +trait InternalLoop { + use LoggerLoop { + __construct as private setLogger; + } + /** - * Start the loop. - * - * @return void + * API instance. */ - public function start(); + protected MTProto $API; /** - * The actual loop. + * Constructor. * - * @return void + * @param MTProto $API API instance */ - public function loop(); - /** - * Get name of the loop. - * - * @return string - */ - public function __toString(): string; + public function __construct(MTProto $API) + { + $this->API = $API; + $this->setLogger($API->getLogger()); + } } diff --git a/src/danog/MadelineProto/Loop/LoggerLoop.php b/src/danog/MadelineProto/Loop/LoggerLoop.php new file mode 100644 index 00000000..82ec50d5 --- /dev/null +++ b/src/danog/MadelineProto/Loop/LoggerLoop.php @@ -0,0 +1,119 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2020 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Loop; + +use danog\MadelineProto\Logger; +use danog\MadelineProto\Tools; + +trait LoggerLoop +{ + /** + * Whether the loop was started. + */ + private bool $started = false; + /** + * Logger instance. + */ + protected Logger $logger; + /** + * Constructor. + * + * @param Logger $logger Logger instance + */ + public function __construct(Logger $logger) + { + $this->logger = $logger; + } + + /** + * Start the loop. + * + * Returns false if the loop is already running. + * + * @return bool + */ + public function start(): bool + { + if ($this->started) { + return false; + } + Tools::callFork((function (): \Generator { + $this->startedLoop(); + try { + yield from $this->loop(); + } finally { + $this->exitedLoop(); + } + })()); + return true; + } + /** + * Check whether loop is running. + * + * @return boolean + */ + public function isRunning(): bool + { + return $this->started; + } + + /** + * Signal that loop has started. + * + * @return void + */ + protected function startedLoop(): void + { + parent::startedLoop(); + $this->logger->logger("Entered $this", Logger::ULTRA_VERBOSE); + } + + /** + * Signal that loop has exited. + * + * @return void + */ + protected function exitedLoop(): void + { + parent::exitedLoop(); + $this->logger->logger("Exited $this", Logger::ULTRA_VERBOSE); + } + + /** + * Report pause, can be overriden for logging. + * + * @param integer $timeout Pause duration, 0 = forever + * + * @return void + */ + protected function reportPause(int $timeout): void + { + $this->logger->logger( + "Pausing $this for $timeout", + Logger::ULTRA_VERBOSE + ); + } + + /** + * Get loop name. + * + * @return string + */ + abstract public function __toString(): string; +} diff --git a/src/danog/MadelineProto/Loop/Update/FeedLoop.php b/src/danog/MadelineProto/Loop/Update/FeedLoop.php index 3eaf66b7..68b05822 100644 --- a/src/danog/MadelineProto/Loop/Update/FeedLoop.php +++ b/src/danog/MadelineProto/Loop/Update/FeedLoop.php @@ -19,30 +19,60 @@ namespace danog\MadelineProto\Loop\Update; -use Amp\Loop; -use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; +use danog\Loop\ResumableSignalLoop; +use danog\MadelineProto\Loop\InternalLoop; +use danog\MadelineProto\MTProto; + +use function Amp\delay; /** - * update feed loop. + * Update feed loop. * * @author Daniil Gentili */ class FeedLoop extends ResumableSignalLoop { - private $incomingUpdates = []; - private $parsedUpdates = []; - private $channelId; + use InternalLoop { + __construct as private init; + } + /** + * Main loop ID. + */ + const GENERIC = 0; + /** + * Incoming updates array. + */ + private array $incomingUpdates = []; + /** + * Parsed updates array. + */ + private array $parsedUpdates = []; + /** + * Channel ID. + */ + private int $channelId; /** * Update loop. * * @var UpdateLoop */ - private $updater; - public function __construct($API, $channelId = false) + private ?UpdateLoop $updater = null; + /** + * Constructor. + * + * @param MTProto $API API instance + * @param integer $channelId Constructor + */ + public function __construct(MTProto $API, int $channelId = 0) { - $this->API = $API; + $this->init($API); $this->channelId = $channelId; } + /** + * Main loop. + * + * @return \Generator + */ public function loop(): \Generator { $API = $this->API; @@ -55,7 +85,11 @@ class FeedLoop extends ResumableSignalLoop return; } } - $this->state = $this->channelId === false ? yield from $API->loadUpdateState() : $API->loadChannelState($this->channelId); + yield (function (): \Generator { + yield delay(1); + return 1; + })(); + $this->state = $this->channelId === self::GENERIC ? yield from $API->loadUpdateState() : $API->loadChannelState($this->channelId); while (true) { while (!$this->API->settings['updates']['handle_updates'] || !$API->hasAllAuth()) { if (yield $this->waitSignal($this->pause())) { @@ -145,11 +179,11 @@ class FeedLoop extends ResumableSignalLoop } public function feedSingle(array $update): \Generator { - $channelId = false; + $channelId = self::GENERIC; switch ($update['_']) { case 'updateNewChannelMessage': case 'updateEditChannelMessage': - $channelId = isset($update['message']['to_id']['channel_id']) ? $update['message']['to_id']['channel_id'] : false; + $channelId = isset($update['message']['to_id']['channel_id']) ? $update['message']['to_id']['channel_id'] : self::GENERIC; if (!$channelId) { return false; } @@ -159,7 +193,7 @@ class FeedLoop extends ResumableSignalLoop $channelId = $update['channel_id']; break; case 'updateChannelTooLong': - $channelId = isset($update['channel_id']) ? $update['channel_id'] : false; + $channelId = isset($update['channel_id']) ? $update['channel_id'] : self::GENERIC; if (!isset($update['pts'])) { $update['pts'] = 1; } @@ -202,7 +236,7 @@ class FeedLoop extends ResumableSignalLoop $this->API->logger->logger("Not enough data: for message update {$log}, getting difference...", \danog\MadelineProto\Logger::VERBOSE); $update = ['_' => 'updateChannelTooLong']; if ($channelId && $to) { - $channelId = false; + $channelId = self::GENERIC; } } break; @@ -217,7 +251,7 @@ class FeedLoop extends ResumableSignalLoop if (isset($this->API->feeders[$channelId])) { return yield from $this->API->feeders[$channelId]->feedSingle($update); } elseif ($this->channelId) { - return yield from $this->API->feeders[false]->feedSingle($update); + return yield from $this->API->feeders[self::GENERIC]->feedSingle($update); } } $this->API->logger->logger('Was fed an update of type '.$update['_']." in {$this}...", \danog\MadelineProto\Logger::VERBOSE); @@ -238,7 +272,7 @@ class FeedLoop extends ResumableSignalLoop if ($message['_'] !== 'messageEmpty') { $this->API->logger->logger('Getdiff fed me message of type '.$message['_']." in {$this}...", \danog\MadelineProto\Logger::VERBOSE); } - $this->parsedUpdates[] = ['_' => $this->channelId === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => -1, 'pts_count' => -1]; + $this->parsedUpdates[] = ['_' => $this->channelId === self::GENERIC ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => -1, 'pts_count' => -1]; } } public function __toString(): string diff --git a/src/danog/MadelineProto/Loop/Update/SeqLoop.php b/src/danog/MadelineProto/Loop/Update/SeqLoop.php index 2cd18c90..61e0b7e5 100644 --- a/src/danog/MadelineProto/Loop/Update/SeqLoop.php +++ b/src/danog/MadelineProto/Loop/Update/SeqLoop.php @@ -19,7 +19,8 @@ namespace danog\MadelineProto\Loop\Update; -use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; +use danog\Loop\ResumableSignalLoop; +use danog\MadelineProto\Loop\InternalLoop; /** * update feed loop. @@ -28,17 +29,28 @@ use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; */ class SeqLoop extends ResumableSignalLoop { - private $incomingUpdates = []; - private $feeder; - private $pendingWakeups = []; - public function __construct($API) - { - $this->API = $API; - } + use InternalLoop; + /** + * Incoming updates. + */ + private array $incomingUpdates = []; + /** + * Update feeder. + */ + private ?FeedLoop $feeder = null; + /** + * Pending updates. + */ + private array $pendingWakeups = []; + /** + * Main loop. + * + * @return \Generator + */ public function loop(): \Generator { $API = $this->API; - $this->feeder = $API->feeders[false]; + $this->feeder = $API->feeders[FeedLoop::GENERIC]; if (!$this->API->settings['updates']['handle_updates']) { return false; } @@ -90,9 +102,9 @@ class SeqLoop extends ResumableSignalLoop $result = $this->state->checkSeq($seq_start); if ($result > 0) { $this->API->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.($this->state->seq() + 1), \danog\MadelineProto\Logger::ERROR); - yield $this->pause(1.0); + yield $this->pause(1000); if (!$this->incomingUpdates) { - yield $this->API->updaters[false]->resume(); + yield $this->API->updaters[UpdateLoop::GENERIC]->resume(); } $this->incomingUpdates = \array_merge($this->incomingUpdates, [$update], $updates); continue; diff --git a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php index 3f1760af..214ed14b 100644 --- a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php +++ b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php @@ -19,9 +19,10 @@ namespace danog\MadelineProto\Loop\Update; -use Amp\Loop; +use danog\Loop\ResumableSignalLoop; use danog\MadelineProto\Exception; -use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; +use danog\MadelineProto\Loop\InternalLoop; +use danog\MadelineProto\MTProto; use danog\MadelineProto\RPCErrorException; /** @@ -31,14 +32,39 @@ use danog\MadelineProto\RPCErrorException; */ class UpdateLoop extends ResumableSignalLoop { + use InternalLoop { + __construct as private init; + } + /** + * Main loop ID. + */ + const GENERIC = 0; + private $toPts; - private $channelId; - private $feeder; - public function __construct($API, $channelId) + /** + * Loop name. + */ + private int $channelId; + /** + * Feed loop. + */ + private ?FeedLoop $feeder = null; + /** + * Constructor. + * + * @param MTProto $API + * @param integer $channelId + */ + public function __construct(MTProto $API, int $channelId) { - $this->API = $API; + $this->init($API); $this->channelId = $channelId; } + /** + * Main loop. + * + * @return \Generator + */ public function loop(): \Generator { $API = $this->API; @@ -49,8 +75,8 @@ class UpdateLoop extends ResumableSignalLoop return; } } - $this->state = $state = $this->channelId === false ? yield from $API->loadUpdateState() : $API->loadChannelState($this->channelId); - $timeout = $API->settings['updates']['getdifference_interval']; + $this->state = $state = $this->channelId === self::GENERIC ? yield from $API->loadUpdateState() : $API->loadChannelState($this->channelId); + $timeout = $API->settings['updates']['getdifference_interval'] * 1000; $first = true; while (true) { while (!$API->settings['updates']['handle_updates'] || !$API->hasAllAuth()) { @@ -184,18 +210,25 @@ class UpdateLoop extends ResumableSignalLoop $API->signalUpdate(); $API->logger->logger("Finished signaling updates in {$this}, pausing"); $first = false; - if (yield $this->waitSignal($this->pause($timeout))) { + if (yield $this->waitSignal($this->pause($timeout * 1000))) { $API->logger->logger("Exiting {$this} due to signal"); return; } } } - public function setLimit($toPts) + public function setLimit($toPts): void { $this->toPts = $toPts; } + /** + * Get loop name. + * + * @return string + */ public function __toString(): string { - return !$this->channelId ? 'getUpdate loop generic' : "getUpdate loop channel {$this->channelId}"; + return $this->channelId ? + "getUpdate loop channel {$this->channelId}" : + 'getUpdate loop generic'; } } diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index d34d4aae..5e828dd8 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -29,7 +29,7 @@ use danog\MadelineProto\Db\DbPropertiesFabric; use danog\MadelineProto\Db\DbPropertiesTrait; use danog\MadelineProto\Db\Mysql; use danog\MadelineProto\Ipc\Server; -use danog\MadelineProto\Loop\Generic\PeriodicLoop; +use danog\MadelineProto\Loop\Generic\PeriodicLoopInternal; use danog\MadelineProto\Loop\Update\FeedLoop; use danog\MadelineProto\Loop\Update\SeqLoop; use danog\MadelineProto\Loop\Update\UpdateLoop; @@ -344,46 +344,32 @@ class MTProto extends AsyncConstruct implements TLCallback public $minDatabase; /** * TOS check loop. - * - * @var PeriodicLoop */ - public $checkTosLoop; + public ?PeriodicLoopInternal $checkTosLoop = null; /** * Phone config loop. - * - * @var PeriodicLoop */ - public $phoneConfigLoop; + public ?PeriodicLoopInternal $phoneConfigLoop = null; /** * Config loop. - * - * @var PeriodicLoop */ - public $configLoop; + public ?PeriodicLoopInternal $configLoop = null; /** * Call checker loop. - * - * @var PeriodicLoop */ - private $callCheckerLoop; + private ?PeriodicLoopInternal $callCheckerLoop = null; /** * Autoserialization loop. - * - * @var PeriodicLoop */ - private $serializeLoop; + private ?PeriodicLoopInternal $serializeLoop = null; /** * RPC reporting loop. - * - * @var PeriodicLoop */ - private $rpcLoop; + private ?PeriodicLoopInternal $rpcLoop = null; /** * IPC server. - * - * @var Server */ - private $ipcServer; + private ?Server $ipcServer = null; /** * Feeder loops. * @@ -401,7 +387,7 @@ class MTProto extends AsyncConstruct implements TLCallback * * @var boolean */ - public $destructing = false; + public bool $destructing = false; /** * DataCenter instance. * @@ -454,7 +440,7 @@ class MTProto extends AsyncConstruct implements TLCallback * * @param array $settings Settings * - * @return void + * @return \Generator */ public function __construct_async($settings = []): \Generator { @@ -708,22 +694,22 @@ class MTProto extends AsyncConstruct implements TLCallback private function startLoops() { if (!$this->callCheckerLoop) { - $this->callCheckerLoop = new PeriodicLoop($this, [$this, 'checkCalls'], 'call check', 10); + $this->callCheckerLoop = new PeriodicLoopInternal($this, [$this, 'checkCalls'], 'call check', 10 * 1000); } if (!$this->serializeLoop) { - $this->serializeLoop = new PeriodicLoop($this, [$this, 'serialize'], 'serialize', $this->settings['serialization']['serialization_interval']); + $this->serializeLoop = new PeriodicLoopInternal($this, [$this, 'serialize'], 'serialize', $this->settings['serialization']['serialization_interval'] * 1000); } if (!$this->phoneConfigLoop) { - $this->phoneConfigLoop = new PeriodicLoop($this, [$this, 'getPhoneConfig'], 'phone config', 24 * 3600); + $this->phoneConfigLoop = new PeriodicLoopInternal($this, [$this, 'getPhoneConfig'], 'phone config', 24 * 3600 * 1000); } if (!$this->checkTosLoop) { - $this->checkTosLoop = new PeriodicLoop($this, [$this, 'checkTos'], 'TOS', 24 * 3600); + $this->checkTosLoop = new PeriodicLoopInternal($this, [$this, 'checkTos'], 'TOS', 24 * 3600 * 1000); } if (!$this->configLoop) { - $this->configLoop = new PeriodicLoop($this, [$this, 'getConfig'], 'config', 24 * 3600); + $this->configLoop = new PeriodicLoopInternal($this, [$this, 'getConfig'], 'config', 24 * 3600 * 1000); } if (!$this->rpcLoop) { - $this->rpcLoop = new PeriodicLoop($this, [$this, 'rpcReport'], 'config', 60); + $this->rpcLoop = new PeriodicLoopInternal($this, [$this, 'rpcReport'], 'config', 60 * 1000); } if (!$this->ipcServer) { $this->ipcServer = new Server($this); @@ -812,7 +798,7 @@ class MTProto extends AsyncConstruct implements TLCallback if (!$this->updates_state instanceof UpdatesState) { $this->updates_state = new UpdatesState($this->updates_state); } - $this->channels_state->__construct([false => $this->updates_state]); + $this->channels_state->__construct([UpdateLoop::GENERIC => $this->updates_state]); unset($this->updates_state); } if (!isset($this->datacenter)) { @@ -1017,9 +1003,9 @@ class MTProto extends AsyncConstruct implements TLCallback } if ($this->authorized === self::LOGGED_IN && $this->settings['updates']['handle_updates']) { $this->logger->logger(Lang::$current_lang['getupdates_deserialization'], Logger::NOTICE); - yield $this->updaters[false]->resume(); + yield $this->updaters[UpdateLoop::GENERIC]->resume(); } - $this->updaters[false]->start(); + $this->updaters[UpdateLoop::GENERIC]->start(); GarbageCollector::start(); } @@ -1165,7 +1151,7 @@ class MTProto extends AsyncConstruct implements TLCallback $lang_pack = 'android'; } // Detect app version - $app_version = self::RELEASE.' ('.self::V.', '.str_replace(' (AN UPDATE IS REQUIRED)', '', Magic::$revision).')'; + $app_version = self::RELEASE.' ('.self::V.', '.\str_replace(' (AN UPDATE IS REQUIRED)', '', Magic::$revision).')'; if (($settings['app_info']['api_id'] ?? 0) === 6) { // TG DEV NOTICE: these app info spoofing measures were implemented for NON-MALICIOUS purposes. // All accounts registered with a custom API ID require manual verification through recover@telegram.org, to avoid instant permabans. @@ -1441,7 +1427,7 @@ class MTProto extends AsyncConstruct implements TLCallback */ public function setupLogger(): void { - $this->logger = Logger::getLoggerFromSettings($this->settings, isset($this->authorization['user']) ? isset($this->authorization['user']['username']) ? $this->authorization['user']['username'] : $this->authorization['user']['id'] : ''); + $this->logger = Logger::getLoggerFromSettings($this->settings, isset($this->authorization['user']) ? (isset($this->authorization['user']['username']) ? $this->authorization['user']['username'] : $this->authorization['user']['id']) : ''); } /** * Reset all MTProto sessions. @@ -1517,7 +1503,7 @@ class MTProto extends AsyncConstruct implements TLCallback */ public function connectToAllDcs(bool $reconnectAll = true): \Generator { - $this->channels_state->get(false); + $this->channels_state->get(FeedLoop::GENERIC); foreach ($this->channels_state->get() as $state) { $channelId = $state->getChannel(); if (!isset($this->feeders[$channelId])) { @@ -1644,7 +1630,7 @@ class MTProto extends AsyncConstruct implements TLCallback if (!isset($this->seqUpdater)) { $this->seqUpdater = new SeqLoop($this); } - $this->channels_state->get(false); + $this->channels_state->get(FeedLoop::GENERIC); $channelIds = []; foreach ($this->channels_state->get() as $state) { $channelIds[] = $state->getChannel(); diff --git a/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php b/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php index 2d71765e..883cdb36 100644 --- a/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php @@ -21,6 +21,7 @@ namespace danog\MadelineProto\MTProtoSession; use Amp\Loop; use danog\MadelineProto\Logger; +use danog\MadelineProto\Loop\Update\UpdateLoop; use danog\MadelineProto\MTProto; /** @@ -84,8 +85,8 @@ trait ResponseHandler $this->shared->getTempAuthKey()->setServerSalt($this->incoming_messages[$current_msg_id]['content']['server_salt']); $this->ackIncomingMessageId($current_msg_id); // Acknowledge that I received the server's response - if ($this->API->authorized === MTProto::LOGGED_IN && !$this->API->isInitingAuthorization() && $this->API->datacenter->getDataCenterConnection($this->API->datacenter->curdc)->hasTempAuthKey() && isset($this->API->updaters[false])) { - $this->API->updaters[false]->resumeDefer(); + if ($this->API->authorized === MTProto::LOGGED_IN && !$this->API->isInitingAuthorization() && $this->API->datacenter->getDataCenterConnection($this->API->datacenter->curdc)->hasTempAuthKey() && isset($this->API->updaters[UpdateLoop::GENERIC])) { + $this->API->updaters[UpdateLoop::GENERIC]->resumeDefer(); } unset($this->incoming_messages[$current_msg_id]['content']); break; diff --git a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php index 02ff33c2..cb8ce96f 100644 --- a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php @@ -609,7 +609,7 @@ trait AuthKeyHandler } continue; } - yield $socket->waitGetConnection(); + yield from $socket->waitGetConnection(); if (isset($this->init_auth_dcs[$id])) { $this->pending_auth = true; continue; @@ -620,7 +620,7 @@ trait AuthKeyHandler } if ($dcs) { $first = \array_shift($dcs)(); - yield $first; + yield from $first; } foreach ($dcs as $id => &$dc) { $dc = $dc(); diff --git a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php index 5ddf1f62..aa4b0c05 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php @@ -23,6 +23,8 @@ use Amp\Deferred; use Amp\Http\Client\Request; use Amp\Loop; use danog\MadelineProto\Logger; +use danog\MadelineProto\Loop\Update\FeedLoop; +use danog\MadelineProto\Loop\Update\UpdateLoop; use danog\MadelineProto\RPCErrorException; /** @@ -189,9 +191,9 @@ trait UpdateHandler { if (!$this->got_state) { $this->got_state = true; - $this->channels_state->get(false, yield from $this->getUpdatesState()); + $this->channels_state->get(0, yield from $this->getUpdatesState()); } - return $this->channels_state->get(false); + return $this->channels_state->get(0); } /** * Load channel state. @@ -256,7 +258,7 @@ trait UpdateHandler $result = []; foreach ($updates['updates'] as $key => $update) { if ($update['_'] === 'updateNewMessage' || $update['_'] === 'updateReadMessagesContents' || $update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' || $update['_'] === 'updateReadHistoryInbox' || $update['_'] === 'updateReadHistoryOutbox' || $update['_'] === 'updateWebPage' || $update['_'] === 'updateMessageID') { - $result[yield from $this->feeders[false]->feedSingle($update)] = true; + $result[yield from $this->feeders[FeedLoop::GENERIC]->feedSingle($update)] = true; unset($updates['updates'][$key]); } } @@ -272,7 +274,7 @@ trait UpdateHandler $this->seqUpdater->resume(); break; case 'updateShort': - $this->feeders[yield from $this->feeders[false]->feedSingle($updates['update'])]->resume(); + $this->feeders[yield from $this->feeders[FeedLoop::GENERIC]->feedSingle($updates['update'])]->resume(); break; case 'updateShortSentMessage': if (!isset($updates['request']['body'])) { @@ -287,7 +289,7 @@ trait UpdateHandler $from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']); $to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']); if (!((yield from $this->peerIsset($from_id)) || !((yield from $this->peerIsset($to_id)) || isset($updates['via_bot_id']) && !((yield from $this->peerIsset($updates['via_bot_id'])) || isset($updates['entities']) && !((yield from $this->entitiesPeerIsset($updates['entities'])) || isset($updates['fwd_from']) && !(yield from $this->fwdPeerIsset($updates['fwd_from']))))))) { - yield $this->updaters[false]->resume(); + yield $this->updaters[FeedLoop::GENERIC]->resume(); return; } $message = $updates; @@ -305,10 +307,10 @@ trait UpdateHandler break; } $update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']]; - $this->feeders[yield from $this->feeders[false]->feedSingle($update)]->resume(); + $this->feeders[yield from $this->feeders[FeedLoop::GENERIC]->feedSingle($update)]->resume(); break; case 'updatesTooLong': - $this->updaters[false]->resume(); + $this->updaters[UpdateLoop::GENERIC]->resume(); break; default: throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.\var_export($updates, true)); @@ -389,7 +391,7 @@ trait UpdateHandler } if ($update['qts'] > $cur_state->qts() + 1) { $this->logger->logger('Qts hole. Fetching updates manually: update qts: '.$update['qts'].' > current qts '.$cur_state->qts().'+1, chat id: '.$update['message']['chat_id'], \danog\MadelineProto\Logger::ERROR); - $this->updaters[false]->resumeDefer(); + $this->updaters[UpdateLoop::GENERIC]->resumeDefer(); return false; } $this->logger->logger('Applying qts: '.$update['qts'].' over current qts '.$cur_state->qts().', chat id: '.$update['message']['chat_id'], \danog\MadelineProto\Logger::VERBOSE); diff --git a/src/danog/MadelineProto/MTProtoTools/UpdatesState.php b/src/danog/MadelineProto/MTProtoTools/UpdatesState.php index 5cee0d16..da291420 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdatesState.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdatesState.php @@ -26,47 +26,37 @@ class UpdatesState { /** * PTS. - * - * @var int */ - private $pts = 1; + private int $pts = 1; /** * QTS. - * - * @var int */ - private $qts = -1; + private int $qts = -1; /** * Seq. - * - * @var int */ - private $seq = 0; + private int $seq = 0; /** * Date. - * - * @var int */ - private $date = 1; + private int $date = 1; /** * Channel ID. * - * @var int|bool + * @var int */ private $channelId; /** * Is busy? - * - * @var bool */ - private $syncLoading = false; + private bool $syncLoading = false; /** * Init function. * * @param array $init Initial parameters - * @param bool $channelId Channel ID + * @param int $channelId Channel ID */ - public function __construct($init = [], $channelId = false) + public function __construct(array $init = [], int $channelId = 0) { $this->channelId = $channelId; $this->update($init); @@ -80,6 +70,16 @@ class UpdatesState { return $this->channelId ? ['pts', 'channelId'] : ['pts', 'qts', 'seq', 'date', 'channelId']; } + /** + * Wakeup function. + */ + public function __wakeup() + { + /** @psalm-suppress DocblockTypeContradiction */ + if ($this->channelId === false) { + $this->channelId = 0; + } + } /** * Is this state relative to a channel? * @@ -92,9 +92,9 @@ class UpdatesState /** * Get the channel ID. * - * @return int|null + * @return int */ - public function getChannel() + public function getChannel(): int { return $this->channelId; } diff --git a/src/danog/MadelineProto/SecretChats/AuthKeyHandler.php b/src/danog/MadelineProto/SecretChats/AuthKeyHandler.php index 3442cbb2..12c793be 100644 --- a/src/danog/MadelineProto/SecretChats/AuthKeyHandler.php +++ b/src/danog/MadelineProto/SecretChats/AuthKeyHandler.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto\SecretChats; +use danog\MadelineProto\Loop\Update\UpdateLoop; use danog\MadelineProto\MTProto; /** @@ -95,7 +96,7 @@ trait AuthKeyHandler $this->checkG($g_a, $dh_config['p']); $res = yield from $this->methodCallAsyncRead('messages.requestEncryption', ['user_id' => $user, 'g_a' => $g_a->toBytes()], ['datacenter' => $this->datacenter->curdc]); $this->temp_requested_secret_chats[$res['id']] = $a; - $this->updaters[false]->resume(); + $this->updaters[UpdateLoop::GENERIC]->resume(); $this->logger->logger('Secret chat '.$res['id'].' requested successfully!', \danog\MadelineProto\Logger::NOTICE); return $res['id']; } @@ -163,7 +164,7 @@ trait AuthKeyHandler $this->temp_rekeyed_secret_chats[$e] = $a; $this->secret_chats[$chat]['rekeying'] = [1, $e]; yield from $this->methodCallAsyncRead('messages.sendEncryptedService', ['peer' => $chat, 'message' => ['_' => 'decryptedMessageService', 'action' => ['_' => 'decryptedMessageActionRequestKey', 'g_a' => $g_a->toBytes(), 'exchange_id' => $e]]], ['datacenter' => $this->datacenter->curdc]); - $this->updaters[false]->resume(); + $this->updaters[UpdateLoop::GENERIC]->resume(); return $e; } /** @@ -203,7 +204,7 @@ trait AuthKeyHandler $g_b = $dh_config['g']->powMod($b, $dh_config['p']); $this->checkG($g_b, $dh_config['p']); yield from $this->methodCallAsyncRead('messages.sendEncryptedService', ['peer' => $chat, 'message' => ['_' => 'decryptedMessageService', 'action' => ['_' => 'decryptedMessageActionAcceptKey', 'g_b' => $g_b->toBytes(), 'exchange_id' => $params['exchange_id'], 'key_fingerprint' => $key['fingerprint']]]], ['datacenter' => $this->datacenter->curdc]); - $this->updaters[false]->resume(); + $this->updaters[UpdateLoop::GENERIC]->resume(); } /** * Commit rekeying of secret chat. @@ -238,7 +239,7 @@ trait AuthKeyHandler $this->secret_chats[$chat]['key'] = $key; $this->secret_chats[$chat]['ttr'] = 100; $this->secret_chats[$chat]['updated'] = \time(); - $this->updaters[false]->resume(); + $this->updaters[UpdateLoop::GENERIC]->resume(); } /** * Complete rekeying. diff --git a/src/danog/MadelineProto/VoIP/AuthKeyHandler.php b/src/danog/MadelineProto/VoIP/AuthKeyHandler.php index a63e2352..db23be2f 100644 --- a/src/danog/MadelineProto/VoIP/AuthKeyHandler.php +++ b/src/danog/MadelineProto/VoIP/AuthKeyHandler.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto\VoIP; +use danog\MadelineProto\Loop\Update\UpdateLoop; use danog\MadelineProto\Tools; /** @@ -109,7 +110,7 @@ trait AuthKeyHandler $res = yield from $this->methodCallAsyncRead('phone.requestCall', ['user_id' => $user, 'g_a_hash' => \hash('sha256', $g_a->toBytes(), true), 'protocol' => ['_' => 'phoneCallProtocol', 'udp_p2p' => true, 'udp_reflector' => true, 'min_layer' => 65, 'max_layer' => \danog\MadelineProto\VoIP::getConnectionMaxLayer()]], ['datacenter' => $this->datacenter->curdc]); $controller->setCall($res['phone_call']); $this->calls[$res['phone_call']['id']] = $controller; - yield $this->updaters[false]->resume(); + yield $this->updaters[UpdateLoop::GENERIC]->resume(); return $controller; } /** @@ -149,7 +150,7 @@ trait AuthKeyHandler throw $e; } $this->calls[$res['phone_call']['id']]->storage['b'] = $b; - yield $this->updaters[false]->resume(); + yield $this->updaters[UpdateLoop::GENERIC]->resume(); return true; } /** diff --git a/src/danog/MadelineProto/Wrappers/Templates.php b/src/danog/MadelineProto/Wrappers/Templates.php index 2ae412a6..b525384d 100644 --- a/src/danog/MadelineProto/Wrappers/Templates.php +++ b/src/danog/MadelineProto/Wrappers/Templates.php @@ -19,8 +19,8 @@ namespace danog\MadelineProto\Wrappers; -use function Amp\ByteStream\getOutputBufferStream; use \danog\MadelineProto\MTProto; +use function Amp\ByteStream\getOutputBufferStream; trait Templates { diff --git a/tools/phar.php b/tools/phar.php index e8c1d099..c70b7941 100644 --- a/tools/phar.php +++ b/tools/phar.php @@ -6,14 +6,14 @@ function ___install_madeline() { if (\count(\debug_backtrace(0)) === 1) { if (isset($GLOBALS['argv']) && !empty($GLOBALS['argv'])) { - $arguments = array_slice($GLOBALS['argv'], 1); + $arguments = \array_slice($GLOBALS['argv'], 1); } elseif (isset($_GET['argv']) && !empty($_GET['argv'])) { $arguments = $_GET['argv']; } else { $arguments = []; } - if (count($arguments) >= 2) { - \define(\MADELINE_WORKER_TYPE::class, array_shift($arguments)); + if (\count($arguments) >= 2) { + \define(\MADELINE_WORKER_TYPE::class, \array_shift($arguments)); \define(\MADELINE_WORKER_ARGS::class, $arguments); } else { die('MadelineProto loader: you must include this file in another PHP script, see https://docs.madelineproto.xyz for more info.'.PHP_EOL);