From 4de96f2e459d9079ec03751e108418de19dcdc73 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sun, 29 Dec 2019 18:28:47 +0100 Subject: [PATCH] Truly parallelized upload for renaming --- .../MadelineProto/Async/AsyncConstruct.php | 2 +- .../MadelineProto/Async/AsyncParameters.php | 43 ++-- src/danog/MadelineProto/Async/Parameters.php | 67 ------ .../MTProtoSession/CallHandler.php | 3 +- .../MTProtoTools/AuthKeyHandler.php | 3 +- .../MTProtoTools/CallHandler.php | 33 ++- .../MadelineProto/MTProtoTools/Files.php | 208 ++++++++++-------- 7 files changed, 172 insertions(+), 187 deletions(-) delete mode 100644 src/danog/MadelineProto/Async/Parameters.php diff --git a/src/danog/MadelineProto/Async/AsyncConstruct.php b/src/danog/MadelineProto/Async/AsyncConstruct.php index 7354db4b..3004628c 100644 --- a/src/danog/MadelineProto/Async/AsyncConstruct.php +++ b/src/danog/MadelineProto/Async/AsyncConstruct.php @@ -63,7 +63,7 @@ class AsyncConstruct /** * Set init promise. * - * @param Promise $promise + * @param Promise $promise Promise * * @internal * diff --git a/src/danog/MadelineProto/Async/AsyncParameters.php b/src/danog/MadelineProto/Async/AsyncParameters.php index 99f0474c..fdb737d7 100644 --- a/src/danog/MadelineProto/Async/AsyncParameters.php +++ b/src/danog/MadelineProto/Async/AsyncParameters.php @@ -25,36 +25,45 @@ namespace danog\MadelineProto\Async; * * @author Daniil Gentili */ -class AsyncParameters extends Parameters +class AsyncParameters { + /** + * Async callable + * + * @var callable + */ private $callable; - private $refetchable = true; - public function __construct(callable $callable, bool $refetchable = true) - { - $this->callable = $callable; - $this->refetchable = $refetchable; - } - - public function setRefetchable(bool $refetchable) - { - $this->refetchable = $refetchable; - } - - public function setCallable(callable $callable) + /** + * Create async parameters + * + * @param callable $callable Async callable that will return parameters + */ + public function __construct(callable $callable) { $this->callable = $callable; } - public function isRefetchable(): bool + + /** + * Create async parameters + * + * @param callable $callable Async callable that will return parameters + */ + public function setCallable(callable $callable): void { - return $this->refetchable; + $this->callable = $callable; } + /** + * Get parameters asynchronously + * + * @return \Generator|\Amp\Promise + */ public function getParameters() { $callable = $this->callable; - return yield $callable(); + return $callable(); } } diff --git a/src/danog/MadelineProto/Async/Parameters.php b/src/danog/MadelineProto/Async/Parameters.php deleted file mode 100644 index 6eb36b82..00000000 --- a/src/danog/MadelineProto/Async/Parameters.php +++ /dev/null @@ -1,67 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2016-2019 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - * - * @link https://docs.madelineproto.xyz MadelineProto documentation - */ - -namespace danog\MadelineProto\Async; - -use Amp\Promise; - -/** - * Parameters module. - * - * Manages asynchronous generation of method parameters - * - * @author Daniil Gentili - */ -abstract class Parameters -{ - private $params = []; - - /** - * Fetch parameters asynchronously. - * - * @return Promise - */ - public function fetchParameters() - { - $refetchable = $this->isRefetchable(); - if ($this->params && !$refetchable) { - return $this->params; - } - $params = yield $this->getParameters(); - - if (!$refetchable) { - $this->params = $params; - } - - return $params; - } - - /** - * Check if the parameters can be fetched more than once. - * - * @return bool - */ - abstract public function isRefetchable(): bool; - - /** - * Gets the parameters asynchronously. - * - * @return \Generator - */ - abstract public function getParameters(); -} diff --git a/src/danog/MadelineProto/MTProtoSession/CallHandler.php b/src/danog/MadelineProto/MTProtoSession/CallHandler.php index 0b101589..d9ae23f0 100644 --- a/src/danog/MadelineProto/MTProtoSession/CallHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/CallHandler.php @@ -22,6 +22,7 @@ namespace danog\MadelineProto\MTProtoSession; use Amp\Deferred; use Amp\Promise; use Amp\Success; +use danog\MadelineProto\Async\AsyncParameters; use danog\MadelineProto\Async\Parameters; use danog\MadelineProto\Tools; @@ -203,7 +204,7 @@ trait CallHandler ] ); - if (\is_object($args) && $args instanceof Parameters) { + if (\is_object($args) && $args instanceof AsyncParameters) { $message['body'] = yield $args->fetchParameters(); } else { $message['body'] = $args; diff --git a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php index d6ca3bb9..35807691 100644 --- a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php @@ -670,7 +670,8 @@ trait AuthKeyHandler } continue; } - if (isset($this->init_auth_dcs[$id]) || !$socket->hasConnection(0)) { + yield $socket->waitGetConnection(); + if (isset($this->init_auth_dcs[$id])) { $this->pending_auth = true; continue; } diff --git a/src/danog/MadelineProto/MTProtoTools/CallHandler.php b/src/danog/MadelineProto/MTProtoTools/CallHandler.php index 48ebe820..3ac73f81 100644 --- a/src/danog/MadelineProto/MTProtoTools/CallHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/CallHandler.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto\MTProtoTools; +use Amp\Deferred; use Amp\Promise; /** @@ -49,11 +50,21 @@ trait CallHandler * @param array $args Arguments * @param array $aargs Additional arguments * - * @return \Generator + * @return Promise */ - public function methodCallAsyncRead(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator + public function methodCallAsyncRead(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise { - return (yield $this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc))->methodCallAsyncRead($method, $args, $aargs); + $deferred = new Deferred; + $this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc)->onResolve( + static function ($e, $res) use (&$method, &$args, &$aargs, &$deferred) { + if ($e) { + throw $e; + } + $deferred->resolve($res->methodCallAsyncRead($method, $args, $aargs)); + } + ); + + return $deferred->promise(); } /** * Call method and make sure it is asynchronously sent. @@ -62,10 +73,20 @@ trait CallHandler * @param array $args Arguments * @param array $aargs Additional arguments * - * @return \Generator + * @return Promise */ - public function methodCallAsyncWrite(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator + public function methodCallAsyncWrite(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise { - return (yield $this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc))->methodCallAsyncWrite($method, $args, $aargs); + $deferred = new Deferred; + $this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc)->onResolve( + static function ($e, $res) use (&$method, &$args, &$aargs, &$deferred) { + if ($e) { + throw $e; + } + $deferred->resolve($res->methodCallAsyncWrite($method, $args, $aargs)); + } + ); + + return $deferred->promise(); } } diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 50c74a99..be8bfd70 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -28,12 +28,10 @@ use Amp\File\BlockingFile; use Amp\File\Handle; use Amp\File\StatCache; use Amp\Http\Client\Request; +use Amp\Promise; use Amp\Success; -use danog\MadelineProto\Async\AsyncParameters; use danog\MadelineProto\Exception; use danog\MadelineProto\FileCallbackInterface; -use danog\MadelineProto\Logger; -use danog\MadelineProto\RPCErrorException; use danog\MadelineProto\Stream\Common\BufferedRawStream; use danog\MadelineProto\Stream\Common\SimpleBufferedRawStream; use danog\MadelineProto\Stream\ConnectionContext; @@ -286,47 +284,48 @@ trait Files \danog\MadelineProto\Tools::callFork($cb($cur * 100 / $part_total_num, $speed, $time)); }; + $callable = static function (int $part_num) use ($file_id, $part_total_num, $part_size, $callable, $ctx, $ige) { + $bytes = yield $callable($part_num * $part_size, $part_size); + + if ($ige) { + $bytes = $ige->encrypt(\str_pad($bytes, $part_size, \chr(0))); + } + \hash_update($ctx, $bytes); + + return ['file_id' => $file_id, 'file_part' => $part_num, 'file_total_parts' => $part_total_num, 'bytes' => $bytes]; + }; + + $resPromises = []; + $start = \microtime(true); while ($part_num < $part_total_num) { - $read_deferred = yield $this->methodCallAsyncWrite( + $writePromise = $this->methodCallAsyncWrite( $method, - new AsyncParameters( - static function () use ($file_id, $part_num, $part_total_num, $part_size, $callable, $ctx, $ige) { - static $fetched = false; - $already_fetched = $fetched; - $fetched = true; - - $bytes = yield $callable($part_num * $part_size, $part_size); - - if ($ige) { - $bytes = $ige->encrypt(\str_pad($bytes, $part_size, \chr(0))); - } - if (!$already_fetched) { - \hash_update($ctx, $bytes); - } - - return ['file_id' => $file_id, 'file_part' => $part_num, 'file_total_parts' => $part_total_num, 'bytes' => $bytes]; - }, - $seekable - ), + $callable($part_num), ['heavy' => true, 'file' => true, 'datacenter' => &$datacenter] ); - $read_deferred->promise()->onResolve(static function ($e, $res) use ($cb) { - if ($res) { + if (!$seekable) { + yield $writePromise; + } + $writePromise->onResolve( + static function ($e, $readDeferred) use ($cb, $part_num, &$resPromises) { + if ($e) { + throw $e; + } + $resPromises []= $readDeferred->promise(); + // Wrote chunk! + if (!yield Tools::call($readDeferred->promise())) { + throw new \danog\MadelineProto\Exception('Upload of part '.$part_num.' failed'); + } + // Got OK from server for chunk! $cb(); } - }); + ); + $promises []= $writePromise; + ++$part_num; - $part_num++; - $promises[] = $read_deferred->promise(); - - if (!($part_num % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps (run the code in this every second) - $result = yield \danog\MadelineProto\Tools::all($promises); - foreach ($result as $kkey => $result) { - if (!$result) { - throw new \danog\MadelineProto\Exception('Upload of part '.$kkey.' failed'); - } - } + if (!($part_num % $parallel_chunks)) { // By default, 10 mb at a time, for a typical bandwidth of 1gbps (run the code in this every second) + yield Tools::all($promises); $promises = []; $time = \microtime(true) - $start; @@ -336,12 +335,9 @@ trait Files } } - $result = yield all($promises); - foreach ($result as $kkey => $result) { - if (!$result) { - throw new \danog\MadelineProto\Exception('Upload of part '.$kkey.' failed'); - } - } + yield all($promises); + yield all($resPromises); + $time = \microtime(true) - $start; $speed = (int) (($size * 8) / $time) / 1000000; $this->logger->logger("Total upload time: $time"); @@ -396,55 +392,92 @@ trait Files $chunk_size = $this->settings['upload']['part_size']; - $bridge = new class { - private $done = []; - private $pending = []; - public $nextRead; - public $size; - public $part_size; + $bridge = new class($size, $chunk_size) { + /** + * Read promises. + * + * @var Deferred[] + */ + private $read = []; + /** + * Write promises. + * + * @var Deferred[] + */ + private $write = []; + /** + * Part size. + * + * @var int + */ + private $partSize; + /** + * Offset for callback + * + * @var int + */ + private $offset = 0; - public function read(int $offset, int $size) + /** + * Constructor. + * + * @param integer $size Total file size + * @param integer $partSize Part size + */ + public function __construct(int $size, int $partSize) { - $nextRead = $this->nextRead; - $this->nextRead = new Deferred; - - if ($nextRead) { - $nextRead->resolve(true); + for ($x = 0; $x < $size; $x += $partSize) { + $this->read []= new Deferred; + $this->write []= new Deferred; } - - if (isset($this->done[$offset])) { - if (\strlen($this->done[$offset]) > $size) { - throw new Exception('Wrong size!'); - } - $result = $this->done[$offset]; - unset($this->done[$offset]); - return $result; - } - $this->pending[$offset] = new Deferred; - return $this->pending[$offset]->promise(); + $this->partSize = $partSize; } - public function write(string $data, int $offset) + /** + * Read chunk. + * + * @param integer $offset Offset + * @param integer $size Chunk size + * + * @return Promise + */ + public function read(int $offset, int $size): Promise { - if (isset($this->pending[$offset])) { - $promise = $this->pending[$offset]; - unset($this->pending[$offset]); - $promise->resolve($data); - } else { - $this->done[$offset] = $data; - } - $length = \strlen($data); - if ($offset + $length === $this->size || $length < $this->part_size) { - return; - } - return $this->nextRead->promise(); + $offset /= $this->partSize; + return $this->write[$offset]->promise(); + } + /** + * Write chunk. + * + * @param string $data Data + * @param integer $offset Offset + * + * @return Promise + */ + public function write(string $data, int $offset): Promise + { + $offset /= $this->partSize; + $this->write[$offset]->resolve($data); + return $this->read[$offset]->promise(); + } + /** + * Read callback, called when the chunk is read and fully resent + * + * @return void + */ + public function callback(): void + { + $this->read[$this->offset++]->resolve(); } }; - $bridge->size = $size; - $bridge->part_size = $chunk_size; $reader = [$bridge, 'read']; $writer = [$bridge, 'write']; - $read = $this->uploadFromCallable($reader, $size, $mime, '', $cb, false, $encrypted); + $cb = static function (...$params) use ($cb, $bridge): \Generator { + $bridge->callback(); + yield $cb(...$params); + }; + + $read = $this->uploadFromCallable($reader, $size, $mime, '', $cb, true, $encrypted); $write = $this->downloadToCallable($media, $writer, null, true, 0, -1, $chunk_size); list($res) = yield \danog\MadelineProto\Tools::all([$read, $write]); @@ -1264,19 +1297,6 @@ trait Files break; } catch (\danog\MadelineProto\RPCErrorException $e) { if (\strpos($e->rpc, 'FLOOD_WAIT_') === 0) { - /*if ($x++ === 5) { - if (isset($message_media['MessageMedia']) && !$this->authorization['user']['bot'] && $this->settings['download']['report_broken_media']) { - try { - yield $this->methodCallAsyncRead('messages.sendMedia', ['peer' => 'support', 'media' => $message_media['MessageMedia'], 'message' => "I can't download this file, could you please help?"], ['datacenter' => $this->datacenter->curdc]); - } catch (RPCErrorException $e) { - $this->logger->logger('An error occurred while reporting the broken file: '.$e->rpc, Logger::FATAL_ERROR); - } catch (Exception $e) { - $this->logger->logger('An error occurred while reporting the broken file: '.$e->getMessage(), Logger::FATAL_ERROR); - } - } - - throw new \danog\MadelineProto\Exception('The media server where this file is hosted is offline/overloaded, please try again later. Send the media to the telegram devs or to @danogentili to fix this.'); - }*/ yield Tools::sleep(1); continue; }