Implement FastAPI in API

This commit is contained in:
Daniil Gentili 2020-09-22 23:10:56 +02:00
parent bcadbcbe53
commit 1c99fd2658
12 changed files with 486 additions and 123 deletions

View File

@ -39,7 +39,8 @@
"amphp/log": "^1.1",
"danog/loop": "^0.1.0",
"danog/tgseclib": "^3",
"amphp/redis": "^1.0"
"amphp/redis": "^1.0",
"symfony/polyfill-php80": "^1.18"
},
"require-dev": {
"vlucas/phpdotenv": "^3",

View File

@ -19,6 +19,9 @@
namespace danog\MadelineProto;
use Amp\Failure;
use Amp\Ipc\Sync\ChannelledSocket;
use danog\MadelineProto\Ipc\Client;
use danog\MadelineProto\Settings\Logger as SettingsLogger;
/**
@ -39,7 +42,7 @@ class API extends InternalDoc
/**
* Instance of MadelineProto.
*
* @var null|MTProto
* @var null|MTProto|Client
*/
public $API;
@ -107,11 +110,11 @@ class API extends InternalDoc
*/
public function __magic_construct(string $session, $settings = []): void
{
Magic::classExists(true);
$settings = Settings::parseFromLegacy($settings);
$this->session = new SessionPaths($session);
$this->wrapper = new APIWrapper($this, $this->exportNamespace());
Magic::classExists(true);
$this->setInitPromise($this->internalInitAPI($settings));
foreach (\get_class_vars(APIFactory::class) as $key => $var) {
if (\in_array($key, ['namespace', 'API', 'lua', 'async', 'asyncAPIPromise', 'methods'])) {
@ -135,8 +138,16 @@ class API extends InternalDoc
? new SettingsLogger
: $settings->getLogger());
[$unserialized, $this->unlock] = yield from Serialization::legacyUnserialize($this->session);
if ($unserialized) {
[$unserialized, $this->unlock] = yield Tools::timeoutWithDefault(
Serialization::unserialize($this->session),
30000,
new Failure(new \RuntimeException("Could not connect to MadelineProto, please check the logs for more details."))
);
if ($unserialized instanceof ChannelledSocket) {
$this->API = new Client($unserialized, Logger::$default);
$this->APIFactory();
return;
} elseif ($unserialized) {
$unserialized->storage = $unserialized->storage ?? [];
$unserialized->session = $this->session;
APIWrapper::link($this, $unserialized);
@ -216,9 +227,11 @@ class API extends InternalDoc
private function APIFactory(): void
{
if ($this->API && $this->API->inited()) {
foreach ($this->API->getMethodNamespaces() as $namespace) {
if (!$this->{$namespace}) {
$this->{$namespace} = $this->exportNamespace($namespace);
if ($this->API instanceof MTProto) {
foreach ($this->API->getMethodNamespaces() as $namespace) {
if (!$this->{$namespace}) {
$this->{$namespace} = $this->exportNamespace($namespace);
}
}
}
$this->methods = self::getInternalMethodList($this->API);

View File

@ -20,8 +20,9 @@ namespace danog\MadelineProto;
use Amp\Promise;
use Amp\Success;
use danog\MadelineProto\Ipc\LightState;
use function Amp\File\put;
use function Amp\File\open;
use function Amp\File\rename as renameAsync;
final class APIWrapper
@ -29,9 +30,9 @@ final class APIWrapper
/**
* MTProto instance.
*
* @var ?MTProto
* @var MTProto|null|Client
*/
private ?MTProto $API = null;
private $API = null;
/**
* Session path.
@ -173,7 +174,7 @@ final class APIWrapper
/**
* Serialize session.
*
* @return Promise
* @return Promise<bool>
*/
public function serialize(): Promise
{
@ -185,11 +186,30 @@ final class APIWrapper
yield from $this->API->initAsynchronously();
}
$wrote = yield put($this->session->getTempPath(), \serialize($this));
$file = yield open($this->session->getTempPath(), 'bw+');
yield $file->write(Serialization::PHP_HEADER);
yield $file->write(\chr(Serialization::VERSION));
yield $file->write(\serialize($this));
yield $file->close();
yield renameAsync($this->session->getTempPath(), $this->session->getSessionPath());
if ($this->API) {
$file = yield open($this->session->getTempPath(), 'bw+');
yield $file->write(Serialization::PHP_HEADER);
yield $file->write(\chr(Serialization::VERSION));
yield $file->write(\serialize(new LightState($this->API)));
yield $file->close();
yield renameAsync($this->session->getTempPath(), $this->session->getIpcStatePath());
}
// Truncate legacy session
yield (yield open($this->session->getLegacySessionPath(), 'w'))->close();
Logger::log('Saved session!');
return $wrote;
return true;
})());
}
}

View File

@ -21,6 +21,7 @@ namespace danog\MadelineProto;
use Amp\File\StatCache;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Promise;
use danog\MadelineProto\Ipc\Client;
use danog\MadelineProto\Ipc\Server;
@ -127,7 +128,7 @@ class FastAPI extends API
*
* @param string $ipcPath IPC path
*
* @return \Generator<ChannelledSocket|null>
* @return \Generator<int, Promise<ChannelledSocket>, mixed, ChannelledSocket|null>
*/
private function tryConnect(string $ipcPath): \Generator
{

View File

@ -0,0 +1,64 @@
<?php
/**
* IPC light state.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2020 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Ipc;
use danog\MadelineProto\MTProto;
use danog\MadelineProto\EventHandler;
/**
* Light state.
*
* @internal
*/
final class LightState
{
/**
* Event handler class name.
*
* @var null|class-string<EventHandler>
*/
private ?string $eventHandler;
public function __construct(MTProto $API)
{
if ($API->hasEventHandler()) {
$this->eventHandler = \get_class($API->getEventHandler());
}
}
/**
* Check whether we can start IPC.
*
* @return boolean
*/
public function canStartIpc(): bool
{
return !$this->eventHandler || \class_exists($this->eventHandler);
}
/**
* Get event handler class name.
*
* @return null|class-string<EventHandler>
*/
public function getEventHandler(): ?string
{
return $this->eventHandler;
}
}

View File

@ -156,14 +156,14 @@ class Logger
*
* @param SettingsLogger $settings Settings instance
*
* @return void
* @return self
*/
public static function constructorFromSettings(SettingsLogger $settings): void
public static function constructorFromSettings(SettingsLogger $settings): self
{
if (!self::$default) {
// The getLogger function will automatically init the static logger, but we'll do it again anyway
self::$default = new self($settings);
}
return self::$default;
}
/**

View File

@ -199,7 +199,7 @@ class MTProto extends AsyncConstruct implements TLCallback
'msg_resend_req',
'msg_resend_ans_req',
];
const DEFAULT_GETUPDATES_PARAMS = ['offset' => 0, 'limit' => null, 'timeout' => 0];
const DEFAULT_GETUPDATES_PARAMS = ['offset' => 0, 'limit' => null, 'timeout' => 100];
/**
* Instance of wrapper API.
*

View File

@ -28,6 +28,7 @@ use danog\MadelineProto\MTProto;
use danog\MadelineProto\RPCErrorException;
use danog\MadelineProto\Settings;
use danog\MadelineProto\Tools;
/**
* Manages updates.
@ -59,13 +60,11 @@ trait UpdateHandler
public function getUpdates($params = []): \Generator
{
$this->updateHandler = MTProto::GETUPDATES_HANDLER;
$params = \array_merge(MTProto::DEFAULT_GETUPDATES_PARAMS, $params);
$params = MTProto::DEFAULT_GETUPDATES_PARAMS + $params;
if (empty($this->updates)) {
$this->update_deferred = new Deferred();
if (!$params['timeout']) {
$params['timeout'] = 0.001;
}
yield from $this->waitUpdate();
$params['timeout'] *= 1000;
yield Tools::timeoutWithDefault($this->waitUpdate(), $params['timeout'] ?: 100000);
}
if (empty($this->updates)) {
return $this->updates;

View File

@ -19,18 +19,84 @@
namespace danog\MadelineProto;
use Amp\CancellationTokenSource;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Loop;
use Amp\Promise;
use danog\MadelineProto\Ipc\LightState;
use danog\MadelineProto\Ipc\Server;
use function Amp\File\exists;
use function Amp\File\get;
use function Amp\File\open;
use function Amp\File\stat;
use function Amp\Ipc\connect;
/**
* Manages serialization of the MadelineProto instance.
*/
class Serialization
abstract class Serialization
{
/**
* Unserialize legacy session.
* Header for session files.
*/
const PHP_HEADER = '<?php __HALT_COMPILER();';
/**
* Serialization version.
*/
const VERSION = 1;
/**
* Unserialize session.
*
* Logic for deserialization is as follows.
* - If the session is unlocked
* - Try starting IPC server:
* - Fetch light state
* - If don't need event handler
* - Unlock
* - Fork
* - Lock (fork)
* - Deserialize full (fork)
* - Start IPC server (fork)
* - Store IPC state (fork)
* - If need event handler
* - If have event handler class
* - Deserialize full
* - Start IPC server
* - Store IPC state
* - Else Fallthrough
* - Wait for a new IPC state for a maximum of 30 seconds, then throw
* - Execute original request via IPC
*
* - If the session is locked
* - In parallel (concurrent):
* - The IPC server should be running, connect
* - Try starting full session
* - Fetch light state
* - If don't need event handler
* - Wait lock
* - Unlock
* - Fork
* - Lock (fork)
* - Deserialize full (fork)
* - Start IPC server (fork)
* - Store IPC state (fork)
* - If need event handler and have event handler class
* - Wait lock
* - Deserialize full
* - Start IPC server
* - Store IPC state
* - Wait for a new IPC session for a maximum of 30 seconds, then throw
* - Execute original request via IPC
*
*
*
* - If receiving a startAndLoop or setEventHandler request on an IPC session:
* - Shutdown remote IPC server
* - Deserialize full
* - Start IPC server
* - Store IPC state
*
* @param SessionPaths $session Session name
*
@ -38,95 +104,214 @@ class Serialization
*
* @return \Generator
*/
public static function legacyUnserialize(SessionPaths $session): \Generator
public static function unserialize(SessionPaths $session): \Generator
{
if (yield exists($session->getSessionPath())) {
Logger::log('Waiting for exclusive session lock...');
$warningId = Loop::delay(1000, static function () use (&$warningId) {
Logger::log("It seems like the session is busy.");
if (\defined(\MADELINE_WORKER::class)) {
Logger::log("Exiting since we're in a worker");
Magic::shutdown(1);
}
Logger::log("Telegram does not support starting multiple instances of the same session, make sure no other instance of the session is running.");
$warningId = Loop::repeat(5000, fn () => Logger::log('Still waiting for exclusive session lock...'));
Loop::unreference($warningId);
});
// Is new session
$isNew = true;
} elseif (yield exists($session->getLegacySessionPath())) {
// Is old session
$isNew = false;
} else {
// No session exists yet, lock for when we create it
return [null, yield Tools::flock($session->getLockPath(), LOCK_EX, 1)];
}
Logger::log('Waiting for exclusive session lock...');
$warningId = Loop::delay(1000, static function () use (&$warningId) {
Logger::log("It seems like the session is busy.");
if (\defined(\MADELINE_WORKER::class)) {
Logger::log("Exiting since we're in a worker");
Magic::shutdown(1);
}
Logger::log("Telegram does not support starting multiple instances of the same session, make sure no other instance of the session is running.");
$warningId = Loop::repeat(5000, fn () => Logger::log('Still waiting for exclusive session lock...'));
Loop::unreference($warningId);
$unlock = yield Tools::flock($session->getLockPath(), LOCK_EX, 1);
Loop::cancel($warningId);
$tempId = Shutdown::addCallback($unlock = static function () use ($unlock) {
Logger::log("Unlocking exclusive session lock!");
$unlock();
Logger::log("Unlocked exclusive session lock!");
});
Loop::unreference($warningId);
$lightState = null;
$cancelFlock = new CancellationTokenSource;
$canContinue = true;
$ipcSocket = null;
$unlock = yield Tools::flock($session->getLockPath(), LOCK_EX, 1, $cancelFlock->getToken(), static function () use ($session, $cancelFlock, &$canContinue, &$ipcSocket, &$lightState) {
$ipcSocket = Tools::call(self::tryConnect($session->getIpcPath(), $cancelFlock));
$session->getIpcState()->onResolve(static function (?\Throwable $e, ?LightState $res) use ($cancelFlock, &$canContinue, &$lightState) {
if ($res) {
$lightState = $res;
if (!$res->canStartIpc()) {
$canContinue = false;
$cancelFlock->cancel();
}
}
});
Logger::log("Got exclusive session lock!");
});
Loop::cancel($warningId);
$tounserialize = yield get($session->getSessionPath());
if (!$unlock) { // Canceled, don't have lock
return [yield $ipcSocket, null];
}
if (!$canContinue) { // Canceled, but have lock already
Logger::log("IPC WARNING: Session has event handler, but it's not started, and we don't have access to the class, so we can't start it.", Logger::ERROR);
Logger::log("IPC WARNING: Please start the event handler or unset it to use the IPC server.", Logger::ERROR);
$unlock();
return [yield $ipcSocket, null];
}
Magic::classExists();
try {
/** @var LightState */
$lightState ??= yield $session->getIpcState();
} catch (\Throwable $e) {
}
if ($lightState) {
if (!$class = $lightState->getEventHandler()) {
// Unlock and fork
$unlock();
Server::startMe($session);
return [$ipcSocket ?? yield from self::tryConnect($session->getIpcPath()), null];
} elseif (!\class_exists($class)) {
return [$ipcSocket ?? yield from self::tryConnect($session->getIpcPath()), null];
}
}
$tempId = Shutdown::addCallback($unlock = static function () use ($unlock) {
Logger::log("Unlocking exclusive session lock!");
$unlock();
Logger::log("Unlocked exclusive session lock!");
});
Logger::log("Got exclusive session lock!");
if ($isNew) {
$unserialized = yield from self::newUnserialize($session->getSessionPath());
} else {
$unserialized = yield from self::legacyUnserialize($session);
}
if ($unserialized === false) {
throw new Exception(\danog\MadelineProto\Lang::$current_lang['deserialization_error']);
}
Shutdown::removeCallback($tempId);
return [$unserialized, $unlock];
}
/**
* Try connecting to IPC socket.
*
* @param string $ipcPath IPC path
* @param ?CancellationTokenSource $cancel Cancelation token
*
* @return \Generator<int, Promise|Promise<ChannelledSocket>, mixed, void>
*/
private static function tryConnect(string $ipcPath, ?CancellationTokenSource $cancel = null): \Generator
{
for ($x = 0; $x < 30; $x++) {
Logger::log("Trying to connect to IPC socket...");
try {
$unserialized = \unserialize($tounserialize);
} catch (\danog\MadelineProto\Bug74586Exception $e) {
\class_exists('\\Volatile');
$tounserialize = \str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize);
foreach (['RSA', 'TL\\TLMethods', 'TL\\TLConstructors', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) {
\class_exists('\\danog\\MadelineProto\\'.$class);
}
$unserialized = \danog\Serialization::unserialize($tounserialize);
} catch (\danog\MadelineProto\Exception $e) {
if ($e->getFile() === 'MadelineProto' && $e->getLine() === 1) {
throw $e;
}
if (@\constant("MADELINEPROTO_TEST") === 'pony') {
throw $e;
}
\class_exists('\\Volatile');
foreach (['RSA', 'TL\\TLMethods', 'TL\\TLConstructors', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) {
\class_exists('\\danog\\MadelineProto\\'.$class);
}
$changed = false;
if (\strpos($tounserialize, 'O:26:"danog\\MadelineProto\\Button":') !== false) {
Logger::log("SUBBING BUTTONS!");
$tounserialize = \str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize);
$changed = true;
}
if (\strpos($e->getMessage(), "Erroneous data format for unserializing 'phpseclib\\Math\\BigInteger'") === 0) {
Logger::log("SUBBING BIGINTEGOR!");
$tounserialize = \str_replace('phpseclib\\Math\\BigInteger', 'phpseclib\\Math\\BigIntegor', $tounserialize);
$changed = true;
}
if (\strpos($tounserialize, 'C:25:"phpseclib\\Math\\BigInteger"') !== false) {
Logger::log("SUBBING TGSECLIB old!");
$tounserialize = \str_replace('C:25:"phpseclib\\Math\\BigInteger"', 'C:24:"tgseclib\\Math\\BigInteger"', $tounserialize);
$changed = true;
}
if (\strpos($tounserialize, 'C:26:"phpseclib3\\Math\\BigInteger"') !== false) {
Logger::log("SUBBING TGSECLIB!");
$tounserialize = \str_replace('C:26:"phpseclib3\\Math\\BigInteger"', 'C:24:"tgseclib\\Math\\BigInteger"', $tounserialize);
$changed = true;
}
Logger::log((string) $e, Logger::ERROR);
if (!$changed) {
throw $e;
}
try {
$unserialized = \danog\Serialization::unserialize($tounserialize);
} catch (\Throwable $e) {
$unserialized = \unserialize($tounserialize);
\clearstatcache(true, $ipcPath);
$socket = yield connect($ipcPath);
if ($cancel) {
$cancel->cancel();
}
return $socket;
} catch (\Throwable $e) {
Logger::log((string) $e, Logger::ERROR);
throw $e;
$e = $e->getMessage();
Logger::log("$e while connecting to IPC socket");
}
if ($unserialized instanceof \danog\PlaceHolder) {
$unserialized = \danog\Serialization::unserialize($tounserialize);
}
if ($unserialized === false) {
throw new Exception(\danog\MadelineProto\Lang::$current_lang['deserialization_error']);
}
Shutdown::removeCallback($tempId);
return [$unserialized, $unlock];
yield Tools::sleep(1);
}
}
/**
* @internal Deserialize new object
*
* @param string $path
* @return \Generator
*/
public static function newUnserialize(string $path): \Generator
{
$headerLen = \strlen(self::PHP_HEADER) + 1;
$file = yield open($path, 'rb');
$size = yield stat($path);
$size = $size['size'] ?? $headerLen;
yield $file->seek($headerLen); // Skip version for now
$unserialized = \unserialize((yield $file->read($size - $headerLen)) ?? '');
yield $file->close();
return $unserialized;
}
/**
* Deserialize legacy session.
*
* @param SessionPaths $session
* @return \Generator
*/
private static function legacyUnserialize(SessionPaths $session): \Generator
{
$tounserialize = yield get($session->getLegacySessionPath());
try {
$unserialized = \unserialize($tounserialize);
} catch (\danog\MadelineProto\Bug74586Exception $e) {
\class_exists('\\Volatile');
$tounserialize = \str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize);
foreach (['RSA', 'TL\\TLMethods', 'TL\\TLConstructors', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) {
\class_exists('\\danog\\MadelineProto\\'.$class);
}
$unserialized = \danog\Serialization::unserialize($tounserialize);
} catch (\danog\MadelineProto\Exception $e) {
if ($e->getFile() === 'MadelineProto' && $e->getLine() === 1) {
throw $e;
}
if (@\constant("MADELINEPROTO_TEST") === 'pony') {
throw $e;
}
\class_exists('\\Volatile');
foreach (['RSA', 'TL\\TLMethods', 'TL\\TLConstructors', 'MTProto', 'API', 'DataCenter', 'Connection', 'TL\\Types\\Button', 'TL\\Types\\Bytes', 'APIFactory'] as $class) {
\class_exists('\\danog\\MadelineProto\\'.$class);
}
$changed = false;
if (\strpos($tounserialize, 'O:26:"danog\\MadelineProto\\Button":') !== false) {
Logger::log("SUBBING BUTTONS!");
$tounserialize = \str_replace('O:26:"danog\\MadelineProto\\Button":', 'O:35:"danog\\MadelineProto\\TL\\Types\\Button":', $tounserialize);
$changed = true;
}
if (\strpos($e->getMessage(), "Erroneous data format for unserializing 'phpseclib\\Math\\BigInteger'") === 0) {
Logger::log("SUBBING BIGINTEGOR!");
$tounserialize = \str_replace('phpseclib\\Math\\BigInteger', 'phpseclib\\Math\\BigIntegor', $tounserialize);
$changed = true;
}
if (\strpos($tounserialize, 'C:25:"phpseclib\\Math\\BigInteger"') !== false) {
Logger::log("SUBBING TGSECLIB old!");
$tounserialize = \str_replace('C:25:"phpseclib\\Math\\BigInteger"', 'C:24:"tgseclib\\Math\\BigInteger"', $tounserialize);
$changed = true;
}
if (\strpos($tounserialize, 'C:26:"phpseclib3\\Math\\BigInteger"') !== false) {
Logger::log("SUBBING TGSECLIB!");
$tounserialize = \str_replace('C:26:"phpseclib3\\Math\\BigInteger"', 'C:24:"tgseclib\\Math\\BigInteger"', $tounserialize);
$changed = true;
}
Logger::log((string) $e, Logger::ERROR);
if (!$changed) {
throw $e;
}
try {
$unserialized = \danog\Serialization::unserialize($tounserialize);
} catch (\Throwable $e) {
$unserialized = \unserialize($tounserialize);
}
} catch (\Throwable $e) {
Logger::log((string) $e, Logger::ERROR);
throw $e;
}
if ($unserialized instanceof \danog\PlaceHolder) {
$unserialized = \danog\Serialization::unserialize($tounserialize);
}
return $unserialized;
}
}

View File

@ -19,11 +19,18 @@
namespace danog\MadelineProto;
use Amp\Promise;
use danog\MadelineProto\Ipc\LightState;
/**
* Session path information.
*/
class SessionPaths
{
/**
* Legacy session path.
*/
private string $legacySessionPath;
/**
* Session path.
*/
@ -36,6 +43,10 @@ class SessionPaths
* IPC socket path.
*/
private string $ipcPath;
/**
* IPC light state path.
*/
private string $ipcStatePath;
/**
* Temporary serialization path.
*/
@ -48,10 +59,12 @@ class SessionPaths
public function __construct(string $session)
{
$session = Tools::absolute($session);
$this->sessionPath = $session;
$this->legacySessionPath = $session;
$this->sessionPath = "$session.safe.php";
$this->lockPath = "$session.lock";
$this->ipcPath = "$session.ipc";
$this->tempPath = "$session.temp.session";
$this->ipcStatePath = "$session.ipcState.php";
$this->tempPath = "$session.temp.php";
}
/**
* Get session path.
@ -60,7 +73,17 @@ class SessionPaths
*/
public function __toString(): string
{
return $this->sessionPath;
return $this->legacySessionPath;
}
/**
* Get legacy session path.
*
* @return string
*/
public function getLegacySessionPath(): string
{
return $this->legacySessionPath;
}
/**
@ -102,4 +125,24 @@ class SessionPaths
{
return $this->tempPath;
}
/**
* Get IPC light state path.
*
* @return string
*/
public function getIpcStatePath(): string
{
return $this->ipcStatePath;
}
/**
* Get IPC state.
*
* @return Promise<LightState>
*/
public function getIpcState(): Promise
{
return Tools::call(Serialization::newUnserialize($this->ipcStatePath));
}
}

View File

@ -59,7 +59,7 @@ class Shutdown
* @param callable $callback The callback to set
* @param null|string $id The optional callback ID
*
* @return int The callback ID
* @return int|string The callback ID
*/
public static function addCallback($callback, $id = null)
{
@ -76,7 +76,7 @@ class Shutdown
/**
* Remove a callback from the script shutdown callable list.
*
* @param null|string $id The optional callback ID
* @param null|string|int $id The optional callback ID
*
* @return bool true if the callback was removed correctly, false otherwise
*/

View File

@ -19,10 +19,12 @@
namespace danog\MadelineProto;
use Amp\CancellationToken;
use Amp\Deferred;
use Amp\Failure;
use Amp\File\StatCache;
use Amp\Loop;
use Amp\NullCancellationToken;
use Amp\Promise;
use Amp\Success;
use tgseclib\Math\BigInteger;
@ -36,6 +38,7 @@ use function Amp\Promise\any;
use function Amp\Promise\first;
use function Amp\Promise\some;
use function Amp\Promise\timeout;
use function Amp\Promise\timeoutWithDefault;
use function Amp\Promise\wait;
/**
@ -384,6 +387,27 @@ abstract class Tools extends StrTools
{
return timeout(self::call($promise), $timeout);
}
/**
* Creates an artificial timeout for any `Promise`.
*
* If the promise is resolved before the timeout expires, the result is returned
*
* If the timeout expires before the promise is resolved, a default value is returned
*
* @template TReturn
*
* @param Promise<TReturn>|\Generator $promise Promise to which the timeout is applied.
* @param int $timeout Timeout in milliseconds.
* @param TReturn $default
*
* @return Promise<TReturn>
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
*/
public static function timeoutWithDefault($promise, int $timeout, $default = null): Promise
{
return timeoutWithDefault(self::call($promise), $timeout, $default);
}
/**
* Convert generator, promise or any other value to a promise.
*
@ -517,29 +541,34 @@ abstract class Tools extends StrTools
* Asynchronously lock a file
* Resolves with a callbable that MUST eventually be called in order to release the lock.
*
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param CancellationToken $token Cancellation token
* @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails.
*
* @return Promise<callable>
* @return Promise<?callable>
*/
public static function flock(string $file, int $operation, float $polling = 0.1): Promise
public static function flock(string $file, int $operation, float $polling = 0.1, $token = null, $failureCb = null): Promise
{
return self::call(Tools::flockGenerator($file, $operation, $polling));
return self::call(Tools::flockGenerator($file, $operation, $polling, $token, $failureCb));
}
/**
* Asynchronously lock a file (internal generator function).
*
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param CancellationToken $token Cancellation token
* @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails.
*
* @internal Generator function
*
* @return \Generator
*/
public static function flockGenerator(string $file, int $operation, float $polling): \Generator
public static function flockGenerator(string $file, int $operation, float $polling, $token = null, $failureCb = null): \Generator
{
$token = $token ?? new NullCancellationToken;
if (!yield exists($file)) {
yield \touch($file);
StatCache::clear($file);
@ -549,7 +578,15 @@ abstract class Tools extends StrTools
do {
$result = \flock($res, $operation);
if (!$result) {
if ($failureCb) {
Tools::callFork($failureCb());
$failureCb = null;
}
yield self::sleep($polling);
if ($token->isRequested()) {
var_dump("was requested pap");
return null;
}
}
} while (!$result);
return static function () use (&$res) {
@ -563,13 +600,13 @@ abstract class Tools extends StrTools
/**
* Asynchronously sleep.
*
* @param int $time Number of seconds to sleep for
* @param int|float $time Number of seconds to sleep for
*
* @return Promise
*/
public static function sleep(int $time): Promise
public static function sleep($time): Promise
{
return new \Amp\Delayed($time * 1000);
return new \Amp\Delayed((int) ($time * 1000));
}
/**
* Asynchronously read line.