Parallelize downloads

This commit is contained in:
Daniil Gentili 2019-06-23 17:46:51 +02:00
parent 8bfeb13176
commit 698d0cb59f
7 changed files with 253 additions and 138 deletions

2
docs

@ -1 +1 @@
Subproject commit 7a17668c568e4a17e6b08f02347bd3f139d8554f
Subproject commit 02967b3019032399364a9fc134f7bb0e674c099a

View File

@ -61,10 +61,10 @@ function ___install_madeline()
$release_branch = '';
}
$release_fallback_branch = '';
if (isset($_SERVER['SERVER_ADMIN']) && strpos($_SERVER['SERVER_ADMIN'], '000webhost.io') && $custom_branch === null) {
/*if (isset($_SERVER['SERVER_ADMIN']) && strpos($_SERVER['SERVER_ADMIN'], '000webhost.io') && $custom_branch === null) {
$release_branch = '-deprecated';
$release_fallback_branch = '-deprecated';
}
}*/
if (PHP_MAJOR_VERSION <= 5) {
$release_branch = '5'.$release_branch;

View File

@ -20,6 +20,10 @@
namespace danog\MadelineProto;
use Amp\Deferred;
use function Amp\File\put;
use function Amp\File\rename;
use function Amp\File\get;
use function Amp\File\exists;
class API extends APIFactory
{
@ -62,21 +66,15 @@ class API extends APIFactory
$realpaths = Serialization::realpaths($params);
$this->session = $realpaths['file'];
if (file_exists($realpaths['file'])) {
if (!file_exists($realpaths['lockfile'])) {
touch($realpaths['lockfile']);
clearstatcache();
}
$realpaths['lockfile'] = fopen($realpaths['lockfile'], 'r');
\danog\MadelineProto\Logger::log('Waiting for shared lock of serialization lockfile...');
flock($realpaths['lockfile'], LOCK_SH);
\danog\MadelineProto\Logger::log('Shared lock acquired, deserializing...');
if (yield exists($realpaths['file'])) {
Logger::log('Waiting for shared lock of serialization lockfile...');
$unlock = yield Tools::flock($realpaths['lockfile'], LOCK_SH);
Logger::log('Shared lock acquired, deserializing...');
try {
$tounserialize = file_get_contents($realpaths['file']);
$tounserialize = yield get($realpaths['file']);
} finally {
flock($realpaths['lockfile'], LOCK_UN);
fclose($realpaths['lockfile']);
$unlock();
}
\danog\MadelineProto\Magic::class_exists();
@ -137,11 +135,11 @@ class API extends APIFactory
$deferred->resolve();
yield $this->API->initAsync();
$this->APIFactory();
\danog\MadelineProto\Logger::log('Ping...', Logger::ULTRA_VERBOSE);
Logger::log('Ping...', Logger::ULTRA_VERBOSE);
$this->asyncInitPromise = null;
$pong = yield $this->ping(['ping_id' => 3], ['async' => true]);
\danog\MadelineProto\Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE);
\danog\MadelineProto\Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE);
Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE);
Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE);
return;
}
@ -158,14 +156,14 @@ class API extends APIFactory
$this->API = new MTProto($params);
$this->APIFactory();
$deferred->resolve();
\danog\MadelineProto\Logger::log(\danog\MadelineProto\Lang::$current_lang['apifactory_start'], Logger::VERBOSE);
Logger::log(\danog\MadelineProto\Lang::$current_lang['apifactory_start'], Logger::VERBOSE);
yield $this->API->initAsync();
$this->APIFactory();
$this->asyncInitPromise = null;
\danog\MadelineProto\Logger::log('Ping...', Logger::ULTRA_VERBOSE);
Logger::log('Ping...', Logger::ULTRA_VERBOSE);
$pong = yield $this->ping(['ping_id' => 3], ['async' => true]);
\danog\MadelineProto\Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE);
\danog\MadelineProto\Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE);
Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE);
Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE);
}
public function async($async)
@ -312,14 +310,11 @@ class API extends APIFactory
}
$this->serialized = time();
$realpaths = Serialization::realpaths($filename);
if (!file_exists($realpaths['lockfile'])) {
touch($realpaths['lockfile']);
clearstatcache();
}
$realpaths['lockfile'] = fopen($realpaths['lockfile'], 'w');
\danog\MadelineProto\Logger::log('Waiting for exclusive lock of serialization lockfile...');
flock($realpaths['lockfile'], LOCK_EX);
\danog\MadelineProto\Logger::log('Lock acquired, serializing');
Logger::log('Waiting for exclusive lock of serialization lockfile...');
$unlock = yield Tools::flock($realpaths['lockfile'], LOCK_EX);
Logger::log('Lock acquired, serializing');
try {
if (!$this->getting_api_id) {
@ -332,17 +327,16 @@ class API extends APIFactory
$this->API->settings['logger']['logger_param'] = [$this->API, 'noop'];
}
}
$wrote = file_put_contents($realpaths['tempfile'], serialize($this));
rename($realpaths['tempfile'], $realpaths['file']);
$wrote = yield put($realpaths['tempfile'], serialize($this));
yield rename($realpaths['tempfile'], $realpaths['file']);
} finally {
if (!$this->getting_api_id) {
$this->API->settings['updates']['callback'] = $update_closure;
$this->API->settings['logger']['logger_param'] = $logger_closure;
}
flock($realpaths['lockfile'], LOCK_UN);
fclose($realpaths['lockfile']);
$unlock();
}
\danog\MadelineProto\Logger::log('Done serializing');
Logger::log('Done serializing');
return $wrote;
})());

