diff --git a/composer.json b/composer.json index 22ec97c6..c2060f4c 100644 --- a/composer.json +++ b/composer.json @@ -39,7 +39,8 @@ "amphp/log": "^1.1", "danog/loop": "^0.1.0", "danog/tgseclib": "^3", - "amphp/redis": "^1.0" + "amphp/redis": "^1.0", + "symfony/polyfill-php80": "^1.18" }, "require-dev": { "vlucas/phpdotenv": "^3", diff --git a/src/danog/MadelineProto/API.php b/src/danog/MadelineProto/API.php index 5a924878..5aef29bb 100644 --- a/src/danog/MadelineProto/API.php +++ b/src/danog/MadelineProto/API.php @@ -19,6 +19,9 @@ namespace danog\MadelineProto; +use Amp\Failure; +use Amp\Ipc\Sync\ChannelledSocket; +use danog\MadelineProto\Ipc\Client; use danog\MadelineProto\Settings\Logger as SettingsLogger; /** @@ -39,7 +42,7 @@ class API extends InternalDoc /** * Instance of MadelineProto. * - * @var null|MTProto + * @var null|MTProto|Client */ public $API; @@ -107,11 +110,11 @@ class API extends InternalDoc */ public function __magic_construct(string $session, $settings = []): void { + Magic::classExists(true); $settings = Settings::parseFromLegacy($settings); $this->session = new SessionPaths($session); $this->wrapper = new APIWrapper($this, $this->exportNamespace()); - Magic::classExists(true); $this->setInitPromise($this->internalInitAPI($settings)); foreach (\get_class_vars(APIFactory::class) as $key => $var) { if (\in_array($key, ['namespace', 'API', 'lua', 'async', 'asyncAPIPromise', 'methods'])) { @@ -135,8 +138,16 @@ class API extends InternalDoc ? new SettingsLogger : $settings->getLogger()); - [$unserialized, $this->unlock] = yield from Serialization::legacyUnserialize($this->session); - if ($unserialized) { + [$unserialized, $this->unlock] = yield Tools::timeoutWithDefault( + Serialization::unserialize($this->session), + 30000, + new Failure(new \RuntimeException("Could not connect to MadelineProto, please check the logs for more details.")) + ); + if ($unserialized instanceof ChannelledSocket) { + $this->API = new Client($unserialized, Logger::$default); + $this->APIFactory(); + return; + } elseif ($unserialized) { $unserialized->storage = $unserialized->storage ?? []; $unserialized->session = $this->session; APIWrapper::link($this, $unserialized); @@ -216,9 +227,11 @@ class API extends InternalDoc private function APIFactory(): void { if ($this->API && $this->API->inited()) { - foreach ($this->API->getMethodNamespaces() as $namespace) { - if (!$this->{$namespace}) { - $this->{$namespace} = $this->exportNamespace($namespace); + if ($this->API instanceof MTProto) { + foreach ($this->API->getMethodNamespaces() as $namespace) { + if (!$this->{$namespace}) { + $this->{$namespace} = $this->exportNamespace($namespace); + } } } $this->methods = self::getInternalMethodList($this->API); diff --git a/src/danog/MadelineProto/APIWrapper.php b/src/danog/MadelineProto/APIWrapper.php index 3612ec0a..1c295466 100644 --- a/src/danog/MadelineProto/APIWrapper.php +++ b/src/danog/MadelineProto/APIWrapper.php @@ -20,8 +20,9 @@ namespace danog\MadelineProto; use Amp\Promise; use Amp\Success; +use danog\MadelineProto\Ipc\LightState; -use function Amp\File\put; +use function Amp\File\open; use function Amp\File\rename as renameAsync; final class APIWrapper @@ -29,9 +30,9 @@ final class APIWrapper /** * MTProto instance. * - * @var ?MTProto + * @var MTProto|null|Client */ - private ?MTProto $API = null; + private $API = null; /** * Session path. @@ -173,7 +174,7 @@ final class APIWrapper /** * Serialize session. * - * @return Promise + * @return Promise */ public function serialize(): Promise { @@ -185,11 +186,30 @@ final class APIWrapper yield from $this->API->initAsynchronously(); } - $wrote = yield put($this->session->getTempPath(), \serialize($this)); + $file = yield open($this->session->getTempPath(), 'bw+'); + yield $file->write(Serialization::PHP_HEADER); + yield $file->write(\chr(Serialization::VERSION)); + yield $file->write(\serialize($this)); + yield $file->close(); + yield renameAsync($this->session->getTempPath(), $this->session->getSessionPath()); + if ($this->API) { + $file = yield open($this->session->getTempPath(), 'bw+'); + yield $file->write(Serialization::PHP_HEADER); + yield $file->write(\chr(Serialization::VERSION)); + yield $file->write(\serialize(new LightState($this->API))); + yield $file->close(); + + yield renameAsync($this->session->getTempPath(), $this->session->getIpcStatePath()); + } + + + // Truncate legacy session + yield (yield open($this->session->getLegacySessionPath(), 'w'))->close(); + Logger::log('Saved session!'); - return $wrote; + return true; })()); } } diff --git a/src/danog/MadelineProto/FastAPI.php b/src/danog/MadelineProto/FastAPI.php index 1c07dee5..93b81299 100644 --- a/src/danog/MadelineProto/FastAPI.php +++ b/src/danog/MadelineProto/FastAPI.php @@ -21,6 +21,7 @@ namespace danog\MadelineProto; use Amp\File\StatCache; use Amp\Ipc\Sync\ChannelledSocket; +use Amp\Promise; use danog\MadelineProto\Ipc\Client; use danog\MadelineProto\Ipc\Server; @@ -127,7 +128,7 @@ class FastAPI extends API * * @param string $ipcPath IPC path * - * @return \Generator + * @return \Generator, mixed, ChannelledSocket|null> */ private function tryConnect(string $ipcPath): \Generator { diff --git a/src/danog/MadelineProto/Ipc/LightState.php b/src/danog/MadelineProto/Ipc/LightState.php new file mode 100644 index 00000000..23d29dd7 --- /dev/null +++ b/src/danog/MadelineProto/Ipc/LightState.php @@ -0,0 +1,64 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2020 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Ipc; + +use danog\MadelineProto\MTProto; +use danog\MadelineProto\EventHandler; + +/** + * Light state. + * + * @internal + */ +final class LightState +{ + /** + * Event handler class name. + * + * @var null|class-string + */ + private ?string $eventHandler; + + public function __construct(MTProto $API) + { + if ($API->hasEventHandler()) { + $this->eventHandler = \get_class($API->getEventHandler()); + } + } + + /** + * Check whether we can start IPC. + * + * @return boolean + */ + public function canStartIpc(): bool + { + return !$this->eventHandler || \class_exists($this->eventHandler); + } + + /** + * Get event handler class name. + * + * @return null|class-string + */ + public function getEventHandler(): ?string + { + return $this->eventHandler; + } +} diff --git a/src/danog/MadelineProto/Logger.php b/src/danog/MadelineProto/Logger.php index 806cade6..7db1188e 100644 --- a/src/danog/MadelineProto/Logger.php +++ b/src/danog/MadelineProto/Logger.php @@ -156,14 +156,14 @@ class Logger * * @param SettingsLogger $settings Settings instance * - * @return void + * @return self */ - public static function constructorFromSettings(SettingsLogger $settings): void + public static function constructorFromSettings(SettingsLogger $settings): self { if (!self::$default) { - // The getLogger function will automatically init the static logger, but we'll do it again anyway self::$default = new self($settings); } + return self::$default; } /** diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index abf9c7fb..1dce31dc 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -199,7 +199,7 @@ class MTProto extends AsyncConstruct implements TLCallback 'msg_resend_req', 'msg_resend_ans_req', ]; - const DEFAULT_GETUPDATES_PARAMS = ['offset' => 0, 'limit' => null, 'timeout' => 0]; + const DEFAULT_GETUPDATES_PARAMS = ['offset' => 0, 'limit' => null, 'timeout' => 100]; /** * Instance of wrapper API. * diff --git a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php index 0167f4f8..61b41d29 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php @@ -28,6 +28,7 @@ use danog\MadelineProto\MTProto; use danog\MadelineProto\RPCErrorException; use danog\MadelineProto\Settings; +use danog\MadelineProto\Tools; /** * Manages updates. @@ -59,13 +60,11 @@ trait UpdateHandler public function getUpdates($params = []): \Generator { $this->updateHandler = MTProto::GETUPDATES_HANDLER; - $params = \array_merge(MTProto::DEFAULT_GETUPDATES_PARAMS, $params); + $params = MTProto::DEFAULT_GETUPDATES_PARAMS + $params; if (empty($this->updates)) { $this->update_deferred = new Deferred(); - if (!$params['timeout']) { - $params['timeout'] = 0.001; - } - yield from $this->waitUpdate(); + $params['timeout'] *= 1000; + yield Tools::timeoutWithDefault($this->waitUpdate(), $params['timeout'] ?: 100000); } if (empty($this->updates)) { return $this->updates; diff --git a/src/danog/MadelineProto/Serialization.php b/src/danog/MadelineProto/Serialization.php index e21a3bc5..330fc36e 100644 --- a/src/danog/MadelineProto/Serialization.php +++ b/src/danog/MadelineProto/Serialization.php @@ -19,18 +19,84 @@ namespace danog\MadelineProto; +use Amp\CancellationTokenSource; +use Amp\Ipc\Sync\ChannelledSocket; use Amp\Loop; +use Amp\Promise; +use danog\MadelineProto\Ipc\LightState; +use danog\MadelineProto\Ipc\Server; use function Amp\File\exists; use function Amp\File\get; +use function Amp\File\open; +use function Amp\File\stat; +use function Amp\Ipc\connect; /** * Manages serialization of the MadelineProto instance. */ -class Serialization +abstract class Serialization { /** - * Unserialize legacy session. + * Header for session files. + */ + const PHP_HEADER = 'getSessionPath())) { - Logger::log('Waiting for exclusive session lock...'); - $warningId = Loop::delay(1000, static function () use (&$warningId) { - Logger::log("It seems like the session is busy."); - if (\defined(\MADELINE_WORKER::class)) { - Logger::log("Exiting since we're in a worker"); - Magic::shutdown(1); - } - Logger::log("Telegram does not support starting multiple instances of the same session, make sure no other instance of the session is running."); - $warningId = Loop::repeat(5000, fn () => Logger::log('Still waiting for exclusive session lock...')); - Loop::unreference($warningId); - }); + // Is new session + $isNew = true; + } elseif (yield exists($session->getLegacySessionPath())) { + // Is old session + $isNew = false; + } else { + // No session exists yet, lock for when we create it + return [null, yield Tools::flock($session->getLockPath(), LOCK_EX, 1)]; + } + + Logger::log('Waiting for exclusive session lock...'); + $warningId = Loop::delay(1000, static function () use (&$warningId) { + Logger::log("It seems like the session is busy."); + if (\defined(\MADELINE_WORKER::class)) { + Logger::log("Exiting since we're in a worker"); + Magic::shutdown(1); + } + Logger::log("Telegram does not support starting multiple instances of the same session, make sure no other instance of the session is running."); + $warningId = Loop::repeat(5000, fn () => Logger::log('Still waiting for exclusive session lock...')); Loop::unreference($warningId); - $unlock = yield Tools::flock($session->getLockPath(), LOCK_EX, 1); - Loop::cancel($warningId); - $tempId = Shutdown::addCallback($unlock = static function () use ($unlock) { - Logger::log("Unlocking exclusive session lock!"); - $unlock(); - Logger::log("Unlocked exclusive session lock!"); + }); + Loop::unreference($warningId); + + $lightState = null; + $cancelFlock = new CancellationTokenSource; + $canContinue = true; + $ipcSocket = null; + $unlock = yield Tools::flock($session->getLockPath(), LOCK_EX, 1, $cancelFlock->getToken(), static function () use ($session, $cancelFlock, &$canContinue, &$ipcSocket, &$lightState) { + $ipcSocket = Tools::call(self::tryConnect($session->getIpcPath(), $cancelFlock)); + $session->getIpcState()->onResolve(static function (?\Throwable $e, ?LightState $res) use ($cancelFlock, &$canContinue, &$lightState) { + if ($res) { + $lightState = $res; + if (!$res->canStartIpc()) { + $canContinue = false; + $cancelFlock->cancel(); + } + } }); - Logger::log("Got exclusive session lock!"); + }); + Loop::cancel($warningId); - $tounserialize = yield get($session->getSessionPath()); + if (!$unlock) { // Canceled, don't have lock + return [yield $ipcSocket, null]; + } + if (!$canContinue) { // Canceled, but have lock already + Logger::log("IPC WARNING: Session has event handler, but it's not started, and we don't have access to the class, so we can't start it.", Logger::ERROR); + Logger::log("IPC WARNING: Please start the event handler or unset it to use the IPC server.", Logger::ERROR); + $unlock(); + return [yield $ipcSocket, null]; + } - Magic::classExists(); + try { + /** @var LightState */ + $lightState ??= yield $session->getIpcState(); + } catch (\Throwable $e) { + } + + if ($lightState) { + if (!$class = $lightState->getEventHandler()) { + // Unlock and fork + $unlock(); + Server::startMe($session); + return [$ipcSocket ?? yield from self::tryConnect($session->getIpcPath()), null]; + } elseif (!\class_exists($class)) { + return [$ipcSocket ?? yield from self::tryConnect($session->getIpcPath()), null]; + } + } + + $tempId = Shutdown::addCallback($unlock = static function () use ($unlock) { + Logger::log("Unlocking exclusive session lock!"); + $unlock(); + Logger::log("Unlocked exclusive session lock!"); + }); + Logger::log("Got exclusive session lock!"); + + if ($isNew) { + $unserialized = yield from self::newUnserialize($session->getSessionPath()); + } else { + $unserialized = yield from self::legacyUnserialize($session); + } + + if ($unserialized === false) { + throw new Exception(\danog\MadelineProto\Lang::$current_lang['deserialization_error']); + } + + Shutdown::removeCallback($tempId); + return [$unserialized, $unlock]; + } + + /** + * Try connecting to IPC socket. + * + * @param string $ipcPath IPC path + * @param ?CancellationTokenSource $cancel Cancelation token + * + * @return \Generator, mixed, void> + */ + private static function tryConnect(string $ipcPath, ?CancellationTokenSource $cancel = null): \Generator + { + for ($x = 0; $x < 30; $x++) { + Logger::log("Trying to connect to IPC socket..."); try { - $unserialized = \unserialize($tounserialize); - } catch (\danog\MadelineProto\Bug74586Exception $e) { - \class_exists('\\Volatile'); - $tounserialize = \str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize); - foreach (['RSA', 'TL\\TLMethods', 'TL\\TLConstructors', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) { - \class_exists('\\danog\\MadelineProto\\'.$class); - } - $unserialized = \danog\Serialization::unserialize($tounserialize); - } catch (\danog\MadelineProto\Exception $e) { - if ($e->getFile() === 'MadelineProto' && $e->getLine() === 1) { - throw $e; - } - if (@\constant("MADELINEPROTO_TEST") === 'pony') { - throw $e; - } - \class_exists('\\Volatile'); - foreach (['RSA', 'TL\\TLMethods', 'TL\\TLConstructors', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) { - \class_exists('\\danog\\MadelineProto\\'.$class); - } - $changed = false; - if (\strpos($tounserialize, 'O:26:"danog\\MadelineProto\\Button":') !== false) { - Logger::log("SUBBING BUTTONS!"); - $tounserialize = \str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize); - $changed = true; - } - if (\strpos($e->getMessage(), "Erroneous data format for unserializing 'phpseclib\\Math\\BigInteger'") === 0) { - Logger::log("SUBBING BIGINTEGOR!"); - $tounserialize = \str_replace('phpseclib\\Math\\BigInteger', 'phpseclib\\Math\\BigIntegor', $tounserialize); - $changed = true; - } - if (\strpos($tounserialize, 'C:25:"phpseclib\\Math\\BigInteger"') !== false) { - Logger::log("SUBBING TGSECLIB old!"); - $tounserialize = \str_replace('C:25:"phpseclib\\Math\\BigInteger"', 'C:24:"tgseclib\\Math\\BigInteger"', $tounserialize); - $changed = true; - } - if (\strpos($tounserialize, 'C:26:"phpseclib3\\Math\\BigInteger"') !== false) { - Logger::log("SUBBING TGSECLIB!"); - $tounserialize = \str_replace('C:26:"phpseclib3\\Math\\BigInteger"', 'C:24:"tgseclib\\Math\\BigInteger"', $tounserialize); - $changed = true; - } - Logger::log((string) $e, Logger::ERROR); - if (!$changed) { - throw $e; - } - try { - $unserialized = \danog\Serialization::unserialize($tounserialize); - } catch (\Throwable $e) { - $unserialized = \unserialize($tounserialize); + \clearstatcache(true, $ipcPath); + $socket = yield connect($ipcPath); + if ($cancel) { + $cancel->cancel(); } + return $socket; } catch (\Throwable $e) { - Logger::log((string) $e, Logger::ERROR); - throw $e; + $e = $e->getMessage(); + Logger::log("$e while connecting to IPC socket"); } - if ($unserialized instanceof \danog\PlaceHolder) { - $unserialized = \danog\Serialization::unserialize($tounserialize); - } - if ($unserialized === false) { - throw new Exception(\danog\MadelineProto\Lang::$current_lang['deserialization_error']); - } - Shutdown::removeCallback($tempId); - return [$unserialized, $unlock]; + yield Tools::sleep(1); } } + + /** + * @internal Deserialize new object + * + * @param string $path + * @return \Generator + */ + public static function newUnserialize(string $path): \Generator + { + $headerLen = \strlen(self::PHP_HEADER) + 1; + + $file = yield open($path, 'rb'); + $size = yield stat($path); + $size = $size['size'] ?? $headerLen; + + yield $file->seek($headerLen); // Skip version for now + $unserialized = \unserialize((yield $file->read($size - $headerLen)) ?? ''); + yield $file->close(); + + return $unserialized; + } + + /** + * Deserialize legacy session. + * + * @param SessionPaths $session + * @return \Generator + */ + private static function legacyUnserialize(SessionPaths $session): \Generator + { + $tounserialize = yield get($session->getLegacySessionPath()); + + try { + $unserialized = \unserialize($tounserialize); + } catch (\danog\MadelineProto\Bug74586Exception $e) { + \class_exists('\\Volatile'); + $tounserialize = \str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize); + foreach (['RSA', 'TL\\TLMethods', 'TL\\TLConstructors', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) { + \class_exists('\\danog\\MadelineProto\\'.$class); + } + $unserialized = \danog\Serialization::unserialize($tounserialize); + } catch (\danog\MadelineProto\Exception $e) { + if ($e->getFile() === 'MadelineProto' && $e->getLine() === 1) { + throw $e; + } + if (@\constant("MADELINEPROTO_TEST") === 'pony') { + throw $e; + } + \class_exists('\\Volatile'); + foreach (['RSA', 'TL\\TLMethods', 'TL\\TLConstructors', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) { + \class_exists('\\danog\\MadelineProto\\'.$class); + } + $changed = false; + if (\strpos($tounserialize, 'O:26:"danog\\MadelineProto\\Button":') !== false) { + Logger::log("SUBBING BUTTONS!"); + $tounserialize = \str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize); + $changed = true; + } + if (\strpos($e->getMessage(), "Erroneous data format for unserializing 'phpseclib\\Math\\BigInteger'") === 0) { + Logger::log("SUBBING BIGINTEGOR!"); + $tounserialize = \str_replace('phpseclib\\Math\\BigInteger', 'phpseclib\\Math\\BigIntegor', $tounserialize); + $changed = true; + } + if (\strpos($tounserialize, 'C:25:"phpseclib\\Math\\BigInteger"') !== false) { + Logger::log("SUBBING TGSECLIB old!"); + $tounserialize = \str_replace('C:25:"phpseclib\\Math\\BigInteger"', 'C:24:"tgseclib\\Math\\BigInteger"', $tounserialize); + $changed = true; + } + if (\strpos($tounserialize, 'C:26:"phpseclib3\\Math\\BigInteger"') !== false) { + Logger::log("SUBBING TGSECLIB!"); + $tounserialize = \str_replace('C:26:"phpseclib3\\Math\\BigInteger"', 'C:24:"tgseclib\\Math\\BigInteger"', $tounserialize); + $changed = true; + } + Logger::log((string) $e, Logger::ERROR); + if (!$changed) { + throw $e; + } + try { + $unserialized = \danog\Serialization::unserialize($tounserialize); + } catch (\Throwable $e) { + $unserialized = \unserialize($tounserialize); + } + } catch (\Throwable $e) { + Logger::log((string) $e, Logger::ERROR); + throw $e; + } + if ($unserialized instanceof \danog\PlaceHolder) { + $unserialized = \danog\Serialization::unserialize($tounserialize); + } + + return $unserialized; + } } diff --git a/src/danog/MadelineProto/SessionPaths.php b/src/danog/MadelineProto/SessionPaths.php index 500210e4..0e21fe62 100644 --- a/src/danog/MadelineProto/SessionPaths.php +++ b/src/danog/MadelineProto/SessionPaths.php @@ -19,11 +19,18 @@ namespace danog\MadelineProto; +use Amp\Promise; +use danog\MadelineProto\Ipc\LightState; + /** * Session path information. */ class SessionPaths { + /** + * Legacy session path. + */ + private string $legacySessionPath; /** * Session path. */ @@ -36,6 +43,10 @@ class SessionPaths * IPC socket path. */ private string $ipcPath; + /** + * IPC light state path. + */ + private string $ipcStatePath; /** * Temporary serialization path. */ @@ -48,10 +59,12 @@ class SessionPaths public function __construct(string $session) { $session = Tools::absolute($session); - $this->sessionPath = $session; + $this->legacySessionPath = $session; + $this->sessionPath = "$session.safe.php"; $this->lockPath = "$session.lock"; $this->ipcPath = "$session.ipc"; - $this->tempPath = "$session.temp.session"; + $this->ipcStatePath = "$session.ipcState.php"; + $this->tempPath = "$session.temp.php"; } /** * Get session path. @@ -60,7 +73,17 @@ class SessionPaths */ public function __toString(): string { - return $this->sessionPath; + return $this->legacySessionPath; + } + + /** + * Get legacy session path. + * + * @return string + */ + public function getLegacySessionPath(): string + { + return $this->legacySessionPath; } /** @@ -102,4 +125,24 @@ class SessionPaths { return $this->tempPath; } + + /** + * Get IPC light state path. + * + * @return string + */ + public function getIpcStatePath(): string + { + return $this->ipcStatePath; + } + + /** + * Get IPC state. + * + * @return Promise + */ + public function getIpcState(): Promise + { + return Tools::call(Serialization::newUnserialize($this->ipcStatePath)); + } } diff --git a/src/danog/MadelineProto/Shutdown.php b/src/danog/MadelineProto/Shutdown.php index 19a9dc17..16923aeb 100644 --- a/src/danog/MadelineProto/Shutdown.php +++ b/src/danog/MadelineProto/Shutdown.php @@ -59,7 +59,7 @@ class Shutdown * @param callable $callback The callback to set * @param null|string $id The optional callback ID * - * @return int The callback ID + * @return int|string The callback ID */ public static function addCallback($callback, $id = null) { @@ -76,7 +76,7 @@ class Shutdown /** * Remove a callback from the script shutdown callable list. * - * @param null|string $id The optional callback ID + * @param null|string|int $id The optional callback ID * * @return bool true if the callback was removed correctly, false otherwise */ diff --git a/src/danog/MadelineProto/Tools.php b/src/danog/MadelineProto/Tools.php index 04e796c7..b00ff3cb 100644 --- a/src/danog/MadelineProto/Tools.php +++ b/src/danog/MadelineProto/Tools.php @@ -19,10 +19,12 @@ namespace danog\MadelineProto; +use Amp\CancellationToken; use Amp\Deferred; use Amp\Failure; use Amp\File\StatCache; use Amp\Loop; +use Amp\NullCancellationToken; use Amp\Promise; use Amp\Success; use tgseclib\Math\BigInteger; @@ -36,6 +38,7 @@ use function Amp\Promise\any; use function Amp\Promise\first; use function Amp\Promise\some; use function Amp\Promise\timeout; +use function Amp\Promise\timeoutWithDefault; use function Amp\Promise\wait; /** @@ -384,6 +387,27 @@ abstract class Tools extends StrTools { return timeout(self::call($promise), $timeout); } + /** + * Creates an artificial timeout for any `Promise`. + * + * If the promise is resolved before the timeout expires, the result is returned + * + * If the timeout expires before the promise is resolved, a default value is returned + * + * @template TReturn + * + * @param Promise|\Generator $promise Promise to which the timeout is applied. + * @param int $timeout Timeout in milliseconds. + * @param TReturn $default + * + * @return Promise + * + * @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface. + */ + public static function timeoutWithDefault($promise, int $timeout, $default = null): Promise + { + return timeoutWithDefault(self::call($promise), $timeout, $default); + } /** * Convert generator, promise or any other value to a promise. * @@ -517,29 +541,34 @@ abstract class Tools extends StrTools * Asynchronously lock a file * Resolves with a callbable that MUST eventually be called in order to release the lock. * - * @param string $file File to lock - * @param integer $operation Locking mode - * @param float $polling Polling interval + * @param string $file File to lock + * @param integer $operation Locking mode + * @param float $polling Polling interval + * @param CancellationToken $token Cancellation token + * @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails. * - * @return Promise + * @return Promise */ - public static function flock(string $file, int $operation, float $polling = 0.1): Promise + public static function flock(string $file, int $operation, float $polling = 0.1, $token = null, $failureCb = null): Promise { - return self::call(Tools::flockGenerator($file, $operation, $polling)); + return self::call(Tools::flockGenerator($file, $operation, $polling, $token, $failureCb)); } /** * Asynchronously lock a file (internal generator function). * - * @param string $file File to lock - * @param integer $operation Locking mode - * @param float $polling Polling interval + * @param string $file File to lock + * @param integer $operation Locking mode + * @param float $polling Polling interval + * @param CancellationToken $token Cancellation token + * @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails. * * @internal Generator function * * @return \Generator */ - public static function flockGenerator(string $file, int $operation, float $polling): \Generator + public static function flockGenerator(string $file, int $operation, float $polling, $token = null, $failureCb = null): \Generator { + $token = $token ?? new NullCancellationToken; if (!yield exists($file)) { yield \touch($file); StatCache::clear($file); @@ -549,7 +578,15 @@ abstract class Tools extends StrTools do { $result = \flock($res, $operation); if (!$result) { + if ($failureCb) { + Tools::callFork($failureCb()); + $failureCb = null; + } yield self::sleep($polling); + if ($token->isRequested()) { + var_dump("was requested pap"); + return null; + } } } while (!$result); return static function () use (&$res) { @@ -563,13 +600,13 @@ abstract class Tools extends StrTools /** * Asynchronously sleep. * - * @param int $time Number of seconds to sleep for + * @param int|float $time Number of seconds to sleep for * * @return Promise */ - public static function sleep(int $time): Promise + public static function sleep($time): Promise { - return new \Amp\Delayed($time * 1000); + return new \Amp\Delayed((int) ($time * 1000)); } /** * Asynchronously read line.