Implement downloadTo* and uploadFrom* via IPC
This commit is contained in:
parent
2b387b5c0f
commit
d558d17d1f
@ -18,53 +18,29 @@
|
||||
|
||||
namespace danog\MadelineProto\Ipc;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Ipc\Sync\ChannelledSocket;
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\API;
|
||||
use danog\MadelineProto\Exception;
|
||||
use danog\MadelineProto\FileCallbackInterface;
|
||||
use danog\MadelineProto\Logger;
|
||||
use danog\MadelineProto\MTProtoTools\FilesLogic;
|
||||
use danog\MadelineProto\SessionPaths;
|
||||
use danog\MadelineProto\Tools;
|
||||
|
||||
use function Amp\Ipc\connect;
|
||||
|
||||
/**
|
||||
* IPC client.
|
||||
*/
|
||||
class Client
|
||||
class Client extends ClientAbstract
|
||||
{
|
||||
use \danog\MadelineProto\Wrappers\Start;
|
||||
use \danog\MadelineProto\Wrappers\Templates;
|
||||
use FilesLogic;
|
||||
|
||||
/**
|
||||
* IPC server socket.
|
||||
*/
|
||||
protected ChannelledSocket $server;
|
||||
/**
|
||||
* Callback IPC server socket.
|
||||
*/
|
||||
private ?ChannelledSocket $serverCallback = null;
|
||||
/**
|
||||
* Requests promise array.
|
||||
*/
|
||||
private array $requests = [];
|
||||
/**
|
||||
* Wrappers array.
|
||||
*/
|
||||
private array $wrappers = [];
|
||||
/**
|
||||
* Whether to run loop.
|
||||
*/
|
||||
private bool $run = true;
|
||||
/**
|
||||
* Session.
|
||||
*/
|
||||
private SessionPaths $session;
|
||||
/**
|
||||
* Logger instance.
|
||||
*/
|
||||
public Logger $logger;
|
||||
protected SessionPaths $session;
|
||||
/**
|
||||
* Constructor function.
|
||||
*
|
||||
@ -79,56 +55,6 @@ class Client
|
||||
$this->session = $session;
|
||||
Tools::callFork($this->loopInternal());
|
||||
}
|
||||
/**
|
||||
* Logger.
|
||||
*
|
||||
* @param string $param Parameter
|
||||
* @param int $level Logging level
|
||||
* @param string $file File where the message originated
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function logger($param, int $level = Logger::NOTICE, string $file = ''): void
|
||||
{
|
||||
if ($file === null) {
|
||||
$file = \basename(\debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1)[0]['file'], '.php');
|
||||
}
|
||||
isset($this->logger) ? $this->logger->logger($param, $level, $file) : Logger::$default->logger($param, $level, $file);
|
||||
}
|
||||
/**
|
||||
* Main loop.
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
private function loopInternal(): \Generator
|
||||
{
|
||||
while ($this->run) {
|
||||
while ($payload = yield $this->server->receive()) {
|
||||
[$id, $payload] = $payload;
|
||||
if (!isset($this->requests[$id])) {
|
||||
Logger::log("Got response for non-existing ID $id!");
|
||||
} else {
|
||||
$promise = $this->requests[$id];
|
||||
unset($this->requests[$id]);
|
||||
if (isset($this->wrappers[$id])) {
|
||||
unset($this->wrappers[$id]);
|
||||
}
|
||||
if ($payload instanceof ExitFailure) {
|
||||
$promise->fail($payload->getException());
|
||||
} else {
|
||||
$promise->resolve($payload);
|
||||
}
|
||||
unset($promise);
|
||||
}
|
||||
}
|
||||
if ($this->run) {
|
||||
$this->logger("Reconnecting to IPC server!");
|
||||
yield $this->server->disconnect();
|
||||
Server::startMe($this->session);
|
||||
$this->server = yield connect($this->session->getIpcPath());
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Run the provided async callable.
|
||||
*
|
||||
@ -147,18 +73,7 @@ class Client
|
||||
*/
|
||||
public function unreference(): void
|
||||
{
|
||||
$this->run = false;
|
||||
Tools::wait($this->server->disconnect());
|
||||
}
|
||||
/**
|
||||
* Disconnect cleanly from main instance.
|
||||
*
|
||||
* @return Promise
|
||||
*/
|
||||
public function disconnect(): Promise
|
||||
{
|
||||
$this->run = false;
|
||||
return $this->server->disconnect();
|
||||
Tools::wait($this->disconnect());
|
||||
}
|
||||
/**
|
||||
* Stop IPC server instance.
|
||||
@ -188,22 +103,145 @@ class Client
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Call function.
|
||||
* Upload file from URL.
|
||||
*
|
||||
* @param string|int $function Function name
|
||||
* @param array|Wrapper $arguments Arguments
|
||||
* @param string|FileCallbackInterface $url URL of file
|
||||
* @param integer $size Size of file
|
||||
* @param string $fileName File name
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
* @param boolean $encrypted Whether to encrypt file for secret chats
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
public function __call($function, $arguments): \Generator
|
||||
public function uploadFromUrl($url, int $size = 0, string $fileName = '', $cb = null, bool $encrypted = false): \Generator
|
||||
{
|
||||
$this->requests []= $deferred = new Deferred;
|
||||
if ($arguments instanceof Wrapper) {
|
||||
$this->wrappers[count($this->requests) - 1] = $arguments;
|
||||
if (\is_object($url) && $url instanceof FileCallbackInterface) {
|
||||
$cb = $url;
|
||||
$url = $url->getFile();
|
||||
}
|
||||
yield $this->server->send([$function, $arguments]);
|
||||
return yield $deferred->promise();
|
||||
$params = [$url, $size, $fileName, &$cb, $encrypted];
|
||||
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
|
||||
$wrapper->wrap($cb, false);
|
||||
return yield from $this->__call('uploadFromUrl', $wrapper);
|
||||
}
|
||||
/**
|
||||
* Upload file from callable.
|
||||
*
|
||||
* The callable must accept two parameters: int $offset, int $size
|
||||
* The callable must return a string with the contest of the file at the specified offset and size.
|
||||
*
|
||||
* @param mixed $callable Callable
|
||||
* @param integer $size File size
|
||||
* @param string $mime Mime type
|
||||
* @param string $fileName File name
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
* @param boolean $seekable Whether chunks can be fetched out of order
|
||||
* @param boolean $encrypted Whether to encrypt file for secret chats
|
||||
*
|
||||
* @return \Generator<array>
|
||||
*/
|
||||
public function uploadFromCallable(callable $callable, int $size, string $mime, string $fileName = '', $cb = null, bool $seekable = true, bool $encrypted = false): \Generator
|
||||
{
|
||||
if (\is_object($callable) && $callable instanceof FileCallbackInterface) {
|
||||
$cb = $callable;
|
||||
$callable = $callable->getFile();
|
||||
}
|
||||
$params = [&$callable, $size, $mime, $fileName, &$cb, $seekable, $encrypted];
|
||||
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
|
||||
$wrapper->wrap($cb, false);
|
||||
$wrapper->wrap($callable, false);
|
||||
return yield from $this->__call('uploadFromCallable', $wrapper);
|
||||
}
|
||||
/**
|
||||
* Reupload telegram file.
|
||||
*
|
||||
* @param mixed $media Telegram file
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
* @param boolean $encrypted Whether to encrypt file for secret chats
|
||||
*
|
||||
* @return \Generator<array>
|
||||
*/
|
||||
public function uploadFromTgfile($media, $cb = null, bool $encrypted = false): \Generator
|
||||
{
|
||||
if (\is_object($media) && $media instanceof FileCallbackInterface) {
|
||||
$cb = $media;
|
||||
$media = $media->getFile();
|
||||
}
|
||||
$params = [$media, &$cb, $encrypted];
|
||||
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
|
||||
$wrapper->wrap($cb, false);
|
||||
return yield from $this->__call('uploadFromTgfile', $wrapper);
|
||||
}
|
||||
/**
|
||||
* Download file to directory.
|
||||
*
|
||||
* @param mixed $messageMedia File to download
|
||||
* @param string|FileCallbackInterface $dir Directory where to download the file
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
*
|
||||
* @return \Generator<string> Downloaded file path
|
||||
*/
|
||||
public function downloadToDir($messageMedia, $dir, $cb = null): \Generator
|
||||
{
|
||||
if (\is_object($dir) && $dir instanceof FileCallbackInterface) {
|
||||
$cb = $dir;
|
||||
$dir = $dir->getFile();
|
||||
}
|
||||
$params = [$messageMedia, $dir, &$cb];
|
||||
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
|
||||
$wrapper->wrap($cb, false);
|
||||
return yield from $this->__call('downloadToDir', $wrapper);
|
||||
}
|
||||
/**
|
||||
* Download file.
|
||||
*
|
||||
* @param mixed $messageMedia File to download
|
||||
* @param string|FileCallbackInterface $file Downloaded file path
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
*
|
||||
* @return \Generator<string> Downloaded file path
|
||||
*/
|
||||
public function downloadToFile($messageMedia, $file, $cb = null): \Generator
|
||||
{
|
||||
if (\is_object($file) && $file instanceof FileCallbackInterface) {
|
||||
$cb = $file;
|
||||
$file = $file->getFile();
|
||||
}
|
||||
$params = [$messageMedia, $file, &$cb];
|
||||
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
|
||||
$wrapper->wrap($cb, false);
|
||||
return yield from $this->__call('downloadToFile', $wrapper);
|
||||
}
|
||||
/**
|
||||
* Download file to callable.
|
||||
* The callable must accept two parameters: string $payload, int $offset
|
||||
* The callable will be called (possibly out of order, depending on the value of $seekable).
|
||||
* The callable should return the number of written bytes.
|
||||
*
|
||||
* @param mixed $messageMedia File to download
|
||||
* @param callable|FileCallbackInterface $callable Chunk callback
|
||||
* @param callable $cb Status callback (DEPRECATED, use FileCallbackInterface)
|
||||
* @param bool $seekable Whether the callable can be called out of order
|
||||
* @param int $offset Offset where to start downloading
|
||||
* @param int $end Offset where to stop downloading (inclusive)
|
||||
* @param int $part_size Size of each chunk
|
||||
*
|
||||
* @return \Generator<bool>
|
||||
*/
|
||||
public function downloadToCallable($messageMedia, callable $callable, $cb = null, bool $seekable = true, int $offset = 0, int $end = -1, int $part_size = null): \Generator
|
||||
{
|
||||
$messageMedia = (yield from $this->getDownloadInfo($messageMedia));
|
||||
if (\is_object($callable) && $callable instanceof FileCallbackInterface) {
|
||||
$cb = $callable;
|
||||
$callable = $callable->getFile();
|
||||
}
|
||||
$params = [$messageMedia, &$callable, &$cb, $seekable, $offset, $end, $part_size, ];
|
||||
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
|
||||
$wrapper->wrap($callable, false);
|
||||
$wrapper->wrap($cb, false);
|
||||
return yield from $this->__call('downloadToCallable', $wrapper);
|
||||
}
|
||||
/**
|
||||
* Placeholder.
|
||||
|
143
src/danog/MadelineProto/Ipc/ClientAbstract.php
Normal file
143
src/danog/MadelineProto/Ipc/ClientAbstract.php
Normal file
@ -0,0 +1,143 @@
|
||||
<?php
|
||||
/**
|
||||
* API wrapper module.
|
||||
*
|
||||
* This file is part of MadelineProto.
|
||||
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
||||
* See the GNU Affero General Public License for more details.
|
||||
* You should have received a copy of the GNU General Public License along with MadelineProto.
|
||||
* If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* @author Daniil Gentili <daniil@daniil.it>
|
||||
* @copyright 2016-2020 Daniil Gentili <daniil@daniil.it>
|
||||
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
|
||||
*
|
||||
* @link https://docs.madelineproto.xyz MadelineProto documentation
|
||||
*/
|
||||
|
||||
namespace danog\MadelineProto\Ipc;
|
||||
|
||||
use Amp\Deferred;
|
||||
use Amp\Ipc\Sync\ChannelledSocket;
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\Logger;
|
||||
|
||||
use function Amp\Ipc\connect;
|
||||
|
||||
/**
|
||||
* IPC client.
|
||||
*/
|
||||
abstract class ClientAbstract
|
||||
{
|
||||
/**
|
||||
* IPC server socket.
|
||||
*/
|
||||
protected ChannelledSocket $server;
|
||||
/**
|
||||
* Requests promise array.
|
||||
*
|
||||
* @var Deferred[]
|
||||
*/
|
||||
private array $requests = [];
|
||||
/**
|
||||
* Wrappers array.
|
||||
*
|
||||
* @var Wrapper[]
|
||||
*/
|
||||
private array $wrappers = [];
|
||||
/**
|
||||
* Whether to run loop.
|
||||
*/
|
||||
protected bool $run = true;
|
||||
/**
|
||||
* Logger instance.
|
||||
*/
|
||||
public Logger $logger;
|
||||
|
||||
protected function __construct()
|
||||
{
|
||||
}
|
||||
/**
|
||||
* Logger.
|
||||
*
|
||||
* @param string $param Parameter
|
||||
* @param int $level Logging level
|
||||
* @param string $file File where the message originated
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function logger($param, int $level = Logger::NOTICE, string $file = ''): void
|
||||
{
|
||||
if ($file === null) {
|
||||
$file = \basename(\debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1)[0]['file'], '.php');
|
||||
}
|
||||
isset($this->logger) ? $this->logger->logger($param, $level, $file) : Logger::$default->logger($param, $level, $file);
|
||||
}
|
||||
/**
|
||||
* Main loop.
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
protected function loopInternal(): \Generator
|
||||
{
|
||||
do {
|
||||
while ($payload = yield $this->server->receive()) {
|
||||
[$id, $payload] = $payload;
|
||||
if (!isset($this->requests[$id])) {
|
||||
Logger::log("Got response for non-existing ID $id!");
|
||||
} else {
|
||||
$promise = $this->requests[$id];
|
||||
unset($this->requests[$id]);
|
||||
if (isset($this->wrappers[$id])) {
|
||||
yield $this->wrappers[$id]->disconnect();
|
||||
unset($this->wrappers[$id]);
|
||||
}
|
||||
if ($payload instanceof ExitFailure) {
|
||||
$promise->fail($payload->getException());
|
||||
} else {
|
||||
$promise->resolve($payload);
|
||||
}
|
||||
unset($promise);
|
||||
}
|
||||
}
|
||||
if ($this->run) {
|
||||
$this->logger("Reconnecting to IPC server!");
|
||||
yield $this->server->disconnect();
|
||||
if ($this instanceof Client) {
|
||||
Server::startMe($this->session);
|
||||
$this->server = yield connect($this->session->getIpcPath());
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
} while ($this->run);
|
||||
}
|
||||
/**
|
||||
* Disconnect cleanly from main instance.
|
||||
*
|
||||
* @return Promise
|
||||
*/
|
||||
public function disconnect(): Promise
|
||||
{
|
||||
$this->run = false;
|
||||
return $this->server->disconnect();
|
||||
}
|
||||
/**
|
||||
* Call function.
|
||||
*
|
||||
* @param string|int $function Function name
|
||||
* @param array|Wrapper $arguments Arguments
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
public function __call($function, $arguments): \Generator
|
||||
{
|
||||
$this->requests []= $deferred = new Deferred;
|
||||
if ($arguments instanceof Wrapper) {
|
||||
$this->wrappers[\count($this->requests) - 1] = $arguments;
|
||||
}
|
||||
yield $this->server->send([$function, $arguments]);
|
||||
return yield $deferred->promise();
|
||||
}
|
||||
}
|
@ -3,7 +3,6 @@
|
||||
namespace danog\MadelineProto\Ipc\Runner;
|
||||
|
||||
use danog\MadelineProto\Logger;
|
||||
use danog\MadelineProto\Tools;
|
||||
|
||||
final class ProcessRunner extends RunnerAbstract
|
||||
{
|
||||
|
@ -70,6 +70,10 @@ class Server extends SignalLoop
|
||||
$this->callback = new ServerCallback($this->API);
|
||||
$this->callback->setIpcPath($session);
|
||||
}
|
||||
public function start(): bool
|
||||
{
|
||||
return $this instanceof ServerCallback ? parent::start() : $this->callback->start() && parent::start();
|
||||
}
|
||||
/**
|
||||
* Start IPC server in background.
|
||||
*
|
||||
@ -140,7 +144,7 @@ class Server extends SignalLoop
|
||||
Tools::callFork($this->clientLoop($socket));
|
||||
}
|
||||
$this->server->close();
|
||||
$this->callback->signal(null);
|
||||
if (isset($this->callback)) $this->callback->signal(null);
|
||||
}
|
||||
/**
|
||||
* Client handler loop.
|
||||
@ -182,13 +186,18 @@ class Server extends SignalLoop
|
||||
{
|
||||
try {
|
||||
if ($payload[1] instanceof Wrapper) {
|
||||
$payload[1] = $this->callback->unwrap($payload[1]);
|
||||
$wrapper = $payload[1];
|
||||
$payload[1] = $this->callback->unwrap($wrapper);
|
||||
}
|
||||
$result = $this->API->{$payload[0]}(...$payload[1]);
|
||||
$result = $result instanceof \Generator ? yield from $result : yield $result;
|
||||
} catch (\Throwable $e) {
|
||||
$this->API->logger("Got error while calling IPC method: $e", Logger::ERROR);
|
||||
$result = new ExitFailure($e);
|
||||
} finally {
|
||||
if (isset($wrapper)) {
|
||||
yield $wrapper->disconnect();
|
||||
}
|
||||
}
|
||||
try {
|
||||
yield $socket->send([$id, $result]);
|
||||
|
@ -66,8 +66,8 @@ class ServerCallback extends Server
|
||||
*/
|
||||
protected function clientLoop(ChannelledSocket $socket)
|
||||
{
|
||||
$this->API->logger("Accepted IPC callback connection!");
|
||||
$id = $this->id++;
|
||||
$this->API->logger("Accepted IPC callback connection, assigning ID $id!");
|
||||
$this->socketList[$id] = $socket;
|
||||
$this->watcherList[$id] = Loop::delay(30*1000, function () use ($id) {
|
||||
unset($this->watcherList[$id], $this->socketList[$id]);
|
||||
@ -78,7 +78,7 @@ class ServerCallback extends Server
|
||||
|
||||
|
||||
/**
|
||||
* Unwrap value
|
||||
* Unwrap value.
|
||||
*
|
||||
* @param Wrapper $wrapper
|
||||
* @return mixed
|
||||
|
@ -6,10 +6,12 @@ use Amp\ByteStream\InputStream as ByteStreamInputStream;
|
||||
use Amp\ByteStream\OutputStream as ByteStreamOutputStream;
|
||||
use Amp\Ipc\Sync\ChannelledSocket;
|
||||
use Amp\Parallel\Sync\ExitFailure;
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\Ipc\Wrapper\InputStream;
|
||||
use danog\MadelineProto\Ipc\Wrapper\Obj;
|
||||
use danog\MadelineProto\Ipc\Wrapper\OutputStream;
|
||||
use danog\MadelineProto\Logger;
|
||||
use danog\MadelineProto\SessionPaths;
|
||||
use danog\MadelineProto\Tools;
|
||||
|
||||
use function Amp\Ipc\connect;
|
||||
@ -17,7 +19,7 @@ use function Amp\Ipc\connect;
|
||||
/**
|
||||
* Callback payload wrapper.
|
||||
*/
|
||||
class Wrapper extends Client
|
||||
class Wrapper extends ClientAbstract
|
||||
{
|
||||
/**
|
||||
* Payload data.
|
||||
@ -45,31 +47,31 @@ class Wrapper extends Client
|
||||
* Remote socket ID.
|
||||
*/
|
||||
private int $remoteId = 0;
|
||||
/**
|
||||
* Logger instance.
|
||||
*/
|
||||
private Logger $logger;
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param mixed $data Payload data
|
||||
* @param string $ipc IPC URI
|
||||
* @param mixed $data Payload data
|
||||
* @param SessionPaths $ipc IPC URI
|
||||
*
|
||||
* @return \Generator
|
||||
* @return \Generator<int, Promise<ChannelledSocket>|Promise<mixed>, mixed, Wrapper>
|
||||
*/
|
||||
public static function create(&$data, string $ipc, Logger $logger): \Generator
|
||||
public static function create(&$data, SessionPaths $session, Logger $logger): \Generator
|
||||
{
|
||||
$instance = new self;
|
||||
$instance->data = &$data;
|
||||
$instance->server = yield connect($ipc);
|
||||
$instance->remoteId = yield $instance->server->receive();
|
||||
$instance->logger = $logger;
|
||||
$instance->run = false;
|
||||
|
||||
$logger->logger("Connecting to callback IPC server...");
|
||||
$instance->server = yield connect($session->getIpcCallbackPath());
|
||||
$logger->logger("Connected to callback IPC server!");
|
||||
|
||||
$instance->remoteId = yield $instance->server->receive();
|
||||
$logger->logger("Got ID {$instance->remoteId} from callback IPC server!");
|
||||
|
||||
Tools::callFork($instance->receiverLoop());
|
||||
return $instance;
|
||||
}
|
||||
private function __construct()
|
||||
{
|
||||
}
|
||||
/**
|
||||
* Serialization function.
|
||||
*
|
||||
@ -77,41 +79,42 @@ class Wrapper extends Client
|
||||
*/
|
||||
public function __sleep(): array
|
||||
{
|
||||
return ['data', 'callbackIds'];
|
||||
return ['data', 'callbackIds', 'remoteId'];
|
||||
}
|
||||
/**
|
||||
* Wrap a certain callback object.
|
||||
*
|
||||
* @param object|callable $callback Object to wrap
|
||||
* @param object|callable $callback Callback to wrap
|
||||
* @param bool $wrapObjects Whether to wrap object methods, too
|
||||
*
|
||||
* @param-out int $callback Callback ID
|
||||
*
|
||||
* @return void
|
||||
*/
|
||||
public function wrap(&$callback): void
|
||||
public function wrap(&$callback, bool $wrapObjects = true): void
|
||||
{
|
||||
if (\is_object($callback)) {
|
||||
if (\is_object($callback) && $wrapObjects) {
|
||||
$ids = [];
|
||||
foreach (\get_class_methods($callback) as $method) {
|
||||
$id = $this->id++;
|
||||
$this->callbacks[$id] = [$callback, $method];
|
||||
$ids[$method] = $id;
|
||||
}
|
||||
$callback = $ids;
|
||||
$this->callbackIds[] = &$callback;
|
||||
} else {
|
||||
$id = $this->id++;
|
||||
$this->callbacks[$id] = self::copy($callback);
|
||||
$class = Obj::class;
|
||||
if ($callback instanceof ByteStreamInputStream) {
|
||||
$class = InputStream::class;
|
||||
} else if ($callback instanceof ByteStreamOutputStream) {
|
||||
} elseif ($callback instanceof ByteStreamOutputStream) {
|
||||
$class = OutputStream::class;
|
||||
}
|
||||
if ($class !== Obj::class && method_exists($callback, 'seek')) {
|
||||
if ($class !== Obj::class && \method_exists($callback, 'seek')) {
|
||||
$class = "Seekable$class";
|
||||
}
|
||||
$callback = [$class, $id]; // Will be re-filled later
|
||||
$callback = [$class, $ids]; // Will be re-filled later
|
||||
$this->callbackIds[] = &$callback;
|
||||
} elseif (\is_callable($callback)) {
|
||||
$id = $this->id++;
|
||||
$this->callbacks[$id] = self::copy($callback);
|
||||
$callback = $id;
|
||||
$this->callbackIds[] = &$callback;
|
||||
}
|
||||
}
|
||||
|
31
src/danog/MadelineProto/Ipc/Wrapper/FileCallback.php
Normal file
31
src/danog/MadelineProto/Ipc/Wrapper/FileCallback.php
Normal file
@ -0,0 +1,31 @@
|
||||
<?php
|
||||
|
||||
namespace danog\MadelineProto\Ipc\Wrapper;
|
||||
|
||||
use danog\MadelineProto\FileCallbackInterface;
|
||||
|
||||
class FileCallback extends Obj implements FileCallbackInterface
|
||||
{
|
||||
/**
|
||||
* Get file.
|
||||
*
|
||||
* @return mixed
|
||||
*/
|
||||
public function getFile()
|
||||
{
|
||||
return $this->__call('getFile');
|
||||
}
|
||||
/**
|
||||
* Invoke callback.
|
||||
*
|
||||
* @param int $percent Percent
|
||||
* @param int $speed Speed in mbps
|
||||
* @param int $time Time
|
||||
*
|
||||
* @return mixed
|
||||
*/
|
||||
public function __invoke(...$args)
|
||||
{
|
||||
return $this->__call('__invoke', $args);
|
||||
}
|
||||
}
|
@ -4,7 +4,6 @@ namespace danog\MadelineProto\Ipc\Wrapper;
|
||||
|
||||
use Amp\ByteStream\InputStream as AmpInputStream;
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\Ipc\Wrapper\Obj;
|
||||
use danog\MadelineProto\Tools;
|
||||
|
||||
class InputStream extends Obj implements AmpInputStream
|
||||
|
@ -16,11 +16,11 @@ class Obj
|
||||
*/
|
||||
private array $methods = [];
|
||||
/**
|
||||
* Wrapper
|
||||
* Wrapper.
|
||||
*/
|
||||
private Wrapper $wrapper;
|
||||
/**
|
||||
* Constructor
|
||||
* Constructor.
|
||||
*
|
||||
* @param Wrapper $wrapper
|
||||
* @param array $methods
|
||||
|
@ -4,7 +4,6 @@ namespace danog\MadelineProto\Ipc\Wrapper;
|
||||
|
||||
use Amp\ByteStream\OutputStream as AmpOutputStream;
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\Ipc\Wrapper\Obj;
|
||||
use danog\MadelineProto\Tools;
|
||||
|
||||
class OutputStream extends Obj implements AmpOutputStream
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace danog\MadelineProto\Ipc\Wrapper;
|
||||
|
||||
class SeekableInputStream extends InputStream
|
||||
class SeekableInputStream extends InputStream
|
||||
{
|
||||
use SeekableTrait;
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace danog\MadelineProto\Ipc\Wrapper;
|
||||
|
||||
class SeekableOutputStream extends OutputStream
|
||||
class SeekableOutputStream extends OutputStream
|
||||
{
|
||||
use SeekableTrait;
|
||||
}
|
||||
}
|
||||
|
18
src/danog/MadelineProto/Ipc/Wrapper/WrapMethodTrait.php
Normal file
18
src/danog/MadelineProto/Ipc/Wrapper/WrapMethodTrait.php
Normal file
@ -0,0 +1,18 @@
|
||||
<?php
|
||||
|
||||
namespace danog\MadelineProto\Ipc\Wrapper;
|
||||
|
||||
use danog\MadelineProto\Ipc\Wrapper;
|
||||
|
||||
trait WrapMethodTrait
|
||||
{
|
||||
abstract public function __call($name, $args);
|
||||
public function wrap(...$args): \Generator
|
||||
{
|
||||
$new = yield from Wrapper::create($args, $this->session->getIpcCallbackPath(), $this->logger);
|
||||
foreach ($args as &$arg) {
|
||||
$new->wrap($arg);
|
||||
}
|
||||
return $this->__call(__FUNCTION__, $new);
|
||||
}
|
||||
}
|
@ -32,7 +32,6 @@ use danog\MadelineProto\Db\DbPropertiesFactory;
|
||||
use danog\MadelineProto\Db\DbPropertiesTrait;
|
||||
use danog\MadelineProto\Db\MemoryArray;
|
||||
use danog\MadelineProto\Ipc\Server;
|
||||
use danog\MadelineProto\Ipc\ServerCallback;
|
||||
use danog\MadelineProto\Loop\Generic\PeriodicLoopInternal;
|
||||
use danog\MadelineProto\Loop\Update\FeedLoop;
|
||||
use danog\MadelineProto\Loop\Update\SeqLoop;
|
||||
|
@ -19,30 +19,16 @@
|
||||
|
||||
namespace danog\MadelineProto\MTProtoTools;
|
||||
|
||||
use Amp\ByteStream\InputStream;
|
||||
use Amp\ByteStream\IteratorStream;
|
||||
use Amp\ByteStream\OutputStream;
|
||||
use Amp\ByteStream\ResourceInputStream;
|
||||
use Amp\ByteStream\ResourceOutputStream;
|
||||
use Amp\ByteStream\StreamException;
|
||||
use Amp\Deferred;
|
||||
use Amp\File\BlockingFile;
|
||||
use Amp\File\Handle;
|
||||
use Amp\File\StatCache as StatCacheAsync;
|
||||
use Amp\Http\Client\Request;
|
||||
use Amp\Http\Server\Request as ServerRequest;
|
||||
use Amp\Http\Server\Response;
|
||||
use Amp\Http\Status;
|
||||
use Amp\Producer;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use danog\MadelineProto\Exception;
|
||||
use danog\MadelineProto\FileCallbackInterface;
|
||||
use danog\MadelineProto\Settings;
|
||||
use danog\MadelineProto\Stream\Common\BufferedRawStream;
|
||||
use danog\MadelineProto\Stream\Common\SimpleBufferedRawStream;
|
||||
use danog\MadelineProto\Stream\ConnectionContext;
|
||||
use danog\MadelineProto\Stream\Transport\PremadeStream;
|
||||
use danog\MadelineProto\Tools;
|
||||
|
||||
use tgseclib\Crypt\AES;
|
||||
@ -61,55 +47,7 @@ use function Amp\Promise\all;
|
||||
*/
|
||||
trait Files
|
||||
{
|
||||
/**
|
||||
* Upload file.
|
||||
*
|
||||
* @param FileCallbackInterface|string|array $file File, URL or Telegram file to upload
|
||||
* @param string $fileName File name
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
* @param boolean $encrypted Whether to encrypt file for secret chats
|
||||
*
|
||||
* @return \Generator<array>
|
||||
*/
|
||||
public function upload($file, string $fileName = '', $cb = null, bool $encrypted = false): \Generator
|
||||
{
|
||||
if (\is_object($file) && $file instanceof FileCallbackInterface) {
|
||||
$cb = $file;
|
||||
$file = $file->getFile();
|
||||
}
|
||||
if (\is_string($file) || \is_object($file) && \method_exists($file, '__toString')) {
|
||||
if (\filter_var($file, FILTER_VALIDATE_URL)) {
|
||||
return yield from $this->uploadFromUrl($file, 0, $fileName, $cb, $encrypted);
|
||||
}
|
||||
} elseif (\is_array($file)) {
|
||||
return yield from $this->uploadFromTgfile($file, $cb, $encrypted);
|
||||
}
|
||||
if (\is_resource($file) || (\is_object($file) && $file instanceof InputStream)) {
|
||||
return yield from $this->uploadFromStream($file, 0, '', $fileName, $cb, $encrypted);
|
||||
}
|
||||
if (!$this->settings->getFiles()->getAllowAutomaticUpload()) {
|
||||
return yield from $this->uploadFromUrl($file, 0, $fileName, $cb, $encrypted);
|
||||
}
|
||||
$file = Tools::absolute($file);
|
||||
if (!yield exists($file)) {
|
||||
throw new \danog\MadelineProto\Exception(\danog\MadelineProto\Lang::$current_lang['file_not_exist']);
|
||||
}
|
||||
if (empty($fileName)) {
|
||||
$fileName = \basename($file);
|
||||
}
|
||||
StatCacheAsync::clear($file);
|
||||
$size = (yield statAsync($file))['size'];
|
||||
if ($size > 512 * 1024 * 4000) {
|
||||
throw new \danog\MadelineProto\Exception('Given file is too big!');
|
||||
}
|
||||
$stream = yield open($file, 'rb');
|
||||
$mime = $this->getMimeFromFile($file);
|
||||
try {
|
||||
return yield from $this->uploadFromStream($stream, $size, $mime, $fileName, $cb, $encrypted);
|
||||
} finally {
|
||||
yield $stream->close();
|
||||
}
|
||||
}
|
||||
use FilesLogic;
|
||||
/**
|
||||
* Upload file from URL.
|
||||
*
|
||||
@ -153,90 +91,6 @@ trait Files
|
||||
}
|
||||
return yield from $this->uploadFromStream($stream, $size, $mime, $fileName, $cb, $encrypted);
|
||||
}
|
||||
/**
|
||||
* Upload file from stream.
|
||||
*
|
||||
* @param mixed $stream PHP resource or AMPHP async stream
|
||||
* @param integer $size File size
|
||||
* @param string $mime Mime type
|
||||
* @param string $fileName File name
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
* @param boolean $encrypted Whether to encrypt file for secret chats
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function uploadFromStream($stream, int $size, string $mime, string $fileName = '', $cb = null, bool $encrypted = false): \Generator
|
||||
{
|
||||
if (\is_object($stream) && $stream instanceof FileCallbackInterface) {
|
||||
$cb = $stream;
|
||||
$stream = $stream->getFile();
|
||||
}
|
||||
/* @var $stream \Amp\ByteStream\OutputStream */
|
||||
if (!\is_object($stream)) {
|
||||
$stream = new ResourceInputStream($stream);
|
||||
}
|
||||
if (!$stream instanceof InputStream) {
|
||||
throw new Exception("Invalid stream provided");
|
||||
}
|
||||
$seekable = false;
|
||||
if (\method_exists($stream, 'seek')) {
|
||||
try {
|
||||
yield $stream->seek(0);
|
||||
$seekable = true;
|
||||
} catch (StreamException $e) {
|
||||
}
|
||||
}
|
||||
$created = false;
|
||||
if ($stream instanceof Handle) {
|
||||
$callable = static function (int $offset, int $size) use ($stream, $seekable): \Generator {
|
||||
if ($seekable) {
|
||||
while ($stream->tell() !== $offset) {
|
||||
yield $stream->seek($offset);
|
||||
}
|
||||
}
|
||||
return yield $stream->read($size);
|
||||
};
|
||||
} else {
|
||||
if (!$stream instanceof BufferedRawStream) {
|
||||
$ctx = (new ConnectionContext())->addStream(PremadeStream::class, $stream)->addStream(SimpleBufferedRawStream::class);
|
||||
$stream = (yield from $ctx->getStream());
|
||||
$created = true;
|
||||
}
|
||||
$callable = static function (int $offset, int $size) use ($stream): \Generator {
|
||||
$reader = yield $stream->getReadBuffer($l);
|
||||
try {
|
||||
return yield $reader->bufferRead($size);
|
||||
} catch (\danog\MadelineProto\NothingInTheSocketException $e) {
|
||||
$reader = yield $stream->getReadBuffer($size);
|
||||
return yield $reader->bufferRead($size);
|
||||
}
|
||||
};
|
||||
$seekable = false;
|
||||
}
|
||||
if (!$size && $seekable && \method_exists($stream, 'tell')) {
|
||||
yield $stream->seek(0, \SEEK_END);
|
||||
$size = yield $stream->tell();
|
||||
yield $stream->seek(0);
|
||||
} elseif (!$size) {
|
||||
$this->logger->logger("No content length for stream, caching first");
|
||||
$body = $stream;
|
||||
$stream = new BlockingFile(\fopen('php://temp', 'r+b'), 'php://temp', 'r+b');
|
||||
while (null !== ($chunk = yield $body->read())) {
|
||||
yield $stream->write($chunk);
|
||||
}
|
||||
$size = $stream->tell();
|
||||
if (!$size) {
|
||||
throw new Exception('Wrong size!');
|
||||
}
|
||||
yield $stream->seek(0);
|
||||
return yield from $this->uploadFromStream($stream, $size, $mime, $fileName, $cb, $encrypted);
|
||||
}
|
||||
$res = (yield from $this->uploadFromCallable($callable, $size, $mime, $fileName, $cb, $seekable, $encrypted));
|
||||
if ($created) {
|
||||
$stream->disconnect();
|
||||
}
|
||||
return $res;
|
||||
}
|
||||
/**
|
||||
* Upload file from callable.
|
||||
*
|
||||
@ -365,19 +219,6 @@ trait Files
|
||||
//\hash_final($ctx);
|
||||
return $constructor;
|
||||
}
|
||||
/**
|
||||
* Upload file to secret chat.
|
||||
*
|
||||
* @param FileCallbackInterface|string|array $file File, URL or Telegram file to upload
|
||||
* @param string $fileName File name
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
*
|
||||
* @return \Generator<array>
|
||||
*/
|
||||
public function uploadEncrypted($file, string $fileName = '', $cb = null): \Generator
|
||||
{
|
||||
return $this->upload($file, $fileName, $cb, true);
|
||||
}
|
||||
/**
|
||||
* Reupload telegram file.
|
||||
*
|
||||
@ -907,106 +748,6 @@ trait Files
|
||||
throw new \danog\MadelineProto\Exception('Invalid constructor provided: '.$messageMedia['_']);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Download file to browser.
|
||||
*
|
||||
* Supports HEAD requests and content-ranges for parallel and resumed downloads.
|
||||
*
|
||||
* @param array|string $messageMedia File to download
|
||||
* @param callable $cb Status callback (can also use FileCallback)
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
public function downloadToBrowser($messageMedia, callable $cb = null): \Generator
|
||||
{
|
||||
if (\is_object($messageMedia) && $messageMedia instanceof FileCallbackInterface) {
|
||||
$cb = $messageMedia;
|
||||
$messageMedia = $messageMedia->getFile();
|
||||
}
|
||||
|
||||
$headers = [];
|
||||
if (isset($_SERVER['HTTP_RANGE'])) {
|
||||
$headers['range'] = $_SERVER['HTTP_RANGE'];
|
||||
}
|
||||
|
||||
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
|
||||
$result = ResponseInfo::parseHeaders(
|
||||
$_SERVER['REQUEST_METHOD'],
|
||||
$headers,
|
||||
$messageMedia
|
||||
);
|
||||
|
||||
foreach ($result->getHeaders() as $key => $value) {
|
||||
if (\is_array($value)) {
|
||||
foreach ($value as $subValue) {
|
||||
\header("$key: $subValue", false);
|
||||
}
|
||||
} else {
|
||||
\header("$key: $value");
|
||||
}
|
||||
}
|
||||
\http_response_code($result->getCode());
|
||||
|
||||
if (!\in_array($result->getCode(), [Status::OK, Status::PARTIAL_CONTENT])) {
|
||||
yield Tools::echo($result->getCodeExplanation());
|
||||
} elseif ($result->shouldServe()) {
|
||||
if (\ob_get_level()) {
|
||||
\ob_end_flush();
|
||||
\ob_implicit_flush();
|
||||
}
|
||||
yield from $this->downloadToStream($messageMedia, \fopen('php://output', 'w'), $cb, ...$result->getServeRange());
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Download file to amphp/http-server response.
|
||||
*
|
||||
* Supports HEAD requests and content-ranges for parallel and resumed downloads.
|
||||
*
|
||||
* @param array|string $messageMedia File to download
|
||||
* @param ServerRequest $request Request
|
||||
* @param callable $cb Status callback (can also use FileCallback)
|
||||
*
|
||||
* @return \Generator<Response> Returned response
|
||||
*/
|
||||
public function downloadToResponse($messageMedia, ServerRequest $request, callable $cb = null): \Generator
|
||||
{
|
||||
if (\is_object($messageMedia) && $messageMedia instanceof FileCallbackInterface) {
|
||||
$cb = $messageMedia;
|
||||
$messageMedia = $messageMedia->getFile();
|
||||
}
|
||||
|
||||
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
|
||||
|
||||
$result = ResponseInfo::parseHeaders(
|
||||
$request->getMethod(),
|
||||
\array_map(fn (array $headers) => $headers[0], $request->getHeaders()),
|
||||
$messageMedia
|
||||
);
|
||||
|
||||
$body = null;
|
||||
if ($result->shouldServe()) {
|
||||
$body = new IteratorStream(
|
||||
new Producer(
|
||||
function (callable $emit) use (&$messageMedia, &$cb, &$result) {
|
||||
$emit = static function (string $payload) use ($emit): \Generator {
|
||||
yield $emit($payload);
|
||||
return \strlen($payload);
|
||||
};
|
||||
yield Tools::call($this->downloadToCallable($messageMedia, $emit, $cb, false, ...$result->getServeRange()));
|
||||
}
|
||||
)
|
||||
);
|
||||
} elseif (!\in_array($result->getCode(), [Status::OK, Status::PARTIAL_CONTENT])) {
|
||||
$body = $result->getCodeExplanation();
|
||||
}
|
||||
|
||||
$response = new Response($result->getCode(), $result->getHeaders(), $body);
|
||||
if ($result->shouldServe() && !empty($result->getHeaders()['Content-Length'])) {
|
||||
$response->setHeader('content-length', $result->getHeaders()['Content-Length']);
|
||||
}
|
||||
|
||||
return $response;
|
||||
}
|
||||
/**
|
||||
* Download file to directory.
|
||||
*
|
||||
@ -1061,49 +802,6 @@ trait Files
|
||||
}
|
||||
return $file;
|
||||
}
|
||||
/**
|
||||
* Download file to stream.
|
||||
*
|
||||
* @param mixed $messageMedia File to download
|
||||
* @param mixed|FileCallbackInterface $stream Stream where to download file
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
* @param int $offset Offset where to start downloading
|
||||
* @param int $end Offset where to end download
|
||||
*
|
||||
* @return \Generator<bool>
|
||||
*/
|
||||
public function downloadToStream($messageMedia, $stream, $cb = null, int $offset = 0, int $end = -1): \Generator
|
||||
{
|
||||
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
|
||||
if (\is_object($stream) && $stream instanceof FileCallbackInterface) {
|
||||
$cb = $stream;
|
||||
$stream = $stream->getFile();
|
||||
}
|
||||
/** @var $stream \Amp\ByteStream\OutputStream */
|
||||
if (!\is_object($stream)) {
|
||||
$stream = new ResourceOutputStream($stream);
|
||||
}
|
||||
if (!$stream instanceof OutputStream) {
|
||||
throw new Exception("Invalid stream provided");
|
||||
}
|
||||
$seekable = false;
|
||||
if (\method_exists($stream, 'seek')) {
|
||||
try {
|
||||
yield $stream->seek($offset);
|
||||
$seekable = true;
|
||||
} catch (StreamException $e) {
|
||||
}
|
||||
}
|
||||
$callable = static function (string $payload, int $offset) use ($stream, $seekable): \Generator {
|
||||
if ($seekable) {
|
||||
while ($stream->tell() !== $offset) {
|
||||
yield $stream->seek($offset);
|
||||
}
|
||||
}
|
||||
return yield $stream->write($payload);
|
||||
};
|
||||
return yield from $this->downloadToCallable($messageMedia, $callable, $cb, $seekable, $offset, $end);
|
||||
}
|
||||
/**
|
||||
* Download file to callable.
|
||||
* The callable must accept two parameters: string $payload, int $offset
|
||||
|
328
src/danog/MadelineProto/MTProtoTools/FilesLogic.php
Normal file
328
src/danog/MadelineProto/MTProtoTools/FilesLogic.php
Normal file
@ -0,0 +1,328 @@
|
||||
<?php
|
||||
|
||||
namespace danog\MadelineProto\MTProtoTools;
|
||||
|
||||
use Amp\ByteStream\InputStream;
|
||||
use Amp\ByteStream\IteratorStream;
|
||||
use Amp\ByteStream\OutputStream;
|
||||
use Amp\ByteStream\ResourceInputStream;
|
||||
use Amp\ByteStream\ResourceOutputStream;
|
||||
use Amp\ByteStream\StreamException;
|
||||
use Amp\File\BlockingFile;
|
||||
|
||||
use Amp\File\Handle;
|
||||
use Amp\File\StatCache as StatCacheAsync;
|
||||
use Amp\Http\Client\Request;
|
||||
use Amp\Http\Server\Request as ServerRequest;
|
||||
use Amp\Http\Server\Response;
|
||||
use Amp\Http\Status;
|
||||
use Amp\Producer;
|
||||
use danog\MadelineProto\Exception;
|
||||
use danog\MadelineProto\FileCallbackInterface;
|
||||
use danog\MadelineProto\Stream\Common\BufferedRawStream;
|
||||
use danog\MadelineProto\Stream\Common\SimpleBufferedRawStream;
|
||||
use danog\MadelineProto\Stream\ConnectionContext;
|
||||
use danog\MadelineProto\Stream\Transport\PremadeStream;
|
||||
use danog\MadelineProto\Tools;
|
||||
|
||||
|
||||
use function Amp\File\exists;
|
||||
use function Amp\File\open;
|
||||
use function Amp\File\stat as statAsync;
|
||||
|
||||
trait FilesLogic
|
||||
{
|
||||
/**
|
||||
* Download file to browser.
|
||||
*
|
||||
* Supports HEAD requests and content-ranges for parallel and resumed downloads.
|
||||
*
|
||||
* @param array|string $messageMedia File to download
|
||||
* @param callable $cb Status callback (can also use FileCallback)
|
||||
*
|
||||
* @return \Generator
|
||||
*/
|
||||
public function downloadToBrowser($messageMedia, callable $cb = null): \Generator
|
||||
{
|
||||
if (\is_object($messageMedia) && $messageMedia instanceof FileCallbackInterface) {
|
||||
$cb = $messageMedia;
|
||||
$messageMedia = $messageMedia->getFile();
|
||||
}
|
||||
|
||||
$headers = [];
|
||||
if (isset($_SERVER['HTTP_RANGE'])) {
|
||||
$headers['range'] = $_SERVER['HTTP_RANGE'];
|
||||
}
|
||||
|
||||
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
|
||||
$result = ResponseInfo::parseHeaders(
|
||||
$_SERVER['REQUEST_METHOD'],
|
||||
$headers,
|
||||
$messageMedia
|
||||
);
|
||||
|
||||
foreach ($result->getHeaders() as $key => $value) {
|
||||
if (\is_array($value)) {
|
||||
foreach ($value as $subValue) {
|
||||
\header("$key: $subValue", false);
|
||||
}
|
||||
} else {
|
||||
\header("$key: $value");
|
||||
}
|
||||
}
|
||||
\http_response_code($result->getCode());
|
||||
|
||||
if (!\in_array($result->getCode(), [Status::OK, Status::PARTIAL_CONTENT])) {
|
||||
yield Tools::echo($result->getCodeExplanation());
|
||||
} elseif ($result->shouldServe()) {
|
||||
if (\ob_get_level()) {
|
||||
\ob_end_flush();
|
||||
\ob_implicit_flush();
|
||||
}
|
||||
yield from $this->downloadToStream($messageMedia, \fopen('php://output', 'w'), $cb, ...$result->getServeRange());
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Download file to stream.
|
||||
*
|
||||
* @param mixed $messageMedia File to download
|
||||
* @param mixed|FileCallbackInterface $stream Stream where to download file
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
* @param int $offset Offset where to start downloading
|
||||
* @param int $end Offset where to end download
|
||||
*
|
||||
* @return \Generator<bool>
|
||||
*/
|
||||
public function downloadToStream($messageMedia, $stream, $cb = null, int $offset = 0, int $end = -1): \Generator
|
||||
{
|
||||
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
|
||||
if (\is_object($stream) && $stream instanceof FileCallbackInterface) {
|
||||
$cb = $stream;
|
||||
$stream = $stream->getFile();
|
||||
}
|
||||
/** @var $stream \Amp\ByteStream\OutputStream */
|
||||
if (!\is_object($stream)) {
|
||||
$stream = new ResourceOutputStream($stream);
|
||||
}
|
||||
if (!$stream instanceof OutputStream) {
|
||||
throw new Exception("Invalid stream provided");
|
||||
}
|
||||
$seekable = false;
|
||||
if (\method_exists($stream, 'seek')) {
|
||||
try {
|
||||
yield $stream->seek($offset);
|
||||
$seekable = true;
|
||||
} catch (StreamException $e) {
|
||||
}
|
||||
}
|
||||
$callable = static function (string $payload, int $offset) use ($stream, $seekable): \Generator {
|
||||
if ($seekable) {
|
||||
while ($stream->tell() !== $offset) {
|
||||
yield $stream->seek($offset);
|
||||
}
|
||||
}
|
||||
return yield $stream->write($payload);
|
||||
};
|
||||
return yield from $this->downloadToCallable($messageMedia, $callable, $cb, $seekable, $offset, $end);
|
||||
}
|
||||
|
||||
/**
|
||||
* Download file to amphp/http-server response.
|
||||
*
|
||||
* Supports HEAD requests and content-ranges for parallel and resumed downloads.
|
||||
*
|
||||
* @param array|string $messageMedia File to download
|
||||
* @param ServerRequest $request Request
|
||||
* @param callable $cb Status callback (can also use FileCallback)
|
||||
*
|
||||
* @return \Generator<Response> Returned response
|
||||
*/
|
||||
public function downloadToResponse($messageMedia, ServerRequest $request, callable $cb = null): \Generator
|
||||
{
|
||||
if (\is_object($messageMedia) && $messageMedia instanceof FileCallbackInterface) {
|
||||
$cb = $messageMedia;
|
||||
$messageMedia = $messageMedia->getFile();
|
||||
}
|
||||
|
||||
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
|
||||
|
||||
$result = ResponseInfo::parseHeaders(
|
||||
$request->getMethod(),
|
||||
\array_map(fn (array $headers) => $headers[0], $request->getHeaders()),
|
||||
$messageMedia
|
||||
);
|
||||
|
||||
$body = null;
|
||||
if ($result->shouldServe()) {
|
||||
$body = new IteratorStream(
|
||||
new Producer(
|
||||
function (callable $emit) use (&$messageMedia, &$cb, &$result) {
|
||||
$emit = static function (string $payload) use ($emit): \Generator {
|
||||
yield $emit($payload);
|
||||
return \strlen($payload);
|
||||
};
|
||||
yield Tools::call($this->downloadToCallable($messageMedia, $emit, $cb, false, ...$result->getServeRange()));
|
||||
}
|
||||
)
|
||||
);
|
||||
} elseif (!\in_array($result->getCode(), [Status::OK, Status::PARTIAL_CONTENT])) {
|
||||
$body = $result->getCodeExplanation();
|
||||
}
|
||||
|
||||
$response = new Response($result->getCode(), $result->getHeaders(), $body);
|
||||
if ($result->shouldServe() && !empty($result->getHeaders()['Content-Length'])) {
|
||||
$response->setHeader('content-length', $result->getHeaders()['Content-Length']);
|
||||
}
|
||||
|
||||
return $response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload file to secret chat.
|
||||
*
|
||||
* @param FileCallbackInterface|string|array $file File, URL or Telegram file to upload
|
||||
* @param string $fileName File name
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
*
|
||||
* @return \Generator<array>
|
||||
*/
|
||||
public function uploadEncrypted($file, string $fileName = '', $cb = null): \Generator
|
||||
{
|
||||
return $this->upload($file, $fileName, $cb, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload file.
|
||||
*
|
||||
* @param FileCallbackInterface|string|array $file File, URL or Telegram file to upload
|
||||
* @param string $fileName File name
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
* @param boolean $encrypted Whether to encrypt file for secret chats
|
||||
*
|
||||
* @return \Generator<array>
|
||||
*/
|
||||
public function upload($file, string $fileName = '', $cb = null, bool $encrypted = false): \Generator
|
||||
{
|
||||
if (\is_object($file) && $file instanceof FileCallbackInterface) {
|
||||
$cb = $file;
|
||||
$file = $file->getFile();
|
||||
}
|
||||
if (\is_string($file) || \is_object($file) && \method_exists($file, '__toString')) {
|
||||
if (\filter_var($file, FILTER_VALIDATE_URL)) {
|
||||
return yield from $this->uploadFromUrl($file, 0, $fileName, $cb, $encrypted);
|
||||
}
|
||||
} elseif (\is_array($file)) {
|
||||
return yield from $this->uploadFromTgfile($file, $cb, $encrypted);
|
||||
}
|
||||
if (\is_resource($file) || (\is_object($file) && $file instanceof InputStream)) {
|
||||
return yield from $this->uploadFromStream($file, 0, '', $fileName, $cb, $encrypted);
|
||||
}
|
||||
if (!$this->settings->getFiles()->getAllowAutomaticUpload()) {
|
||||
return yield from $this->uploadFromUrl($file, 0, $fileName, $cb, $encrypted);
|
||||
}
|
||||
$file = Tools::absolute($file);
|
||||
if (!yield exists($file)) {
|
||||
throw new \danog\MadelineProto\Exception(\danog\MadelineProto\Lang::$current_lang['file_not_exist']);
|
||||
}
|
||||
if (empty($fileName)) {
|
||||
$fileName = \basename($file);
|
||||
}
|
||||
StatCacheAsync::clear($file);
|
||||
$size = (yield statAsync($file))['size'];
|
||||
if ($size > 512 * 1024 * 4000) {
|
||||
throw new \danog\MadelineProto\Exception('Given file is too big!');
|
||||
}
|
||||
$stream = yield open($file, 'rb');
|
||||
$mime = $this->getMimeFromFile($file);
|
||||
try {
|
||||
return yield from $this->uploadFromStream($stream, $size, $mime, $fileName, $cb, $encrypted);
|
||||
} finally {
|
||||
yield $stream->close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload file from stream.
|
||||
*
|
||||
* @param mixed $stream PHP resource or AMPHP async stream
|
||||
* @param integer $size File size
|
||||
* @param string $mime Mime type
|
||||
* @param string $fileName File name
|
||||
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
|
||||
* @param boolean $encrypted Whether to encrypt file for secret chats
|
||||
*
|
||||
* @return array
|
||||
*/
|
||||
public function uploadFromStream($stream, int $size, string $mime, string $fileName = '', $cb = null, bool $encrypted = false): \Generator
|
||||
{
|
||||
if (\is_object($stream) && $stream instanceof FileCallbackInterface) {
|
||||
$cb = $stream;
|
||||
$stream = $stream->getFile();
|
||||
}
|
||||
/* @var $stream \Amp\ByteStream\OutputStream */
|
||||
if (!\is_object($stream)) {
|
||||
$stream = new ResourceInputStream($stream);
|
||||
}
|
||||
if (!$stream instanceof InputStream) {
|
||||
throw new Exception("Invalid stream provided");
|
||||
}
|
||||
$seekable = false;
|
||||
if (\method_exists($stream, 'seek')) {
|
||||
try {
|
||||
yield $stream->seek(0);
|
||||
$seekable = true;
|
||||
} catch (StreamException $e) {
|
||||
}
|
||||
}
|
||||
$created = false;
|
||||
if ($stream instanceof Handle) {
|
||||
$callable = static function (int $offset, int $size) use ($stream, $seekable): \Generator {
|
||||
if ($seekable) {
|
||||
while ($stream->tell() !== $offset) {
|
||||
yield $stream->seek($offset);
|
||||
}
|
||||
}
|
||||
return yield $stream->read($size);
|
||||
};
|
||||
} else {
|
||||
if (!$stream instanceof BufferedRawStream) {
|
||||
$ctx = (new ConnectionContext())->addStream(PremadeStream::class, $stream)->addStream(SimpleBufferedRawStream::class);
|
||||
$stream = (yield from $ctx->getStream());
|
||||
$created = true;
|
||||
}
|
||||
$callable = static function (int $offset, int $size) use ($stream): \Generator {
|
||||
$reader = yield $stream->getReadBuffer($l);
|
||||
try {
|
||||
return yield $reader->bufferRead($size);
|
||||
} catch (\danog\MadelineProto\NothingInTheSocketException $e) {
|
||||
$reader = yield $stream->getReadBuffer($size);
|
||||
return yield $reader->bufferRead($size);
|
||||
}
|
||||
};
|
||||
$seekable = false;
|
||||
}
|
||||
if (!$size && $seekable && \method_exists($stream, 'tell')) {
|
||||
yield $stream->seek(0, \SEEK_END);
|
||||
$size = yield $stream->tell();
|
||||
yield $stream->seek(0);
|
||||
} elseif (!$size) {
|
||||
$this->logger->logger("No content length for stream, caching first");
|
||||
$body = $stream;
|
||||
$stream = new BlockingFile(\fopen('php://temp', 'r+b'), 'php://temp', 'r+b');
|
||||
while (null !== ($chunk = yield $body->read())) {
|
||||
yield $stream->write($chunk);
|
||||
}
|
||||
$size = $stream->tell();
|
||||
if (!$size) {
|
||||
throw new Exception('Wrong size!');
|
||||
}
|
||||
yield $stream->seek(0);
|
||||
return yield from $this->uploadFromStream($stream, $size, $mime, $fileName, $cb, $encrypted);
|
||||
}
|
||||
$res = (yield from $this->uploadFromCallable($callable, $size, $mime, $fileName, $cb, $seekable, $encrypted));
|
||||
if ($created) {
|
||||
$stream->disconnect();
|
||||
}
|
||||
return $res;
|
||||
}
|
||||
}
|
@ -22,7 +22,6 @@ namespace danog\MadelineProto;
|
||||
use Amp\Deferred;
|
||||
use Amp\Loop;
|
||||
use Amp\Promise;
|
||||
use danog\MadelineProto\Db\DbArray;
|
||||
use danog\MadelineProto\Db\DriverArray;
|
||||
use danog\MadelineProto\Ipc\Server;
|
||||
use danog\MadelineProto\MTProtoSession\Session;
|
||||
@ -225,6 +224,7 @@ abstract class Serialization
|
||||
try {
|
||||
\clearstatcache(true, $ipcPath);
|
||||
$socket = yield connect($ipcPath);
|
||||
Logger::log("Connected to IPC socket!");
|
||||
if ($cancelFull) {
|
||||
$cancelFull->resolve(true);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user