View File

@ -26,6 +26,7 @@ use danog\MadelineProto\MTProtoTools\Crypt;
use danog\MadelineProto\NothingInTheSocketException;
use danog\MadelineProto\Tools;
use Amp\ByteStream\StreamException;
use Amp\ByteStream\PendingReadError;
/**
* Socket read loop.
@ -57,7 +58,7 @@ class ReadLoop extends SignalLoop
while (true) {
try {
$error = yield $this->waitSignal($this->readMessage());
} catch (NothingInTheSocketException|StreamException $e) {
} catch (NothingInTheSocketException|StreamException|PendingReadError $e) {
if (isset($connection->old)) {
return;
}
@ -65,10 +66,6 @@ class ReadLoop extends SignalLoop
$API->logger->logger("Got nothing in the socket in DC {$datacenter}, reconnecting...", Logger::ERROR);
yield $connection->reconnect();
continue;
} catch (ClosedException $e) {
$API->logger->logger($e->getMessage(), Logger::FATAL_ERROR);
throw $e;
}
if (is_int($error)) {

View File

@ -73,7 +73,7 @@ class MTProto extends AsyncConstruct implements TLCallback
/*
const V = 71;
*/
const V = 124;
const V = 125;
const RELEASE = '4.0';
const NOT_LOGGED_IN = 0;
const WAITING_CODE = 1;
@ -801,9 +801,11 @@ class MTProto extends AsyncConstruct implements TLCallback
], 'upload' => [
'allow_automatic_upload' => true,
'part_size' => 512 * 1024,
'parallel_chunks' => 20
], 'download' => [
'report_broken_media' => true,
'part_size' => 1024 * 1024,
'parallel_chunks' => 20
], 'pwr' => [
'pwr' => false,
// Need info ?

View File

@ -19,11 +19,20 @@
namespace danog\MadelineProto\MTProtoTools;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\ByteStream\StreamException;
use danog\MadelineProto\Async\AsyncParameters;
use danog\MadelineProto\Exception;
use danog\MadelineProto\FileCallbackInterface;
use danog\MadelineProto\Logger;
use danog\MadelineProto\RPCErrorException;
use function Amp\File\open;
use function Amp\File\stat;
use function Amp\Promise\all;
use Amp\Deferred;
use Amp\Success;
use Amp\File\StatCache;
/**
* Manages upload and download of files.
@ -33,7 +42,7 @@ trait Files
public function upload_async($file, $file_name = '', $cb = null, $encrypted = false): \Generator
{
if (is_object($file)) {
if (!isset(class_implements($file)['danog\MadelineProto\FileCallbackInterface'])) {
if (!$file instanceof FileCallbackInterface) {
throw new \danog\MadelineProto\Exception('Provided object does not implement FileCallbackInterface');
}
$cb = $file;
@ -511,7 +520,7 @@ trait Files
public function download_to_dir_async($message_media, $dir, $cb = null)
{
if (is_object($dir) && class_implements($dir)['danog\MadelineProto\FileCallbackInterface']) {
if (is_object($dir) && $dir instanceof FileCallbackInterface) {
$cb = $dir;
$dir = $dir->getFile();
}
@ -523,7 +532,7 @@ trait Files
public function download_to_file_async($message_media, $file, $cb = null)
{
if (is_object($file) && class_implements($file)['danog\MadelineProto\FileCallbackInterface']) {
if (is_object($file) && $file instanceof FileCallbackInterface) {
$cb = $file;
$file = $file->getFile();
}
@ -533,22 +542,21 @@ trait Files
}
$file = realpath($file);
$message_media = yield $this->get_download_info_async($message_media);
$stream = fopen($file, 'r+b');
$size = fstat($stream)['size'];
StatCache::clear($file);
$size = (yield stat($file))['size'];
$stream = yield open($file, 'cb');
$this->logger->logger('Waiting for lock of file to download...');
do {
$res = flock($stream, LOCK_EX|LOCK_NB);
if (!$res) {
yield $this->sleep(0.1);
}
} while (!$res);
$unlock = yield $this->flock($file, LOCK_EX);
try {
yield $this->download_to_stream_async($message_media, $stream, $cb, $size, -1);
} finally {
flock($stream, LOCK_UN);
fclose($stream);
clearstatcache();
$unlock();
yield $stream->close();
StatCache::clear($file);
}
return $file;
@ -556,7 +564,9 @@ trait Files
public function download_to_stream_async($message_media, $stream, $cb = null, $offset = 0, $end = -1)
{
if (is_object($stream) && class_implements($stream)['danog\MadelineProto\FileCallbackInterface']) {
$message_media = yield $this->get_download_info_async($message_media);
if (is_object($stream) && $stream instanceof FileCallbackInterface) {
$cb = $stream;
$stream = $stream->getFile();
}
@ -567,25 +577,34 @@ trait Files
};
}
$message_media = yield $this->get_download_info_async($message_media);
/** @var $stream \Amp\ByteStream\OutputStream */
if (!is_object($stream)) {
$stream = new ResourceOutputStream($stream);
}
if (!$stream instanceof OutputStream) {
throw new Exception("Invalid stream provided");
}
$seekable = false;
if (method_exists($stream, 'seek')) {
try {
if (stream_get_meta_data($stream)['seekable']) {
fseek($stream, $offset);
yield $stream->seek($offset);
$seekable = true;
} catch (StreamException $e) {
}
} catch (\danog\MadelineProto\Exception $e) {
}
$downloaded_size = 0;
if ($end === -1 && isset($message_media['size'])) {
$end = $message_media['size'];
}
$size = $end - $offset;
$part_size = $this->settings['download']['part_size'];
$percent = 0;
$parallel_chunks = $this->settings['download']['parallel_chunks'];
$datacenter = isset($message_media['InputFileLocation']['dc_id']) ? $message_media['InputFileLocation']['dc_id'] : $this->settings['connection_settings']['default_dc'];
if (isset($this->datacenter->sockets[$datacenter.'_media'])) {
$datacenter .= '_media';
}
if (isset($message_media['key'])) {
$digest = hash('md5', $message_media['key'].$message_media['iv'], true);
$fingerprint = $this->unpack_signed_int(substr($digest, 0, 4) ^ substr($digest, 4, 4));
@ -597,42 +616,122 @@ trait Files
$ige->setKey($message_media['key']);
$ige->enableContinuousBuffer();
}
$theend = false;
if ($offset === $end) {
$cb(100);
return true;
}
$params = [];
$start_at = $offset % $part_size;
$probable_end = $end !== -1 ? $end : 512 * 1024 * 3000;
$breakOut = false;
for ($x = $offset - $start_at; $x < $probable_end; $x += $part_size) {
$end_at = $part_size;
if ($end !== -1 && $x + $part_size >= $end) {
$end_at = $end % $part_size;
$breakOut = true;
}
$params[] = [
'offset' => $x,
'limit' => $part_size,
'part_start_at' => $start_at,
'part_end_at' => $end_at,
];
$start_at = 0;
if ($breakOut) {
break;
}
}
if (!$params) {
$cb(100);
return true;
}
$count = count($params);
$cb = function () use ($cb, $count) {
static $cur = 0;
$cur++;
$this->callFork($cb($cur*100/$count));
};
$cdn = false;
while (true) {
if ($start_at = $offset % $part_size) {
$offset -= $start_at;
$params[0]['previous_promise'] = new Success(true);
$start = microtime(true);
$size = yield $this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, array_shift($params), $stream, $seekable);
if ($params) {
$previous_promise = new Success(true);
$promises = [];
foreach ($params as $key => $param) {
$param['previous_promise'] = $previous_promise;
$previous_promise = $this->call($this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, $param, $stream, $seekable));
$previous_promise->onResolve(static function ($e, $res) use (&$size) {
if ($res) {
$size += $res;
}
});
$promises []= $previous_promise;
if (!($key % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps
yield $this->all($promises);
$promises = [];
$time = microtime(true) - $start;
$speed = (int) (($size*8)/$time)/1000000;
$this->logger->logger("Partial download time: $time");
$this->logger->logger("Partial download speed: $speed mbps");
}
}
if ($promises) {
yield $this->all($promises);
}
}
$time = microtime(true) - $start;
$speed = (int) (($size*8)/$time)/1000000;
$this->logger->logger("Total download time: $time");
$this->logger->logger("Total download speed: $speed mbps");
if ($cdn) {
$this->clear_cdn_hashes($message_media['file_token']);
}
return true;
}
private function download_part(&$message_media, &$cdn, &$datacenter, &$old_dc, &$ige, $cb, $offset, $stream, $seekable, $postpone = false)
{
static $method = [
false => 'upload.getFile', // non-cdn
true => 'upload.getCdnFile', // cdn
];
do {
if (!$cdn) {
$basic_param = [
'location' => $message_media['InputFileLocation']
];
} else {
$basic_param = [
'file_token' => $message_media['file_token']
];
}
try {
$res = $cdn ?
yield $this->method_call_async_read(
'upload.getCdnFile',
[
'file_token' => $message_media['file_token'],
'offset' => $offset,
'limit' => $part_size
],
$res = yield $this->method_call_async_read(
$method[$cdn],
$basic_param + $offset,
[
'heavy' => true,
'file' => true,
'FloodWaitLimit' => 0,
'datacenter' => $datacenter
]
) :
yield $this->method_call_async_read(
'upload.getFile',
[
'location' => $message_media['InputFileLocation'],
'offset' => $offset,
'limit' => $part_size
],
[
'heavy' => true,
'file' => true,
'FloodWaitLimit' => 0,
'datacenter' => &$datacenter
'datacenter' => &$datacenter,
'postpone' => $postpone
]
);
} catch (\danog\MadelineProto\RPCErrorException $e) {
@ -657,6 +756,7 @@ trait Files
throw $e;
}
}
if ($res['_'] === 'upload.fileCdnRedirect') {
$cdn = true;
$message_media['file_token'] = $res['file_token'];
@ -669,9 +769,7 @@ trait Files
yield $this->get_config_async([], ['datacenter' => $this->datacenter->curdc]);
}
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['stored_on_cdn'], \danog\MadelineProto\Logger::NOTICE);
continue;
}
if ($res['_'] === 'upload.cdnFileReuploadNeeded') {
} else if ($res['_'] === 'upload.cdnFileReuploadNeeded') {
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['cdn_reupload'], \danog\MadelineProto\Logger::NOTICE);
yield $this->get_config_async([], ['datacenter' => $this->datacenter->curdc]);
@ -690,51 +788,37 @@ trait Files
continue;
}
if ($cdn === false && $res['type']['_'] === 'storage.fileUnknown' && $res['bytes'] === '') {
$datacenter = 1;
}
while ($cdn === false && $res['type']['_'] === 'storage.fileUnknown' && $res['bytes'] === '') {
$res = yield $this->method_call_async_read('upload.getFile', ['location' => $message_media['InputFileLocation'], 'offset' => $offset, 'limit' => $part_size], ['heavy' => true, 'datacenter' => $datacenter]);
$datacenter++;
if (!isset($this->datacenter->sockets[$datacenter])) {
break;
$datacenter = 0;
}
while ($cdn === false &&
$res['type']['_'] === 'storage.fileUnknown' &&
$res['bytes'] === '' &&
isset($this->datacenter->sockets[++$datacenter])
) {
$res = yield $this->method_call_async_read('upload.getFile', $basic_param + $offset, ['heavy' => true, 'file' => true, 'FloodWaitLimit' => 0, 'datacenter' => $datacenter]);
}
if (isset($message_media['cdn_key'])) {
$ivec = substr($message_media['cdn_iv'], 0, 12).pack('N', $offset >> 4);
$ivec = substr($message_media['cdn_iv'], 0, 12).pack('N', $offset['offset'] >> 4);
$res['bytes'] = $this->ctr_encrypt($res['bytes'], $message_media['cdn_key'], $ivec);
$this->check_cdn_hash($message_media['file_token'], $offset, $res['bytes'], $old_dc);
$this->check_cdn_hash($message_media['file_token'], $offset['offset'], $res['bytes'], $old_dc);
}
if (isset($message_media['key'])) {
$res['bytes'] = $ige->decrypt($res['bytes']);
}
if ($start_at) {
$res['bytes'] = substr($res['bytes'], $start_at);
}
if ($end !== -1 && strlen($res['bytes']) + $downloaded_size >= $size) {
$res['bytes'] = substr($res['bytes'], 0, $size - $downloaded_size);
$theend = true;
}
if ($res['bytes'] === '') {
break;
}
$offset += strlen($res['bytes']);
$downloaded_size += strlen($res['bytes']);
$this->logger->logger(fwrite($stream, $res['bytes']), \danog\MadelineProto\Logger::ULTRA_VERBOSE);
if ($theend) {
break;
}
if ($end !== -1) {
$this->callFork($cb($percent = $downloaded_size * 100 / $size));
}
}
if ($end === -1) {
$this->callFork($cb(100));
}
if ($cdn) {
$this->clear_cdn_hashes($message_media['file_token']);
if ($offset['part_start_at'] || $offset['part_end_at'] !== $offset['limit']) {
$res['bytes'] = substr($res['bytes'], $offset['part_start_at'], $offset['part_end_at']- $offset['part_start_at']);
}
return true;
if (!$seekable) {
yield $offset['previous_promise']->promise();
} else {
yield $stream->seek($offset['offset'] + $offset['part_start_at']);
}
yield $stream->write($res['bytes']);
$cb();
return strlen($res['bytes']);
} while (true);
}
private $cdn_hashes = [];

