Final IPC improvements

This commit is contained in:
Daniil Gentili 2020-09-24 20:49:34 +02:00
parent 7a550311ac
commit 164000b591
14 changed files with 240 additions and 95 deletions

View File

@ -85,7 +85,6 @@ class API extends InternalDoc
*/
private bool $destructing = false;
/**
* API wrapper (to avoid circular references).
*
@ -93,6 +92,12 @@ class API extends InternalDoc
*/
private $wrapper;
/**
* Unlock callback.
*
* @var ?callable
*/
private $unlock = null;
/**
* Magic constructor function.
@ -154,7 +159,19 @@ class API extends InternalDoc
$this->APIFactory();
$this->logger->logger(Lang::$current_lang['madelineproto_ready'], Logger::NOTICE);
}
/**
* Reconnect to full instance.
*
* @return \Generator
*/
protected function reconnectFull(): \Generator
{
if ($this->API instanceof Client) {
yield $this->API->stopIpcServer();
yield $this->API->disconnect();
yield from $this->connectToMadelineProto(new SettingsEmpty, true);
}
}
/**
* Connect to MadelineProto.
*
@ -184,7 +201,7 @@ class API extends InternalDoc
return yield from $this->connectToMadelineProto($settings, true);
} elseif ($unserialized instanceof ChannelledSocket) {
// Success, IPC client
$this->API = new Client($unserialized, Logger::$default);
$this->API = new Client($unserialized, $this->session->getIpcPath(), Logger::$default);
$this->APIFactory();
return true;
} elseif ($unserialized) {

View File

@ -101,6 +101,10 @@ abstract class AbstractAPIFactory extends AsyncConstruct
$a->methods =& $b->methods;
if ($b instanceof API) {
$a->mainAPI = $b;
$b->mainAPI = $b;
} elseif ($a instanceof API) {
$a->mainAPI = $a;
$b->mainAPI = $a;
} else {
$a->mainAPI =& $b->mainAPI;
}
@ -185,17 +189,10 @@ abstract class AbstractAPIFactory extends AsyncConstruct
$args = isset($arguments[0]) && \is_array($arguments[0]) ? $arguments[0] : [];
return yield from $this->API->methodCallAsyncRead($name, $args, $aargs);
}
if ($this->API instanceof Client
&& ($lower_name === 'seteventhandler'
|| ($lower_name === 'loop' && !isset($arguments[0])))
if ($lower_name === 'seteventhandler'
|| ($lower_name === 'loop' && !isset($arguments[0]))
) {
yield $this->API->stopIpcServer();
yield $this->API->disconnect();
if ($this instanceof API) {
yield from $this->connectToMadelineProto(new SettingsEmpty, true);
} else {
yield from $this->mainAPI->connectToMadelineProto(new SettingsEmpty, true);
}
yield from $this->mainAPI->reconnectFull();
}
$res = $this->methods[$lower_name](...$arguments);
return $res instanceof \Generator ? yield from $res : yield $res;

View File

@ -20,6 +20,7 @@
namespace danog\MadelineProto;
use Amp\Promise;
use danog\MadelineProto\Settings\TLSchema;
use danog\MadelineProto\TL\TL;
use danog\MadelineProto\TL\TLCallback;
use phpDocumentor\Reflection\DocBlockFactory;
@ -37,7 +38,9 @@ class AnnotationsBuilder
$this->logger = $logger;
}
});
$this->TL->init($settings['tl_schema']);
$tlSchema = new TLSchema;
$tlSchema->mergeArray($settings);
$this->TL->init($tlSchema);
$this->settings = $settings;
$this->output = $output;
}

View File

@ -50,7 +50,7 @@ class DocsBuilder
}
});
$new = new TLSchema;
$new->mergeArray($settings['tl_schema']);
$new->mergeArray($settings);
$this->TL->init($new);
if (isset($settings['tl_schema']['td']) && !isset($settings['tl_schema']['telegram'])) {
$this->td = true;

View File

@ -4568,12 +4568,11 @@ class InternalDoc extends APIFactory
/**
* Cleanup memory and session file.
*
* @return self
* @return void
*/
public function cleanup(): \danog\MadelineProto\API
public function cleanup(): void
{
$this->API->cleanup();
return $this;
}
/**
* Close connection with client, connected via web.
@ -4848,15 +4847,17 @@ class InternalDoc extends APIFactory
* Asynchronously lock a file
* Resolves with a callbable that MUST eventually be called in order to release the lock.
*
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param ?Promise $token Cancellation token
* @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails.
*
* @return Promise<callable>
* @return Promise<?callable>
*/
public function flock(string $file, int $operation, float $polling = 0.1)
public function flock(string $file, int $operation, float $polling = 0.1, ?\Amp\Promise $token = null, $failureCb = null)
{
return \danog\MadelineProto\Tools::flock($file, $operation, $polling);
return \danog\MadelineProto\Tools::flock($file, $operation, $polling, $token, $failureCb);
}
/**
* Convert bot API channel ID to MTProto channel ID.
@ -5247,11 +5248,11 @@ class InternalDoc extends APIFactory
return \danog\MadelineProto\MTProto::getSessionId($madelineProto);
}
/**
* Return current settings array.
* Return current settings.
*
* @return Settings
*/
public function getSettings(): Settings
public function getSettings(): \danog\MadelineProto\Settings
{
return $this->API->getSettings();
}
@ -5326,6 +5327,21 @@ class InternalDoc extends APIFactory
{
return $this->API->hasSecretChat($chat);
}
/**
* Checks private property exists in an object.
*
* @param object $obj Object
* @param string $var Attribute name
*
* @psalm-suppress InvalidScope
*
* @return bool
* @access public
*/
public function hasVar($obj, string $var): bool
{
return \danog\MadelineProto\Tools::hasVar($obj, $var);
}
/**
* Import authorization.
*
@ -5348,7 +5364,13 @@ class InternalDoc extends APIFactory
{
return \danog\MadelineProto\Tools::inflateStripped($stripped);
}
/**
* Initialize database instance.
*
* @param MTProto $MadelineProto
* @param boolean $reset
* @return \Generator
*/
public function initDb(\danog\MadelineProto\MTProto $MadelineProto, bool $reset = false, array $extra = [])
{
return $this->__call(__FUNCTION__, [$MadelineProto, $reset, $extra]);
@ -5418,7 +5440,7 @@ class InternalDoc extends APIFactory
/**
* Start MadelineProto's update handling loop, or run the provided async callable.
*
* @param callable $callback Async callable to run
* @param callable|null $callback Async callable to run
*
* @return mixed
*/
@ -5654,13 +5676,14 @@ class InternalDoc extends APIFactory
/**
* Report an error to the previously set peer.
*
* @param string $message Error to report
* @param string $message Error to report
* @param string $parseMode Parse mode
*
* @return \Generator
*/
public function report(string $message, array $extra = [])
public function report(string $message, string $parseMode = '', array $extra = [])
{
return $this->__call(__FUNCTION__, [$message, $extra]);
return $this->__call(__FUNCTION__, [$message, $parseMode, $extra]);
}
/**
* Request VoIP call.
@ -5774,13 +5797,13 @@ class InternalDoc extends APIFactory
/**
* Set event handler.
*
* @param string|EventHandler $event_handler Event handler
* @param class-string<EventHandler> $eventHandler Event handler
*
* @return \Generator
*/
public function setEventHandler($event_handler, array $extra = [])
public function setEventHandler(string $eventHandler, array $extra = [])
{
return $this->__call(__FUNCTION__, [$event_handler, $extra]);
return $this->__call(__FUNCTION__, [$eventHandler, $extra]);
}
/**
* Set NOOP update handler, ignoring all updates.
@ -5841,6 +5864,13 @@ class InternalDoc extends APIFactory
{
$this->API->setWebhook($hook_url, $pem_path);
}
/**
* Set API wrapper needed for triggering serialization functions.
*/
public function setWrapper(\danog\MadelineProto\APIWrapper $wrapper): void
{
$this->API->setWrapper($wrapper);
}
/**
* Setup logger.
*
@ -5853,11 +5883,11 @@ class InternalDoc extends APIFactory
/**
* Asynchronously sleep.
*
* @param int $time Number of seconds to sleep for
* @param int|float $time Number of seconds to sleep for
*
* @return Promise
*/
public function sleep(int $time)
public function sleep($time)
{
return \danog\MadelineProto\Tools::sleep($time);
}
@ -5937,6 +5967,27 @@ class InternalDoc extends APIFactory
{
return \danog\MadelineProto\Tools::timeout($promise, $timeout);
}
/**
* Creates an artificial timeout for any `Promise`.
*
* If the promise is resolved before the timeout expires, the result is returned
*
* If the timeout expires before the promise is resolved, a default value is returned
*
* @template TReturn
*
* @param Promise<TReturn>|\Generator $promise Promise to which the timeout is applied.
* @param int $timeout Timeout in milliseconds.
* @param TReturn $default
*
* @return Promise<TReturn>
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
*/
public function timeoutWithDefault($promise, int $timeout, $default = null)
{
return \danog\MadelineProto\Tools::timeoutWithDefault($promise, $timeout, $default);
}
/**
* Convert to camelCase.
*
@ -6063,12 +6114,12 @@ class InternalDoc extends APIFactory
/**
* Parse, update and store settings.
*
* @param array $settings Settings
* @param bool $reinit Whether to reinit the instance
* @param Settings|SettingsEmpty $settings Settings
* @param bool $reinit Whether to reinit the instance
*
* @return void
* @return \Generator
*/
public function updateSettings(array $settings, bool $reinit = true, array $extra = [])
public function updateSettings(\danog\MadelineProto\SettingsAbstract $settings, bool $reinit = true, array $extra = [])
{
return $this->__call(__FUNCTION__, [$settings, $reinit, $extra]);
}

View File

@ -21,12 +21,13 @@ namespace danog\MadelineProto\Ipc;
use Amp\Deferred;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Promise;
use Amp\Success;
use danog\MadelineProto\API;
use danog\MadelineProto\Exception;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Tools;
use function Amp\Ipc\connect;
/**
* IPC client.
*/
@ -43,6 +44,14 @@ class Client
* Requests promise array.
*/
private array $requests = [];
/**
* Whether to run loop.
*/
private bool $run = true;
/**
* IPC path.
*/
private string $ipcPath;
/**
* Logger instance.
*/
@ -50,13 +59,15 @@ class Client
/**
* Constructor function.
*
* @param ChannelledSocket $socket IPC client socket
* @param Logger $logger Logger
* @param ChannelledSocket $socket IPC client socket
* @param string $ipcPath IPC socket path
* @param Logger $logger Logger
*/
public function __construct(ChannelledSocket $server, Logger $logger)
public function __construct(ChannelledSocket $server, string $ipcPath, Logger $logger)
{
$this->logger = $logger;
$this->server = $server;
$this->ipcPath = $ipcPath;
Tools::callFork($this->loopInternal());
}
/**
@ -82,19 +93,26 @@ class Client
*/
private function loopInternal(): \Generator
{
while ($payload = yield $this->server->receive()) {
[$id, $payload] = $payload;
if (!isset($this->requests[$id])) {
Logger::log("Got response for non-existing ID $id!");
} else {
$promise = $this->requests[$id];
unset($this->requests[$id]);
if ($payload instanceof ExitFailure) {
$promise->fail($payload->getException());
while ($this->run) {
while ($payload = yield $this->server->receive()) {
[$id, $payload] = $payload;
if (!isset($this->requests[$id])) {
Logger::log("Got response for non-existing ID $id!");
} else {
$promise->resolve($payload);
$promise = $this->requests[$id];
unset($this->requests[$id]);
if ($payload instanceof ExitFailure) {
$promise->fail($payload->getException());
} else {
$promise->resolve($payload);
}
unset($promise);
}
unset($promise);
}
if ($this->run) {
$this->logger("Reconnecting to IPC server!");
yield $this->server->disconnect();
$this->server = yield connect($this->ipcPath);
}
}
}
@ -116,9 +134,8 @@ class Client
*/
public function unreference(): void
{
if (isset($this->server)) {
Tools::wait($this->server->disconnect());
}
$this->run = false;
Tools::wait($this->server->disconnect());
}
/**
* Disconnect cleanly from main instance.
@ -127,7 +144,8 @@ class Client
*/
public function disconnect(): Promise
{
return isset($this->server) ? $this->server->disconnect() : new Success();
$this->run = false;
return $this->server->disconnect();
}
/**
* Stop IPC server instance.
@ -137,7 +155,6 @@ class Client
public function stopIpcServer(): \Generator
{
yield $this->server->send(Server::SHUTDOWN);
//yield $this->disconnect();
}
/**
* Call function.

View File

@ -154,12 +154,12 @@ class Server extends SignalLoop
}
} finally {
yield $socket->disconnect();
if ($payload === self::SHUTDOWN) {
/*if ($payload === self::SHUTDOWN) {
$this->signal(null);
if (self::$shutdownDeferred) {
self::$shutdownDeferred->resolve();
}
}
}*/
}
}
/**

View File

@ -1148,7 +1148,12 @@ class MTProto extends AsyncConstruct implements TLCallback
}
} else {
if (!isset($this->settings)) {
$this->settings = $settings;
if ($settings instanceof Settings) {
$this->settings = $settings;
} else {
$this->settings = new Settings;
$this->settings->merge($settings);
}
} else {
$this->settings->merge($settings);
}

View File

@ -112,7 +112,7 @@ abstract class Serialization
$isNew = false;
} else {
// No session exists yet, lock for when we create it
return [null, yield Tools::flock($session->getLockPath(), LOCK_EX, 1)];
return [null, yield from Tools::flockGenerator($session->getLockPath(), LOCK_EX, 1)];
}
//Logger::log('Waiting for exclusive session lock...');

View File

@ -155,6 +155,29 @@ class Settings extends SettingsAbstract
public function merge(SettingsAbstract $settings): void
{
if (!$settings instanceof self) {
if ($settings instanceof AppInfo) {
$this->appInfo->merge($settings);
} elseif ($settings instanceof Auth) {
$this->auth->merge($settings);
} elseif ($settings instanceof Connection) {
$this->connection->merge($settings);
} elseif ($settings instanceof Files) {
$this->files->merge($settings);
} elseif ($settings instanceof Logger) {
$this->logger->merge($settings);
} elseif ($settings instanceof Peer) {
$this->peer->merge($settings);
} elseif ($settings instanceof Pwr) {
$this->pwr->merge($settings);
} elseif ($settings instanceof RPC) {
$this->rpc->merge($settings);
} elseif ($settings instanceof SecretChats) {
$this->secretChats->merge($settings);
} elseif ($settings instanceof Serialization) {
$this->serialization->merge($settings);
} elseif ($settings instanceof TLSchema) {
$this->schema->merge($settings);
}
return;
}
$this->appInfo->merge($settings->appInfo);

View File

@ -36,17 +36,15 @@ class TLSchema extends SettingsAbstract
if (isset($settings['layer'])) {
$this->setLayer($settings['layer']);
}
if (isset($settings['src'])) {
$src = $settings['src'];
if (isset($src['mtproto'])) {
$this->setMTProtoSchema($src['mtproto']);
}
if (isset($src['telegram'])) {
$this->setAPISchema($src['telegram']);
}
if (isset($src['secret'])) {
$this->setSecretSchema($src['secret']);
}
$src = $settings['src'] ?? $settings;
if (isset($src['mtproto'])) {
$this->setMTProtoSchema($src['mtproto']);
}
if (isset($src['telegram'])) {
$this->setAPISchema($src['telegram']);
}
if (isset($src['secret'])) {
$this->setSecretSchema($src['secret']);
}
}

View File

@ -1078,15 +1078,17 @@ class InternalDoc extends APIFactory
* Asynchronously lock a file
* Resolves with a callbable that MUST eventually be called in order to release the lock.
*
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param ?Promise $token Cancellation token
* @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails.
*
* @return Promise<callable>
* @return Promise<?callable>
*/
public function flock(string $file, int $operation, float $polling = 0.1)
public function flock(string $file, int $operation, float $polling = 0.1, ?\Amp\Promise $token = null, $failureCb = null)
{
return \danog\MadelineProto\Tools::flock($file, $operation, $polling);
return \danog\MadelineProto\Tools::flock($file, $operation, $polling, $token, $failureCb);
}
/**
* Generate MTProto vector hash.
@ -1123,6 +1125,21 @@ class InternalDoc extends APIFactory
{
return \danog\MadelineProto\Tools::getVar($obj, $var);
}
/**
* Checks private property exists in an object.
*
* @param object $obj Object
* @param string $var Attribute name
*
* @psalm-suppress InvalidScope
*
* @return bool
* @access public
*/
public function hasVar($obj, string $var): bool
{
return \danog\MadelineProto\Tools::hasVar($obj, $var);
}
/**
* Inflate stripped photosize to full JPG payload.
*
@ -1359,11 +1376,11 @@ class InternalDoc extends APIFactory
/**
* Asynchronously sleep.
*
* @param int $time Number of seconds to sleep for
* @param int|float $time Number of seconds to sleep for
*
* @return Promise
*/
public function sleep(int $time)
public function sleep($time)
{
return \danog\MadelineProto\Tools::sleep($time);
}
@ -1391,6 +1408,27 @@ class InternalDoc extends APIFactory
{
return \danog\MadelineProto\Tools::timeout($promise, $timeout);
}
/**
* Creates an artificial timeout for any `Promise`.
*
* If the promise is resolved before the timeout expires, the result is returned
*
* If the timeout expires before the promise is resolved, a default value is returned
*
* @template TReturn
*
* @param Promise<TReturn>|\Generator $promise Promise to which the timeout is applied.
* @param int $timeout Timeout in milliseconds.
* @param TReturn $default
*
* @return Promise<TReturn>
*
* @throws \TypeError If $promise is not an instance of \Amp\Promise or \React\Promise\PromiseInterface.
*/
public function timeoutWithDefault($promise, int $timeout, $default = null)
{
return \danog\MadelineProto\Tools::timeoutWithDefault($promise, $timeout, $default);
}
/**
* Convert to camelCase.
*

View File

@ -16,6 +16,7 @@ use danog\MadelineProto\APIFactory;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Magic;
use danog\MadelineProto\MTProto;
use danog\MadelineProto\Settings\Logger as SettingsLogger;
use danog\MadelineProto\TON\API as TONAPI;
use danog\MadelineProto\TON\APIFactory as TONAPIFactory;
use danog\MadelineProto\TON\Lite;
@ -25,7 +26,7 @@ use danog\MadelineProto\TON\Lite;
require 'vendor/autoload.php';
Magic::classExists();
Logger::constructor(1);
Logger::constructorFromSettings(new SettingsLogger);
$logger = Logger::$default;
\set_error_handler(['\danog\MadelineProto\Exception', 'ExceptionErrorHandler']);
@ -78,8 +79,8 @@ $doc = new \danog\MadelineProto\AnnotationsBuilder(
$logger,
[
'tl_schema' => [
'lite_api' => "$d/schemas/TON/lite_api.tl",
'ton_api' => "$d/schemas/TON/ton_api.tl",
'telegram' => "$d/schemas/TON/lite_api.tl",
'mtproto' => "$d/schemas/TON/ton_api.tl",
//'tonlib_api' => "$d/schemas/TON/tonlib_api.tl",
]
],

View File

@ -18,27 +18,22 @@ function layerUpgrade(int $layer): void
\copy("schemas/$schema.tl", "src/danog/MadelineProto/$schema.tl");
}
$doc = \file_get_contents('src/danog/MadelineProto/MTProto.php');
\preg_match("/'layer' => (\d+)/", $doc, $matches);
$doc = \file_get_contents('src/danog/MadelineProto/Settings/TLSchema.php');
\preg_match("/layer = (\d+)/", $doc, $matches);
$prevLayer = (int) $matches[1];
if ($prevLayer === $layer) {
return;
}
\preg_match_all("/const V = (\d+)/", $doc, $matches);
$prevVersion = $matches[1][1];
$version = $prevVersion + 1;
$doc = \str_replace(
[
"'layer' => $prevLayer",
"layer = $prevLayer",
"TL_telegram_$prevLayer",
"const V = $prevVersion"
],
[
"'layer' => $layer",
"layer = $layer",
"TL_telegram_$layer",
"const V = $version"
],
$doc
);