IPC improvements

This commit is contained in:
Daniil Gentili 2020-09-23 00:57:49 +02:00
parent 1c99fd2658
commit a3a331542b
6 changed files with 33 additions and 17 deletions

View File

@ -20,6 +20,7 @@ namespace danog\MadelineProto;
use Amp\Promise; use Amp\Promise;
use Amp\Success; use Amp\Success;
use danog\MadelineProto\Ipc\Client;
use danog\MadelineProto\Ipc\LightState; use danog\MadelineProto\Ipc\LightState;
use function Amp\File\open; use function Amp\File\open;
@ -181,6 +182,9 @@ final class APIWrapper
if ($this->API === null && !$this->gettingApiId) { if ($this->API === null && !$this->gettingApiId) {
return new Success(false); return new Success(false);
} }
if ($this->API instanceof Client) {
return new Success(false);
}
return Tools::callFork((function (): \Generator { return Tools::callFork((function (): \Generator {
if ($this->API) { if ($this->API) {
yield from $this->API->initAsynchronously(); yield from $this->API->initAsynchronously();

View File

@ -92,6 +92,9 @@ abstract class AbstractAPIFactory extends AsyncConstruct
$a->lua =& $b->lua; $a->lua =& $b->lua;
$a->async =& $b->async; $a->async =& $b->async;
$a->methods =& $b->methods; $a->methods =& $b->methods;
if (!$b->inited()) {
$a->setInitPromise($b->initAsynchronously());
}
} }
/** /**
* Enable or disable async. * Enable or disable async.
@ -162,7 +165,6 @@ abstract class AbstractAPIFactory extends AsyncConstruct
public function __call_async(string $name, array $arguments): \Generator public function __call_async(string $name, array $arguments): \Generator
{ {
yield from $this->initAsynchronously(); yield from $this->initAsynchronously();
$lower_name = \strtolower($name); $lower_name = \strtolower($name);
if ($this->namespace !== '' || !isset($this->methods[$lower_name])) { if ($this->namespace !== '' || !isset($this->methods[$lower_name])) {
$name = $this->namespace.$name; $name = $this->namespace.$name;

View File

@ -18,8 +18,8 @@
namespace danog\MadelineProto\Ipc; namespace danog\MadelineProto\Ipc;
use danog\MadelineProto\MTProto;
use danog\MadelineProto\EventHandler; use danog\MadelineProto\EventHandler;
use danog\MadelineProto\MTProto;
/** /**
* Light state. * Light state.
@ -33,7 +33,7 @@ final class LightState
* *
* @var null|class-string<EventHandler> * @var null|class-string<EventHandler>
*/ */
private ?string $eventHandler; private ?string $eventHandler = null;
public function __construct(MTProto $API) public function __construct(MTProto $API)
{ {

View File

@ -73,8 +73,12 @@ class Server extends SignalLoop
} catch (\Throwable $e) { } catch (\Throwable $e) {
Logger::log($e); Logger::log($e);
} }
Logger::log("Starting IPC server $session (web)"); try {
WebRunner::start($session); Logger::log("Starting IPC server $session (web)");
WebRunner::start($session);
} catch (\Throwable $e) {
Logger::log($e);
}
} }
/** /**
* Main loop. * Main loop.

View File

@ -117,7 +117,7 @@ abstract class Serialization
return [null, yield Tools::flock($session->getLockPath(), LOCK_EX, 1)]; return [null, yield Tools::flock($session->getLockPath(), LOCK_EX, 1)];
} }
Logger::log('Waiting for exclusive session lock...'); //Logger::log('Waiting for exclusive session lock...');
$warningId = Loop::delay(1000, static function () use (&$warningId) { $warningId = Loop::delay(1000, static function () use (&$warningId) {
Logger::log("It seems like the session is busy."); Logger::log("It seems like the session is busy.");
if (\defined(\MADELINE_WORKER::class)) { if (\defined(\MADELINE_WORKER::class)) {
@ -143,19 +143,21 @@ abstract class Serialization
$canContinue = false; $canContinue = false;
$cancelFlock->cancel(); $cancelFlock->cancel();
} }
} else {
$lightState = false;
} }
}); });
}); });
Loop::cancel($warningId); Loop::cancel($warningId);
if (!$unlock) { // Canceled, don't have lock if (!$unlock) { // Canceled, don't have lock
return [yield $ipcSocket, null]; return $ipcSocket;
} }
if (!$canContinue) { // Canceled, but have lock already if (!$canContinue) { // Have lock, can't use it
Logger::log("IPC WARNING: Session has event handler, but it's not started, and we don't have access to the class, so we can't start it.", Logger::ERROR); Logger::log("IPC WARNING: Session has event handler, but it's not started, and we don't have access to the class, so we can't start it.", Logger::ERROR);
Logger::log("IPC WARNING: Please start the event handler or unset it to use the IPC server.", Logger::ERROR); Logger::log("IPC WARNING: Please start the event handler or unset it to use the IPC server.", Logger::ERROR);
$unlock(); $unlock();
return [yield $ipcSocket, null]; return $ipcSocket;
} }
try { try {
@ -169,9 +171,11 @@ abstract class Serialization
// Unlock and fork // Unlock and fork
$unlock(); $unlock();
Server::startMe($session); Server::startMe($session);
return [$ipcSocket ?? yield from self::tryConnect($session->getIpcPath()), null]; return $ipcSocket ?? yield from self::tryConnect($session->getIpcPath());
} elseif (!\class_exists($class)) { } elseif (!\class_exists($class)) {
return [$ipcSocket ?? yield from self::tryConnect($session->getIpcPath()), null]; Logger::log("IPC WARNING: Session has event handler, but it's not started, and we don't have access to the class, so we can't start it.", Logger::ERROR);
Logger::log("IPC WARNING: Please start the event handler or unset it to use the IPC server.", Logger::ERROR);
return $ipcSocket ?? yield from self::tryConnect($session->getIpcPath());
} }
} }
@ -185,7 +189,7 @@ abstract class Serialization
if ($isNew) { if ($isNew) {
$unserialized = yield from self::newUnserialize($session->getSessionPath()); $unserialized = yield from self::newUnserialize($session->getSessionPath());
} else { } else {
$unserialized = yield from self::legacyUnserialize($session); $unserialized = yield from self::legacyUnserialize($session->getLegacySessionPath());
} }
if ($unserialized === false) { if ($unserialized === false) {
@ -214,7 +218,7 @@ abstract class Serialization
if ($cancel) { if ($cancel) {
$cancel->cancel(); $cancel->cancel();
} }
return $socket; return [$socket, null];
} catch (\Throwable $e) { } catch (\Throwable $e) {
$e = $e->getMessage(); $e = $e->getMessage();
Logger::log("$e while connecting to IPC socket"); Logger::log("$e while connecting to IPC socket");
@ -247,12 +251,12 @@ abstract class Serialization
/** /**
* Deserialize legacy session. * Deserialize legacy session.
* *
* @param SessionPaths $session * @param string $session
* @return \Generator * @return \Generator
*/ */
private static function legacyUnserialize(SessionPaths $session): \Generator private static function legacyUnserialize(string $session): \Generator
{ {
$tounserialize = yield get($session->getLegacySessionPath()); $tounserialize = yield get($session);
try { try {
$unserialized = \unserialize($tounserialize); $unserialized = \unserialize($tounserialize);

View File

@ -582,9 +582,11 @@ abstract class Tools extends StrTools
Tools::callFork($failureCb()); Tools::callFork($failureCb());
$failureCb = null; $failureCb = null;
} }
if ($token->isRequested()) {
return null;
}
yield self::sleep($polling); yield self::sleep($polling);
if ($token->isRequested()) { if ($token->isRequested()) {
var_dump("was requested pap");
return null; return null;
} }
} }