From 698d0cb59f6d9c35347c8ffb75749dabfc581bbf Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sun, 23 Jun 2019 17:46:51 +0200 Subject: [PATCH] Parallelize downloads --- docs | 2 +- phar.php | 4 +- src/danog/MadelineProto/API.php | 58 ++-- .../Loop/Connection/ReadLoop.php | 7 +- src/danog/MadelineProto/MTProto.php | 4 +- .../MadelineProto/MTProtoTools/Files.php | 278 ++++++++++++------ src/danog/MadelineProto/Tools.php | 38 +++ 7 files changed, 253 insertions(+), 138 deletions(-) diff --git a/docs b/docs index 7a17668c..02967b30 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit 7a17668c568e4a17e6b08f02347bd3f139d8554f +Subproject commit 02967b3019032399364a9fc134f7bb0e674c099a diff --git a/phar.php b/phar.php index 84dd3132..e483d944 100644 --- a/phar.php +++ b/phar.php @@ -61,10 +61,10 @@ function ___install_madeline() $release_branch = ''; } $release_fallback_branch = ''; - if (isset($_SERVER['SERVER_ADMIN']) && strpos($_SERVER['SERVER_ADMIN'], '000webhost.io') && $custom_branch === null) { + /*if (isset($_SERVER['SERVER_ADMIN']) && strpos($_SERVER['SERVER_ADMIN'], '000webhost.io') && $custom_branch === null) { $release_branch = '-deprecated'; $release_fallback_branch = '-deprecated'; - } + }*/ if (PHP_MAJOR_VERSION <= 5) { $release_branch = '5'.$release_branch; diff --git a/src/danog/MadelineProto/API.php b/src/danog/MadelineProto/API.php index 3bd7b931..fb76a4c0 100644 --- a/src/danog/MadelineProto/API.php +++ b/src/danog/MadelineProto/API.php @@ -20,6 +20,10 @@ namespace danog\MadelineProto; use Amp\Deferred; +use function Amp\File\put; +use function Amp\File\rename; +use function Amp\File\get; +use function Amp\File\exists; class API extends APIFactory { @@ -62,21 +66,15 @@ class API extends APIFactory $realpaths = Serialization::realpaths($params); $this->session = $realpaths['file']; - if (file_exists($realpaths['file'])) { - if (!file_exists($realpaths['lockfile'])) { - touch($realpaths['lockfile']); - clearstatcache(); - } - $realpaths['lockfile'] = fopen($realpaths['lockfile'], 'r'); - \danog\MadelineProto\Logger::log('Waiting for shared lock of serialization lockfile...'); - flock($realpaths['lockfile'], LOCK_SH); - \danog\MadelineProto\Logger::log('Shared lock acquired, deserializing...'); + if (yield exists($realpaths['file'])) { + Logger::log('Waiting for shared lock of serialization lockfile...'); + $unlock = yield Tools::flock($realpaths['lockfile'], LOCK_SH); + Logger::log('Shared lock acquired, deserializing...'); try { - $tounserialize = file_get_contents($realpaths['file']); + $tounserialize = yield get($realpaths['file']); } finally { - flock($realpaths['lockfile'], LOCK_UN); - fclose($realpaths['lockfile']); + $unlock(); } \danog\MadelineProto\Magic::class_exists(); @@ -137,11 +135,11 @@ class API extends APIFactory $deferred->resolve(); yield $this->API->initAsync(); $this->APIFactory(); - \danog\MadelineProto\Logger::log('Ping...', Logger::ULTRA_VERBOSE); + Logger::log('Ping...', Logger::ULTRA_VERBOSE); $this->asyncInitPromise = null; $pong = yield $this->ping(['ping_id' => 3], ['async' => true]); - \danog\MadelineProto\Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE); - \danog\MadelineProto\Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE); + Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE); + Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE); return; } @@ -158,14 +156,14 @@ class API extends APIFactory $this->API = new MTProto($params); $this->APIFactory(); $deferred->resolve(); - \danog\MadelineProto\Logger::log(\danog\MadelineProto\Lang::$current_lang['apifactory_start'], Logger::VERBOSE); + Logger::log(\danog\MadelineProto\Lang::$current_lang['apifactory_start'], Logger::VERBOSE); yield $this->API->initAsync(); $this->APIFactory(); $this->asyncInitPromise = null; - \danog\MadelineProto\Logger::log('Ping...', Logger::ULTRA_VERBOSE); + Logger::log('Ping...', Logger::ULTRA_VERBOSE); $pong = yield $this->ping(['ping_id' => 3], ['async' => true]); - \danog\MadelineProto\Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE); - \danog\MadelineProto\Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE); + Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE); + Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE); } public function async($async) @@ -312,14 +310,11 @@ class API extends APIFactory } $this->serialized = time(); $realpaths = Serialization::realpaths($filename); - if (!file_exists($realpaths['lockfile'])) { - touch($realpaths['lockfile']); - clearstatcache(); - } - $realpaths['lockfile'] = fopen($realpaths['lockfile'], 'w'); - \danog\MadelineProto\Logger::log('Waiting for exclusive lock of serialization lockfile...'); - flock($realpaths['lockfile'], LOCK_EX); - \danog\MadelineProto\Logger::log('Lock acquired, serializing'); + Logger::log('Waiting for exclusive lock of serialization lockfile...'); + + $unlock = yield Tools::flock($realpaths['lockfile'], LOCK_EX); + + Logger::log('Lock acquired, serializing'); try { if (!$this->getting_api_id) { @@ -332,17 +327,16 @@ class API extends APIFactory $this->API->settings['logger']['logger_param'] = [$this->API, 'noop']; } } - $wrote = file_put_contents($realpaths['tempfile'], serialize($this)); - rename($realpaths['tempfile'], $realpaths['file']); + $wrote = yield put($realpaths['tempfile'], serialize($this)); + yield rename($realpaths['tempfile'], $realpaths['file']); } finally { if (!$this->getting_api_id) { $this->API->settings['updates']['callback'] = $update_closure; $this->API->settings['logger']['logger_param'] = $logger_closure; } - flock($realpaths['lockfile'], LOCK_UN); - fclose($realpaths['lockfile']); + $unlock(); } - \danog\MadelineProto\Logger::log('Done serializing'); + Logger::log('Done serializing'); return $wrote; })()); diff --git a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php index f124b1eb..9d8461d5 100644 --- a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php @@ -26,6 +26,7 @@ use danog\MadelineProto\MTProtoTools\Crypt; use danog\MadelineProto\NothingInTheSocketException; use danog\MadelineProto\Tools; use Amp\ByteStream\StreamException; +use Amp\ByteStream\PendingReadError; /** * Socket read loop. @@ -57,7 +58,7 @@ class ReadLoop extends SignalLoop while (true) { try { $error = yield $this->waitSignal($this->readMessage()); - } catch (NothingInTheSocketException|StreamException $e) { + } catch (NothingInTheSocketException|StreamException|PendingReadError $e) { if (isset($connection->old)) { return; } @@ -65,10 +66,6 @@ class ReadLoop extends SignalLoop $API->logger->logger("Got nothing in the socket in DC {$datacenter}, reconnecting...", Logger::ERROR); yield $connection->reconnect(); continue; - } catch (ClosedException $e) { - $API->logger->logger($e->getMessage(), Logger::FATAL_ERROR); - - throw $e; } if (is_int($error)) { diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index 010b5262..431a1c7d 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -73,7 +73,7 @@ class MTProto extends AsyncConstruct implements TLCallback /* const V = 71; */ - const V = 124; + const V = 125; const RELEASE = '4.0'; const NOT_LOGGED_IN = 0; const WAITING_CODE = 1; @@ -801,9 +801,11 @@ class MTProto extends AsyncConstruct implements TLCallback ], 'upload' => [ 'allow_automatic_upload' => true, 'part_size' => 512 * 1024, + 'parallel_chunks' => 20 ], 'download' => [ 'report_broken_media' => true, 'part_size' => 1024 * 1024, + 'parallel_chunks' => 20 ], 'pwr' => [ 'pwr' => false, // Need info ? diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 325a6d41..258c4430 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -19,11 +19,20 @@ namespace danog\MadelineProto\MTProtoTools; +use Amp\ByteStream\OutputStream; +use Amp\ByteStream\ResourceOutputStream; +use Amp\ByteStream\StreamException; use danog\MadelineProto\Async\AsyncParameters; use danog\MadelineProto\Exception; +use danog\MadelineProto\FileCallbackInterface; use danog\MadelineProto\Logger; use danog\MadelineProto\RPCErrorException; +use function Amp\File\open; +use function Amp\File\stat; use function Amp\Promise\all; +use Amp\Deferred; +use Amp\Success; +use Amp\File\StatCache; /** * Manages upload and download of files. @@ -33,7 +42,7 @@ trait Files public function upload_async($file, $file_name = '', $cb = null, $encrypted = false): \Generator { if (is_object($file)) { - if (!isset(class_implements($file)['danog\MadelineProto\FileCallbackInterface'])) { + if (!$file instanceof FileCallbackInterface) { throw new \danog\MadelineProto\Exception('Provided object does not implement FileCallbackInterface'); } $cb = $file; @@ -511,7 +520,7 @@ trait Files public function download_to_dir_async($message_media, $dir, $cb = null) { - if (is_object($dir) && class_implements($dir)['danog\MadelineProto\FileCallbackInterface']) { + if (is_object($dir) && $dir instanceof FileCallbackInterface) { $cb = $dir; $dir = $dir->getFile(); } @@ -523,7 +532,7 @@ trait Files public function download_to_file_async($message_media, $file, $cb = null) { - if (is_object($file) && class_implements($file)['danog\MadelineProto\FileCallbackInterface']) { + if (is_object($file) && $file instanceof FileCallbackInterface) { $cb = $file; $file = $file->getFile(); } @@ -533,22 +542,21 @@ trait Files } $file = realpath($file); $message_media = yield $this->get_download_info_async($message_media); - $stream = fopen($file, 'r+b'); - $size = fstat($stream)['size']; + + StatCache::clear($file); + + $size = (yield stat($file))['size']; + $stream = yield open($file, 'cb'); + $this->logger->logger('Waiting for lock of file to download...'); - do { - $res = flock($stream, LOCK_EX|LOCK_NB); - if (!$res) { - yield $this->sleep(0.1); - } - } while (!$res); + $unlock = yield $this->flock($file, LOCK_EX); try { yield $this->download_to_stream_async($message_media, $stream, $cb, $size, -1); } finally { - flock($stream, LOCK_UN); - fclose($stream); - clearstatcache(); + $unlock(); + yield $stream->close(); + StatCache::clear($file); } return $file; @@ -556,7 +564,9 @@ trait Files public function download_to_stream_async($message_media, $stream, $cb = null, $offset = 0, $end = -1) { - if (is_object($stream) && class_implements($stream)['danog\MadelineProto\FileCallbackInterface']) { + $message_media = yield $this->get_download_info_async($message_media); + + if (is_object($stream) && $stream instanceof FileCallbackInterface) { $cb = $stream; $stream = $stream->getFile(); } @@ -567,25 +577,34 @@ trait Files }; } - $message_media = yield $this->get_download_info_async($message_media); - - try { - if (stream_get_meta_data($stream)['seekable']) { - fseek($stream, $offset); - } - } catch (\danog\MadelineProto\Exception $e) { + /** @var $stream \Amp\ByteStream\OutputStream */ + if (!is_object($stream)) { + $stream = new ResourceOutputStream($stream); } - $downloaded_size = 0; + if (!$stream instanceof OutputStream) { + throw new Exception("Invalid stream provided"); + } + $seekable = false; + if (method_exists($stream, 'seek')) { + try { + yield $stream->seek($offset); + $seekable = true; + } catch (StreamException $e) { + } + } + if ($end === -1 && isset($message_media['size'])) { $end = $message_media['size']; } - $size = $end - $offset; + $part_size = $this->settings['download']['part_size']; - $percent = 0; + $parallel_chunks = $this->settings['download']['parallel_chunks']; + $datacenter = isset($message_media['InputFileLocation']['dc_id']) ? $message_media['InputFileLocation']['dc_id'] : $this->settings['connection_settings']['default_dc']; if (isset($this->datacenter->sockets[$datacenter.'_media'])) { $datacenter .= '_media'; } + if (isset($message_media['key'])) { $digest = hash('md5', $message_media['key'].$message_media['iv'], true); $fingerprint = $this->unpack_signed_int(substr($digest, 0, 4) ^ substr($digest, 4, 4)); @@ -597,44 +616,124 @@ trait Files $ige->setKey($message_media['key']); $ige->enableContinuousBuffer(); } - $theend = false; - $cdn = false; - while (true) { - if ($start_at = $offset % $part_size) { - $offset -= $start_at; + + if ($offset === $end) { + $cb(100); + return true; + } + $params = []; + $start_at = $offset % $part_size; + $probable_end = $end !== -1 ? $end : 512 * 1024 * 3000; + + $breakOut = false; + for ($x = $offset - $start_at; $x < $probable_end; $x += $part_size) { + $end_at = $part_size; + if ($end !== -1 && $x + $part_size >= $end) { + $end_at = $end % $part_size; + $breakOut = true; } + $params[] = [ + 'offset' => $x, + 'limit' => $part_size, + 'part_start_at' => $start_at, + 'part_end_at' => $end_at, + ]; + + $start_at = 0; + if ($breakOut) { + break; + } + } + + if (!$params) { + $cb(100); + return true; + } + $count = count($params); + $cb = function () use ($cb, $count) { + static $cur = 0; + $cur++; + $this->callFork($cb($cur*100/$count)); + }; + + $cdn = false; + + $params[0]['previous_promise'] = new Success(true); + + $start = microtime(true); + $size = yield $this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, array_shift($params), $stream, $seekable); + + if ($params) { + $previous_promise = new Success(true); + + $promises = []; + foreach ($params as $key => $param) { + $param['previous_promise'] = $previous_promise; + $previous_promise = $this->call($this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, $param, $stream, $seekable)); + $previous_promise->onResolve(static function ($e, $res) use (&$size) { + if ($res) { + $size += $res; + } + }); + $promises []= $previous_promise; + + if (!($key % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps + yield $this->all($promises); + $promises = []; + + $time = microtime(true) - $start; + $speed = (int) (($size*8)/$time)/1000000; + $this->logger->logger("Partial download time: $time"); + $this->logger->logger("Partial download speed: $speed mbps"); + } + } + if ($promises) { + yield $this->all($promises); + } + } + $time = microtime(true) - $start; + $speed = (int) (($size*8)/$time)/1000000; + $this->logger->logger("Total download time: $time"); + $this->logger->logger("Total download speed: $speed mbps"); + + if ($cdn) { + $this->clear_cdn_hashes($message_media['file_token']); + } + + return true; + } + + private function download_part(&$message_media, &$cdn, &$datacenter, &$old_dc, &$ige, $cb, $offset, $stream, $seekable, $postpone = false) + { + static $method = [ + false => 'upload.getFile', // non-cdn + true => 'upload.getCdnFile', // cdn + ]; + do { + if (!$cdn) { + $basic_param = [ + 'location' => $message_media['InputFileLocation'] + ]; + } else { + $basic_param = [ + 'file_token' => $message_media['file_token'] + ]; + } + try { - $res = $cdn ? - yield $this->method_call_async_read( - 'upload.getCdnFile', - [ - 'file_token' => $message_media['file_token'], - 'offset' => $offset, - 'limit' => $part_size - ], - [ - 'heavy' => true, - 'file' => true, - 'FloodWaitLimit' => 0, - 'datacenter' => $datacenter - ] - ) : - yield $this->method_call_async_read( - 'upload.getFile', - [ - 'location' => $message_media['InputFileLocation'], - 'offset' => $offset, - 'limit' => $part_size - ], - [ - 'heavy' => true, - 'file' => true, - 'FloodWaitLimit' => 0, - 'datacenter' => &$datacenter - ] - ); + $res = yield $this->method_call_async_read( + $method[$cdn], + $basic_param + $offset, + [ + 'heavy' => true, + 'file' => true, + 'FloodWaitLimit' => 0, + 'datacenter' => &$datacenter, + 'postpone' => $postpone + ] + ); } catch (\danog\MadelineProto\RPCErrorException $e) { if (strpos($e->rpc, 'FLOOD_WAIT_') === 0) { if (isset($message_media['MessageMedia']) && !$this->authorization['user']['bot'] && $this->settings['download']['report_broken_media']) { @@ -657,6 +756,7 @@ trait Files throw $e; } } + if ($res['_'] === 'upload.fileCdnRedirect') { $cdn = true; $message_media['file_token'] = $res['file_token']; @@ -669,9 +769,7 @@ trait Files yield $this->get_config_async([], ['datacenter' => $this->datacenter->curdc]); } $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['stored_on_cdn'], \danog\MadelineProto\Logger::NOTICE); - continue; - } - if ($res['_'] === 'upload.cdnFileReuploadNeeded') { + } else if ($res['_'] === 'upload.cdnFileReuploadNeeded') { $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['cdn_reupload'], \danog\MadelineProto\Logger::NOTICE); yield $this->get_config_async([], ['datacenter' => $this->datacenter->curdc]); @@ -690,51 +788,37 @@ trait Files continue; } if ($cdn === false && $res['type']['_'] === 'storage.fileUnknown' && $res['bytes'] === '') { - $datacenter = 1; + $datacenter = 0; } - while ($cdn === false && $res['type']['_'] === 'storage.fileUnknown' && $res['bytes'] === '') { - $res = yield $this->method_call_async_read('upload.getFile', ['location' => $message_media['InputFileLocation'], 'offset' => $offset, 'limit' => $part_size], ['heavy' => true, 'datacenter' => $datacenter]); - $datacenter++; - if (!isset($this->datacenter->sockets[$datacenter])) { - break; - } + while ($cdn === false && + $res['type']['_'] === 'storage.fileUnknown' && + $res['bytes'] === '' && + isset($this->datacenter->sockets[++$datacenter]) + ) { + $res = yield $this->method_call_async_read('upload.getFile', $basic_param + $offset, ['heavy' => true, 'file' => true, 'FloodWaitLimit' => 0, 'datacenter' => $datacenter]); } + if (isset($message_media['cdn_key'])) { - $ivec = substr($message_media['cdn_iv'], 0, 12).pack('N', $offset >> 4); + $ivec = substr($message_media['cdn_iv'], 0, 12).pack('N', $offset['offset'] >> 4); $res['bytes'] = $this->ctr_encrypt($res['bytes'], $message_media['cdn_key'], $ivec); - $this->check_cdn_hash($message_media['file_token'], $offset, $res['bytes'], $old_dc); + $this->check_cdn_hash($message_media['file_token'], $offset['offset'], $res['bytes'], $old_dc); } if (isset($message_media['key'])) { $res['bytes'] = $ige->decrypt($res['bytes']); } - if ($start_at) { - $res['bytes'] = substr($res['bytes'], $start_at); + if ($offset['part_start_at'] || $offset['part_end_at'] !== $offset['limit']) { + $res['bytes'] = substr($res['bytes'], $offset['part_start_at'], $offset['part_end_at']- $offset['part_start_at']); } - if ($end !== -1 && strlen($res['bytes']) + $downloaded_size >= $size) { - $res['bytes'] = substr($res['bytes'], 0, $size - $downloaded_size); - $theend = true; + + if (!$seekable) { + yield $offset['previous_promise']->promise(); + } else { + yield $stream->seek($offset['offset'] + $offset['part_start_at']); } - if ($res['bytes'] === '') { - break; - } - $offset += strlen($res['bytes']); - $downloaded_size += strlen($res['bytes']); - $this->logger->logger(fwrite($stream, $res['bytes']), \danog\MadelineProto\Logger::ULTRA_VERBOSE); - if ($theend) { - break; - } - if ($end !== -1) { - $this->callFork($cb($percent = $downloaded_size * 100 / $size)); - } - } - if ($end === -1) { - $this->callFork($cb(100)); - } - if ($cdn) { - $this->clear_cdn_hashes($message_media['file_token']); - } - - return true; + yield $stream->write($res['bytes']); + $cb(); + return strlen($res['bytes']); + } while (true); } private $cdn_hashes = []; diff --git a/src/danog/MadelineProto/Tools.php b/src/danog/MadelineProto/Tools.php index 74e85123..d4a9ca92 100644 --- a/src/danog/MadelineProto/Tools.php +++ b/src/danog/MadelineProto/Tools.php @@ -32,6 +32,9 @@ use function Amp\Promise\wait; use function Amp\ByteStream\getStdin; use function Amp\ByteStream\getStdout; use function Amp\ByteStream\getOutputBufferStream; +use function Amp\File\exists; +use function Amp\File\touch; +use Amp\File\StatCache; /** * Some tools. @@ -344,7 +347,42 @@ trait Tools return $deferred->promise(); } + /** + * Asynchronously lock a file + * Resolves with a callbable that MUST eventually be called in order to release the lock + * + * @param string $file File to lock + * @param integer $operation Locking mode (see flock) + * @param numeric $polling Polling interval for lock + * @return Promise + */ + public static function flock(string $file, int $operation, $polling = 0.1): Promise + { + return self::call(self::flockAsync($file, $operation, $polling)); + } + public static function flockAsync(string $file, int $operation, $polling) + { + if (!yield exists($file)) { + yield touch($file); + StatCache::clear($file); + } + $operation |= LOCK_NB; + $res = fopen($file, 'c'); + do { + $result = flock($res, $operation); + if (!$result) { + yield self::sleep($polling); + } + } while (!$result); + return static function () use (&$res) { + if ($res) { + flock($res, LOCK_UN); + fclose($res); + $res = null; + } + }; + } public static function sleep($time) { return new \Amp\Delayed($time * 1000);