Implement downloadTo* and uploadFrom* via IPC

This commit is contained in:
Daniil Gentili 2020-09-25 20:49:51 +02:00
parent 71b4efbbd3
commit bb003f0df8
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
17 changed files with 708 additions and 444 deletions

View File

@ -18,53 +18,29 @@
namespace danog\MadelineProto\Ipc;
use Amp\Deferred;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Promise;
use danog\MadelineProto\API;
use danog\MadelineProto\Exception;
use danog\MadelineProto\FileCallbackInterface;
use danog\MadelineProto\Logger;
use danog\MadelineProto\MTProtoTools\FilesLogic;
use danog\MadelineProto\SessionPaths;
use danog\MadelineProto\Tools;
use function Amp\Ipc\connect;
/**
* IPC client.
*/
class Client
class Client extends ClientAbstract
{
use \danog\MadelineProto\Wrappers\Start;
use \danog\MadelineProto\Wrappers\Templates;
use FilesLogic;
/**
* IPC server socket.
*/
protected ChannelledSocket $server;
/**
* Callback IPC server socket.
*/
private ?ChannelledSocket $serverCallback = null;
/**
* Requests promise array.
*/
private array $requests = [];
/**
* Wrappers array.
*/
private array $wrappers = [];
/**
* Whether to run loop.
*/
private bool $run = true;
/**
* Session.
*/
private SessionPaths $session;
/**
* Logger instance.
*/
public Logger $logger;
protected SessionPaths $session;
/**
* Constructor function.
*
@ -79,56 +55,6 @@ class Client
$this->session = $session;
Tools::callFork($this->loopInternal());
}
/**
* Logger.
*
* @param string $param Parameter
* @param int $level Logging level
* @param string $file File where the message originated
*
* @return void
*/
public function logger($param, int $level = Logger::NOTICE, string $file = ''): void
{
if ($file === null) {
$file = \basename(\debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1)[0]['file'], '.php');
}
isset($this->logger) ? $this->logger->logger($param, $level, $file) : Logger::$default->logger($param, $level, $file);
}
/**
* Main loop.
*
* @return \Generator
*/
private function loopInternal(): \Generator
{
while ($this->run) {
while ($payload = yield $this->server->receive()) {
[$id, $payload] = $payload;
if (!isset($this->requests[$id])) {
Logger::log("Got response for non-existing ID $id!");
} else {
$promise = $this->requests[$id];
unset($this->requests[$id]);
if (isset($this->wrappers[$id])) {
unset($this->wrappers[$id]);
}
if ($payload instanceof ExitFailure) {
$promise->fail($payload->getException());
} else {
$promise->resolve($payload);
}
unset($promise);
}
}
if ($this->run) {
$this->logger("Reconnecting to IPC server!");
yield $this->server->disconnect();
Server::startMe($this->session);
$this->server = yield connect($this->session->getIpcPath());
}
}
}
/**
* Run the provided async callable.
*
@ -147,18 +73,7 @@ class Client
*/
public function unreference(): void
{
$this->run = false;
Tools::wait($this->server->disconnect());
}
/**
* Disconnect cleanly from main instance.
*
* @return Promise
*/
public function disconnect(): Promise
{
$this->run = false;
return $this->server->disconnect();
Tools::wait($this->disconnect());
}
/**
* Stop IPC server instance.
@ -188,22 +103,145 @@ class Client
{
return true;
}
/**
* Call function.
* Upload file from URL.
*
* @param string|int $function Function name
* @param array|Wrapper $arguments Arguments
* @param string|FileCallbackInterface $url URL of file
* @param integer $size Size of file
* @param string $fileName File name
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
* @param boolean $encrypted Whether to encrypt file for secret chats
*
* @return \Generator
*/
public function __call($function, $arguments): \Generator
public function uploadFromUrl($url, int $size = 0, string $fileName = '', $cb = null, bool $encrypted = false): \Generator
{
$this->requests []= $deferred = new Deferred;
if ($arguments instanceof Wrapper) {
$this->wrappers[count($this->requests) - 1] = $arguments;
if (\is_object($url) && $url instanceof FileCallbackInterface) {
$cb = $url;
$url = $url->getFile();
}
yield $this->server->send([$function, $arguments]);
return yield $deferred->promise();
$params = [$url, $size, $fileName, &$cb, $encrypted];
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
$wrapper->wrap($cb, false);
return yield from $this->__call('uploadFromUrl', $wrapper);
}
/**
* Upload file from callable.
*
* The callable must accept two parameters: int $offset, int $size
* The callable must return a string with the contest of the file at the specified offset and size.
*
* @param mixed $callable Callable
* @param integer $size File size
* @param string $mime Mime type
* @param string $fileName File name
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
* @param boolean $seekable Whether chunks can be fetched out of order
* @param boolean $encrypted Whether to encrypt file for secret chats
*
* @return \Generator<array>
*/
public function uploadFromCallable(callable $callable, int $size, string $mime, string $fileName = '', $cb = null, bool $seekable = true, bool $encrypted = false): \Generator
{
if (\is_object($callable) && $callable instanceof FileCallbackInterface) {
$cb = $callable;
$callable = $callable->getFile();
}
$params = [&$callable, $size, $mime, $fileName, &$cb, $seekable, $encrypted];
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
$wrapper->wrap($cb, false);
$wrapper->wrap($callable, false);
return yield from $this->__call('uploadFromCallable', $wrapper);
}
/**
* Reupload telegram file.
*
* @param mixed $media Telegram file
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
* @param boolean $encrypted Whether to encrypt file for secret chats
*
* @return \Generator<array>
*/
public function uploadFromTgfile($media, $cb = null, bool $encrypted = false): \Generator
{
if (\is_object($media) && $media instanceof FileCallbackInterface) {
$cb = $media;
$media = $media->getFile();
}
$params = [$media, &$cb, $encrypted];
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
$wrapper->wrap($cb, false);
return yield from $this->__call('uploadFromTgfile', $wrapper);
}
/**
* Download file to directory.
*
* @param mixed $messageMedia File to download
* @param string|FileCallbackInterface $dir Directory where to download the file
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
*
* @return \Generator<string> Downloaded file path
*/
public function downloadToDir($messageMedia, $dir, $cb = null): \Generator
{
if (\is_object($dir) && $dir instanceof FileCallbackInterface) {
$cb = $dir;
$dir = $dir->getFile();
}
$params = [$messageMedia, $dir, &$cb];
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
$wrapper->wrap($cb, false);
return yield from $this->__call('downloadToDir', $wrapper);
}
/**
* Download file.
*
* @param mixed $messageMedia File to download
* @param string|FileCallbackInterface $file Downloaded file path
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
*
* @return \Generator<string> Downloaded file path
*/
public function downloadToFile($messageMedia, $file, $cb = null): \Generator
{
if (\is_object($file) && $file instanceof FileCallbackInterface) {
$cb = $file;
$file = $file->getFile();
}
$params = [$messageMedia, $file, &$cb];
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
$wrapper->wrap($cb, false);
return yield from $this->__call('downloadToFile', $wrapper);
}
/**
* Download file to callable.
* The callable must accept two parameters: string $payload, int $offset
* The callable will be called (possibly out of order, depending on the value of $seekable).
* The callable should return the number of written bytes.
*
* @param mixed $messageMedia File to download
* @param callable|FileCallbackInterface $callable Chunk callback
* @param callable $cb Status callback (DEPRECATED, use FileCallbackInterface)
* @param bool $seekable Whether the callable can be called out of order
* @param int $offset Offset where to start downloading
* @param int $end Offset where to stop downloading (inclusive)
* @param int $part_size Size of each chunk
*
* @return \Generator<bool>
*/
public function downloadToCallable($messageMedia, callable $callable, $cb = null, bool $seekable = true, int $offset = 0, int $end = -1, int $part_size = null): \Generator
{
$messageMedia = (yield from $this->getDownloadInfo($messageMedia));
if (\is_object($callable) && $callable instanceof FileCallbackInterface) {
$cb = $callable;
$callable = $callable->getFile();
}
$params = [$messageMedia, &$callable, &$cb, $seekable, $offset, $end, $part_size, ];
$wrapper = yield from Wrapper::create($params, $this->session, $this->logger);
$wrapper->wrap($callable, false);
$wrapper->wrap($cb, false);
return yield from $this->__call('downloadToCallable', $wrapper);
}
/**
* Placeholder.

View File

@ -0,0 +1,143 @@
<?php
/**
* API wrapper module.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2020 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Ipc;
use Amp\Deferred;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Promise;
use danog\MadelineProto\Logger;
use function Amp\Ipc\connect;
/**
* IPC client.
*/
abstract class ClientAbstract
{
/**
* IPC server socket.
*/
protected ChannelledSocket $server;
/**
* Requests promise array.
*
* @var Deferred[]
*/
private array $requests = [];
/**
* Wrappers array.
*
* @var Wrapper[]
*/
private array $wrappers = [];
/**
* Whether to run loop.
*/
protected bool $run = true;
/**
* Logger instance.
*/
public Logger $logger;
protected function __construct()
{
}
/**
* Logger.
*
* @param string $param Parameter
* @param int $level Logging level
* @param string $file File where the message originated
*
* @return void
*/
public function logger($param, int $level = Logger::NOTICE, string $file = ''): void
{
if ($file === null) {
$file = \basename(\debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1)[0]['file'], '.php');
}
isset($this->logger) ? $this->logger->logger($param, $level, $file) : Logger::$default->logger($param, $level, $file);
}
/**
* Main loop.
*
* @return \Generator
*/
protected function loopInternal(): \Generator
{
do {
while ($payload = yield $this->server->receive()) {
[$id, $payload] = $payload;
if (!isset($this->requests[$id])) {
Logger::log("Got response for non-existing ID $id!");
} else {
$promise = $this->requests[$id];
unset($this->requests[$id]);
if (isset($this->wrappers[$id])) {
yield $this->wrappers[$id]->disconnect();
unset($this->wrappers[$id]);
}
if ($payload instanceof ExitFailure) {
$promise->fail($payload->getException());
} else {
$promise->resolve($payload);
}
unset($promise);
}
}
if ($this->run) {
$this->logger("Reconnecting to IPC server!");
yield $this->server->disconnect();
if ($this instanceof Client) {
Server::startMe($this->session);
$this->server = yield connect($this->session->getIpcPath());
} else {
return;
}
}
} while ($this->run);
}
/**
* Disconnect cleanly from main instance.
*
* @return Promise
*/
public function disconnect(): Promise
{
$this->run = false;
return $this->server->disconnect();
}
/**
* Call function.
*
* @param string|int $function Function name
* @param array|Wrapper $arguments Arguments
*
* @return \Generator
*/
public function __call($function, $arguments): \Generator
{
$this->requests []= $deferred = new Deferred;
if ($arguments instanceof Wrapper) {
$this->wrappers[\count($this->requests) - 1] = $arguments;
}
yield $this->server->send([$function, $arguments]);
return yield $deferred->promise();
}
}

