Truly parallelized upload for renaming

This commit is contained in:
Daniil Gentili 2019-12-29 18:28:47 +01:00
parent acd66a8bc4
commit 4de96f2e45
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
7 changed files with 172 additions and 187 deletions

View File

@ -63,7 +63,7 @@ class AsyncConstruct
/**
* Set init promise.
*
* @param Promise $promise
* @param Promise $promise Promise
*
* @internal
*

View File

@ -25,36 +25,45 @@ namespace danog\MadelineProto\Async;
*
* @author Daniil Gentili <daniil@daniil.it>
*/
class AsyncParameters extends Parameters
class AsyncParameters
{
/**
* Async callable
*
* @var callable
*/
private $callable;
private $refetchable = true;
public function __construct(callable $callable, bool $refetchable = true)
{
$this->callable = $callable;
$this->refetchable = $refetchable;
}
public function setRefetchable(bool $refetchable)
{
$this->refetchable = $refetchable;
}
public function setCallable(callable $callable)
/**
* Create async parameters
*
* @param callable $callable Async callable that will return parameters
*/
public function __construct(callable $callable)
{
$this->callable = $callable;
}
public function isRefetchable(): bool
/**
* Create async parameters
*
* @param callable $callable Async callable that will return parameters
*/
public function setCallable(callable $callable): void
{
return $this->refetchable;
$this->callable = $callable;
}
/**
* Get parameters asynchronously
*
* @return \Generator<array>|\Amp\Promise<array>
*/
public function getParameters()
{
$callable = $this->callable;
return yield $callable();
return $callable();
}
}

View File

@ -1,67 +0,0 @@
<?php
/**
* Parameters module.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2019 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Async;
use Amp\Promise;
/**
* Parameters module.
*
* Manages asynchronous generation of method parameters
*
* @author Daniil Gentili <daniil@daniil.it>
*/
abstract class Parameters
{
private $params = [];
/**
* Fetch parameters asynchronously.
*
* @return Promise
*/
public function fetchParameters()
{
$refetchable = $this->isRefetchable();
if ($this->params && !$refetchable) {
return $this->params;
}
$params = yield $this->getParameters();
if (!$refetchable) {
$this->params = $params;
}
return $params;
}
/**
* Check if the parameters can be fetched more than once.
*
* @return bool
*/
abstract public function isRefetchable(): bool;
/**
* Gets the parameters asynchronously.
*
* @return \Generator
*/
abstract public function getParameters();
}

View File

@ -22,6 +22,7 @@ namespace danog\MadelineProto\MTProtoSession;
use Amp\Deferred;
use Amp\Promise;
use Amp\Success;
use danog\MadelineProto\Async\AsyncParameters;
use danog\MadelineProto\Async\Parameters;
use danog\MadelineProto\Tools;
@ -203,7 +204,7 @@ trait CallHandler
]
);
if (\is_object($args) && $args instanceof Parameters) {
if (\is_object($args) && $args instanceof AsyncParameters) {
$message['body'] = yield $args->fetchParameters();
} else {
$message['body'] = $args;

View File

@ -670,7 +670,8 @@ trait AuthKeyHandler
}
continue;
}
if (isset($this->init_auth_dcs[$id]) || !$socket->hasConnection(0)) {
yield $socket->waitGetConnection();
if (isset($this->init_auth_dcs[$id])) {
$this->pending_auth = true;
continue;
}

View File

@ -19,6 +19,7 @@
namespace danog\MadelineProto\MTProtoTools;
use Amp\Deferred;
use Amp\Promise;
/**
@ -49,11 +50,21 @@ trait CallHandler
* @param array $args Arguments
* @param array $aargs Additional arguments
*
* @return \Generator<Promise>
* @return Promise
*/
public function methodCallAsyncRead(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator
public function methodCallAsyncRead(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise
{
return (yield $this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc))->methodCallAsyncRead($method, $args, $aargs);
$deferred = new Deferred;
$this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc)->onResolve(
static function ($e, $res) use (&$method, &$args, &$aargs, &$deferred) {
if ($e) {
throw $e;
}
$deferred->resolve($res->methodCallAsyncRead($method, $args, $aargs));
}
);
return $deferred->promise();
}
/**
* Call method and make sure it is asynchronously sent.
@ -62,10 +73,20 @@ trait CallHandler
* @param array $args Arguments
* @param array $aargs Additional arguments
*
* @return \Generator<Promise>
* @return Promise
*/
public function methodCallAsyncWrite(string $method, $args = [], array $aargs = ['msg_id' => null]): \Generator
public function methodCallAsyncWrite(string $method, $args = [], array $aargs = ['msg_id' => null]): Promise
{
return (yield $this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc))->methodCallAsyncWrite($method, $args, $aargs);
$deferred = new Deferred;
$this->datacenter->waitGetConnection($aargs['datacenter'] ?? $this->datacenter->curdc)->onResolve(
static function ($e, $res) use (&$method, &$args, &$aargs, &$deferred) {
if ($e) {
throw $e;
}
$deferred->resolve($res->methodCallAsyncWrite($method, $args, $aargs));
}
);
return $deferred->promise();
}
}

