Final fixes

This commit is contained in:
Daniil Gentili 2020-09-24 11:45:20 +02:00
parent a3a331542b
commit bc13e61526
20 changed files with 557 additions and 206 deletions

View File

@ -5,8 +5,7 @@ $config->getFinder()
->in(__DIR__ . '/src')
->in(__DIR__ . '/tests')
->in(__DIR__ . '/examples')
->in(__DIR__ . '/tools')
->in(__DIR__);
->in(__DIR__ . '/tools');
$cacheDir = getenv('TRAVIS') ? getenv('HOME') . '/.php-cs-fixer' : __DIR__;

View File

@ -19,10 +19,11 @@
namespace danog\MadelineProto;
use Amp\Failure;
use Amp\Ipc\Sync\ChannelledSocket;
use danog\MadelineProto\Ipc\Client;
use danog\MadelineProto\Ipc\Server;
use danog\MadelineProto\Settings\Logger as SettingsLogger;
use danog\MadelineProto\Settings\Serialization as SettingsSerialization;
/**
* Main API wrapper for MadelineProto.
@ -92,13 +93,6 @@ class API extends InternalDoc
*/
private $wrapper;
/**
* Global session unlock callback.
*
* @var ?callable
*/
private $unlock;
/**
* Magic constructor function.
@ -128,44 +122,20 @@ class API extends InternalDoc
/**
* Async constructor function.
*
* @param Settings|SettingsEmpty $settings Settings
* @param Settings|SettingsEmpty|SettingsSerialization $settings Settings
*
* @return \Generator
*/
private function internalInitAPI(SettingsAbstract $settings): \Generator
{
Logger::constructorFromSettings($settings instanceof SettingsEmpty
? new SettingsLogger
: $settings->getLogger());
Logger::constructorFromSettings($settings instanceof Settings
? $settings->getLogger()
: new SettingsLogger);
[$unserialized, $this->unlock] = yield Tools::timeoutWithDefault(
Serialization::unserialize($this->session),
30000,
new Failure(new \RuntimeException("Could not connect to MadelineProto, please check the logs for more details."))
);
if ($unserialized instanceof ChannelledSocket) {
$this->API = new Client($unserialized, Logger::$default);
$this->APIFactory();
return;
} elseif ($unserialized) {
$unserialized->storage = $unserialized->storage ?? [];
$unserialized->session = $this->session;
APIWrapper::link($this, $unserialized);
APIWrapper::link($this->wrapper, $this);
AbstractAPIFactory::link($this->wrapper->getFactory(), $this);
if (isset($this->API)) {
$this->storage = $this->API->storage ?? $this->storage;
unset($unserialized);
yield from $this->API->wakeup($settings, $this->wrapper);
$this->APIFactory();
$this->logger->logger(Lang::$current_lang['madelineproto_ready'], Logger::NOTICE);
return;
}
if (yield from $this->connectToMadelineProto($settings)) {
return; // OK
}
if ($settings instanceof SettingsEmpty) {
if (!$settings instanceof Settings) {
$settings = new Settings;
}
@ -185,6 +155,61 @@ class API extends InternalDoc
$this->logger->logger(Lang::$current_lang['madelineproto_ready'], Logger::NOTICE);
}
/**
* Connect to MadelineProto.
*
* @param Settings|SettingsEmpty $settings Settings
* @param bool $forceFull Whether to force full initialization
*
* @return \Generator
*/
protected function connectToMadelineProto(SettingsAbstract $settings, bool $forceFull = false): \Generator
{
if ($settings instanceof SettingsSerialization) {
$forceFull = $forceFull || $settings->getForceFull();
} elseif ($settings instanceof Settings) {
$forceFull = $forceFull || $settings->getSerialization()->getForceFull();
}
[$unserialized, $this->unlock] = yield Tools::timeoutWithDefault(
Serialization::unserialize($this->session, $forceFull),
30000,
[0, null]
);
if ($unserialized === 0) {
// Timeout
throw new \RuntimeException("Could not connect to MadelineProto, please check the logs for more details.");
} elseif ($unserialized instanceof \Throwable) {
// IPC server error, try fetching full session
return yield from $this->connectToMadelineProto($settings, true);
} elseif ($unserialized instanceof ChannelledSocket) {
// Success, IPC client
$this->API = new Client($unserialized, Logger::$default);
$this->APIFactory();
return true;
} elseif ($unserialized) {
// Success, full session
$unserialized->storage = $unserialized->storage ?? [];
$unserialized->session = $this->session;
APIWrapper::link($this, $unserialized);
APIWrapper::link($this->wrapper, $this);
AbstractAPIFactory::link($this->wrapper->getFactory(), $this);
if (isset($this->API)) {
$this->storage = $this->API->storage ?? $this->storage;
unset($unserialized);
if ($settings instanceof SettingsSerialization) {
$settings = new SettingsEmpty;
}
yield from $this->API->wakeup($settings, $this->wrapper);
$this->APIFactory();
$this->logger->logger(Lang::$current_lang['madelineproto_ready'], Logger::NOTICE);
return true;
}
}
return false;
}
/**
* Wakeup function.
*
@ -303,15 +328,24 @@ class API extends InternalDoc
{
$errors = [];
$this->async(true);
if ($this->API instanceof Client) {
yield $this->API->stopIpcServer();
yield $this->API->disconnect();
yield from $this->connectToMadelineProto(new SettingsEmpty, true);
}
$started = false;
while (true) {
try {
yield $this->start();
$started = true;
yield $this->setEventHandler($eventHandler);
return yield from $this->API->loop();
} catch (\Throwable $e) {
$errors = [\time() => $errors[\time()] ?? 0];
$errors[\time()]++;
if ($errors[\time()] > 100 && !$this->inited()) {
if ($errors[\time()] > 100 && (!$this->inited() || !$started)) {
$this->logger->logger("More than 100 errors in a second and not inited, exiting!", Logger::FATAL_ERROR);
return;
}

View File

@ -21,10 +21,8 @@ namespace danog\MadelineProto;
use Amp\Promise;
use Amp\Success;
use danog\MadelineProto\Ipc\Client;
use danog\MadelineProto\Ipc\LightState;
use function Amp\File\open;
use function Amp\File\rename as renameAsync;
final class APIWrapper
{
@ -190,22 +188,10 @@ final class APIWrapper
yield from $this->API->initAsynchronously();
}
$file = yield open($this->session->getTempPath(), 'bw+');
yield $file->write(Serialization::PHP_HEADER);
yield $file->write(\chr(Serialization::VERSION));
yield $file->write(\serialize($this));
yield $file->close();
yield renameAsync($this->session->getTempPath(), $this->session->getSessionPath());
yield from $this->session->serialize($this, $this->session->getSessionPath());
if ($this->API) {
$file = yield open($this->session->getTempPath(), 'bw+');
yield $file->write(Serialization::PHP_HEADER);
yield $file->write(\chr(Serialization::VERSION));
yield $file->write(\serialize(new LightState($this->API)));
yield $file->close();
yield renameAsync($this->session->getTempPath(), $this->session->getIpcStatePath());
yield from $this->session->storeLightState($this->API);
}

View File

@ -20,6 +20,7 @@
namespace danog\MadelineProto;
use danog\MadelineProto\Async\AsyncConstruct;
use danog\MadelineProto\Ipc\Client;
abstract class AbstractAPIFactory extends AsyncConstruct
{
@ -36,7 +37,7 @@ abstract class AbstractAPIFactory extends AsyncConstruct
*
* @internal
*
* @var MTProto
* @var MTProto|Client
*/
public $API;
/**
@ -61,6 +62,12 @@ abstract class AbstractAPIFactory extends AsyncConstruct
* @var string[]
*/
protected array $methods = [];
/**
* Main API instance.
*/
private API $mainAPI;
/**
* Export APIFactory instance with the specified namespace.
*
@ -92,6 +99,11 @@ abstract class AbstractAPIFactory extends AsyncConstruct
$a->lua =& $b->lua;
$a->async =& $b->async;
$a->methods =& $b->methods;
if ($b instanceof API) {
$a->mainAPI = $b;
} else {
$a->mainAPI =& $b->mainAPI;
}
if (!$b->inited()) {
$a->setInitPromise($b->initAsynchronously());
}
@ -173,6 +185,18 @@ abstract class AbstractAPIFactory extends AsyncConstruct
$args = isset($arguments[0]) && \is_array($arguments[0]) ? $arguments[0] : [];
return yield from $this->API->methodCallAsyncRead($name, $args, $aargs);
}
if ($this->API instanceof Client
&& ($lower_name === 'seteventhandler'
|| ($lower_name === 'loop' && !isset($arguments[0])))
) {
yield $this->API->stopIpcServer();
yield $this->API->disconnect();
if ($this instanceof API) {
yield from $this->connectToMadelineProto(new SettingsEmpty, true);
} else {
yield from $this->mainAPI->connectToMadelineProto(new SettingsEmpty, true);
}
}
$res = $this->methods[$lower_name](...$arguments);
return $res instanceof \Generator ? yield from $res : yield $res;
}

View File

@ -20,6 +20,8 @@ namespace danog\MadelineProto\Ipc;
use Amp\Deferred;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Promise;
use Amp\Success;
use danog\MadelineProto\API;
use danog\MadelineProto\Exception;
use danog\MadelineProto\Logger;
@ -118,6 +120,25 @@ class Client
Tools::wait($this->server->disconnect());
}
}
/**
* Disconnect cleanly from main instance.
*
* @return Promise
*/
public function disconnect(): Promise
{
return isset($this->server) ? $this->server->disconnect() : new Success();
}
/**
* Stop IPC server instance.
*
* @internal
*/
public function stopIpcServer(): \Generator
{
yield $this->server->send(Server::SHUTDOWN);
//yield $this->disconnect();
}
/**
* Call function.
*

View File

@ -0,0 +1,64 @@
<?php
namespace danog\MadelineProto\Ipc;
/**
* IPC state class.
*/
final class IpcState
{
/**
* Startup time.
*/
private float $startupTime;
/**
* Startup ID.
*/
private int $startupId;
/**
* Exception.
*/
private ?\Throwable $exception;
/**
* Construct.
*
* @param integer $startupId
* @param \Throwable $exception
*/
public function __construct(int $startupId, \Throwable $exception = null)
{
$this->startupTime = \microtime(true);
$this->startupId = $startupId;
$this->exception = $exception;
}
/**
* Get startup time.
*
* @return float
*/
public function getStartupTime(): float
{
return $this->startupTime;
}
/**
* Get startup ID.
*
* @return int
*/
public function getStartupId(): int
{
return $this->startupId;
}
/**
* Get exception.
*
* @return ?\Throwable
*/
public function getException(): ?\Throwable
{
return $this->exception;
}
}

View File

@ -2,18 +2,28 @@
namespace danog\MadelineProto\Ipc\Runner;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Tools;
final class ProcessRunner extends RunnerAbstract
{
/** @var string|null Cached path to located PHP binary. */
private static $binaryPath;
/**
* Resources.
*/
private static array $resources = [];
/**
* Runner.
*
* @param string $session Session path
*
* @return void
*/
public static function start(string $session): void
public static function start(string $session, int $request): void
{
$request = Tools::randomInt();
if (\PHP_SAPI === "cli") {
$binary = \PHP_BINARY;
} else {
@ -29,15 +39,16 @@ final class ProcessRunner extends RunnerAbstract
$runner = self::getScriptPath();
$command = \implode(" ", [
'nohup',
\escapeshellarg($binary),
self::formatOptions($options),
$runner,
'madeline-ipc',
\escapeshellarg($session),
'&>/dev/null &'
$request
]);
\proc_close(\proc_open($command, [], $foo));
Logger::log("Starting process with $command");
self::$resources []= \proc_open($command, [], $foo);
}
private static function locateBinary(): string
{

View File

@ -60,8 +60,9 @@ abstract class RunnerAbstract
* Runner.
*
* @param string $session Session path
* @param int $startup ID
*
* @return void
*/
abstract public static function start(string $session): void;
abstract public static function start(string $session, int $startupId): void;
}

View File

@ -2,7 +2,6 @@
namespace danog\MadelineProto\Ipc\Runner;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Parallel\Context\ContextException;
use danog\MadelineProto\Magic;
@ -15,21 +14,18 @@ final class WebRunner extends RunnerAbstract
* Resources.
*/
private static array $resources = [];
/**
* Socket.
*
* @var ResourceOutputStream
*/
private $res;
/**
* Start.
*
* @param string $session Session path
*
* @return void
*/
public static function start(string $session): void
public static function start(string $session, int $id): void
{
if (!isset($_SERVER['SERVER_NAME'])) {
throw new ContextException("Could not initialize web runner!");
return;
}
if (!self::$runPath) {
@ -79,7 +75,7 @@ final class WebRunner extends RunnerAbstract
}
$params = [
'argv' => ['madeline-ipc', $session],
'argv' => ['madeline-ipc', $session, $id],
'cwd' => Magic::getcwd()
];

View File

@ -16,12 +16,13 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
use Amp\Deferred;
use danog\MadelineProto\API;
use danog\MadelineProto\Ipc\IpcState;
use danog\MadelineProto\Ipc\Server;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Magic;
use danog\MadelineProto\SessionPaths;
use danog\MadelineProto\Settings;
use danog\MadelineProto\Tools;
(static function (): void {
@ -48,6 +49,13 @@ use danog\MadelineProto\Tools;
\define(\MADELINE_WORKER_TYPE::class, \array_shift($arguments));
\define(\MADELINE_WORKER_ARGS::class, $arguments);
}
if (\defined(\SIGHUP::class)) {
try {
\pcntl_signal(SIGHUP, fn () => null);
} catch (\Throwable $e) {
}
}
if (!\class_exists(API::class)) {
$paths = [
\dirname(__DIR__, 7)."/autoload.php",
@ -82,29 +90,32 @@ use danog\MadelineProto\Tools;
}
\define(\MADELINE_WORKER::class, 1);
$runnerId = \MADELINE_WORKER_ARGS[1];
$session = new SessionPaths($ipcPath);
try {
Magic::classExists();
Magic::$script_cwd = $_GET['cwd'] ?? Magic::getcwd();
$API = new API($ipcPath);
$API = new API($ipcPath, (new Settings)->getSerialization()->setForceFull(true));
$API->init();
if ($API->hasEventHandler()) {
unset($API);
\gc_collect_cycles();
Logger::log("Session has event handler, can't start IPC server like this!");
$ipc = (new SessionPaths($ipcPath))->getIpcPath();
@\unlink($ipc);
\file_put_contents($ipc, Server::EVENT_HANDLER);
} else {
$API->initSelfRestart();
Tools::wait((new Deferred)->promise());
$API->initSelfRestart();
Tools::wait($session->storeIpcState(new IpcState($runnerId)));
while (true) {
try {
Tools::wait(Server::waitShutdown());
return;
} catch (\Throwable $e) {
Logger::log((string) $e, Logger::FATAL_ERROR);
Tools::wait($API->report("Surfaced: $e"));
}
}
} catch (\Throwable $e) {
Logger::log("Got exception $e in IPC server, exiting...", Logger::FATAL_ERROR);
\trigger_error("Got exception $e in IPC server, exiting...", E_USER_ERROR);
if ($e->getMessage() === 'Not inited!') {
$ipc = (new SessionPaths($ipcPath))->getIpcPath();
@\unlink($ipc);
\file_put_contents($ipc, Server::NOT_INITED);
$ipc = Tools::wait($session->getIpcState());
if (!($ipc && $ipc->getRunnerId() === $runnerId && !$ipc->getException())) {
Tools::wait($session->storeIpcState(new IpcState($runnerId, $e)));
}
}
}

View File

@ -18,13 +18,16 @@
namespace danog\MadelineProto\Ipc;
use Amp\Deferred;
use Amp\Ipc\IpcServer;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Promise;
use danog\Loop\SignalLoop;
use danog\MadelineProto\Ipc\Runner\ProcessRunner;
use danog\MadelineProto\Ipc\Runner\WebRunner;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\InternalLoop;
use danog\MadelineProto\SessionPaths;
use danog\MadelineProto\Tools;
/**
@ -34,13 +37,17 @@ class Server extends SignalLoop
{
use InternalLoop;
/**
* Session not initialized, should initialize.
* Shutdown server.
*/
const NOT_INITED = 'not inited';
const SHUTDOWN = 0;
/**
* Session uses event handler, should start from main event handler file.
* Boolean to shut down worker, if started.
*/
const EVENT_HANDLER = 'event';
private static bool $shutdown = false;
/**
* Deferred to shut down worker, if started.
*/
private static ?Deferred $shutdownDeferred = null;
/**
* IPC server.
*/
@ -54,31 +61,67 @@ class Server extends SignalLoop
*/
public function setIpcPath(string $path): void
{
self::$shutdownDeferred = new Deferred;
$this->server = new IpcServer($path);
}
/**
* Start IPC server in background.
*
* @param string $session Session path
* @param SessionPaths $session Session path
*
* @return void
* @return Promise
*/
public static function startMe(string $session): void
public static function startMe(SessionPaths $session): Promise
{
$id = Tools::randomInt();
try {
Logger::log("Starting IPC server $session (process)");
ProcessRunner::start($session);
WebRunner::start($session);
return;
ProcessRunner::start($session, $id);
WebRunner::start($session, $id);
return Tools::call(self::monitor($session, $id));
} catch (\Throwable $e) {
Logger::log($e);
}
try {
Logger::log("Starting IPC server $session (web)");
WebRunner::start($session);
WebRunner::start($session, $id);
} catch (\Throwable $e) {
Logger::log($e);
}
return Tools::call(self::monitor($session, $id));
}
/**
* Monitor session.
*
* @param SessionPaths $session
* @param int $id
*
* @return \Generator
*/
private static function monitor(SessionPaths $session, int $id): \Generator
{
while (true) {
$state = yield $session->getIpcState();
if ($state && $state->getStartupId() === $id) {
if ($e = $state->getException()) {
Logger::log("IPC server got exception $e");
return $e;
}
Logger::log("IPC server started successfully!");
return true;
}
yield Tools::sleep(1);
}
return false;
}
/**
* Wait for shutdown.
*
* @return Promise
*/
public static function waitShutdown(): Promise
{
return self::$shutdownDeferred->promise();
}
/**
* Main loop.
@ -104,11 +147,19 @@ class Server extends SignalLoop
$this->API->logger("Accepted IPC client connection!");
$id = 0;
$payload = null;
try {
while ($payload = yield $socket->receive()) {
Tools::callFork($this->clientRequest($socket, $id++, $payload));
}
} catch (\Throwable $e) {
} finally {
yield $socket->disconnect();
if ($payload === self::SHUTDOWN) {
$this->signal(null);
if (self::$shutdownDeferred) {
self::$shutdownDeferred->resolve();
}
}
}
}
/**

View File

@ -16,10 +16,7 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Ipc;
use danog\MadelineProto\EventHandler;
use danog\MadelineProto\MTProto;
namespace danog\MadelineProto;
/**
* Light state.

View File

@ -160,10 +160,7 @@ class Logger
*/
public static function constructorFromSettings(SettingsLogger $settings): self
{
if (!self::$default) {
self::$default = new self($settings);
}
return self::$default;
return self::$default = new self($settings);
}
/**

View File

@ -22,6 +22,7 @@ namespace danog\MadelineProto;
use Amp\Dns\Resolver;
use Amp\File\StatCache;
use Amp\Http\Client\HttpClient;
use Amp\Loop;
use Amp\Promise;
use Closure;
use danog\MadelineProto\Async\AsyncConstruct;
@ -1604,11 +1605,12 @@ class MTProto extends AsyncConstruct implements TLCallback
/**
* Report an error to the previously set peer.
*
* @param string $message Error to report
* @param string $message Error to report
* @param string $parseMode Parse mode
*
* @return \Generator
*/
public function report(string $message): \Generator
public function report(string $message, string $parseMode = ''): \Generator
{
if (!$this->reportDest) {
return;
@ -1640,7 +1642,7 @@ class MTProto extends AsyncConstruct implements TLCallback
$sent = true;
foreach ($this->reportDest as $id) {
try {
yield from $this->methodCallAsyncRead('messages.sendMessage', ['peer' => $id, 'message' => $message]);
yield from $this->methodCallAsyncRead('messages.sendMessage', ['peer' => $id, 'message' => $message, 'parse_mode' => $parseMode]);
if ($file) {
yield from $this->methodCallAsyncRead('messages.sendMedia', ['peer' => $id, 'media' => $file]);
}

View File

@ -481,7 +481,7 @@ trait ResponseHandler
if (isset($response['_']) && !$this->isCdn() && $this->API->getTL()->getConstructors()->findByPredicate($response['_'])['type'] === 'Updates') {
$body = [];
if (isset($request['body']['peer'])) {
$body['peer'] = $this->API->getID($request['body']['peer']);
$body['peer'] = \is_string($request['body']['peer']) ? $request['body']['peer'] : $this->API->getId($request['body']['peer']);
}
if (isset($request['body']['message'])) {
$body['message'] = (string) $request['body']['message'];

View File

@ -1052,9 +1052,13 @@ trait PeerHandler
*/
public function resolveUsername(string $username): \Generator
{
$username = \str_replace('@', '', $username);
if (!$username) {
return false;
}
try {
$this->caching_simple_username[$username] = true;
$res = yield from $this->methodCallAsyncRead('contacts.resolveUsername', ['username' => \str_replace('@', '', $username)], ['datacenter' => $this->datacenter->curdc]);
$res = yield from $this->methodCallAsyncRead('contacts.resolveUsername', ['username' => $username], ['datacenter' => $this->datacenter->curdc]);
} catch (\danog\MadelineProto\RPCErrorException $e) {
$this->logger->logger('Username resolution failed with error '.$e->getMessage(), \danog\MadelineProto\Logger::ERROR);
if (\strpos($e->rpc, 'FLOOD_WAIT_') === 0 || $e->rpc === 'AUTH_KEY_UNREGISTERED' || $e->rpc === 'USERNAME_INVALID') {

View File

@ -19,17 +19,14 @@
namespace danog\MadelineProto;
use Amp\CancellationTokenSource;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;
use danog\MadelineProto\Ipc\LightState;
use danog\MadelineProto\Ipc\Server;
use danog\MadelineProto\MTProtoSession\Session;
use function Amp\File\exists;
use function Amp\File\get;
use function Amp\File\open;
use function Amp\File\stat;
use function Amp\Ipc\connect;
/**
@ -98,13 +95,14 @@ abstract class Serialization
* - Start IPC server
* - Store IPC state
*
* @param SessionPaths $session Session name
* @param SessionPaths $session Session name
* @param bool $forceFull Whether to force full session deserialization
*
* @internal
*
* @return \Generator
*/
public static function unserialize(SessionPaths $session): \Generator
public static function unserialize(SessionPaths $session, bool $forceFull = false): \Generator
{
if (yield exists($session->getSessionPath())) {
// Is new session
@ -131,17 +129,18 @@ abstract class Serialization
Loop::unreference($warningId);
$lightState = null;
$cancelFlock = new CancellationTokenSource;
$cancelFlock = new Deferred;
$cancelIpc = new Deferred;
$canContinue = true;
$ipcSocket = null;
$unlock = yield Tools::flock($session->getLockPath(), LOCK_EX, 1, $cancelFlock->getToken(), static function () use ($session, $cancelFlock, &$canContinue, &$ipcSocket, &$lightState) {
$ipcSocket = Tools::call(self::tryConnect($session->getIpcPath(), $cancelFlock));
$session->getIpcState()->onResolve(static function (?\Throwable $e, ?LightState $res) use ($cancelFlock, &$canContinue, &$lightState) {
$unlock = yield from Tools::flockGenerator($session->getLockPath(), LOCK_EX, 1, $cancelFlock->promise(), $forceFull ? null : static function () use ($session, $cancelFlock, $cancelIpc, &$canContinue, &$ipcSocket, &$lightState) {
$ipcSocket = Tools::call(self::tryConnect($session->getIpcPath(), $cancelIpc->promise(), $cancelFlock));
$session->getLightState()->onResolve(static function (?\Throwable $e, ?LightState $res) use ($cancelFlock, &$canContinue, &$lightState) {
if ($res) {
$lightState = $res;
if (!$res->canStartIpc()) {
$canContinue = false;
$cancelFlock->cancel();
$cancelFlock->resolve(true);
}
} else {
$lightState = false;
@ -154,28 +153,32 @@ abstract class Serialization
return $ipcSocket;
}
if (!$canContinue) { // Have lock, can't use it
Logger::log("IPC WARNING: Session has event handler, but it's not started, and we don't have access to the class, so we can't start it.", Logger::ERROR);
Logger::log("IPC WARNING: Please start the event handler or unset it to use the IPC server.", Logger::ERROR);
Logger::log("Session has event handler, but it's not started.", Logger::ERROR);
Logger::log("We don't have access to the event handler class, so we can't start it.", Logger::ERROR);
Logger::log("Please start the event handler or unset it to use the IPC server.", Logger::ERROR);
$unlock();
return $ipcSocket;
}
try {
/** @var LightState */
$lightState ??= yield $session->getIpcState();
$lightState ??= yield $session->getLightState();
} catch (\Throwable $e) {
}
if ($lightState) {
if ($lightState && !$forceFull) {
if (!$class = $lightState->getEventHandler()) {
// Unlock and fork
$unlock();
Server::startMe($session);
return $ipcSocket ?? yield from self::tryConnect($session->getIpcPath());
$cancelIpc->resolve(Server::startMe($session));
return $ipcSocket ?? yield from self::tryConnect($session->getIpcPath(), $cancelIpc->promise());
} elseif (!\class_exists($class)) {
Logger::log("IPC WARNING: Session has event handler, but it's not started, and we don't have access to the class, so we can't start it.", Logger::ERROR);
Logger::log("IPC WARNING: Please start the event handler or unset it to use the IPC server.", Logger::ERROR);
return $ipcSocket ?? yield from self::tryConnect($session->getIpcPath());
// Have lock, can't use it
$unlock();
Logger::log("Session has event handler, but it's not started.", Logger::ERROR);
Logger::log("We don't have access to the event handler class, so we can't start it.", Logger::ERROR);
Logger::log("Please start the event handler or unset it to use the IPC server.", Logger::ERROR);
return $ipcSocket ?? yield from self::tryConnect($session->getIpcPath(), $cancelIpc->promise());
}
}
@ -187,7 +190,7 @@ abstract class Serialization
Logger::log("Got exclusive session lock!");
if ($isNew) {
$unserialized = yield from self::newUnserialize($session->getSessionPath());
$unserialized = yield from $session->unserialize();
} else {
$unserialized = yield from self::legacyUnserialize($session->getLegacySessionPath());
}
@ -203,51 +206,36 @@ abstract class Serialization
/**
* Try connecting to IPC socket.
*
* @param string $ipcPath IPC path
* @param ?CancellationTokenSource $cancel Cancelation token
* @param string $ipcPath IPC path
* @param Promise $cancelConnect Cancelation token (triggers cancellation of connection)
* @param ?Deferred $cancelFull Cancelation token source (can trigger cancellation of full unserialization)
*
* @return \Generator<int, Promise|Promise<ChannelledSocket>, mixed, void>
* @return \Generator
*/
private static function tryConnect(string $ipcPath, ?CancellationTokenSource $cancel = null): \Generator
private static function tryConnect(string $ipcPath, Promise $cancelConnect, ?Deferred $cancelFull = null): \Generator
{
for ($x = 0; $x < 30; $x++) {
Logger::log("Trying to connect to IPC socket...");
try {
\clearstatcache(true, $ipcPath);
$socket = yield connect($ipcPath);
if ($cancel) {
$cancel->cancel();
if ($cancelFull) {
$cancelFull->resolve(true);
}
return [$socket, null];
} catch (\Throwable $e) {
$e = $e->getMessage();
Logger::log("$e while connecting to IPC socket");
}
yield Tools::sleep(1);
if ($res = yield Tools::timeoutWithDefault($cancelConnect, 1000, null)) {
if ($res instanceof \Throwable) {
return [$res, null];
}
$cancelConnect = (new Deferred)->promise();
}
}
}
/**
* @internal Deserialize new object
*
* @param string $path
* @return \Generator
*/
public static function newUnserialize(string $path): \Generator
{
$headerLen = \strlen(self::PHP_HEADER) + 1;
$file = yield open($path, 'rb');
$size = yield stat($path);
$size = $size['size'] ?? $headerLen;
yield $file->seek($headerLen); // Skip version for now
$unserialized = \unserialize((yield $file->read($size - $headerLen)) ?? '');
yield $file->close();
return $unserialized;
}
/**
* Deserialize legacy session.
*

View File

@ -19,8 +19,14 @@
namespace danog\MadelineProto;
use Amp\File\StatCache;
use Amp\Promise;
use danog\MadelineProto\Ipc\LightState;
use Amp\Success;
use danog\MadelineProto\Ipc\IpcState;
use function Amp\File\exists;
use function Amp\File\open;
use function Amp\File\rename;
/**
* Session path information.
@ -48,9 +54,14 @@ class SessionPaths
*/
private string $ipcStatePath;
/**
* Temporary serialization path.
* Light state path.
*/
private string $tempPath;
private string $lightStatePath;
/**
* Light state.
*/
private ?LightState $lightState = null;
/**
* Construct session info from session name.
*
@ -61,11 +72,57 @@ class SessionPaths
$session = Tools::absolute($session);
$this->legacySessionPath = $session;
$this->sessionPath = "$session.safe.php";
$this->lightStatePath = "$session.lightState.php";
$this->lockPath = "$session.lock";
$this->ipcPath = "$session.ipc";
$this->ipcStatePath = "$session.ipcState.php";
$this->tempPath = "$session.temp.php";
}
/**
* Serialize object to file.
*
* @param object $object
* @param string $path
* @return \Generator
*/
public function serialize(object $object, string $path): \Generator
{
$file = yield open("$path.temp.php", 'bw+');
yield $file->write(Serialization::PHP_HEADER);
yield $file->write(\chr(Serialization::VERSION));
yield $file->write(\serialize($object));
yield $file->close();
yield rename("$path.temp.php", $path);
}
/**
* Deserialize new object.
*
* @param string $path Object path, defaults to session path
*
* @return \Generator
*/
public function unserialize(string $path = ''): \Generator
{
$path = $path ?: $this->sessionPath;
StatCache::clear($path);
if (!yield exists($path)) {
return null;
}
$headerLen = \strlen(Serialization::PHP_HEADER) + 1;
$file = yield open($path, 'rb');
$size = yield \stat($path);
$size = $size['size'] ?? $headerLen;
yield $file->seek($headerLen); // Skip version for now
$unserialized = \unserialize((yield $file->read($size - $headerLen)) ?? '');
yield $file->close();
return $unserialized;
}
/**
* Get session path.
*
@ -116,16 +173,6 @@ class SessionPaths
return $this->ipcPath;
}
/**
* Get temporary serialization path.
*
* @return string
*/
public function getTempPath(): string
{
return $this->tempPath;
}
/**
* Get IPC light state path.
*
@ -139,10 +186,61 @@ class SessionPaths
/**
* Get IPC state.
*
* @return Promise<LightState>
* @return Promise<?IpcState>
*/
public function getIpcState(): Promise
{
return Tools::call(Serialization::newUnserialize($this->ipcStatePath));
return Tools::call($this->unserialize($this->ipcStatePath));
}
/**
* Store IPC state.
*
* @return \Generator
*/
public function storeIpcState(IpcState $state): \Generator
{
return $this->serialize($state, $this->getIpcStatePath());
}
/**
* Get light state path.
*
* @return string
*/
public function getLightStatePath(): string
{
return $this->lightStatePath;
}
/**
* Get light state.
*
* @return Promise<LightState>
*/
public function getLightState(): Promise
{
if ($this->lightState) {
return new Success($this->lightState);
}
$promise = Tools::call($this->unserialize($this->lightStatePath));
$promise->onResolve(function (?\Throwable $e, ?LightState $res) {
if ($res) {
$this->lightState = $res;
}
});
return $promise;
}
/**
* Store light state.
*
* @return \Generator
*/
public function storeLightState(MTProto $state): \Generator
{
$this->lightState = new LightState($state);
return $this->serialize($this->lightState, $this->getLightStatePath());
}
}

View File

@ -10,6 +10,12 @@ class Serialization extends SettingsAbstract
* Serialization interval, in seconds.
*/
protected int $interval = 30;
/**
* Whether to force full deserialization of instance, without using the IPC server/client.
*
* WARNING: this will cause slow startup if enabled.
*/
protected bool $forceFull = false;
public function mergeArray(array $settings): void
{
@ -40,4 +46,28 @@ class Serialization extends SettingsAbstract
return $this;
}
/**
* Get WARNING: this will cause slow startup if enabled.
*
* @return bool
*/
public function getForceFull(): bool
{
return $this->forceFull;
}
/**
* Set WARNING: this will cause slow startup if enabled.
*
* @param bool $forceFull WARNING: this will cause slow startup if enabled.
*
* @return self
*/
public function setForceFull(bool $forceFull): self
{
$this->forceFull = $forceFull;
return $this;
}
}

View File

@ -19,18 +19,18 @@
namespace danog\MadelineProto;
use Amp\CancellationToken;
use Amp\Deferred;
use Amp\Failure;
use Amp\File\StatCache;
use Amp\Loop;
use Amp\NullCancellationToken;
use Amp\Promise;
use Amp\Success;
use Amp\TimeoutException;
use tgseclib\Math\BigInteger;
use function Amp\ByteStream\getOutputBufferStream;
use function Amp\ByteStream\getStdin;
use function Amp\ByteStream\getStdout;
use function Amp\delay;
use function Amp\File\exists;
use function Amp\File\get;
use function Amp\Promise\all;
@ -385,7 +385,25 @@ abstract class Tools extends StrTools
*/
public static function timeout($promise, int $timeout): Promise
{
return timeout(self::call($promise), $timeout);
$promise = self::call($promise);
$deferred = new Deferred;
$watcher = Loop::delay($timeout, static function () use (&$deferred) {
$temp = $deferred; // prevent double resolve
$deferred = null;
$temp->fail(new TimeoutException);
});
Loop::unreference($watcher);
$promise->onResolve(function () use (&$deferred, $promise, $watcher) {
if ($deferred !== null) {
Loop::cancel($watcher);
$deferred->resolve($promise);
}
});
return $deferred->promise();
}
/**
* Creates an artificial timeout for any `Promise`.
@ -406,7 +424,25 @@ abstract class Tools extends StrTools
*/
public static function timeoutWithDefault($promise, int $timeout, $default = null): Promise
{
return timeoutWithDefault(self::call($promise), $timeout, $default);
$promise = self::call($promise);
$deferred = new Deferred;
$watcher = Loop::delay($timeout, static function () use (&$deferred, $default) {
$temp = $deferred; // prevent double resolve
$deferred = null;
$temp->resolve($default);
});
Loop::unreference($watcher);
$promise->onResolve(function () use (&$deferred, $promise, $watcher) {
if ($deferred !== null) {
Loop::cancel($watcher);
$deferred->resolve($promise);
}
});
return $deferred->promise();
}
/**
* Convert generator, promise or any other value to a promise.
@ -541,34 +577,35 @@ abstract class Tools extends StrTools
* Asynchronously lock a file
* Resolves with a callbable that MUST eventually be called in order to release the lock.
*
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param CancellationToken $token Cancellation token
* @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails.
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param ?Promise $token Cancellation token
* @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails.
*
* @return Promise<?callable>
*/
public static function flock(string $file, int $operation, float $polling = 0.1, $token = null, $failureCb = null): Promise
public static function flock(string $file, int $operation, float $polling = 0.1, ?Promise $token = null, $failureCb = null): Promise
{
return self::call(Tools::flockGenerator($file, $operation, $polling, $token, $failureCb));
}
/**
* Asynchronously lock a file (internal generator function).
*
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param CancellationToken $token Cancellation token
* @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails.
* @param string $file File to lock
* @param integer $operation Locking mode
* @param float $polling Polling interval
* @param ?Promise $token Cancellation token
* @param ?callable $failureCb Failure callback, called only once if the first locking attempt fails.
*
* @internal Generator function
*
* @return \Generator
*/
public static function flockGenerator(string $file, int $operation, float $polling, $token = null, $failureCb = null): \Generator
public static function flockGenerator(string $file, int $operation, float $polling, ?Promise $token = null, $failureCb = null): \Generator
{
$token = $token ?? new NullCancellationToken;
$polling *= 1000;
$polling = (int) $polling;
if (!yield exists($file)) {
yield \touch($file);
StatCache::clear($file);
@ -579,15 +616,15 @@ abstract class Tools extends StrTools
$result = \flock($res, $operation);
if (!$result) {
if ($failureCb) {
Tools::callFork($failureCb());
$failureCb();
$failureCb = null;
}
if ($token->isRequested()) {
return null;
}
yield self::sleep($polling);
if ($token->isRequested()) {
return null;
if ($token) {
if (yield Tools::timeoutWithDefault($token, $polling, false)) {
return;
}
} else {
yield delay($polling);
}
}
} while (!$result);
@ -898,7 +935,7 @@ abstract class Tools extends StrTools
public static function absolute(string $file): string
{
if (($file[0] ?? '') !== '/' && ($file[1] ?? '') !== ':' && !\in_array(\substr($file, 0, 4), ['phar', 'http'])) {
$file = Magic::getcwd().'/'.$file;
$file = Magic::getcwd().DIRECTORY_SEPARATOR.$file;
}
return $file;
}