View File

@ -32,6 +32,9 @@ use function Amp\Promise\wait;
use function Amp\ByteStream\getStdin;
use function Amp\ByteStream\getStdout;
use function Amp\ByteStream\getOutputBufferStream;
use function Amp\File\exists;
use function Amp\File\touch;
use Amp\File\StatCache;
/**
* Some tools.
@ -344,7 +347,42 @@ trait Tools
return $deferred->promise();
}
/**
* Asynchronously lock a file
* Resolves with a callbable that MUST eventually be called in order to release the lock
*
* @param string $file File to lock
* @param integer $operation Locking mode (see flock)
* @param numeric $polling Polling interval for lock
* @return Promise
*/
public static function flock(string $file, int $operation, $polling = 0.1): Promise
{
return self::call(self::flockAsync($file, $operation, $polling));
}
public static function flockAsync(string $file, int $operation, $polling)
{
if (!yield exists($file)) {
yield touch($file);
StatCache::clear($file);
}
$operation |= LOCK_NB;
$res = fopen($file, 'c');
do {
$result = flock($res, $operation);
if (!$result) {
yield self::sleep($polling);
}
} while (!$result);
return static function () use (&$res) {
if ($res) {
flock($res, LOCK_UN);
fclose($res);
$res = null;
}
};
}
public static function sleep($time)
{
return new \Amp\Delayed($time * 1000);