View File

@ -28,12 +28,10 @@ use Amp\File\BlockingFile;
use Amp\File\Handle;
use Amp\File\StatCache;
use Amp\Http\Client\Request;
use Amp\Promise;
use Amp\Success;
use danog\MadelineProto\Async\AsyncParameters;
use danog\MadelineProto\Exception;
use danog\MadelineProto\FileCallbackInterface;
use danog\MadelineProto\Logger;
use danog\MadelineProto\RPCErrorException;
use danog\MadelineProto\Stream\Common\BufferedRawStream;
use danog\MadelineProto\Stream\Common\SimpleBufferedRawStream;
use danog\MadelineProto\Stream\ConnectionContext;
@ -286,47 +284,48 @@ trait Files
\danog\MadelineProto\Tools::callFork($cb($cur * 100 / $part_total_num, $speed, $time));
};
$callable = static function (int $part_num) use ($file_id, $part_total_num, $part_size, $callable, $ctx, $ige) {
$bytes = yield $callable($part_num * $part_size, $part_size);
if ($ige) {
$bytes = $ige->encrypt(\str_pad($bytes, $part_size, \chr(0)));
}
\hash_update($ctx, $bytes);
return ['file_id' => $file_id, 'file_part' => $part_num, 'file_total_parts' => $part_total_num, 'bytes' => $bytes];
};
$resPromises = [];
$start = \microtime(true);
while ($part_num < $part_total_num) {
$read_deferred = yield $this->methodCallAsyncWrite(
$writePromise = $this->methodCallAsyncWrite(
$method,
new AsyncParameters(
static function () use ($file_id, $part_num, $part_total_num, $part_size, $callable, $ctx, $ige) {
static $fetched = false;
$already_fetched = $fetched;
$fetched = true;
$bytes = yield $callable($part_num * $part_size, $part_size);
if ($ige) {
$bytes = $ige->encrypt(\str_pad($bytes, $part_size, \chr(0)));
}
if (!$already_fetched) {
\hash_update($ctx, $bytes);
}
return ['file_id' => $file_id, 'file_part' => $part_num, 'file_total_parts' => $part_total_num, 'bytes' => $bytes];
},
$seekable
),
$callable($part_num),
['heavy' => true, 'file' => true, 'datacenter' => &$datacenter]
);
$read_deferred->promise()->onResolve(static function ($e, $res) use ($cb) {
if ($res) {
if (!$seekable) {
yield $writePromise;
}
$writePromise->onResolve(
static function ($e, $readDeferred) use ($cb, $part_num, &$resPromises) {
if ($e) {
throw $e;
}
$resPromises []= $readDeferred->promise();
// Wrote chunk!
if (!yield Tools::call($readDeferred->promise())) {
throw new \danog\MadelineProto\Exception('Upload of part '.$part_num.' failed');
}
// Got OK from server for chunk!
$cb();
}
});
);
$promises []= $writePromise;
++$part_num;
$part_num++;
$promises[] = $read_deferred->promise();
if (!($part_num % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps (run the code in this every second)
$result = yield \danog\MadelineProto\Tools::all($promises);
foreach ($result as $kkey => $result) {
if (!$result) {
throw new \danog\MadelineProto\Exception('Upload of part '.$kkey.' failed');
}
}
if (!($part_num % $parallel_chunks)) { // By default, 10 mb at a time, for a typical bandwidth of 1gbps (run the code in this every second)
yield Tools::all($promises);
$promises = [];
$time = \microtime(true) - $start;
@ -336,12 +335,9 @@ trait Files
}
}
$result = yield all($promises);
foreach ($result as $kkey => $result) {
if (!$result) {
throw new \danog\MadelineProto\Exception('Upload of part '.$kkey.' failed');
}
}
yield all($promises);
yield all($resPromises);
$time = \microtime(true) - $start;
$speed = (int) (($size * 8) / $time) / 1000000;
$this->logger->logger("Total upload time: $time");
@ -396,55 +392,92 @@ trait Files
$chunk_size = $this->settings['upload']['part_size'];
$bridge = new class {
private $done = [];
private $pending = [];
public $nextRead;
public $size;
public $part_size;
$bridge = new class($size, $chunk_size) {
/**
* Read promises.
*
* @var Deferred[]
*/
private $read = [];
/**
* Write promises.
*
* @var Deferred[]
*/
private $write = [];
/**
* Part size.
*
* @var int
*/
private $partSize;
/**
* Offset for callback
*
* @var int
*/
private $offset = 0;
public function read(int $offset, int $size)
/**
* Constructor.
*
* @param integer $size Total file size
* @param integer $partSize Part size
*/
public function __construct(int $size, int $partSize)
{
$nextRead = $this->nextRead;
$this->nextRead = new Deferred;
if ($nextRead) {
$nextRead->resolve(true);
for ($x = 0; $x < $size; $x += $partSize) {
$this->read []= new Deferred;
$this->write []= new Deferred;
}
if (isset($this->done[$offset])) {
if (\strlen($this->done[$offset]) > $size) {
throw new Exception('Wrong size!');
}
$result = $this->done[$offset];
unset($this->done[$offset]);
return $result;
}
$this->pending[$offset] = new Deferred;
return $this->pending[$offset]->promise();
$this->partSize = $partSize;
}
public function write(string $data, int $offset)
/**
* Read chunk.
*
* @param integer $offset Offset
* @param integer $size Chunk size
*
* @return Promise
*/
public function read(int $offset, int $size): Promise
{
if (isset($this->pending[$offset])) {
$promise = $this->pending[$offset];
unset($this->pending[$offset]);
$promise->resolve($data);
} else {
$this->done[$offset] = $data;
}
$length = \strlen($data);
if ($offset + $length === $this->size || $length < $this->part_size) {
return;
}
return $this->nextRead->promise();
$offset /= $this->partSize;
return $this->write[$offset]->promise();
}
/**
* Write chunk.
*
* @param string $data Data
* @param integer $offset Offset
*
* @return Promise
*/
public function write(string $data, int $offset): Promise
{
$offset /= $this->partSize;
$this->write[$offset]->resolve($data);
return $this->read[$offset]->promise();
}
/**
* Read callback, called when the chunk is read and fully resent
*
* @return void
*/
public function callback(): void
{
$this->read[$this->offset++]->resolve();
}
};
$bridge->size = $size;
$bridge->part_size = $chunk_size;
$reader = [$bridge, 'read'];
$writer = [$bridge, 'write'];
$read = $this->uploadFromCallable($reader, $size, $mime, '', $cb, false, $encrypted);
$cb = static function (...$params) use ($cb, $bridge): \Generator {
$bridge->callback();
yield $cb(...$params);
};
$read = $this->uploadFromCallable($reader, $size, $mime, '', $cb, true, $encrypted);
$write = $this->downloadToCallable($media, $writer, null, true, 0, -1, $chunk_size);
list($res) = yield \danog\MadelineProto\Tools::all([$read, $write]);
@ -1264,19 +1297,6 @@ trait Files
break;
} catch (\danog\MadelineProto\RPCErrorException $e) {
if (\strpos($e->rpc, 'FLOOD_WAIT_') === 0) {
/*if ($x++ === 5) {
if (isset($message_media['MessageMedia']) && !$this->authorization['user']['bot'] && $this->settings['download']['report_broken_media']) {
try {
yield $this->methodCallAsyncRead('messages.sendMedia', ['peer' => 'support', 'media' => $message_media['MessageMedia'], 'message' => "I can't download this file, could you please help?"], ['datacenter' => $this->datacenter->curdc]);
} catch (RPCErrorException $e) {
$this->logger->logger('An error occurred while reporting the broken file: '.$e->rpc, Logger::FATAL_ERROR);
} catch (Exception $e) {
$this->logger->logger('An error occurred while reporting the broken file: '.$e->getMessage(), Logger::FATAL_ERROR);
}
}
throw new \danog\MadelineProto\Exception('The media server where this file is hosted is offline/overloaded, please try again later. Send the media to the telegram devs or to @danogentili to fix this.');
}*/
yield Tools::sleep(1);
continue;
}