diff --git a/src/danog/MadelineProto/Ipc/Client.php b/src/danog/MadelineProto/Ipc/Client.php index 2e52f76b..2699f06a 100644 --- a/src/danog/MadelineProto/Ipc/Client.php +++ b/src/danog/MadelineProto/Ipc/Client.php @@ -40,11 +40,19 @@ class Client /** * IPC server socket. */ - private ChannelledSocket $server; + protected ChannelledSocket $server; + /** + * Callback IPC server socket. + */ + private ?ChannelledSocket $serverCallback = null; /** * Requests promise array. */ private array $requests = []; + /** + * Wrappers array. + */ + private array $wrappers = []; /** * Whether to run loop. */ @@ -102,6 +110,9 @@ class Client } else { $promise = $this->requests[$id]; unset($this->requests[$id]); + if (isset($this->wrappers[$id])) { + unset($this->wrappers[$id]); + } if ($payload instanceof ExitFailure) { $promise->fail($payload->getException()); } else { @@ -123,7 +134,7 @@ class Client * * @param callable $callback Async callable to run * - * @return mixed + * @return \Generator */ public function loop(callable $callback): \Generator { @@ -180,14 +191,17 @@ class Client /** * Call function. * - * @param string $function Function name - * @param array $arguments Arguments + * @param string|int $function Function name + * @param array|Wrapper $arguments Arguments * * @return \Generator */ - public function __call(string $function, array $arguments): \Generator + public function __call($function, $arguments): \Generator { $this->requests []= $deferred = new Deferred; + if ($arguments instanceof Wrapper) { + $this->wrappers[count($this->requests) - 1] = $arguments; + } yield $this->server->send([$function, $arguments]); return yield $deferred->promise(); } diff --git a/src/danog/MadelineProto/Ipc/Server.php b/src/danog/MadelineProto/Ipc/Server.php index 40fd8f49..aa8cad73 100644 --- a/src/danog/MadelineProto/Ipc/Server.php +++ b/src/danog/MadelineProto/Ipc/Server.php @@ -51,18 +51,24 @@ class Server extends SignalLoop /** * IPC server. */ - private IpcServer $server; + protected IpcServer $server; + /** + * Callback IPC server. + */ + private ServerCallback $callback; /** * Set IPC path. * - * @param string $path IPC path + * @param SessionPaths $session Session * * @return void */ - public function setIpcPath(string $path): void + public function setIpcPath(SessionPaths $session): void { self::$shutdownDeferred = new Deferred; - $this->server = new IpcServer($path); + $this->server = new IpcServer($session->getIpcPath()); + $this->callback = new ServerCallback($this->API); + $this->callback->setIpcPath($session); } /** * Start IPC server in background. @@ -134,15 +140,16 @@ class Server extends SignalLoop Tools::callFork($this->clientLoop($socket)); } $this->server->close(); + $this->callback->signal(null); } /** * Client handler loop. * * @param ChannelledSocket $socket Client * - * @return \Generator + * @return \Generator|Promise */ - private function clientLoop(ChannelledSocket $socket): \Generator + protected function clientLoop(ChannelledSocket $socket) { $this->API->logger("Accepted IPC client connection!"); @@ -167,13 +174,16 @@ class Server extends SignalLoop * * @param ChannelledSocket $socket Socket * @param integer $id Request ID - * @param array $payload Payload + * @param array|Wrapper $payload Payload * * @return \Generator */ - public function clientRequest(ChannelledSocket $socket, int $id, $payload): \Generator + private function clientRequest(ChannelledSocket $socket, int $id, $payload): \Generator { try { + if ($payload[1] instanceof Wrapper) { + $payload[1] = $this->callback->unwrap($payload[1]); + } $result = $this->API->{$payload[0]}(...$payload[1]); $result = $result instanceof \Generator ? yield from $result : yield $result; } catch (\Throwable $e) { diff --git a/src/danog/MadelineProto/Ipc/ServerCallback.php b/src/danog/MadelineProto/Ipc/ServerCallback.php new file mode 100644 index 00000000..282a8f90 --- /dev/null +++ b/src/danog/MadelineProto/Ipc/ServerCallback.php @@ -0,0 +1,97 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2020 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Ipc; + +use Amp\Ipc\IpcServer; +use Amp\Ipc\Sync\ChannelledSocket; +use Amp\Loop; +use Amp\Promise; +use danog\MadelineProto\Exception; +use danog\MadelineProto\SessionPaths; + +/** + * IPC callback server. + */ +class ServerCallback extends Server +{ + /** + * Timeout watcher list, indexed by socket ID. + * + * @var array + */ + private $watcherList = []; + /** + * Timeout watcher list, indexed by socket ID. + * + * @var array + */ + private $socketList = []; + /** + * Counter. + */ + private int $id = 0; + /** + * Set IPC path. + * + * @param SessionPaths $session Session + * + * @return void + */ + public function setIpcPath(SessionPaths $session): void + { + $this->server = new IpcServer($session->getIpcCallbackPath()); + } + /** + * Client handler loop. + * + * @param ChannelledSocket $socket Client + * + * @return Promise + */ + protected function clientLoop(ChannelledSocket $socket) + { + $this->API->logger("Accepted IPC callback connection!"); + $id = $this->id++; + $this->socketList[$id] = $socket; + $this->watcherList[$id] = Loop::delay(30*1000, function () use ($id) { + unset($this->watcherList[$id], $this->socketList[$id]); + }); + + return $socket->send($id); + } + + + /** + * Unwrap value + * + * @param Wrapper $wrapper + * @return mixed + */ + protected function unwrap(Wrapper $wrapper) + { + $id = $wrapper->getRemoteId(); + if (!isset($this->socketList[$id])) { + throw new Exception("IPC timeout, could not find callback socket!"); + } + $socket = $this->socketList[$id]; + Loop::cancel($this->watcherList[$id]); + unset($this->watcherList[$id], $this->socketList[$id]); + return $wrapper->unwrap($socket); + } +} diff --git a/src/danog/MadelineProto/Ipc/Wrapper.php b/src/danog/MadelineProto/Ipc/Wrapper.php new file mode 100644 index 00000000..a73d5993 --- /dev/null +++ b/src/danog/MadelineProto/Ipc/Wrapper.php @@ -0,0 +1,210 @@ +, array})[] + */ + private array $callbackIds = []; + /** + * Callback ID. + */ + private int $id = 0; + /** + * Remote socket ID. + */ + private int $remoteId = 0; + /** + * Logger instance. + */ + private Logger $logger; + /** + * Constructor. + * + * @param mixed $data Payload data + * @param string $ipc IPC URI + * + * @return \Generator + */ + public static function create(&$data, string $ipc, Logger $logger): \Generator + { + $instance = new self; + $instance->data = &$data; + $instance->server = yield connect($ipc); + $instance->remoteId = yield $instance->server->receive(); + $instance->logger = $logger; + Tools::callFork($instance->receiverLoop()); + return $instance; + } + private function __construct() + { + } + /** + * Serialization function. + * + * @return array + */ + public function __sleep(): array + { + return ['data', 'callbackIds']; + } + /** + * Wrap a certain callback object. + * + * @param object|callable $callback Object to wrap + * + * @param-out int $callback Callback ID + * + * @return void + */ + public function wrap(&$callback): void + { + if (\is_object($callback)) { + $ids = []; + foreach (\get_class_methods($callback) as $method) { + $id = $this->id++; + $this->callbacks[$id] = [$callback, $method]; + $ids[$method] = $id; + } + $callback = $ids; + $this->callbackIds[] = &$callback; + } else { + $id = $this->id++; + $this->callbacks[$id] = self::copy($callback); + $class = Obj::class; + if ($callback instanceof ByteStreamInputStream) { + $class = InputStream::class; + } else if ($callback instanceof ByteStreamOutputStream) { + $class = OutputStream::class; + } + if ($class !== Obj::class && method_exists($callback, 'seek')) { + $class = "Seekable$class"; + } + $callback = [$class, $id]; // Will be re-filled later + $this->callbackIds[] = &$callback; + } + } + /** + * Get copy of data. + * + * @param mixed $data + * @return mixed + */ + private static function copy($data) + { + return $data; + } + /** + * Receiver loop. + * + * @return \Generator + */ + private function receiverLoop(): \Generator + { + $id = 0; + $payload = null; + try { + while ($payload = yield $this->server->receive()) { + Tools::callFork($this->clientRequest($id++, $payload)); + } + } finally { + yield $this->server->disconnect(); + } + } + + /** + * Handle client request. + * + * @param integer $id Request ID + * @param array $payload Payload + * + * @return \Generator + */ + private function clientRequest(int $id, $payload): \Generator + { + try { + $result = $this->callbacks[$payload[0]](...$payload[1]); + $result = $result instanceof \Generator ? yield from $result : yield $result; + } catch (\Throwable $e) { + $this->logger->logger("Got error while calling reverse IPC method: $e", Logger::ERROR); + $result = new ExitFailure($e); + } + try { + yield $this->server->send([$id, $result]); + } catch (\Throwable $e) { + $this->logger->logger("Got error while trying to send result of reverse method: $e", Logger::ERROR); + try { + yield $this->server->send([$id, new ExitFailure($e)]); + } catch (\Throwable $e) { + $this->logger->logger("Got error while trying to send error of error of reverse method: $e", Logger::ERROR); + } + } + } + /** + * Get remote socket ID. + * + * @internal + * + * @return int + */ + public function getRemoteId(): int + { + return $this->remoteId; + } + + /** + * Set socket and unwrap data. + * + * @param ChannelledSocket $server Socket. + * + * @internal + * + * @return mixed + */ + public function unwrap(ChannelledSocket $server) + { + $this->server = $server; + Tools::callFork($this->loopInternal()); + + foreach ($this->callbackIds as &$id) { + if (\is_int($id)) { + $id = fn (...$args): \Generator => $this->__call($id, $args); + } else { + [$class, $ids] = $id; + $id = new $class($this, $ids); + } + } + return $this->data; + } +} diff --git a/src/danog/MadelineProto/Ipc/Wrapper/InputStream.php b/src/danog/MadelineProto/Ipc/Wrapper/InputStream.php new file mode 100644 index 00000000..d5cb29c1 --- /dev/null +++ b/src/danog/MadelineProto/Ipc/Wrapper/InputStream.php @@ -0,0 +1,25 @@ + + * + * @throws PendingReadError Thrown if another read operation is still pending. + */ + public function read(): Promise + { + return Tools::call($this->__call('read')); + } +} diff --git a/src/danog/MadelineProto/Ipc/Wrapper/Obj.php b/src/danog/MadelineProto/Ipc/Wrapper/Obj.php new file mode 100644 index 00000000..be5b5b8e --- /dev/null +++ b/src/danog/MadelineProto/Ipc/Wrapper/Obj.php @@ -0,0 +1,45 @@ + + */ + private array $methods = []; + /** + * Wrapper + */ + private Wrapper $wrapper; + /** + * Constructor + * + * @param Wrapper $wrapper + * @param array $methods + */ + public function __construct(Wrapper $wrapper, array $methods) + { + $this->wrapper = $wrapper; + $this->methods = $methods; + } + /** + * Call method. + * + * @param string $name + * @param array $arguments + * + * @return \Generator + */ + public function __call(string $name, array $arguments = []): \Generator + { + return $this->wrapper->__call($this->methods[$name], $arguments); + } +} diff --git a/src/danog/MadelineProto/Ipc/Wrapper/OutputStream.php b/src/danog/MadelineProto/Ipc/Wrapper/OutputStream.php new file mode 100644 index 00000000..e5e8630e --- /dev/null +++ b/src/danog/MadelineProto/Ipc/Wrapper/OutputStream.php @@ -0,0 +1,43 @@ +__call('write', [$data])); + } + + /** + * Marks the stream as no longer writable. Optionally writes a final data chunk before. Note that this is not the + * same as forcefully closing the stream. This method waits for all pending writes to complete before closing the + * stream. Socket streams implementing this interface should only close the writable side of the stream. + * + * @param string $finalData Bytes to write. + * + * @return Promise Succeeds once the data has been successfully written to the stream. + * + * @throws ClosedException If the stream has already been closed. + * @throws StreamException If writing to the stream fails. + */ + public function end(string $finalData = ""): Promise + { + return Tools::call($this->__call('write', [$finalData])); + } +} diff --git a/src/danog/MadelineProto/Ipc/Wrapper/SeekableInputStream.php b/src/danog/MadelineProto/Ipc/Wrapper/SeekableInputStream.php new file mode 100644 index 00000000..f0adf543 --- /dev/null +++ b/src/danog/MadelineProto/Ipc/Wrapper/SeekableInputStream.php @@ -0,0 +1,8 @@ + New offset position. + */ + public function seek(int $position, int $whence = \SEEK_SET): Promise + { + return Tools::call($this->__call('seek', [$position, $whence])); + } +} diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index 93e2c2d2..f7a3f0bb 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -32,6 +32,7 @@ use danog\MadelineProto\Db\DbPropertiesFactory; use danog\MadelineProto\Db\DbPropertiesTrait; use danog\MadelineProto\Db\MemoryArray; use danog\MadelineProto\Ipc\Server; +use danog\MadelineProto\Ipc\ServerCallback; use danog\MadelineProto\Loop\Generic\PeriodicLoopInternal; use danog\MadelineProto\Loop\Update\FeedLoop; use danog\MadelineProto\Loop\Update\SeqLoop; @@ -206,7 +207,7 @@ class MTProto extends AsyncConstruct implements TLCallback /** * Instance of wrapper API. * - * @var null|APIWrapper + * @var APIWrapper */ public $wrapper; /** @@ -825,7 +826,7 @@ class MTProto extends AsyncConstruct implements TLCallback } if (!$this->ipcServer) { $this->ipcServer = new Server($this); - $this->ipcServer->setIpcPath($this->wrapper->getIpcPath()); + $this->ipcServer->setIpcPath($this->wrapper->session); } $this->callCheckerLoop->start(); $this->serializeLoop->start(); diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 18584e34..6fa8fff9 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -1074,7 +1074,7 @@ trait Files */ public function downloadToStream($messageMedia, $stream, $cb = null, int $offset = 0, int $end = -1): \Generator { - $messageMedia = (yield from $this->getDownloadInfo($messageMedia)); + $messageMedia = yield from $this->getDownloadInfo($messageMedia); if (\is_object($stream) && $stream instanceof FileCallbackInterface) { $cb = $stream; $stream = $stream->getFile(); diff --git a/src/danog/MadelineProto/SessionPaths.php b/src/danog/MadelineProto/SessionPaths.php index f3056a48..af089a8f 100644 --- a/src/danog/MadelineProto/SessionPaths.php +++ b/src/danog/MadelineProto/SessionPaths.php @@ -49,6 +49,10 @@ class SessionPaths * IPC socket path. */ private string $ipcPath; + /** + * IPC callback socket path. + */ + private string $ipcCallbackPath; /** * IPC light state path. */ @@ -75,6 +79,7 @@ class SessionPaths $this->lightStatePath = "$session.lightState.php"; $this->lockPath = "$session.lock"; $this->ipcPath = "$session.ipc"; + $this->ipcCallbackPath = "$session.callback.ipc"; $this->ipcStatePath = "$session.ipcState.php"; } /** @@ -243,4 +248,14 @@ class SessionPaths $this->lightState = new LightState($state); return $this->serialize($this->lightState, $this->getLightStatePath()); } + + /** + * Get IPC callback socket path. + * + * @return string + */ + public function getIpcCallbackPath(): string + { + return $this->ipcCallbackPath; + } }