From bc13e615264cd86132689ecfc326c24571b35efa Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Thu, 24 Sep 2020 11:45:20 +0200 Subject: [PATCH] Final fixes --- .php_cs.dist | 3 +- src/danog/MadelineProto/API.php | 114 +++++++++------ src/danog/MadelineProto/APIWrapper.php | 18 +-- .../MadelineProto/AbstractAPIFactory.php | 26 +++- src/danog/MadelineProto/Ipc/Client.php | 21 +++ src/danog/MadelineProto/Ipc/IpcState.php | 64 +++++++++ .../Ipc/Runner/ProcessRunner.php | 19 ++- .../Ipc/Runner/RunnerAbstract.php | 3 +- .../MadelineProto/Ipc/Runner/WebRunner.php | 16 +-- src/danog/MadelineProto/Ipc/Runner/entry.php | 43 +++--- src/danog/MadelineProto/Ipc/Server.php | 75 ++++++++-- .../MadelineProto/{Ipc => }/LightState.php | 5 +- src/danog/MadelineProto/Logger.php | 5 +- src/danog/MadelineProto/MTProto.php | 8 +- .../MTProtoSession/ResponseHandler.php | 2 +- .../MTProtoTools/PeerHandler.php | 6 +- src/danog/MadelineProto/Serialization.php | 88 +++++------- src/danog/MadelineProto/SessionPaths.php | 130 +++++++++++++++--- .../MadelineProto/Settings/Serialization.php | 30 ++++ src/danog/MadelineProto/Tools.php | 87 ++++++++---- 20 files changed, 557 insertions(+), 206 deletions(-) create mode 100644 src/danog/MadelineProto/Ipc/IpcState.php rename src/danog/MadelineProto/{Ipc => }/LightState.php (94%) diff --git a/.php_cs.dist b/.php_cs.dist index a75477be..98210e9c 100644 --- a/.php_cs.dist +++ b/.php_cs.dist @@ -5,8 +5,7 @@ $config->getFinder() ->in(__DIR__ . '/src') ->in(__DIR__ . '/tests') ->in(__DIR__ . '/examples') - ->in(__DIR__ . '/tools') - ->in(__DIR__); + ->in(__DIR__ . '/tools'); $cacheDir = getenv('TRAVIS') ? getenv('HOME') . '/.php-cs-fixer' : __DIR__; diff --git a/src/danog/MadelineProto/API.php b/src/danog/MadelineProto/API.php index 5aef29bb..07d79270 100644 --- a/src/danog/MadelineProto/API.php +++ b/src/danog/MadelineProto/API.php @@ -19,10 +19,11 @@ namespace danog\MadelineProto; -use Amp\Failure; use Amp\Ipc\Sync\ChannelledSocket; use danog\MadelineProto\Ipc\Client; +use danog\MadelineProto\Ipc\Server; use danog\MadelineProto\Settings\Logger as SettingsLogger; +use danog\MadelineProto\Settings\Serialization as SettingsSerialization; /** * Main API wrapper for MadelineProto. @@ -92,13 +93,6 @@ class API extends InternalDoc */ private $wrapper; - /** - * Global session unlock callback. - * - * @var ?callable - */ - private $unlock; - /** * Magic constructor function. @@ -128,44 +122,20 @@ class API extends InternalDoc /** * Async constructor function. * - * @param Settings|SettingsEmpty $settings Settings + * @param Settings|SettingsEmpty|SettingsSerialization $settings Settings * * @return \Generator */ private function internalInitAPI(SettingsAbstract $settings): \Generator { - Logger::constructorFromSettings($settings instanceof SettingsEmpty - ? new SettingsLogger - : $settings->getLogger()); + Logger::constructorFromSettings($settings instanceof Settings + ? $settings->getLogger() + : new SettingsLogger); - [$unserialized, $this->unlock] = yield Tools::timeoutWithDefault( - Serialization::unserialize($this->session), - 30000, - new Failure(new \RuntimeException("Could not connect to MadelineProto, please check the logs for more details.")) - ); - if ($unserialized instanceof ChannelledSocket) { - $this->API = new Client($unserialized, Logger::$default); - $this->APIFactory(); - return; - } elseif ($unserialized) { - $unserialized->storage = $unserialized->storage ?? []; - $unserialized->session = $this->session; - APIWrapper::link($this, $unserialized); - APIWrapper::link($this->wrapper, $this); - AbstractAPIFactory::link($this->wrapper->getFactory(), $this); - if (isset($this->API)) { - $this->storage = $this->API->storage ?? $this->storage; - - unset($unserialized); - - yield from $this->API->wakeup($settings, $this->wrapper); - $this->APIFactory(); - $this->logger->logger(Lang::$current_lang['madelineproto_ready'], Logger::NOTICE); - return; - } + if (yield from $this->connectToMadelineProto($settings)) { + return; // OK } - - if ($settings instanceof SettingsEmpty) { + if (!$settings instanceof Settings) { $settings = new Settings; } @@ -185,6 +155,61 @@ class API extends InternalDoc $this->logger->logger(Lang::$current_lang['madelineproto_ready'], Logger::NOTICE); } + /** + * Connect to MadelineProto. + * + * @param Settings|SettingsEmpty $settings Settings + * @param bool $forceFull Whether to force full initialization + * + * @return \Generator + */ + protected function connectToMadelineProto(SettingsAbstract $settings, bool $forceFull = false): \Generator + { + if ($settings instanceof SettingsSerialization) { + $forceFull = $forceFull || $settings->getForceFull(); + } elseif ($settings instanceof Settings) { + $forceFull = $forceFull || $settings->getSerialization()->getForceFull(); + } + + [$unserialized, $this->unlock] = yield Tools::timeoutWithDefault( + Serialization::unserialize($this->session, $forceFull), + 30000, + [0, null] + ); + if ($unserialized === 0) { + // Timeout + throw new \RuntimeException("Could not connect to MadelineProto, please check the logs for more details."); + } elseif ($unserialized instanceof \Throwable) { + // IPC server error, try fetching full session + return yield from $this->connectToMadelineProto($settings, true); + } elseif ($unserialized instanceof ChannelledSocket) { + // Success, IPC client + $this->API = new Client($unserialized, Logger::$default); + $this->APIFactory(); + return true; + } elseif ($unserialized) { + // Success, full session + $unserialized->storage = $unserialized->storage ?? []; + $unserialized->session = $this->session; + APIWrapper::link($this, $unserialized); + APIWrapper::link($this->wrapper, $this); + AbstractAPIFactory::link($this->wrapper->getFactory(), $this); + if (isset($this->API)) { + $this->storage = $this->API->storage ?? $this->storage; + + unset($unserialized); + + if ($settings instanceof SettingsSerialization) { + $settings = new SettingsEmpty; + } + yield from $this->API->wakeup($settings, $this->wrapper); + $this->APIFactory(); + $this->logger->logger(Lang::$current_lang['madelineproto_ready'], Logger::NOTICE); + return true; + } + } + return false; + } /** * Wakeup function. * @@ -303,15 +328,24 @@ class API extends InternalDoc { $errors = []; $this->async(true); + + if ($this->API instanceof Client) { + yield $this->API->stopIpcServer(); + yield $this->API->disconnect(); + yield from $this->connectToMadelineProto(new SettingsEmpty, true); + } + + $started = false; while (true) { try { yield $this->start(); + $started = true; yield $this->setEventHandler($eventHandler); return yield from $this->API->loop(); } catch (\Throwable $e) { $errors = [\time() => $errors[\time()] ?? 0]; $errors[\time()]++; - if ($errors[\time()] > 100 && !$this->inited()) { + if ($errors[\time()] > 100 && (!$this->inited() || !$started)) { $this->logger->logger("More than 100 errors in a second and not inited, exiting!", Logger::FATAL_ERROR); return; } diff --git a/src/danog/MadelineProto/APIWrapper.php b/src/danog/MadelineProto/APIWrapper.php index 1fd03d7d..76810b91 100644 --- a/src/danog/MadelineProto/APIWrapper.php +++ b/src/danog/MadelineProto/APIWrapper.php @@ -21,10 +21,8 @@ namespace danog\MadelineProto; use Amp\Promise; use Amp\Success; use danog\MadelineProto\Ipc\Client; -use danog\MadelineProto\Ipc\LightState; use function Amp\File\open; -use function Amp\File\rename as renameAsync; final class APIWrapper { @@ -190,22 +188,10 @@ final class APIWrapper yield from $this->API->initAsynchronously(); } - $file = yield open($this->session->getTempPath(), 'bw+'); - yield $file->write(Serialization::PHP_HEADER); - yield $file->write(\chr(Serialization::VERSION)); - yield $file->write(\serialize($this)); - yield $file->close(); - - yield renameAsync($this->session->getTempPath(), $this->session->getSessionPath()); + yield from $this->session->serialize($this, $this->session->getSessionPath()); if ($this->API) { - $file = yield open($this->session->getTempPath(), 'bw+'); - yield $file->write(Serialization::PHP_HEADER); - yield $file->write(\chr(Serialization::VERSION)); - yield $file->write(\serialize(new LightState($this->API))); - yield $file->close(); - - yield renameAsync($this->session->getTempPath(), $this->session->getIpcStatePath()); + yield from $this->session->storeLightState($this->API); } diff --git a/src/danog/MadelineProto/AbstractAPIFactory.php b/src/danog/MadelineProto/AbstractAPIFactory.php index 2f7b9ef3..26cd7f8f 100644 --- a/src/danog/MadelineProto/AbstractAPIFactory.php +++ b/src/danog/MadelineProto/AbstractAPIFactory.php @@ -20,6 +20,7 @@ namespace danog\MadelineProto; use danog\MadelineProto\Async\AsyncConstruct; +use danog\MadelineProto\Ipc\Client; abstract class AbstractAPIFactory extends AsyncConstruct { @@ -36,7 +37,7 @@ abstract class AbstractAPIFactory extends AsyncConstruct * * @internal * - * @var MTProto + * @var MTProto|Client */ public $API; /** @@ -61,6 +62,12 @@ abstract class AbstractAPIFactory extends AsyncConstruct * @var string[] */ protected array $methods = []; + + /** + * Main API instance. + */ + private API $mainAPI; + /** * Export APIFactory instance with the specified namespace. * @@ -92,6 +99,11 @@ abstract class AbstractAPIFactory extends AsyncConstruct $a->lua =& $b->lua; $a->async =& $b->async; $a->methods =& $b->methods; + if ($b instanceof API) { + $a->mainAPI = $b; + } else { + $a->mainAPI =& $b->mainAPI; + } if (!$b->inited()) { $a->setInitPromise($b->initAsynchronously()); } @@ -173,6 +185,18 @@ abstract class AbstractAPIFactory extends AsyncConstruct $args = isset($arguments[0]) && \is_array($arguments[0]) ? $arguments[0] : []; return yield from $this->API->methodCallAsyncRead($name, $args, $aargs); } + if ($this->API instanceof Client + && ($lower_name === 'seteventhandler' + || ($lower_name === 'loop' && !isset($arguments[0]))) + ) { + yield $this->API->stopIpcServer(); + yield $this->API->disconnect(); + if ($this instanceof API) { + yield from $this->connectToMadelineProto(new SettingsEmpty, true); + } else { + yield from $this->mainAPI->connectToMadelineProto(new SettingsEmpty, true); + } + } $res = $this->methods[$lower_name](...$arguments); return $res instanceof \Generator ? yield from $res : yield $res; } diff --git a/src/danog/MadelineProto/Ipc/Client.php b/src/danog/MadelineProto/Ipc/Client.php index d257097c..e2a43673 100644 --- a/src/danog/MadelineProto/Ipc/Client.php +++ b/src/danog/MadelineProto/Ipc/Client.php @@ -20,6 +20,8 @@ namespace danog\MadelineProto\Ipc; use Amp\Deferred; use Amp\Ipc\Sync\ChannelledSocket; +use Amp\Promise; +use Amp\Success; use danog\MadelineProto\API; use danog\MadelineProto\Exception; use danog\MadelineProto\Logger; @@ -118,6 +120,25 @@ class Client Tools::wait($this->server->disconnect()); } } + /** + * Disconnect cleanly from main instance. + * + * @return Promise + */ + public function disconnect(): Promise + { + return isset($this->server) ? $this->server->disconnect() : new Success(); + } + /** + * Stop IPC server instance. + * + * @internal + */ + public function stopIpcServer(): \Generator + { + yield $this->server->send(Server::SHUTDOWN); + //yield $this->disconnect(); + } /** * Call function. * diff --git a/src/danog/MadelineProto/Ipc/IpcState.php b/src/danog/MadelineProto/Ipc/IpcState.php new file mode 100644 index 00000000..d792da6f --- /dev/null +++ b/src/danog/MadelineProto/Ipc/IpcState.php @@ -0,0 +1,64 @@ +startupTime = \microtime(true); + $this->startupId = $startupId; + $this->exception = $exception; + } + + /** + * Get startup time. + * + * @return float + */ + public function getStartupTime(): float + { + return $this->startupTime; + } + + /** + * Get startup ID. + * + * @return int + */ + public function getStartupId(): int + { + return $this->startupId; + } + + /** + * Get exception. + * + * @return ?\Throwable + */ + public function getException(): ?\Throwable + { + return $this->exception; + } +} diff --git a/src/danog/MadelineProto/Ipc/Runner/ProcessRunner.php b/src/danog/MadelineProto/Ipc/Runner/ProcessRunner.php index 2d510ee3..bf2b02a7 100644 --- a/src/danog/MadelineProto/Ipc/Runner/ProcessRunner.php +++ b/src/danog/MadelineProto/Ipc/Runner/ProcessRunner.php @@ -2,18 +2,28 @@ namespace danog\MadelineProto\Ipc\Runner; +use danog\MadelineProto\Logger; +use danog\MadelineProto\Tools; + final class ProcessRunner extends RunnerAbstract { /** @var string|null Cached path to located PHP binary. */ private static $binaryPath; + /** + * Resources. + */ + private static array $resources = []; /** * Runner. * * @param string $session Session path + * + * @return void */ - public static function start(string $session): void + public static function start(string $session, int $request): void { + $request = Tools::randomInt(); if (\PHP_SAPI === "cli") { $binary = \PHP_BINARY; } else { @@ -29,15 +39,16 @@ final class ProcessRunner extends RunnerAbstract $runner = self::getScriptPath(); $command = \implode(" ", [ - 'nohup', \escapeshellarg($binary), self::formatOptions($options), $runner, 'madeline-ipc', \escapeshellarg($session), - '&>/dev/null &' + $request ]); - \proc_close(\proc_open($command, [], $foo)); + Logger::log("Starting process with $command"); + + self::$resources []= \proc_open($command, [], $foo); } private static function locateBinary(): string { diff --git a/src/danog/MadelineProto/Ipc/Runner/RunnerAbstract.php b/src/danog/MadelineProto/Ipc/Runner/RunnerAbstract.php index e08fc2ef..9d84aa4c 100644 --- a/src/danog/MadelineProto/Ipc/Runner/RunnerAbstract.php +++ b/src/danog/MadelineProto/Ipc/Runner/RunnerAbstract.php @@ -60,8 +60,9 @@ abstract class RunnerAbstract * Runner. * * @param string $session Session path + * @param int $startup ID * * @return void */ - abstract public static function start(string $session): void; + abstract public static function start(string $session, int $startupId): void; } diff --git a/src/danog/MadelineProto/Ipc/Runner/WebRunner.php b/src/danog/MadelineProto/Ipc/Runner/WebRunner.php index 61663636..8a36c45b 100644 --- a/src/danog/MadelineProto/Ipc/Runner/WebRunner.php +++ b/src/danog/MadelineProto/Ipc/Runner/WebRunner.php @@ -2,7 +2,6 @@ namespace danog\MadelineProto\Ipc\Runner; -use Amp\ByteStream\ResourceOutputStream; use Amp\Parallel\Context\ContextException; use danog\MadelineProto\Magic; @@ -15,21 +14,18 @@ final class WebRunner extends RunnerAbstract * Resources. */ private static array $resources = []; - /** - * Socket. - * - * @var ResourceOutputStream - */ - private $res; + /** * Start. * * @param string $session Session path + * + * @return void */ - public static function start(string $session): void + public static function start(string $session, int $id): void { if (!isset($_SERVER['SERVER_NAME'])) { - throw new ContextException("Could not initialize web runner!"); + return; } if (!self::$runPath) { @@ -79,7 +75,7 @@ final class WebRunner extends RunnerAbstract } $params = [ - 'argv' => ['madeline-ipc', $session], + 'argv' => ['madeline-ipc', $session, $id], 'cwd' => Magic::getcwd() ]; diff --git a/src/danog/MadelineProto/Ipc/Runner/entry.php b/src/danog/MadelineProto/Ipc/Runner/entry.php index 8a54511c..bc7def5e 100644 --- a/src/danog/MadelineProto/Ipc/Runner/entry.php +++ b/src/danog/MadelineProto/Ipc/Runner/entry.php @@ -16,12 +16,13 @@ * @link https://docs.madelineproto.xyz MadelineProto documentation */ -use Amp\Deferred; use danog\MadelineProto\API; +use danog\MadelineProto\Ipc\IpcState; use danog\MadelineProto\Ipc\Server; use danog\MadelineProto\Logger; use danog\MadelineProto\Magic; use danog\MadelineProto\SessionPaths; +use danog\MadelineProto\Settings; use danog\MadelineProto\Tools; (static function (): void { @@ -48,6 +49,13 @@ use danog\MadelineProto\Tools; \define(\MADELINE_WORKER_TYPE::class, \array_shift($arguments)); \define(\MADELINE_WORKER_ARGS::class, $arguments); } + + if (\defined(\SIGHUP::class)) { + try { + \pcntl_signal(SIGHUP, fn () => null); + } catch (\Throwable $e) { + } + } if (!\class_exists(API::class)) { $paths = [ \dirname(__DIR__, 7)."/autoload.php", @@ -82,29 +90,32 @@ use danog\MadelineProto\Tools; } \define(\MADELINE_WORKER::class, 1); + $runnerId = \MADELINE_WORKER_ARGS[1]; + $session = new SessionPaths($ipcPath); + try { Magic::classExists(); Magic::$script_cwd = $_GET['cwd'] ?? Magic::getcwd(); - $API = new API($ipcPath); + $API = new API($ipcPath, (new Settings)->getSerialization()->setForceFull(true)); $API->init(); - if ($API->hasEventHandler()) { - unset($API); - \gc_collect_cycles(); - Logger::log("Session has event handler, can't start IPC server like this!"); - $ipc = (new SessionPaths($ipcPath))->getIpcPath(); - @\unlink($ipc); - \file_put_contents($ipc, Server::EVENT_HANDLER); - } else { - $API->initSelfRestart(); - Tools::wait((new Deferred)->promise()); + $API->initSelfRestart(); + Tools::wait($session->storeIpcState(new IpcState($runnerId))); + + while (true) { + try { + Tools::wait(Server::waitShutdown()); + return; + } catch (\Throwable $e) { + Logger::log((string) $e, Logger::FATAL_ERROR); + Tools::wait($API->report("Surfaced: $e")); + } } } catch (\Throwable $e) { Logger::log("Got exception $e in IPC server, exiting...", Logger::FATAL_ERROR); \trigger_error("Got exception $e in IPC server, exiting...", E_USER_ERROR); - if ($e->getMessage() === 'Not inited!') { - $ipc = (new SessionPaths($ipcPath))->getIpcPath(); - @\unlink($ipc); - \file_put_contents($ipc, Server::NOT_INITED); + $ipc = Tools::wait($session->getIpcState()); + if (!($ipc && $ipc->getRunnerId() === $runnerId && !$ipc->getException())) { + Tools::wait($session->storeIpcState(new IpcState($runnerId, $e))); } } } diff --git a/src/danog/MadelineProto/Ipc/Server.php b/src/danog/MadelineProto/Ipc/Server.php index e1610385..d76c5105 100644 --- a/src/danog/MadelineProto/Ipc/Server.php +++ b/src/danog/MadelineProto/Ipc/Server.php @@ -18,13 +18,16 @@ namespace danog\MadelineProto\Ipc; +use Amp\Deferred; use Amp\Ipc\IpcServer; use Amp\Ipc\Sync\ChannelledSocket; +use Amp\Promise; use danog\Loop\SignalLoop; use danog\MadelineProto\Ipc\Runner\ProcessRunner; use danog\MadelineProto\Ipc\Runner\WebRunner; use danog\MadelineProto\Logger; use danog\MadelineProto\Loop\InternalLoop; +use danog\MadelineProto\SessionPaths; use danog\MadelineProto\Tools; /** @@ -34,13 +37,17 @@ class Server extends SignalLoop { use InternalLoop; /** - * Session not initialized, should initialize. + * Shutdown server. */ - const NOT_INITED = 'not inited'; + const SHUTDOWN = 0; /** - * Session uses event handler, should start from main event handler file. + * Boolean to shut down worker, if started. */ - const EVENT_HANDLER = 'event'; + private static bool $shutdown = false; + /** + * Deferred to shut down worker, if started. + */ + private static ?Deferred $shutdownDeferred = null; /** * IPC server. */ @@ -54,31 +61,67 @@ class Server extends SignalLoop */ public function setIpcPath(string $path): void { + self::$shutdownDeferred = new Deferred; $this->server = new IpcServer($path); } /** * Start IPC server in background. * - * @param string $session Session path + * @param SessionPaths $session Session path * - * @return void + * @return Promise */ - public static function startMe(string $session): void + public static function startMe(SessionPaths $session): Promise { + $id = Tools::randomInt(); try { Logger::log("Starting IPC server $session (process)"); - ProcessRunner::start($session); - WebRunner::start($session); - return; + ProcessRunner::start($session, $id); + WebRunner::start($session, $id); + return Tools::call(self::monitor($session, $id)); } catch (\Throwable $e) { Logger::log($e); } try { Logger::log("Starting IPC server $session (web)"); - WebRunner::start($session); + WebRunner::start($session, $id); } catch (\Throwable $e) { Logger::log($e); } + return Tools::call(self::monitor($session, $id)); + } + /** + * Monitor session. + * + * @param SessionPaths $session + * @param int $id + * + * @return \Generator + */ + private static function monitor(SessionPaths $session, int $id): \Generator + { + while (true) { + $state = yield $session->getIpcState(); + if ($state && $state->getStartupId() === $id) { + if ($e = $state->getException()) { + Logger::log("IPC server got exception $e"); + return $e; + } + Logger::log("IPC server started successfully!"); + return true; + } + yield Tools::sleep(1); + } + return false; + } + /** + * Wait for shutdown. + * + * @return Promise + */ + public static function waitShutdown(): Promise + { + return self::$shutdownDeferred->promise(); } /** * Main loop. @@ -104,11 +147,19 @@ class Server extends SignalLoop $this->API->logger("Accepted IPC client connection!"); $id = 0; + $payload = null; try { while ($payload = yield $socket->receive()) { Tools::callFork($this->clientRequest($socket, $id++, $payload)); } - } catch (\Throwable $e) { + } finally { + yield $socket->disconnect(); + if ($payload === self::SHUTDOWN) { + $this->signal(null); + if (self::$shutdownDeferred) { + self::$shutdownDeferred->resolve(); + } + } } } /** diff --git a/src/danog/MadelineProto/Ipc/LightState.php b/src/danog/MadelineProto/LightState.php similarity index 94% rename from src/danog/MadelineProto/Ipc/LightState.php rename to src/danog/MadelineProto/LightState.php index 7410ae34..4c84cf04 100644 --- a/src/danog/MadelineProto/Ipc/LightState.php +++ b/src/danog/MadelineProto/LightState.php @@ -16,10 +16,7 @@ * @link https://docs.madelineproto.xyz MadelineProto documentation */ -namespace danog\MadelineProto\Ipc; - -use danog\MadelineProto\EventHandler; -use danog\MadelineProto\MTProto; +namespace danog\MadelineProto; /** * Light state. diff --git a/src/danog/MadelineProto/Logger.php b/src/danog/MadelineProto/Logger.php index 7db1188e..8b563ebf 100644 --- a/src/danog/MadelineProto/Logger.php +++ b/src/danog/MadelineProto/Logger.php @@ -160,10 +160,7 @@ class Logger */ public static function constructorFromSettings(SettingsLogger $settings): self { - if (!self::$default) { - self::$default = new self($settings); - } - return self::$default; + return self::$default = new self($settings); } /** diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index 1dce31dc..1e21abe6 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -22,6 +22,7 @@ namespace danog\MadelineProto; use Amp\Dns\Resolver; use Amp\File\StatCache; use Amp\Http\Client\HttpClient; +use Amp\Loop; use Amp\Promise; use Closure; use danog\MadelineProto\Async\AsyncConstruct; @@ -1604,11 +1605,12 @@ class MTProto extends AsyncConstruct implements TLCallback /** * Report an error to the previously set peer. * - * @param string $message Error to report + * @param string $message Error to report + * @param string $parseMode Parse mode * * @return \Generator */ - public function report(string $message): \Generator + public function report(string $message, string $parseMode = ''): \Generator { if (!$this->reportDest) { return; @@ -1640,7 +1642,7 @@ class MTProto extends AsyncConstruct implements TLCallback $sent = true; foreach ($this->reportDest as $id) { try { - yield from $this->methodCallAsyncRead('messages.sendMessage', ['peer' => $id, 'message' => $message]); + yield from $this->methodCallAsyncRead('messages.sendMessage', ['peer' => $id, 'message' => $message, 'parse_mode' => $parseMode]); if ($file) { yield from $this->methodCallAsyncRead('messages.sendMedia', ['peer' => $id, 'media' => $file]); } diff --git a/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php b/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php index 8951219d..fa96d6c7 100644 --- a/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php @@ -481,7 +481,7 @@ trait ResponseHandler if (isset($response['_']) && !$this->isCdn() && $this->API->getTL()->getConstructors()->findByPredicate($response['_'])['type'] === 'Updates') { $body = []; if (isset($request['body']['peer'])) { - $body['peer'] = $this->API->getID($request['body']['peer']); + $body['peer'] = \is_string($request['body']['peer']) ? $request['body']['peer'] : $this->API->getId($request['body']['peer']); } if (isset($request['body']['message'])) { $body['message'] = (string) $request['body']['message']; diff --git a/src/danog/MadelineProto/MTProtoTools/PeerHandler.php b/src/danog/MadelineProto/MTProtoTools/PeerHandler.php index ef781ef6..8ee6a12e 100644 --- a/src/danog/MadelineProto/MTProtoTools/PeerHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/PeerHandler.php @@ -1052,9 +1052,13 @@ trait PeerHandler */ public function resolveUsername(string $username): \Generator { + $username = \str_replace('@', '', $username); + if (!$username) { + return false; + } try { $this->caching_simple_username[$username] = true; - $res = yield from $this->methodCallAsyncRead('contacts.resolveUsername', ['username' => \str_replace('@', '', $username)], ['datacenter' => $this->datacenter->curdc]); + $res = yield from $this->methodCallAsyncRead('contacts.resolveUsername', ['username' => $username], ['datacenter' => $this->datacenter->curdc]); } catch (\danog\MadelineProto\RPCErrorException $e) { $this->logger->logger('Username resolution failed with error '.$e->getMessage(), \danog\MadelineProto\Logger::ERROR); if (\strpos($e->rpc, 'FLOOD_WAIT_') === 0 || $e->rpc === 'AUTH_KEY_UNREGISTERED' || $e->rpc === 'USERNAME_INVALID') { diff --git a/src/danog/MadelineProto/Serialization.php b/src/danog/MadelineProto/Serialization.php index bfd1c37b..749e1da2 100644 --- a/src/danog/MadelineProto/Serialization.php +++ b/src/danog/MadelineProto/Serialization.php @@ -19,17 +19,14 @@ namespace danog\MadelineProto; -use Amp\CancellationTokenSource; -use Amp\Ipc\Sync\ChannelledSocket; +use Amp\Deferred; use Amp\Loop; use Amp\Promise; -use danog\MadelineProto\Ipc\LightState; use danog\MadelineProto\Ipc\Server; +use danog\MadelineProto\MTProtoSession\Session; use function Amp\File\exists; use function Amp\File\get; -use function Amp\File\open; -use function Amp\File\stat; use function Amp\Ipc\connect; /** @@ -98,13 +95,14 @@ abstract class Serialization * - Start IPC server * - Store IPC state * - * @param SessionPaths $session Session name + * @param SessionPaths $session Session name + * @param bool $forceFull Whether to force full session deserialization * * @internal * * @return \Generator */ - public static function unserialize(SessionPaths $session): \Generator + public static function unserialize(SessionPaths $session, bool $forceFull = false): \Generator { if (yield exists($session->getSessionPath())) { // Is new session @@ -131,17 +129,18 @@ abstract class Serialization Loop::unreference($warningId); $lightState = null; - $cancelFlock = new CancellationTokenSource; + $cancelFlock = new Deferred; + $cancelIpc = new Deferred; $canContinue = true; $ipcSocket = null; - $unlock = yield Tools::flock($session->getLockPath(), LOCK_EX, 1, $cancelFlock->getToken(), static function () use ($session, $cancelFlock, &$canContinue, &$ipcSocket, &$lightState) { - $ipcSocket = Tools::call(self::tryConnect($session->getIpcPath(), $cancelFlock)); - $session->getIpcState()->onResolve(static function (?\Throwable $e, ?LightState $res) use ($cancelFlock, &$canContinue, &$lightState) { + $unlock = yield from Tools::flockGenerator($session->getLockPath(), LOCK_EX, 1, $cancelFlock->promise(), $forceFull ? null : static function () use ($session, $cancelFlock, $cancelIpc, &$canContinue, &$ipcSocket, &$lightState) { + $ipcSocket = Tools::call(self::tryConnect($session->getIpcPath(), $cancelIpc->promise(), $cancelFlock)); + $session->getLightState()->onResolve(static function (?\Throwable $e, ?LightState $res) use ($cancelFlock, &$canContinue, &$lightState) { if ($res) { $lightState = $res; if (!$res->canStartIpc()) { $canContinue = false; - $cancelFlock->cancel(); + $cancelFlock->resolve(true); } } else { $lightState = false; @@ -154,28 +153,32 @@ abstract class Serialization return $ipcSocket; } if (!$canContinue) { // Have lock, can't use it - Logger::log("IPC WARNING: Session has event handler, but it's not started, and we don't have access to the class, so we can't start it.", Logger::ERROR); - Logger::log("IPC WARNING: Please start the event handler or unset it to use the IPC server.", Logger::ERROR); + Logger::log("Session has event handler, but it's not started.", Logger::ERROR); + Logger::log("We don't have access to the event handler class, so we can't start it.", Logger::ERROR); + Logger::log("Please start the event handler or unset it to use the IPC server.", Logger::ERROR); $unlock(); return $ipcSocket; } try { /** @var LightState */ - $lightState ??= yield $session->getIpcState(); + $lightState ??= yield $session->getLightState(); } catch (\Throwable $e) { } - if ($lightState) { + if ($lightState && !$forceFull) { if (!$class = $lightState->getEventHandler()) { // Unlock and fork $unlock(); - Server::startMe($session); - return $ipcSocket ?? yield from self::tryConnect($session->getIpcPath()); + $cancelIpc->resolve(Server::startMe($session)); + return $ipcSocket ?? yield from self::tryConnect($session->getIpcPath(), $cancelIpc->promise()); } elseif (!\class_exists($class)) { - Logger::log("IPC WARNING: Session has event handler, but it's not started, and we don't have access to the class, so we can't start it.", Logger::ERROR); - Logger::log("IPC WARNING: Please start the event handler or unset it to use the IPC server.", Logger::ERROR); - return $ipcSocket ?? yield from self::tryConnect($session->getIpcPath()); + // Have lock, can't use it + $unlock(); + Logger::log("Session has event handler, but it's not started.", Logger::ERROR); + Logger::log("We don't have access to the event handler class, so we can't start it.", Logger::ERROR); + Logger::log("Please start the event handler or unset it to use the IPC server.", Logger::ERROR); + return $ipcSocket ?? yield from self::tryConnect($session->getIpcPath(), $cancelIpc->promise()); } } @@ -187,7 +190,7 @@ abstract class Serialization Logger::log("Got exclusive session lock!"); if ($isNew) { - $unserialized = yield from self::newUnserialize($session->getSessionPath()); + $unserialized = yield from $session->unserialize(); } else { $unserialized = yield from self::legacyUnserialize($session->getLegacySessionPath()); } @@ -203,51 +206,36 @@ abstract class Serialization /** * Try connecting to IPC socket. * - * @param string $ipcPath IPC path - * @param ?CancellationTokenSource $cancel Cancelation token + * @param string $ipcPath IPC path + * @param Promise $cancelConnect Cancelation token (triggers cancellation of connection) + * @param ?Deferred $cancelFull Cancelation token source (can trigger cancellation of full unserialization) * - * @return \Generator, mixed, void> + * @return \Generator */ - private static function tryConnect(string $ipcPath, ?CancellationTokenSource $cancel = null): \Generator + private static function tryConnect(string $ipcPath, Promise $cancelConnect, ?Deferred $cancelFull = null): \Generator { for ($x = 0; $x < 30; $x++) { Logger::log("Trying to connect to IPC socket..."); try { \clearstatcache(true, $ipcPath); $socket = yield connect($ipcPath); - if ($cancel) { - $cancel->cancel(); + if ($cancelFull) { + $cancelFull->resolve(true); } return [$socket, null]; } catch (\Throwable $e) { $e = $e->getMessage(); Logger::log("$e while connecting to IPC socket"); } - yield Tools::sleep(1); + if ($res = yield Tools::timeoutWithDefault($cancelConnect, 1000, null)) { + if ($res instanceof \Throwable) { + return [$res, null]; + } + $cancelConnect = (new Deferred)->promise(); + } } } - /** - * @internal Deserialize new object - * - * @param string $path - * @return \Generator - */ - public static function newUnserialize(string $path): \Generator - { - $headerLen = \strlen(self::PHP_HEADER) + 1; - - $file = yield open($path, 'rb'); - $size = yield stat($path); - $size = $size['size'] ?? $headerLen; - - yield $file->seek($headerLen); // Skip version for now - $unserialized = \unserialize((yield $file->read($size - $headerLen)) ?? ''); - yield $file->close(); - - return $unserialized; - } - /** * Deserialize legacy session. * diff --git a/src/danog/MadelineProto/SessionPaths.php b/src/danog/MadelineProto/SessionPaths.php index 0e21fe62..f3056a48 100644 --- a/src/danog/MadelineProto/SessionPaths.php +++ b/src/danog/MadelineProto/SessionPaths.php @@ -19,8 +19,14 @@ namespace danog\MadelineProto; +use Amp\File\StatCache; use Amp\Promise; -use danog\MadelineProto\Ipc\LightState; +use Amp\Success; +use danog\MadelineProto\Ipc\IpcState; + +use function Amp\File\exists; +use function Amp\File\open; +use function Amp\File\rename; /** * Session path information. @@ -48,9 +54,14 @@ class SessionPaths */ private string $ipcStatePath; /** - * Temporary serialization path. + * Light state path. */ - private string $tempPath; + private string $lightStatePath; + /** + * Light state. + */ + private ?LightState $lightState = null; + /** * Construct session info from session name. * @@ -61,11 +72,57 @@ class SessionPaths $session = Tools::absolute($session); $this->legacySessionPath = $session; $this->sessionPath = "$session.safe.php"; + $this->lightStatePath = "$session.lightState.php"; $this->lockPath = "$session.lock"; $this->ipcPath = "$session.ipc"; $this->ipcStatePath = "$session.ipcState.php"; - $this->tempPath = "$session.temp.php"; } + /** + * Serialize object to file. + * + * @param object $object + * @param string $path + * @return \Generator + */ + public function serialize(object $object, string $path): \Generator + { + $file = yield open("$path.temp.php", 'bw+'); + yield $file->write(Serialization::PHP_HEADER); + yield $file->write(\chr(Serialization::VERSION)); + yield $file->write(\serialize($object)); + yield $file->close(); + + yield rename("$path.temp.php", $path); + } + + /** + * Deserialize new object. + * + * @param string $path Object path, defaults to session path + * + * @return \Generator + */ + public function unserialize(string $path = ''): \Generator + { + $path = $path ?: $this->sessionPath; + + StatCache::clear($path); + if (!yield exists($path)) { + return null; + } + $headerLen = \strlen(Serialization::PHP_HEADER) + 1; + + $file = yield open($path, 'rb'); + $size = yield \stat($path); + $size = $size['size'] ?? $headerLen; + + yield $file->seek($headerLen); // Skip version for now + $unserialized = \unserialize((yield $file->read($size - $headerLen)) ?? ''); + yield $file->close(); + + return $unserialized; + } + /** * Get session path. * @@ -116,16 +173,6 @@ class SessionPaths return $this->ipcPath; } - /** - * Get temporary serialization path. - * - * @return string - */ - public function getTempPath(): string - { - return $this->tempPath; - } - /** * Get IPC light state path. * @@ -139,10 +186,61 @@ class SessionPaths /** * Get IPC state. * - * @return Promise + * @return Promise */ public function getIpcState(): Promise { - return Tools::call(Serialization::newUnserialize($this->ipcStatePath)); + return Tools::call($this->unserialize($this->ipcStatePath)); + } + + /** + * Store IPC state. + * + * @return \Generator + */ + public function storeIpcState(IpcState $state): \Generator + { + return $this->serialize($state, $this->getIpcStatePath()); + } + + + /** + * Get light state path. + * + * @return string + */ + public function getLightStatePath(): string + { + return $this->lightStatePath; + } + + /** + * Get light state. + * + * @return Promise + */ + public function getLightState(): Promise + { + if ($this->lightState) { + return new Success($this->lightState); + } + $promise = Tools::call($this->unserialize($this->lightStatePath)); + $promise->onResolve(function (?\Throwable $e, ?LightState $res) { + if ($res) { + $this->lightState = $res; + } + }); + return $promise; + } + + /** + * Store light state. + * + * @return \Generator + */ + public function storeLightState(MTProto $state): \Generator + { + $this->lightState = new LightState($state); + return $this->serialize($this->lightState, $this->getLightStatePath()); } } diff --git a/src/danog/MadelineProto/Settings/Serialization.php b/src/danog/MadelineProto/Settings/Serialization.php index 8da90224..ae884c17 100644 --- a/src/danog/MadelineProto/Settings/Serialization.php +++ b/src/danog/MadelineProto/Settings/Serialization.php @@ -10,6 +10,12 @@ class Serialization extends SettingsAbstract * Serialization interval, in seconds. */ protected int $interval = 30; + /** + * Whether to force full deserialization of instance, without using the IPC server/client. + * + * WARNING: this will cause slow startup if enabled. + */ + protected bool $forceFull = false; public function mergeArray(array $settings): void { @@ -40,4 +46,28 @@ class Serialization extends SettingsAbstract return $this; } + + /** + * Get WARNING: this will cause slow startup if enabled. + * + * @return bool + */ + public function getForceFull(): bool + { + return $this->forceFull; + } + + /** + * Set WARNING: this will cause slow startup if enabled. + * + * @param bool $forceFull WARNING: this will cause slow startup if enabled. + * + * @return self + */ + public function setForceFull(bool $forceFull): self + { + $this->forceFull = $forceFull; + + return $this; + } } diff --git a/src/danog/MadelineProto/Tools.php b/src/danog/MadelineProto/Tools.php index 7edda11e..d537b717 100644 --- a/src/danog/MadelineProto/Tools.php +++ b/src/danog/MadelineProto/Tools.php @@ -19,18 +19,18 @@ namespace danog\MadelineProto; -use Amp\CancellationToken; use Amp\Deferred; use Amp\Failure; use Amp\File\StatCache; use Amp\Loop; -use Amp\NullCancellationToken; use Amp\Promise; use Amp\Success; +use Amp\TimeoutException; use tgseclib\Math\BigInteger; use function Amp\ByteStream\getOutputBufferStream; use function Amp\ByteStream\getStdin; use function Amp\ByteStream\getStdout; +use function Amp\delay; use function Amp\File\exists; use function Amp\File\get; use function Amp\Promise\all; @@ -385,7 +385,25 @@ abstract class Tools extends StrTools */ public static function timeout($promise, int $timeout): Promise { - return timeout(self::call($promise), $timeout); + $promise = self::call($promise); + + $deferred = new Deferred; + + $watcher = Loop::delay($timeout, static function () use (&$deferred) { + $temp = $deferred; // prevent double resolve + $deferred = null; + $temp->fail(new TimeoutException); + }); + Loop::unreference($watcher); + + $promise->onResolve(function () use (&$deferred, $promise, $watcher) { + if ($deferred !== null) { + Loop::cancel($watcher); + $deferred->resolve($promise); + } + }); + + return $deferred->promise(); } /** * Creates an artificial timeout for any `Promise`. @@ -406,7 +424,25 @@ abstract class Tools extends StrTools */ public static function timeoutWithDefault($promise, int $timeout, $default = null): Promise { - return timeoutWithDefault(self::call($promise), $timeout, $default); + $promise = self::call($promise); + + $deferred = new Deferred; + + $watcher = Loop::delay($timeout, static function () use (&$deferred, $default) { + $temp = $deferred; // prevent double resolve + $deferred = null; + $temp->resolve($default); + }); + Loop::unreference($watcher); + + $promise->onResolve(function () use (&$deferred, $promise, $watcher) { + if ($deferred !== null) { + Loop::cancel($watcher); + $deferred->resolve($promise); + } + }); + + return $deferred->promise(); } /** * Convert generator, promise or any other value to a promise. @@ -541,34 +577,35 @@ abstract class Tools extends StrTools * Asynchronously lock a file * Resolves with a callbable that MUST eventually be called in order to release the lock. * - * @param string $file File to lock - * @param integer $operation Locking mode - * @param float $polling Polling interval - * @param CancellationToken $token Cancellation token - * @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails. + * @param string $file File to lock + * @param integer $operation Locking mode + * @param float $polling Polling interval + * @param ?Promise $token Cancellation token + * @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails. * * @return Promise */ - public static function flock(string $file, int $operation, float $polling = 0.1, $token = null, $failureCb = null): Promise + public static function flock(string $file, int $operation, float $polling = 0.1, ?Promise $token = null, $failureCb = null): Promise { return self::call(Tools::flockGenerator($file, $operation, $polling, $token, $failureCb)); } /** * Asynchronously lock a file (internal generator function). * - * @param string $file File to lock - * @param integer $operation Locking mode - * @param float $polling Polling interval - * @param CancellationToken $token Cancellation token - * @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails. + * @param string $file File to lock + * @param integer $operation Locking mode + * @param float $polling Polling interval + * @param ?Promise $token Cancellation token + * @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails. * * @internal Generator function * * @return \Generator */ - public static function flockGenerator(string $file, int $operation, float $polling, $token = null, $failureCb = null): \Generator + public static function flockGenerator(string $file, int $operation, float $polling, ?Promise $token = null, $failureCb = null): \Generator { - $token = $token ?? new NullCancellationToken; + $polling *= 1000; + $polling = (int) $polling; if (!yield exists($file)) { yield \touch($file); StatCache::clear($file); @@ -579,15 +616,15 @@ abstract class Tools extends StrTools $result = \flock($res, $operation); if (!$result) { if ($failureCb) { - Tools::callFork($failureCb()); + $failureCb(); $failureCb = null; } - if ($token->isRequested()) { - return null; - } - yield self::sleep($polling); - if ($token->isRequested()) { - return null; + if ($token) { + if (yield Tools::timeoutWithDefault($token, $polling, false)) { + return; + } + } else { + yield delay($polling); } } } while (!$result); @@ -898,7 +935,7 @@ abstract class Tools extends StrTools public static function absolute(string $file): string { if (($file[0] ?? '') !== '/' && ($file[1] ?? '') !== ':' && !\in_array(\substr($file, 0, 4), ['phar', 'http'])) { - $file = Magic::getcwd().'/'.$file; + $file = Magic::getcwd().DIRECTORY_SEPARATOR.$file; } return $file; }