View File

@ -3,7 +3,6 @@
namespace danog\MadelineProto\Ipc\Runner;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Tools;
final class ProcessRunner extends RunnerAbstract
{

View File

@ -70,6 +70,10 @@ class Server extends SignalLoop
$this->callback = new ServerCallback($this->API);
$this->callback->setIpcPath($session);
}
public function start(): bool
{
return $this instanceof ServerCallback ? parent::start() : $this->callback->start() && parent::start();
}
/**
* Start IPC server in background.
*
@ -140,7 +144,7 @@ class Server extends SignalLoop
Tools::callFork($this->clientLoop($socket));
}
$this->server->close();
$this->callback->signal(null);
if (isset($this->callback)) $this->callback->signal(null);
}
/**
* Client handler loop.
@ -182,13 +186,18 @@ class Server extends SignalLoop
{
try {
if ($payload[1] instanceof Wrapper) {
$payload[1] = $this->callback->unwrap($payload[1]);
$wrapper = $payload[1];
$payload[1] = $this->callback->unwrap($wrapper);
}
$result = $this->API->{$payload[0]}(...$payload[1]);
$result = $result instanceof \Generator ? yield from $result : yield $result;
} catch (\Throwable $e) {
$this->API->logger("Got error while calling IPC method: $e", Logger::ERROR);
$result = new ExitFailure($e);
} finally {
if (isset($wrapper)) {
yield $wrapper->disconnect();
}
}
try {
yield $socket->send([$id, $result]);

View File

@ -66,8 +66,8 @@ class ServerCallback extends Server
*/
protected function clientLoop(ChannelledSocket $socket)
{
$this->API->logger("Accepted IPC callback connection!");
$id = $this->id++;
$this->API->logger("Accepted IPC callback connection, assigning ID $id!");
$this->socketList[$id] = $socket;
$this->watcherList[$id] = Loop::delay(30*1000, function () use ($id) {
unset($this->watcherList[$id], $this->socketList[$id]);
@ -78,7 +78,7 @@ class ServerCallback extends Server
/**
* Unwrap value
* Unwrap value.
*
* @param Wrapper $wrapper
* @return mixed

View File

@ -6,10 +6,12 @@ use Amp\ByteStream\InputStream as ByteStreamInputStream;
use Amp\ByteStream\OutputStream as ByteStreamOutputStream;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Parallel\Sync\ExitFailure;
use Amp\Promise;
use danog\MadelineProto\Ipc\Wrapper\InputStream;
use danog\MadelineProto\Ipc\Wrapper\Obj;
use danog\MadelineProto\Ipc\Wrapper\OutputStream;
use danog\MadelineProto\Logger;
use danog\MadelineProto\SessionPaths;
use danog\MadelineProto\Tools;
use function Amp\Ipc\connect;
@ -17,7 +19,7 @@ use function Amp\Ipc\connect;
/**
* Callback payload wrapper.
*/
class Wrapper extends Client
class Wrapper extends ClientAbstract
{
/**
* Payload data.
@ -45,31 +47,31 @@ class Wrapper extends Client
* Remote socket ID.
*/
private int $remoteId = 0;
/**
* Logger instance.
*/
private Logger $logger;
/**
* Constructor.
*
* @param mixed $data Payload data
* @param string $ipc IPC URI
* @param mixed $data Payload data
* @param SessionPaths $ipc IPC URI
*
* @return \Generator
* @return \Generator<int, Promise<ChannelledSocket>|Promise<mixed>, mixed, Wrapper>
*/
public static function create(&$data, string $ipc, Logger $logger): \Generator
public static function create(&$data, SessionPaths $session, Logger $logger): \Generator
{
$instance = new self;
$instance->data = &$data;
$instance->server = yield connect($ipc);
$instance->remoteId = yield $instance->server->receive();
$instance->logger = $logger;
$instance->run = false;
$logger->logger("Connecting to callback IPC server...");
$instance->server = yield connect($session->getIpcCallbackPath());
$logger->logger("Connected to callback IPC server!");
$instance->remoteId = yield $instance->server->receive();
$logger->logger("Got ID {$instance->remoteId} from callback IPC server!");
Tools::callFork($instance->receiverLoop());
return $instance;
}
private function __construct()
{
}
/**
* Serialization function.
*
@ -77,41 +79,42 @@ class Wrapper extends Client
*/
public function __sleep(): array
{
return ['data', 'callbackIds'];
return ['data', 'callbackIds', 'remoteId'];
}
/**
* Wrap a certain callback object.
*
* @param object|callable $callback Object to wrap
* @param object|callable $callback Callback to wrap
* @param bool $wrapObjects Whether to wrap object methods, too
*
* @param-out int $callback Callback ID
*
* @return void
*/
public function wrap(&$callback): void
public function wrap(&$callback, bool $wrapObjects = true): void
{
if (\is_object($callback)) {
if (\is_object($callback) && $wrapObjects) {
$ids = [];
foreach (\get_class_methods($callback) as $method) {
$id = $this->id++;
$this->callbacks[$id] = [$callback, $method];
$ids[$method] = $id;
}
$callback = $ids;
$this->callbackIds[] = &$callback;
} else {
$id = $this->id++;
$this->callbacks[$id] = self::copy($callback);
$class = Obj::class;
if ($callback instanceof ByteStreamInputStream) {
$class = InputStream::class;
} else if ($callback instanceof ByteStreamOutputStream) {
} elseif ($callback instanceof ByteStreamOutputStream) {
$class = OutputStream::class;
}
if ($class !== Obj::class && method_exists($callback, 'seek')) {
if ($class !== Obj::class && \method_exists($callback, 'seek')) {
$class = "Seekable$class";
}
$callback = [$class, $id]; // Will be re-filled later
$callback = [$class, $ids]; // Will be re-filled later
$this->callbackIds[] = &$callback;
} elseif (\is_callable($callback)) {
$id = $this->id++;
$this->callbacks[$id] = self::copy($callback);
$callback = $id;
$this->callbackIds[] = &$callback;
}
}

View File

@ -0,0 +1,31 @@
<?php
namespace danog\MadelineProto\Ipc\Wrapper;
use danog\MadelineProto\FileCallbackInterface;
class FileCallback extends Obj implements FileCallbackInterface
{
/**
* Get file.
*
* @return mixed
*/
public function getFile()
{
return $this->__call('getFile');
}
/**
* Invoke callback.
*
* @param int $percent Percent
* @param int $speed Speed in mbps
* @param int $time Time
*
* @return mixed
*/
public function __invoke(...$args)
{
return $this->__call('__invoke', $args);
}
}

View File

@ -4,7 +4,6 @@ namespace danog\MadelineProto\Ipc\Wrapper;
use Amp\ByteStream\InputStream as AmpInputStream;
use Amp\Promise;
use danog\MadelineProto\Ipc\Wrapper\Obj;
use danog\MadelineProto\Tools;
class InputStream extends Obj implements AmpInputStream

View File

@ -16,11 +16,11 @@ class Obj
*/
private array $methods = [];
/**
* Wrapper
* Wrapper.
*/
private Wrapper $wrapper;
/**
* Constructor
* Constructor.
*
* @param Wrapper $wrapper
* @param array $methods

View File

@ -4,7 +4,6 @@ namespace danog\MadelineProto\Ipc\Wrapper;
use Amp\ByteStream\OutputStream as AmpOutputStream;
use Amp\Promise;
use danog\MadelineProto\Ipc\Wrapper\Obj;
use danog\MadelineProto\Tools;
class OutputStream extends Obj implements AmpOutputStream

View File

@ -2,7 +2,7 @@
namespace danog\MadelineProto\Ipc\Wrapper;
class SeekableInputStream extends InputStream
class SeekableInputStream extends InputStream
{
use SeekableTrait;
}
}

View File

@ -2,7 +2,7 @@
namespace danog\MadelineProto\Ipc\Wrapper;
class SeekableOutputStream extends OutputStream
class SeekableOutputStream extends OutputStream
{
use SeekableTrait;
}
}

View File

@ -0,0 +1,18 @@
<?php
namespace danog\MadelineProto\Ipc\Wrapper;
use danog\MadelineProto\Ipc\Wrapper;
trait WrapMethodTrait
{
abstract public function __call($name, $args);
public function wrap(...$args): \Generator
{
$new = yield from Wrapper::create($args, $this->session->getIpcCallbackPath(), $this->logger);
foreach ($args as &$arg) {
$new->wrap($arg);
}
return $this->__call(__FUNCTION__, $new);
}
}

View File

@ -32,7 +32,6 @@ use danog\MadelineProto\Db\DbPropertiesFactory;
use danog\MadelineProto\Db\DbPropertiesTrait;
use danog\MadelineProto\Db\MemoryArray;
use danog\MadelineProto\Ipc\Server;
use danog\MadelineProto\Ipc\ServerCallback;
use danog\MadelineProto\Loop\Generic\PeriodicLoopInternal;
use danog\MadelineProto\Loop\Update\FeedLoop;
use danog\MadelineProto\Loop\Update\SeqLoop;

View File

@ -19,30 +19,16 @@
namespace danog\MadelineProto\MTProtoTools;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\IteratorStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\ByteStream\StreamException;
use Amp\Deferred;
use Amp\File\BlockingFile;
use Amp\File\Handle;
use Amp\File\StatCache as StatCacheAsync;
use Amp\Http\Client\Request;
use Amp\Http\Server\Request as ServerRequest;
use Amp\Http\Server\Response;
use Amp\Http\Status;
use Amp\Producer;
use Amp\Promise;
use Amp\Success;
use danog\MadelineProto\Exception;
use danog\MadelineProto\FileCallbackInterface;
use danog\MadelineProto\Settings;
use danog\MadelineProto\Stream\Common\BufferedRawStream;
use danog\MadelineProto\Stream\Common\SimpleBufferedRawStream;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\Transport\PremadeStream;
use danog\MadelineProto\Tools;
use tgseclib\Crypt\AES;
@ -61,55 +47,7 @@ use function Amp\Promise\all;
*/
trait Files
{
/**
* Upload file.
*
* @param FileCallbackInterface|string|array $file File, URL or Telegram file to upload
* @param string $fileName File name
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
* @param boolean $encrypted Whether to encrypt file for secret chats
*
* @return \Generator<array>
*/
public function upload($file, string $fileName = '', $cb = null, bool $encrypted = false): \Generator
{
if (\is_object($file) && $file instanceof FileCallbackInterface) {
$cb = $file;
$file = $file->getFile();
}
if (\is_string($file) || \is_object($file) && \method_exists($file, '__toString')) {
if (\filter_var($file, FILTER_VALIDATE_URL)) {
return yield from $this->uploadFromUrl($file, 0, $fileName, $cb, $encrypted);
}
} elseif (\is_array($file)) {
return yield from $this->uploadFromTgfile($file, $cb, $encrypted);
}
if (\is_resource($file) || (\is_object($file) && $file instanceof InputStream)) {
return yield from $this->uploadFromStream($file, 0, '', $fileName, $cb, $encrypted);
}
if (!$this->settings->getFiles()->getAllowAutomaticUpload()) {
return yield from $this->uploadFromUrl($file, 0, $fileName, $cb, $encrypted);
}
$file = Tools::absolute($file);
if (!yield exists($file)) {
throw new \danog\MadelineProto\Exception(\danog\MadelineProto\Lang::$current_lang['file_not_exist']);
}
if (empty($fileName)) {
$fileName = \basename($file);
}
StatCacheAsync::clear($file);
$size = (yield statAsync($file))['size'];
if ($size > 512 * 1024 * 4000) {
throw new \danog\MadelineProto\Exception('Given file is too big!');
}
$stream = yield open($file, 'rb');
$mime = $this->getMimeFromFile($file);
try {
return yield from $this->uploadFromStream($stream, $size, $mime, $fileName, $cb, $encrypted);
} finally {
yield $stream->close();
}
}
use FilesLogic;
/**
* Upload file from URL.
*
@ -153,90 +91,6 @@ trait Files
}
return yield from $this->uploadFromStream($stream, $size, $mime, $fileName, $cb, $encrypted);
}
/**
* Upload file from stream.
*
* @param mixed $stream PHP resource or AMPHP async stream
* @param integer $size File size
* @param string $mime Mime type
* @param string $fileName File name
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
* @param boolean $encrypted Whether to encrypt file for secret chats
*
* @return array
*/
public function uploadFromStream($stream, int $size, string $mime, string $fileName = '', $cb = null, bool $encrypted = false): \Generator
{
if (\is_object($stream) && $stream instanceof FileCallbackInterface) {
$cb = $stream;
$stream = $stream->getFile();
}
/* @var $stream \Amp\ByteStream\OutputStream */
if (!\is_object($stream)) {
$stream = new ResourceInputStream($stream);
}
if (!$stream instanceof InputStream) {
throw new Exception("Invalid stream provided");
}
$seekable = false;
if (\method_exists($stream, 'seek')) {
try {
yield $stream->seek(0);
$seekable = true;
} catch (StreamException $e) {
}
}
$created = false;
if ($stream instanceof Handle) {
$callable = static function (int $offset, int $size) use ($stream, $seekable): \Generator {
if ($seekable) {
while ($stream->tell() !== $offset) {
yield $stream->seek($offset);
}
}
return yield $stream->read($size);
};
} else {
if (!$stream instanceof BufferedRawStream) {
$ctx = (new ConnectionContext())->addStream(PremadeStream::class, $stream)->addStream(SimpleBufferedRawStream::class);
$stream = (yield from $ctx->getStream());
$created = true;
}
$callable = static function (int $offset, int $size) use ($stream): \Generator {
$reader = yield $stream->getReadBuffer($l);
try {
return yield $reader->bufferRead($size);
} catch (\danog\MadelineProto\NothingInTheSocketException $e) {
$reader = yield $stream->getReadBuffer($size);
return yield $reader->bufferRead($size);
}
};
$seekable = false;
}
if (!$size && $seekable && \method_exists($stream, 'tell')) {
yield $stream->seek(0, \SEEK_END);
$size = yield $stream->tell();
yield $stream->seek(0);
} elseif (!$size) {
$this->logger->logger("No content length for stream, caching first");
$body = $stream;
$stream = new BlockingFile(\fopen('php://temp', 'r+b'), 'php://temp', 'r+b');
while (null !== ($chunk = yield $body->read())) {
yield $stream->write($chunk);
}
$size = $stream->tell();
if (!$size) {
throw new Exception('Wrong size!');
}
yield $stream->seek(0);
return yield from $this->uploadFromStream($stream, $size, $mime, $fileName, $cb, $encrypted);
}
$res = (yield from $this->uploadFromCallable($callable, $size, $mime, $fileName, $cb, $seekable, $encrypted));
if ($created) {
$stream->disconnect();
}
return $res;
}
/**
* Upload file from callable.
*
@ -365,19 +219,6 @@ trait Files
//\hash_final($ctx);
return $constructor;
}
/**
* Upload file to secret chat.
*
* @param FileCallbackInterface|string|array $file File, URL or Telegram file to upload
* @param string $fileName File name
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
*
* @return \Generator<array>
*/
public function uploadEncrypted($file, string $fileName = '', $cb = null): \Generator
{
return $this->upload($file, $fileName, $cb, true);
}
/**
* Reupload telegram file.
*
@ -907,106 +748,6 @@ trait Files
throw new \danog\MadelineProto\Exception('Invalid constructor provided: '.$messageMedia['_']);
}
}
/**
* Download file to browser.
*
* Supports HEAD requests and content-ranges for parallel and resumed downloads.
*
* @param array|string $messageMedia File to download
* @param callable $cb Status callback (can also use FileCallback)
*
* @return \Generator
*/
public function downloadToBrowser($messageMedia, callable $cb = null): \Generator
{
if (\is_object($messageMedia) && $messageMedia instanceof FileCallbackInterface) {
$cb = $messageMedia;
$messageMedia = $messageMedia->getFile();
}
$headers = [];
if (isset($_SERVER['HTTP_RANGE'])) {
$headers['range'] = $_SERVER['HTTP_RANGE'];
}
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
$result = ResponseInfo::parseHeaders(
$_SERVER['REQUEST_METHOD'],
$headers,
$messageMedia
);
foreach ($result->getHeaders() as $key => $value) {
if (\is_array($value)) {
foreach ($value as $subValue) {
\header("$key: $subValue", false);
}
} else {
\header("$key: $value");
}
}
\http_response_code($result->getCode());
if (!\in_array($result->getCode(), [Status::OK, Status::PARTIAL_CONTENT])) {
yield Tools::echo($result->getCodeExplanation());
} elseif ($result->shouldServe()) {
if (\ob_get_level()) {
\ob_end_flush();
\ob_implicit_flush();
}
yield from $this->downloadToStream($messageMedia, \fopen('php://output', 'w'), $cb, ...$result->getServeRange());
}
}
/**
* Download file to amphp/http-server response.
*
* Supports HEAD requests and content-ranges for parallel and resumed downloads.
*
* @param array|string $messageMedia File to download
* @param ServerRequest $request Request
* @param callable $cb Status callback (can also use FileCallback)
*
* @return \Generator<Response> Returned response
*/
public function downloadToResponse($messageMedia, ServerRequest $request, callable $cb = null): \Generator
{
if (\is_object($messageMedia) && $messageMedia instanceof FileCallbackInterface) {
$cb = $messageMedia;
$messageMedia = $messageMedia->getFile();
}
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
$result = ResponseInfo::parseHeaders(
$request->getMethod(),
\array_map(fn (array $headers) => $headers[0], $request->getHeaders()),
$messageMedia
);
$body = null;
if ($result->shouldServe()) {
$body = new IteratorStream(
new Producer(
function (callable $emit) use (&$messageMedia, &$cb, &$result) {
$emit = static function (string $payload) use ($emit): \Generator {
yield $emit($payload);
return \strlen($payload);
};
yield Tools::call($this->downloadToCallable($messageMedia, $emit, $cb, false, ...$result->getServeRange()));
}
)
);
} elseif (!\in_array($result->getCode(), [Status::OK, Status::PARTIAL_CONTENT])) {
$body = $result->getCodeExplanation();
}
$response = new Response($result->getCode(), $result->getHeaders(), $body);
if ($result->shouldServe() && !empty($result->getHeaders()['Content-Length'])) {
$response->setHeader('content-length', $result->getHeaders()['Content-Length']);
}
return $response;
}
/**
* Download file to directory.
*
@ -1061,49 +802,6 @@ trait Files
}
return $file;
}
/**
* Download file to stream.
*
* @param mixed $messageMedia File to download
* @param mixed|FileCallbackInterface $stream Stream where to download file
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
* @param int $offset Offset where to start downloading
* @param int $end Offset where to end download
*
* @return \Generator<bool>
*/
public function downloadToStream($messageMedia, $stream, $cb = null, int $offset = 0, int $end = -1): \Generator
{
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
if (\is_object($stream) && $stream instanceof FileCallbackInterface) {
$cb = $stream;
$stream = $stream->getFile();
}
/** @var $stream \Amp\ByteStream\OutputStream */
if (!\is_object($stream)) {
$stream = new ResourceOutputStream($stream);
}
if (!$stream instanceof OutputStream) {
throw new Exception("Invalid stream provided");
}
$seekable = false;
if (\method_exists($stream, 'seek')) {
try {
yield $stream->seek($offset);
$seekable = true;
} catch (StreamException $e) {
}
}
$callable = static function (string $payload, int $offset) use ($stream, $seekable): \Generator {
if ($seekable) {
while ($stream->tell() !== $offset) {
yield $stream->seek($offset);
}
}
return yield $stream->write($payload);
};
return yield from $this->downloadToCallable($messageMedia, $callable, $cb, $seekable, $offset, $end);
}
/**
* Download file to callable.
* The callable must accept two parameters: string $payload, int $offset

View File

@ -0,0 +1,328 @@
<?php
namespace danog\MadelineProto\MTProtoTools;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\IteratorStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\ByteStream\StreamException;
use Amp\File\BlockingFile;
use Amp\File\Handle;
use Amp\File\StatCache as StatCacheAsync;
use Amp\Http\Client\Request;
use Amp\Http\Server\Request as ServerRequest;
use Amp\Http\Server\Response;
use Amp\Http\Status;
use Amp\Producer;
use danog\MadelineProto\Exception;
use danog\MadelineProto\FileCallbackInterface;
use danog\MadelineProto\Stream\Common\BufferedRawStream;
use danog\MadelineProto\Stream\Common\SimpleBufferedRawStream;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\Transport\PremadeStream;
use danog\MadelineProto\Tools;
use function Amp\File\exists;
use function Amp\File\open;
use function Amp\File\stat as statAsync;
trait FilesLogic
{
/**
* Download file to browser.
*
* Supports HEAD requests and content-ranges for parallel and resumed downloads.
*
* @param array|string $messageMedia File to download
* @param callable $cb Status callback (can also use FileCallback)
*
* @return \Generator
*/
public function downloadToBrowser($messageMedia, callable $cb = null): \Generator
{
if (\is_object($messageMedia) && $messageMedia instanceof FileCallbackInterface) {
$cb = $messageMedia;
$messageMedia = $messageMedia->getFile();
}
$headers = [];
if (isset($_SERVER['HTTP_RANGE'])) {
$headers['range'] = $_SERVER['HTTP_RANGE'];
}
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
$result = ResponseInfo::parseHeaders(
$_SERVER['REQUEST_METHOD'],
$headers,
$messageMedia
);
foreach ($result->getHeaders() as $key => $value) {
if (\is_array($value)) {
foreach ($value as $subValue) {
\header("$key: $subValue", false);
}
} else {
\header("$key: $value");
}
}
\http_response_code($result->getCode());
if (!\in_array($result->getCode(), [Status::OK, Status::PARTIAL_CONTENT])) {
yield Tools::echo($result->getCodeExplanation());
} elseif ($result->shouldServe()) {
if (\ob_get_level()) {
\ob_end_flush();
\ob_implicit_flush();
}
yield from $this->downloadToStream($messageMedia, \fopen('php://output', 'w'), $cb, ...$result->getServeRange());
}
}
/**
* Download file to stream.
*
* @param mixed $messageMedia File to download
* @param mixed|FileCallbackInterface $stream Stream where to download file
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
* @param int $offset Offset where to start downloading
* @param int $end Offset where to end download
*
* @return \Generator<bool>
*/
public function downloadToStream($messageMedia, $stream, $cb = null, int $offset = 0, int $end = -1): \Generator
{
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
if (\is_object($stream) && $stream instanceof FileCallbackInterface) {
$cb = $stream;
$stream = $stream->getFile();
}
/** @var $stream \Amp\ByteStream\OutputStream */
if (!\is_object($stream)) {
$stream = new ResourceOutputStream($stream);
}
if (!$stream instanceof OutputStream) {
throw new Exception("Invalid stream provided");
}
$seekable = false;
if (\method_exists($stream, 'seek')) {
try {
yield $stream->seek($offset);
$seekable = true;
} catch (StreamException $e) {
}
}
$callable = static function (string $payload, int $offset) use ($stream, $seekable): \Generator {
if ($seekable) {
while ($stream->tell() !== $offset) {
yield $stream->seek($offset);
}
}
return yield $stream->write($payload);
};
return yield from $this->downloadToCallable($messageMedia, $callable, $cb, $seekable, $offset, $end);
}
/**
* Download file to amphp/http-server response.
*
* Supports HEAD requests and content-ranges for parallel and resumed downloads.
*
* @param array|string $messageMedia File to download
* @param ServerRequest $request Request
* @param callable $cb Status callback (can also use FileCallback)
*
* @return \Generator<Response> Returned response
*/
public function downloadToResponse($messageMedia, ServerRequest $request, callable $cb = null): \Generator
{
if (\is_object($messageMedia) && $messageMedia instanceof FileCallbackInterface) {
$cb = $messageMedia;
$messageMedia = $messageMedia->getFile();
}
$messageMedia = yield from $this->getDownloadInfo($messageMedia);
$result = ResponseInfo::parseHeaders(
$request->getMethod(),
\array_map(fn (array $headers) => $headers[0], $request->getHeaders()),
$messageMedia
);
$body = null;
if ($result->shouldServe()) {
$body = new IteratorStream(
new Producer(
function (callable $emit) use (&$messageMedia, &$cb, &$result) {
$emit = static function (string $payload) use ($emit): \Generator {
yield $emit($payload);
return \strlen($payload);
};
yield Tools::call($this->downloadToCallable($messageMedia, $emit, $cb, false, ...$result->getServeRange()));
}
)
);
} elseif (!\in_array($result->getCode(), [Status::OK, Status::PARTIAL_CONTENT])) {
$body = $result->getCodeExplanation();
}
$response = new Response($result->getCode(), $result->getHeaders(), $body);
if ($result->shouldServe() && !empty($result->getHeaders()['Content-Length'])) {
$response->setHeader('content-length', $result->getHeaders()['Content-Length']);
}
return $response;
}
/**
* Upload file to secret chat.
*
* @param FileCallbackInterface|string|array $file File, URL or Telegram file to upload
* @param string $fileName File name
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
*
* @return \Generator<array>
*/
public function uploadEncrypted($file, string $fileName = '', $cb = null): \Generator
{
return $this->upload($file, $fileName, $cb, true);
}
/**
* Upload file.
*
* @param FileCallbackInterface|string|array $file File, URL or Telegram file to upload
* @param string $fileName File name
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
* @param boolean $encrypted Whether to encrypt file for secret chats
*
* @return \Generator<array>
*/
public function upload($file, string $fileName = '', $cb = null, bool $encrypted = false): \Generator
{
if (\is_object($file) && $file instanceof FileCallbackInterface) {
$cb = $file;
$file = $file->getFile();
}
if (\is_string($file) || \is_object($file) && \method_exists($file, '__toString')) {
if (\filter_var($file, FILTER_VALIDATE_URL)) {
return yield from $this->uploadFromUrl($file, 0, $fileName, $cb, $encrypted);
}
} elseif (\is_array($file)) {
return yield from $this->uploadFromTgfile($file, $cb, $encrypted);
}
if (\is_resource($file) || (\is_object($file) && $file instanceof InputStream)) {
return yield from $this->uploadFromStream($file, 0, '', $fileName, $cb, $encrypted);
}
if (!$this->settings->getFiles()->getAllowAutomaticUpload()) {
return yield from $this->uploadFromUrl($file, 0, $fileName, $cb, $encrypted);
}
$file = Tools::absolute($file);
if (!yield exists($file)) {
throw new \danog\MadelineProto\Exception(\danog\MadelineProto\Lang::$current_lang['file_not_exist']);
}
if (empty($fileName)) {
$fileName = \basename($file);
}
StatCacheAsync::clear($file);
$size = (yield statAsync($file))['size'];
if ($size > 512 * 1024 * 4000) {
throw new \danog\MadelineProto\Exception('Given file is too big!');
}
$stream = yield open($file, 'rb');
$mime = $this->getMimeFromFile($file);
try {
return yield from $this->uploadFromStream($stream, $size, $mime, $fileName, $cb, $encrypted);
} finally {
yield $stream->close();
}
}
/**
* Upload file from stream.
*
* @param mixed $stream PHP resource or AMPHP async stream
* @param integer $size File size
* @param string $mime Mime type
* @param string $fileName File name
* @param callable $cb Callback (DEPRECATED, use FileCallbackInterface)
* @param boolean $encrypted Whether to encrypt file for secret chats
*
* @return array
*/
public function uploadFromStream($stream, int $size, string $mime, string $fileName = '', $cb = null, bool $encrypted = false): \Generator
{
if (\is_object($stream) && $stream instanceof FileCallbackInterface) {
$cb = $stream;
$stream = $stream->getFile();
}
/* @var $stream \Amp\ByteStream\OutputStream */
if (!\is_object($stream)) {
$stream = new ResourceInputStream($stream);
}
if (!$stream instanceof InputStream) {
throw new Exception("Invalid stream provided");
}
$seekable = false;
if (\method_exists($stream, 'seek')) {
try {
yield $stream->seek(0);
$seekable = true;
} catch (StreamException $e) {
}
}
$created = false;
if ($stream instanceof Handle) {
$callable = static function (int $offset, int $size) use ($stream, $seekable): \Generator {
if ($seekable) {
while ($stream->tell() !== $offset) {
yield $stream->seek($offset);
}
}
return yield $stream->read($size);
};
} else {
if (!$stream instanceof BufferedRawStream) {
$ctx = (new ConnectionContext())->addStream(PremadeStream::class, $stream)->addStream(SimpleBufferedRawStream::class);
$stream = (yield from $ctx->getStream());
$created = true;
}
$callable = static function (int $offset, int $size) use ($stream): \Generator {
$reader = yield $stream->getReadBuffer($l);
try {
return yield $reader->bufferRead($size);
} catch (\danog\MadelineProto\NothingInTheSocketException $e) {
$reader = yield $stream->getReadBuffer($size);
return yield $reader->bufferRead($size);
}
};
$seekable = false;
}
if (!$size && $seekable && \method_exists($stream, 'tell')) {
yield $stream->seek(0, \SEEK_END);
$size = yield $stream->tell();
yield $stream->seek(0);
} elseif (!$size) {
$this->logger->logger("No content length for stream, caching first");
$body = $stream;
$stream = new BlockingFile(\fopen('php://temp', 'r+b'), 'php://temp', 'r+b');
while (null !== ($chunk = yield $body->read())) {
yield $stream->write($chunk);
}
$size = $stream->tell();
if (!$size) {
throw new Exception('Wrong size!');
}
yield $stream->seek(0);
return yield from $this->uploadFromStream($stream, $size, $mime, $fileName, $cb, $encrypted);
}
$res = (yield from $this->uploadFromCallable($callable, $size, $mime, $fileName, $cb, $seekable, $encrypted));
if ($created) {
$stream->disconnect();
}
return $res;
}
}

View File

@ -22,7 +22,6 @@ namespace danog\MadelineProto;
use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;
use danog\MadelineProto\Db\DbArray;
use danog\MadelineProto\Db\DriverArray;
use danog\MadelineProto\Ipc\Server;
use danog\MadelineProto\MTProtoSession\Session;
@ -225,6 +224,7 @@ abstract class Serialization
try {
\clearstatcache(true, $ipcPath);
$socket = yield connect($ipcPath);
Logger::log("Connected to IPC socket!");
if ($cancelFull) {
$cancelFull->resolve(true);
}