From 7dbf529bb16ec1d06f2f62c9fc43f41c676a533a Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Mon, 24 Jun 2019 18:55:37 +0200 Subject: [PATCH] Implement parallelized upload --- .../MadelineProto/MTProtoTools/Files.php | 269 ++++++++++++++---- .../Stream/Common/BufferedRawStream.php | 5 +- .../Stream/Transport/DefaultStream.php | 2 +- 3 files changed, 219 insertions(+), 57 deletions(-) diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index f5f21197..5f85ab9f 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -19,9 +19,12 @@ namespace danog\MadelineProto\MTProtoTools; +use Amp\ByteStream\InputStream; use Amp\ByteStream\OutputStream; use Amp\ByteStream\ResourceOutputStream; use Amp\ByteStream\StreamException; +use Amp\Deferred; +use Amp\File\Handle; use Amp\File\StatCache; use Amp\Promise; use Amp\Success; @@ -30,8 +33,13 @@ use danog\MadelineProto\Exception; use danog\MadelineProto\FileCallbackInterface; use danog\MadelineProto\Logger; use danog\MadelineProto\RPCErrorException; +use danog\MadelineProto\Stream\Common\BufferedRawStream; +use danog\MadelineProto\Stream\ConnectionContext; +use danog\MadelineProto\Stream\Transport\PremadeStream; +use function Amp\File\exists; use function Amp\File\open; use function Amp\File\stat; +use function Amp\File\touch; use function Amp\Promise\all; /** @@ -39,45 +47,112 @@ use function Amp\Promise\all; */ trait Files { - public function upload_async($file, $file_name = '', $cb = null, $encrypted = false): \Generator + public function upload_async($file, $file_name = '', $cb = null, $encrypted = false) { - if (is_object($file)) { - if (!$file instanceof FileCallbackInterface) { - throw new \danog\MadelineProto\Exception('Provided object does not implement FileCallbackInterface'); - } + if (is_object($file) && $file instanceof FileCallbackInterface) { $cb = $file; $file = $file->getFile(); } + if (is_string($file) || (is_object($file) && method_exists($file, '__toString'))) { + if (filter_var($file, FILTER_VALIDATE_URL)) { + return yield $this->upload_from_url_async($file); + } + } else if (is_array($file)) { + return yield $this->upload_from_tgfile($file, $cb, $encrypted); + } $file = \danog\MadelineProto\Absolute::absolute($file); - if (!file_exists($file)) { + if (!yield exists($file)) { throw new \danog\MadelineProto\Exception(\danog\MadelineProto\Lang::$current_lang['file_not_exist']); } if (empty($file_name)) { $file_name = basename($file); } - $datacenter = $this->settings['connection_settings']['default_dc']; - if (isset($this->datacenter->sockets[$datacenter.'_media'])) { - $datacenter .= '_media'; - } - $file_size = filesize($file); - if ($file_size > 512 * 1024 * 3000) { + + StatCache::clear($file); + + $size = (yield stat($file))['size']; + if ($size > 512 * 1024 * 3000) { throw new \danog\MadelineProto\Exception('Given file is too big!'); } - if ($cb === null) { - $cb = function ($percent) { - $this->logger->logger('Upload status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE); + + $stream = yield open($file, 'rb'); + $mime = $this->get_mime_from_file($file); + + try { + return yield $this->upload_from_stream_async($stream, $size, $stream, $mime, $cb, $encrypted); + } finally { + yield $stream->close(); + } + } + public function upload_from_url_async($url, int $size = 0, string $file_name = '', $cb = null, bool $encrypted = false) + { + if (is_object($url) && $url instanceof FileCallbackInterface) { + $cb = $url; + $url = $url->getFile(); + } + /** @var $response \Amp\Artax\Response */ + $response = yield $this->datacenter->getHTTPClient()->request($url); + if (200 !== $status = $response->getStatusCode) { + throw new Exception("Wrong status code: $status"); + } + $mime = trim(explode(';', $response->getHeader('content-type') ?? 'application/octet-stream')[0]); + $size = $response->getHeader('content-length') ?? $size; + if (!$size) { + throw new Exception('Wrong size'); + } + + $body = $response->getBody(); + + return yield $this->upload_from_stream_async($body, $size, $mime, $file_name, $cb, $encrypted); + } + public function upload_from_stream_async($stream, int $size, string $mime, string $file_name = '', $cb = null, bool $encrypted = false) + { + if (is_object($stream) && $stream instanceof FileCallbackInterface) { + $cb = $stream; + $stream = $stream->getFile(); + } + + /** @var $stream \Amp\ByteStream\OutputStream */ + if (!is_object($stream)) { + $stream = new ResourceOutputStream($stream); + } + if (!$stream instanceof InputStream) { + throw new Exception("Invalid stream provided"); + } + $seekable = false; + if (method_exists($stream, 'seek')) { + try { + yield $stream->seek(0); + $seekable = true; + } catch (StreamException $e) { + } + } + + if ($stream instanceof Handle || $stream instanceof BufferedRawStream) { + $callable = static function (int $offset, int $size) use ($stream, $seekable) { + if ($seekable) { + while ($stream->tell() !== $offset) { + yield $stream->seek($offset); + } + } + return yield $stream->read($size); + }; + } else { + $ctx = (new ConnectionContext) + ->addStream(PremadeStream::getName(), $stream) + ->addStream(BufferedRawStream); + + $stream = yield $ctx->getStream(); + + $callable = static function (int $offset, int $size) use ($stream) { + return yield $stream->read($size); }; } - $f = fopen($file, 'r'); - - $seekable = stream_get_meta_data($f)['seekable']; - if ($seekable) { - fseek($f, 0); - } + return yield $this->upload_from_callable_async($callable, $size, $mime, $file_name, $cb, $seekable, $encrypted); } - public function upload_from_callable($callable, $size, $file_name = '', $cb = null, $encrypted = false) + public function upload_from_callable_async($callable, int $size, string $mime, string $file_name = '', $cb = null, bool $refetchable = true, bool $encrypted = false) { if (is_object($callable) && $callable instanceof FileCallbackInterface) { $cb = $callable; @@ -86,8 +161,20 @@ trait Files if (!is_callable($callable)) { throw new Exception('Invalid callable provided'); } + if ($cb === null) { + $cb = function ($percent) { + $this->logger->logger('Upload status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE); + }; + } + + $datacenter = $this->settings['connection_settings']['default_dc']; + if (isset($this->datacenter->sockets[$datacenter.'_media'])) { + $datacenter .= '_media'; + } $part_size = $this->settings['upload']['part_size']; + $parallel_chunks = $this->settings['upload']['parallel_chunks'] ? $this->settings['upload']['parallel_chunks'] : 3000; + $part_total_num = (int) ceil($size / $part_size); $part_num = 0; $method = $size > 10 * 1024 * 1024 ? 'upload.saveBigFilePart' : 'upload.saveFilePart'; @@ -104,40 +191,64 @@ trait Files $ige->setIV($iv); $ige->setKey($key); $ige->enableContinuousBuffer(); - $parallelize = false; + $refetchable = false; } $ctx = hash_init('md5'); $promises = []; - $cur_part_num = 0; + $cb = function () use ($cb, $part_total_num) { + static $cur = 0; + $cur++; + $this->callFork($cb($cur * 100 / $part_total_num)); + }; + + $start = microtime(true); while ($part_num < $part_total_num) { - $t = microtime(true); $read_deferred = yield $this->method_call_async_write( $method, new AsyncParameters( - static function () use ($file_id, $part_num, $part_total_num, $part_size, $f, $ctx, $ige, $seekable) { - if ($seekable) { - fseek($f, $part_num * $part_size); - } elseif (ftell($f) !== $part_num * $part_size) { - throw new \danog\MadelineProto\Exception('Wrong position!'); - } + static function () use ($file_id, $part_num, $part_total_num, $part_size, $callable, $ctx, $ige) { + static $fetched = false; + $already_fetched = $fetched; + $fetched = true; - $bytes = stream_get_contents($f, $part_size); + $bytes = yield $callable($part_num * $part_size, $part_size); + if (!$already_fetched) { + hash_update($ctx, $bytes); + } if ($ige) { $bytes = $ige->encrypt(str_pad($bytes, $part_size, chr(0))); } - hash_update($ctx, $bytes); return ['file_id' => $file_id, 'file_part' => $part_num, 'file_total_parts' => $part_total_num, 'bytes' => $bytes]; }, - $seekable + $refetchable ), - ['heavy' => true, 'file' => true, 'datacenter' => $datacenter] + ['heavy' => true, 'file' => true, 'datacenter' => &$datacenter] ); - $this->callFork($cb(ftell($f) * 100 / $file_size)); - $this->logger->logger('Speed for chunk: '.(($part_size * 8 / 1000000) / (microtime(true) - $t))); + $read_deferred->onResolve(static function ($e, $res) use ($cb) { + if ($res) { + $cb(); + } + }); + $part_num++; $promises[] = $read_deferred->promise(); + + if (!($part_num % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps (run the code in this if every second) + $result = yield $this->all($promises); + foreach ($result as $kkey => $result) { + if (!$result) { + throw new \danog\MadelineProto\Exception('Upload of part '.$kkey.' failed'); + } + } + $promises = []; + + $time = microtime(true) - $start; + $speed = (int) (($size * 8) / $time) / 1000000; + $this->logger->logger("Partial download time: $time"); + $this->logger->logger("Partial download speed: $speed mbps"); + } } $result = yield all($promises); @@ -146,18 +257,18 @@ trait Files throw new \danog\MadelineProto\Exception('Upload of part '.$kkey.' failed'); } } + $time = microtime(true) - $start; + $speed = (int) (($size * 8) / $time) / 1000000; + $this->logger->logger("Total download time: $time"); + $this->logger->logger("Total download speed: $speed mbps"); - $constructor = ['_' => $constructor, 'id' => $file_id, 'parts' => $part_total_num, 'name' => $file_name, 'mime_type' => $this->get_mime_from_file($file)]; + $constructor = ['_' => $constructor, 'id' => $file_id, 'parts' => $part_total_num, 'name' => $file_name, 'mime_type' => $mime]; if ($encrypted === true) { $constructor['key_fingerprint'] = $fingerprint; $constructor['key'] = $key; $constructor['iv'] = $iv; } - - fclose($f); - clearstatcache(); - - $this->logger->logger('Speed: '.(($file_size * 8) / (microtime(true) - $t) / 1000000)); + $constructor['md5_checksum'] = hash_final($ctx); return $constructor; } @@ -167,7 +278,56 @@ trait Files return $this->upload_async($file, $file_name, $cb, true); } - public function gen_all_file_async($media, $regenerate) + public function upload_from_tgfile_async($media, $cb = null, $encrypted = false) + { + if (is_object($media) && $media instanceof FileCallbackInterface) { + $cb = $media; + $media = $media->getFile(); + } + $media = yield $this->get_download_info_async($media); + if (!isset($media['size'], $media['mime'])) { + throw new Exception('Wrong file provided!'); + } + $size = $media['size']; + $mime = $media['mime']; + + $bridge = new class + { + private $done = []; + private $pending = []; + public function write(string $data, int $offset) + { + if (isset($this->pending[$offset])) { + $promise = $this->pending[$offset]; + unset($this->pending[$offset]); + $promise->resolve($data); + } else { + $this->done[$offset] = $data; + } + } + public function read(int $offset, int $size) + { + if (isset($this->done[$offset])) { + if (strlen($this->done[$offset]) > $size) { + throw new Exception('Wrong size!'); + } + $result = $this->done[$offset]; + unset($this->done[$offset]); + return $result; + } + $this->pending[$offset] = new Deferred; + return $this->pending[$offset]->promise(); + } + }; + $reader = [$bridge, 'read']; + $writer = [$bridge, 'write']; + yield $this->all([ + $this->download_to_callable_async($media, $writer, $cb), + $this->upload_from_callable_async($reader, $size, $mime, '', $cb, false, $encrypted), + ]); + } + + public function gen_all_file_async($media) { $res = [$this->constructors->find_by_predicate($media['_'])['type'] => $media]; switch ($media['_']) { @@ -260,7 +420,7 @@ trait Files return $res; } - public function get_file_info_async($constructor, $regenerate = false) + public function get_file_info_async($constructor) { if (is_string($constructor)) { $constructor = $this->unpack_file_id($constructor)['MessageMedia']; @@ -276,7 +436,7 @@ trait Files $constructor = $constructor['media']; } - return yield $this->gen_all_file_async($constructor, $regenerate); + return yield $this->gen_all_file_async($constructor); } public function get_propic_info_async($data) { @@ -644,8 +804,8 @@ trait Files $file = $file->getFile(); } $file = \danog\MadelineProto\Absolute::absolute(preg_replace('|/+|', '/', $file)); - if (!file_exists($file)) { - touch($file); + if (!yield exists($file)) { + yield touch($file); } $file = realpath($file); $message_media = yield $this->get_download_info_async($message_media); @@ -677,12 +837,6 @@ trait Files $stream = $stream->getFile(); } - if ($cb === null) { - $cb = function ($percent) { - $this->logger->logger('Download status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE); - }; - } - /** @var $stream \Amp\ByteStream\OutputStream */ if (!is_object($stream)) { $stream = new ResourceOutputStream($stream); @@ -721,13 +875,18 @@ trait Files if (!is_callable($callable)) { throw new Exception('Wrong callable provided'); } + if ($cb === null) { + $cb = function ($percent) { + $this->logger->logger('Download status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE); + }; + } if ($end === -1 && isset($message_media['size'])) { $end = $message_media['size']; } $part_size = $this->settings['download']['part_size']; - $parallel_chunks = $this->settings['download']['parallel_chunks']; + $parallel_chunks = $this->settings['download']['parallel_chunks'] ? $this->settings['download']['parallel_chunks'] : 3000; $datacenter = isset($message_media['InputFileLocation']['dc_id']) ? $message_media['InputFileLocation']['dc_id'] : $this->settings['connection_settings']['default_dc']; if (isset($this->datacenter->sockets[$datacenter.'_media'])) { diff --git a/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php b/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php index 129380e9..a25a983b 100644 --- a/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php +++ b/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php @@ -24,13 +24,16 @@ use danog\MadelineProto\Exception; use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\ConnectionContext; use function Amp\Socket\connect; +use danog\MadelineProto\Stream\BufferedStreamInterface; +use danog\MadelineProto\Stream\BufferInterface; +use danog\MadelineProto\Stream\RawStreamInterface; /** * Buffered raw stream. * * @author Daniil Gentili */ -class BufferedRawStream implements \danog\MadelineProto\Stream\BufferedStreamInterface, \danog\MadelineProto\Stream\BufferInterface, \danog\MadelineProto\Stream\RawStreamInterface +class BufferedRawStream implements BufferedStreamInterface, BufferInterface, RawStreamInterface { use RawStream; diff --git a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php index 9b4a3fdb..a01fe5b8 100644 --- a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php +++ b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php @@ -47,7 +47,7 @@ class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInt public function enableCrypto(ClientTlsContext $tlsContext = null): \Amp\Promise { - return $this->enableCrypto($tlsContext); + return $this->stream->enableCrypto($tlsContext); } public function getStream()