IPC fixes

This commit is contained in:
Daniil Gentili 2020-07-12 00:17:47 +02:00
parent 699f0660e0
commit 2463fd7261
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
8 changed files with 75 additions and 37 deletions

View File

@ -88,7 +88,7 @@ class MyEventHandler extends EventHandler
}
$settings = [
'logger' => [
'logger_level' => Logger::VERBOSE
'logger_level' => Logger::ULTRA_VERBOSE
],
'serialization' => [
'serialization_interval' => 30,

View File

@ -20,12 +20,15 @@
namespace danog\MadelineProto;
use Amp\File\StatCache;
use Amp\Ipc\Sync\ChannelledSocket;
use danog\MadelineProto\Ipc\Client;
use danog\MadelineProto\Ipc\Server;
use function Amp\File\exists;
use function Amp\File\get;
use function Amp\File\isfile;
use function Amp\File\unlink;
use function Amp\Ipc\connect;
/**
* IPC API wrapper for MadelineProto.
@ -65,25 +68,29 @@ class FastAPI extends API
{
$this->logger = Logger::constructorFromSettings($settings);
$session = new SessionPaths($session);
yield from $this->checkInit($session, $settings);
if (!(yield exists($session->getIpcPath()))) {
if (!$client = yield from $this->checkInit($session, $settings)) {
try {
yield unlink($session->getIpcPath());
} catch (\Throwable $e) {
}
StatCache::clear($session->getIpcPath());
yield from Server::startMe($session);
$inited = false;
for ($x = 0; $x < 3; $x++) {
$this->logger->logger("Waiting for IPC server to start...");
yield Tools::sleep(0.1);
if (yield from $this->checkInit($session, $settings)) {
yield Tools::sleep(1);
StatCache::clear($session->getIpcPath());
if ($client = yield from $this->checkInit($session, $settings)) {
$inited = true;
break;
}
yield from Server::startMe($session);
}
if (!$inited) {
if (!$client) {
throw new Exception("The IPC server isn't running, please check logs!");
}
}
$this->API = new Client($session->getIpcPath(), $this->logger);
yield from $this->API->initAsynchronously();
$this->API = new Client($client, $this->logger);
$this->methods = self::getInternalMethodList($this->API, MTProto::class);
$this->logger->logger(Lang::$current_lang['madelineproto_ready'], Logger::NOTICE);
}
@ -102,15 +109,34 @@ class FastAPI extends API
if (!(yield exists($session->getSessionPath()))
|| (yield exists($session->getIpcPath())
&& yield isfile($session->getIpcPath())
&& yield get($session->getIpcPath()) === 'not inited')
&& yield get($session->getIpcPath()) === Server::NOT_INITED)
) { // Should init API ID|session
Logger::log("Session not initialized, initializing it now...");
$API = new API($session->getSessionPath(), $settings);
yield from $API->initAsynchronously();
unset($API);
return false; // Should start IPC server
while (\gc_collect_cycles());
return null; // Should start IPC server
}
return yield from $this->tryConnect($session->getIpcPath());
}
/**
* Try connecting to IPC socket.
*
* @param string $ipcPath IPC path
*
* @return \Generator<ChannelledSocket|null>
*/
private function tryConnect(string $ipcPath): \Generator
{
Logger::log("Trying to connect to IPC socket...");
try {
return yield connect($ipcPath);
} catch (\Throwable $e) {
$e = $e->getMessage();
Logger::log("$e while connecting to IPC socket");
return null;
}
return true; // All good, IPC server is running
}
/**
* Start MadelineProto and the event handler (enables async).

View File

@ -31,7 +31,7 @@ use function Amp\Ipc\connect;
/**
* IPC client.
*/
class Client extends AsyncConstruct
class Client
{
use \danog\MadelineProto\Wrappers\Start;
use \danog\MadelineProto\Wrappers\Templates;
@ -51,27 +51,13 @@ class Client extends AsyncConstruct
/**
* Constructor function.
*
* @param string $ipcPath IPC socket path
* @param Logger $logger Logger
* @param ChannelledSocket $socket IPC client socket
* @param Logger $logger Logger
*/
public function __construct(string $ipcPath, Logger $logger)
{
$this->setInitPromise($this->__construct_async($ipcPath, $logger));
}
/**
* Constructor function.
*
* @param string $ipcPath IPC socket path
* @param Logger $logger Logger
*
* @return \Generator
*/
public function __construct_async(string $ipcPath, Logger $logger): \Generator
public function __construct(ChannelledSocket $server, Logger $logger)
{
$this->logger = $logger;
$this->logger("Connecting to IPC server...");
$this->server = yield connect($ipcPath);
$this->logger("Connected to IPC server!");
$this->server = $server;
Tools::callFork($this->loop());
}
/**

View File

@ -4,6 +4,7 @@ namespace danog\MadelineProto\Ipc\Runner;
use Amp\Process\Process as BaseProcess;
use Amp\Promise;
use danog\MadelineProto\Magic;
final class ProcessRunner extends RunnerAbstract
{
@ -42,9 +43,9 @@ final class ProcessRunner extends RunnerAbstract
\escapeshellarg($session),
'&'
]);
\var_dumP($command);
var_dump($command);
$this->process = new BaseProcess($command);
$this->process = new BaseProcess($command, Magic::getcwd());
}
private static function locateBinary(): string
{

View File

@ -5,6 +5,7 @@ namespace danog\MadelineProto\Ipc\Runner;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Parallel\Context\ContextException;
use Amp\Promise;
use danog\MadelineProto\Magic;
final class WebRunner extends RunnerAbstract
{
@ -79,7 +80,8 @@ final class WebRunner extends RunnerAbstract
}
$this->params = [
'argv' => ['pony', 'madeline-ipc', $session]
'argv' => ['pony', 'madeline-ipc', $session],
'cwd' => Magic::getcwd()
];
}

View File

@ -18,7 +18,9 @@
use Amp\Deferred;
use danog\MadelineProto\API;
use danog\MadelineProto\Ipc\Server;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Magic;
use danog\MadelineProto\SessionPaths;
use danog\MadelineProto\Tools;
@ -64,12 +66,25 @@ use danog\MadelineProto\Tools;
\trigger_error("IPC session $ipcPath does not exist!", E_USER_ERROR);
exit(1);
}
if (\function_exists(\cli_set_process_title::class)) {
@\cli_set_process_title("MadelineProto worker $ipcPath");
}
if (isset($_GET['cwd'])) {
@\chdir($_GET['cwd']);
}
\define(\MADELINE_WORKER::class, 1);
try {
Magic::classExists();
Magic::$script_cwd = $_GET['cwd'] ?? Magic::getcwd();
$API = new API($ipcPath);
if ($API->hasEventHandler()) {
$API->startAndLoop(\get_class($API->getEventHandler()));
unset($API);
\gc_collect_cycles();
Logger::log("Session has event handler, can't start IPC server like this!");
$ipc = (new SessionPaths($ipcPath))->getIpcPath();
@\unlink($ipc);
\file_put_contents($ipc, Server::EVENT_HANDLER);
} else {
$API->initSelfRestart();
Tools::wait((new Deferred)->promise());
@ -80,7 +95,7 @@ use danog\MadelineProto\Tools;
if ($e->getMessage() === 'Not inited!') {
$ipc = (new SessionPaths($ipcPath))->getIpcPath();
@\unlink($ipc);
\file_put_contents($ipc, 'not inited');
\file_put_contents($ipc, Server::NOT_INITED);
}
}
}

View File

@ -31,6 +31,14 @@ use danog\MadelineProto\Tools;
*/
class Server extends SignalLoop
{
/**
* Session not initialized, should initialize
*/
const NOT_INITED = 'not inited';
/**
* Session uses event handler, should start from main event handler file
*/
const EVENT_HANDLER = 'event';
/**
* IPC server.
*/

View File

@ -275,7 +275,7 @@ class Magic
}
self::$initedLight = true;
if ($light) {
\define('AMP_WORKER', true);
if (!\defined(\AMP_WORKER::class)) \define(\AMP_WORKER::class, true);
return;
}
}
@ -327,7 +327,7 @@ class Magic
if ((PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg') && !(\class_exists(\Phar::class) && \Phar::running())) {
try {
$back = \debug_backtrace(0);
\define('AMP_WORKER', 1);
if (!defined('AMP_WORKER')) \define('AMP_WORKER', 1);
$promise = \Amp\File\get(\end($back)['file']);
do {
try {