Implement IPC callbacks

This commit is contained in:
Daniil Gentili 2020-09-25 19:17:16 +02:00
parent 3d9d975f79
commit 2b387b5c0f
13 changed files with 519 additions and 16 deletions

View File

@ -40,11 +40,19 @@ class Client
/**
* IPC server socket.
*/
private ChannelledSocket $server;
protected ChannelledSocket $server;
/**
* Callback IPC server socket.
*/
private ?ChannelledSocket $serverCallback = null;
/**
* Requests promise array.
*/
private array $requests = [];
/**
* Wrappers array.
*/
private array $wrappers = [];
/**
* Whether to run loop.
*/
@ -102,6 +110,9 @@ class Client
} else {
$promise = $this->requests[$id];
unset($this->requests[$id]);
if (isset($this->wrappers[$id])) {
unset($this->wrappers[$id]);
}
if ($payload instanceof ExitFailure) {
$promise->fail($payload->getException());
} else {
@ -123,7 +134,7 @@ class Client
*
* @param callable $callback Async callable to run
*
* @return mixed
* @return \Generator
*/
public function loop(callable $callback): \Generator
{
@ -180,14 +191,17 @@ class Client
/**
* Call function.
*
* @param string $function Function name
* @param array $arguments Arguments
* @param string|int $function Function name
* @param array|Wrapper $arguments Arguments
*
* @return \Generator
*/
public function __call(string $function, array $arguments): \Generator
public function __call($function, $arguments): \Generator
{
$this->requests []= $deferred = new Deferred;
if ($arguments instanceof Wrapper) {
$this->wrappers[count($this->requests) - 1] = $arguments;
}
yield $this->server->send([$function, $arguments]);
return yield $deferred->promise();
}

View File

@ -51,18 +51,24 @@ class Server extends SignalLoop
/**
* IPC server.
*/
private IpcServer $server;
protected IpcServer $server;
/**
* Callback IPC server.
*/
private ServerCallback $callback;
/**
* Set IPC path.
*
* @param string $path IPC path
* @param SessionPaths $session Session
*
* @return void
*/
public function setIpcPath(string $path): void
public function setIpcPath(SessionPaths $session): void
{
self::$shutdownDeferred = new Deferred;
$this->server = new IpcServer($path);
$this->server = new IpcServer($session->getIpcPath());
$this->callback = new ServerCallback($this->API);
$this->callback->setIpcPath($session);
}
/**
* Start IPC server in background.
@ -134,15 +140,16 @@ class Server extends SignalLoop
Tools::callFork($this->clientLoop($socket));
}
$this->server->close();
$this->callback->signal(null);
}
/**
* Client handler loop.
*
* @param ChannelledSocket $socket Client
*
* @return \Generator
* @return \Generator|Promise
*/
private function clientLoop(ChannelledSocket $socket): \Generator
protected function clientLoop(ChannelledSocket $socket)
{
$this->API->logger("Accepted IPC client connection!");
@ -167,13 +174,16 @@ class Server extends SignalLoop
*
* @param ChannelledSocket $socket Socket
* @param integer $id Request ID
* @param array $payload Payload
* @param array|Wrapper $payload Payload
*
* @return \Generator
*/
public function clientRequest(ChannelledSocket $socket, int $id, $payload): \Generator
private function clientRequest(ChannelledSocket $socket, int $id, $payload): \Generator
{
try {
if ($payload[1] instanceof Wrapper) {
$payload[1] = $this->callback->unwrap($payload[1]);
}
$result = $this->API->{$payload[0]}(...$payload[1]);
$result = $result instanceof \Generator ? yield from $result : yield $result;
} catch (\Throwable $e) {

View File

@ -0,0 +1,97 @@
<?php
/**
* IPC callback server.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2020 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Ipc;
use Amp\Ipc\IpcServer;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Loop;
use Amp\Promise;
use danog\MadelineProto\Exception;
use danog\MadelineProto\SessionPaths;
/**
* IPC callback server.
*/
class ServerCallback extends Server
{
/**
* Timeout watcher list, indexed by socket ID.
*
* @var array<int, string>
*/
private $watcherList = [];
/**
* Timeout watcher list, indexed by socket ID.
*
* @var array<int, ChannelledSocket>
*/
private $socketList = [];
/**
* Counter.
*/
private int $id = 0;
/**
* Set IPC path.
*
* @param SessionPaths $session Session
*
* @return void
*/
public function setIpcPath(SessionPaths $session): void
{
$this->server = new IpcServer($session->getIpcCallbackPath());
}
/**
* Client handler loop.
*
* @param ChannelledSocket $socket Client
*
* @return Promise
*/
protected function clientLoop(ChannelledSocket $socket)
{
$this->API->logger("Accepted IPC callback connection!");
$id = $this->id++;
$this->socketList[$id] = $socket;
$this->watcherList[$id] = Loop::delay(30*1000, function () use ($id) {
unset($this->watcherList[$id], $this->socketList[$id]);
});
return $socket->send($id);
}
/**
* Unwrap value
*
* @param Wrapper $wrapper
* @return mixed
*/
protected function unwrap(Wrapper $wrapper)
{
$id = $wrapper->getRemoteId();
if (!isset($this->socketList[$id])) {
throw new Exception("IPC timeout, could not find callback socket!");
}
$socket = $this->socketList[$id];
Loop::cancel($this->watcherList[$id]);
unset($this->watcherList[$id], $this->socketList[$id]);
return $wrapper->unwrap($socket);
}
}

View File

@ -0,0 +1,210 @@
<?php
namespace danog\MadelineProto\Ipc;
use Amp\ByteStream\InputStream as ByteStreamInputStream;
use Amp\ByteStream\OutputStream as ByteStreamOutputStream;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Parallel\Sync\ExitFailure;
use danog\MadelineProto\Ipc\Wrapper\InputStream;
use danog\MadelineProto\Ipc\Wrapper\Obj;
use danog\MadelineProto\Ipc\Wrapper\OutputStream;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Tools;
use function Amp\Ipc\connect;
/**
* Callback payload wrapper.
*/
class Wrapper extends Client
{
/**
* Payload data.
*
* @var mixed
*/
private $data;
/**
* Callbacks.
*
* @var callable[]
*/
private array $callbacks = [];
/**
* Callbacks IDs.
*
* @var (int|array{0: class-string<Obj>, array<string, int>})[]
*/
private array $callbackIds = [];
/**
* Callback ID.
*/
private int $id = 0;
/**
* Remote socket ID.
*/
private int $remoteId = 0;
/**
* Logger instance.
*/
private Logger $logger;
/**
* Constructor.
*
* @param mixed $data Payload data
* @param string $ipc IPC URI
*
* @return \Generator
*/
public static function create(&$data, string $ipc, Logger $logger): \Generator
{
$instance = new self;
$instance->data = &$data;
$instance->server = yield connect($ipc);
$instance->remoteId = yield $instance->server->receive();
$instance->logger = $logger;
Tools::callFork($instance->receiverLoop());
return $instance;
}
private function __construct()
{
}
/**
* Serialization function.
*
* @return array
*/
public function __sleep(): array
{
return ['data', 'callbackIds'];
}
/**
* Wrap a certain callback object.
*
* @param object|callable $callback Object to wrap
*
* @param-out int $callback Callback ID
*
* @return void
*/
public function wrap(&$callback): void
{
if (\is_object($callback)) {
$ids = [];
foreach (\get_class_methods($callback) as $method) {
$id = $this->id++;
$this->callbacks[$id] = [$callback, $method];
$ids[$method] = $id;
}
$callback = $ids;
$this->callbackIds[] = &$callback;
} else {
$id = $this->id++;
$this->callbacks[$id] = self::copy($callback);
$class = Obj::class;
if ($callback instanceof ByteStreamInputStream) {
$class = InputStream::class;
} else if ($callback instanceof ByteStreamOutputStream) {
$class = OutputStream::class;
}
if ($class !== Obj::class && method_exists($callback, 'seek')) {
$class = "Seekable$class";
}
$callback = [$class, $id]; // Will be re-filled later
$this->callbackIds[] = &$callback;
}
}
/**
* Get copy of data.
*
* @param mixed $data
* @return mixed
*/
private static function copy($data)
{
return $data;
}
/**
* Receiver loop.
*
* @return \Generator
*/
private function receiverLoop(): \Generator
{
$id = 0;
$payload = null;
try {
while ($payload = yield $this->server->receive()) {
Tools::callFork($this->clientRequest($id++, $payload));
}
} finally {
yield $this->server->disconnect();
}
}
/**
* Handle client request.
*
* @param integer $id Request ID
* @param array $payload Payload
*
* @return \Generator
*/
private function clientRequest(int $id, $payload): \Generator
{
try {
$result = $this->callbacks[$payload[0]](...$payload[1]);
$result = $result instanceof \Generator ? yield from $result : yield $result;
} catch (\Throwable $e) {
$this->logger->logger("Got error while calling reverse IPC method: $e", Logger::ERROR);
$result = new ExitFailure($e);
}
try {
yield $this->server->send([$id, $result]);
} catch (\Throwable $e) {
$this->logger->logger("Got error while trying to send result of reverse method: $e", Logger::ERROR);
try {
yield $this->server->send([$id, new ExitFailure($e)]);
} catch (\Throwable $e) {
$this->logger->logger("Got error while trying to send error of error of reverse method: $e", Logger::ERROR);
}
}
}
/**
* Get remote socket ID.
*
* @internal
*
* @return int
*/
public function getRemoteId(): int
{
return $this->remoteId;
}
/**
* Set socket and unwrap data.
*
* @param ChannelledSocket $server Socket.
*
* @internal
*
* @return mixed
*/
public function unwrap(ChannelledSocket $server)
{
$this->server = $server;
Tools::callFork($this->loopInternal());
foreach ($this->callbackIds as &$id) {
if (\is_int($id)) {
$id = fn (...$args): \Generator => $this->__call($id, $args);
} else {
[$class, $ids] = $id;
$id = new $class($this, $ids);
}
}
return $this->data;
}
}

View File

@ -0,0 +1,25 @@
<?php
namespace danog\MadelineProto\Ipc\Wrapper;
use Amp\ByteStream\InputStream as AmpInputStream;
use Amp\Promise;
use danog\MadelineProto\Ipc\Wrapper\Obj;
use danog\MadelineProto\Tools;
class InputStream extends Obj implements AmpInputStream
{
/**
* Reads data from the stream.
*
* @return Promise Resolves with a string when new data is available or `null` if the stream has closed.
*
* @psalm-return Promise<string|null>
*
* @throws PendingReadError Thrown if another read operation is still pending.
*/
public function read(): Promise
{
return Tools::call($this->__call('read'));
}
}

View File

@ -0,0 +1,45 @@
<?php
namespace danog\MadelineProto\Ipc\Wrapper;
use danog\MadelineProto\Ipc\Wrapper;
/**
* Generic callback wrapper object.
*/
class Obj
{
/**
* Method list.
*
* @var array<string, int>
*/
private array $methods = [];
/**
* Wrapper
*/
private Wrapper $wrapper;
/**
* Constructor
*
* @param Wrapper $wrapper
* @param array $methods
*/
public function __construct(Wrapper $wrapper, array $methods)
{
$this->wrapper = $wrapper;
$this->methods = $methods;
}
/**
* Call method.
*
* @param string $name
* @param array $arguments
*
* @return \Generator
*/
public function __call(string $name, array $arguments = []): \Generator
{
return $this->wrapper->__call($this->methods[$name], $arguments);
}
}

View File

@ -0,0 +1,43 @@
<?php
namespace danog\MadelineProto\Ipc\Wrapper;
use Amp\ByteStream\OutputStream as AmpOutputStream;
use Amp\Promise;
use danog\MadelineProto\Ipc\Wrapper\Obj;
use danog\MadelineProto\Tools;
class OutputStream extends Obj implements AmpOutputStream
{
/**
* Writes data to the stream.
*
* @param string $data Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
* @throws StreamException If writing to the stream fails.
*/
public function write(string $data): Promise
{
return Tools::call($this->__call('write', [$data]));
}
/**
* Marks the stream as no longer writable. Optionally writes a final data chunk before. Note that this is not the
* same as forcefully closing the stream. This method waits for all pending writes to complete before closing the
* stream. Socket streams implementing this interface should only close the writable side of the stream.
*
* @param string $finalData Bytes to write.
*
* @return Promise Succeeds once the data has been successfully written to the stream.
*
* @throws ClosedException If the stream has already been closed.
* @throws StreamException If writing to the stream fails.
*/
public function end(string $finalData = ""): Promise
{
return Tools::call($this->__call('write', [$finalData]));
}
}

View File

@ -0,0 +1,8 @@
<?php
namespace danog\MadelineProto\Ipc\Wrapper;
class SeekableInputStream extends InputStream
{
use SeekableTrait;
}

View File

@ -0,0 +1,8 @@
<?php
namespace danog\MadelineProto\Ipc\Wrapper;
class SeekableOutputStream extends OutputStream
{
use SeekableTrait;
}

View File

@ -0,0 +1,27 @@
<?php
namespace danog\MadelineProto\Ipc\Wrapper;
use Amp\Promise;
use danog\MadelineProto\Tools;
trait SeekableTrait
{
/**
* Set the handle's internal pointer position.
*
* $whence values:
*
* SEEK_SET - Set position equal to offset bytes.
* SEEK_CUR - Set position to current location plus offset.
* SEEK_END - Set position to end-of-file plus offset.
*
* @param int $position
* @param int $whence
* @return \Amp\Promise<int> New offset position.
*/
public function seek(int $position, int $whence = \SEEK_SET): Promise
{
return Tools::call($this->__call('seek', [$position, $whence]));
}
}

View File

@ -32,6 +32,7 @@ use danog\MadelineProto\Db\DbPropertiesFactory;
use danog\MadelineProto\Db\DbPropertiesTrait;
use danog\MadelineProto\Db\MemoryArray;
use danog\MadelineProto\Ipc\Server;
use danog\MadelineProto\Ipc\ServerCallback;
use danog\MadelineProto\Loop\Generic\PeriodicLoopInternal;
use danog\MadelineProto\Loop\Update\FeedLoop;
use danog\MadelineProto\Loop\Update\SeqLoop;
@ -206,7 +207,7 @@ class MTProto extends AsyncConstruct implements TLCallback
/**
* Instance of wrapper API.
*
* @var null|APIWrapper
* @var APIWrapper
*/
public $wrapper;
/**
@ -825,7 +826,7 @@ class MTProto extends AsyncConstruct implements TLCallback
}
if (!$this->ipcServer) {
$this->ipcServer = new Server($this);
$this->ipcServer->setIpcPath($this->wrapper->getIpcPath());
$this->ipcServer->setIpcPath($this->wrapper->session);
}
$this->callCheckerLoop->start();
$this->serializeLoop->start();

View File

@ -1074,7 +1074,7 @@ trait Files
*/
public function downloadToStream($messageMedia, $stream, $cb = null, int $offset = 0, int $end = -1): \Generator
{
$messageMedia = (yield from $this->getDownloadInfo($messageMedia));
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
if (\is_object($stream) && $stream instanceof FileCallbackInterface) {
$cb = $stream;
$stream = $stream->getFile();

View File

@ -49,6 +49,10 @@ class SessionPaths
* IPC socket path.
*/
private string $ipcPath;
/**
* IPC callback socket path.
*/
private string $ipcCallbackPath;
/**
* IPC light state path.
*/
@ -75,6 +79,7 @@ class SessionPaths
$this->lightStatePath = "$session.lightState.php";
$this->lockPath = "$session.lock";
$this->ipcPath = "$session.ipc";
$this->ipcCallbackPath = "$session.callback.ipc";
$this->ipcStatePath = "$session.ipcState.php";
}
/**
@ -243,4 +248,14 @@ class SessionPaths
$this->lightState = new LightState($state);
return $this->serialize($this->lightState, $this->getLightStatePath());
}
/**
* Get IPC callback socket path.
*
* @return string
*/
public function getIpcCallbackPath(): string
{
return $this->ipcCallbackPath;
}
}