From 698d0cb59f6d9c35347c8ffb75749dabfc581bbf Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sun, 23 Jun 2019 17:46:51 +0200 Subject: [PATCH 01/17] Parallelize downloads --- docs | 2 +- phar.php | 4 +- src/danog/MadelineProto/API.php | 58 ++-- .../Loop/Connection/ReadLoop.php | 7 +- src/danog/MadelineProto/MTProto.php | 4 +- .../MadelineProto/MTProtoTools/Files.php | 278 ++++++++++++------ src/danog/MadelineProto/Tools.php | 38 +++ 7 files changed, 253 insertions(+), 138 deletions(-) diff --git a/docs b/docs index 7a17668c..02967b30 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit 7a17668c568e4a17e6b08f02347bd3f139d8554f +Subproject commit 02967b3019032399364a9fc134f7bb0e674c099a diff --git a/phar.php b/phar.php index 84dd3132..e483d944 100644 --- a/phar.php +++ b/phar.php @@ -61,10 +61,10 @@ function ___install_madeline() $release_branch = ''; } $release_fallback_branch = ''; - if (isset($_SERVER['SERVER_ADMIN']) && strpos($_SERVER['SERVER_ADMIN'], '000webhost.io') && $custom_branch === null) { + /*if (isset($_SERVER['SERVER_ADMIN']) && strpos($_SERVER['SERVER_ADMIN'], '000webhost.io') && $custom_branch === null) { $release_branch = '-deprecated'; $release_fallback_branch = '-deprecated'; - } + }*/ if (PHP_MAJOR_VERSION <= 5) { $release_branch = '5'.$release_branch; diff --git a/src/danog/MadelineProto/API.php b/src/danog/MadelineProto/API.php index 3bd7b931..fb76a4c0 100644 --- a/src/danog/MadelineProto/API.php +++ b/src/danog/MadelineProto/API.php @@ -20,6 +20,10 @@ namespace danog\MadelineProto; use Amp\Deferred; +use function Amp\File\put; +use function Amp\File\rename; +use function Amp\File\get; +use function Amp\File\exists; class API extends APIFactory { @@ -62,21 +66,15 @@ class API extends APIFactory $realpaths = Serialization::realpaths($params); $this->session = $realpaths['file']; - if (file_exists($realpaths['file'])) { - if (!file_exists($realpaths['lockfile'])) { - touch($realpaths['lockfile']); - clearstatcache(); - } - $realpaths['lockfile'] = fopen($realpaths['lockfile'], 'r'); - \danog\MadelineProto\Logger::log('Waiting for shared lock of serialization lockfile...'); - flock($realpaths['lockfile'], LOCK_SH); - \danog\MadelineProto\Logger::log('Shared lock acquired, deserializing...'); + if (yield exists($realpaths['file'])) { + Logger::log('Waiting for shared lock of serialization lockfile...'); + $unlock = yield Tools::flock($realpaths['lockfile'], LOCK_SH); + Logger::log('Shared lock acquired, deserializing...'); try { - $tounserialize = file_get_contents($realpaths['file']); + $tounserialize = yield get($realpaths['file']); } finally { - flock($realpaths['lockfile'], LOCK_UN); - fclose($realpaths['lockfile']); + $unlock(); } \danog\MadelineProto\Magic::class_exists(); @@ -137,11 +135,11 @@ class API extends APIFactory $deferred->resolve(); yield $this->API->initAsync(); $this->APIFactory(); - \danog\MadelineProto\Logger::log('Ping...', Logger::ULTRA_VERBOSE); + Logger::log('Ping...', Logger::ULTRA_VERBOSE); $this->asyncInitPromise = null; $pong = yield $this->ping(['ping_id' => 3], ['async' => true]); - \danog\MadelineProto\Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE); - \danog\MadelineProto\Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE); + Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE); + Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE); return; } @@ -158,14 +156,14 @@ class API extends APIFactory $this->API = new MTProto($params); $this->APIFactory(); $deferred->resolve(); - \danog\MadelineProto\Logger::log(\danog\MadelineProto\Lang::$current_lang['apifactory_start'], Logger::VERBOSE); + Logger::log(\danog\MadelineProto\Lang::$current_lang['apifactory_start'], Logger::VERBOSE); yield $this->API->initAsync(); $this->APIFactory(); $this->asyncInitPromise = null; - \danog\MadelineProto\Logger::log('Ping...', Logger::ULTRA_VERBOSE); + Logger::log('Ping...', Logger::ULTRA_VERBOSE); $pong = yield $this->ping(['ping_id' => 3], ['async' => true]); - \danog\MadelineProto\Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE); - \danog\MadelineProto\Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE); + Logger::log('Pong: '.$pong['ping_id'], Logger::ULTRA_VERBOSE); + Logger::log(\danog\MadelineProto\Lang::$current_lang['madelineproto_ready'], Logger::NOTICE); } public function async($async) @@ -312,14 +310,11 @@ class API extends APIFactory } $this->serialized = time(); $realpaths = Serialization::realpaths($filename); - if (!file_exists($realpaths['lockfile'])) { - touch($realpaths['lockfile']); - clearstatcache(); - } - $realpaths['lockfile'] = fopen($realpaths['lockfile'], 'w'); - \danog\MadelineProto\Logger::log('Waiting for exclusive lock of serialization lockfile...'); - flock($realpaths['lockfile'], LOCK_EX); - \danog\MadelineProto\Logger::log('Lock acquired, serializing'); + Logger::log('Waiting for exclusive lock of serialization lockfile...'); + + $unlock = yield Tools::flock($realpaths['lockfile'], LOCK_EX); + + Logger::log('Lock acquired, serializing'); try { if (!$this->getting_api_id) { @@ -332,17 +327,16 @@ class API extends APIFactory $this->API->settings['logger']['logger_param'] = [$this->API, 'noop']; } } - $wrote = file_put_contents($realpaths['tempfile'], serialize($this)); - rename($realpaths['tempfile'], $realpaths['file']); + $wrote = yield put($realpaths['tempfile'], serialize($this)); + yield rename($realpaths['tempfile'], $realpaths['file']); } finally { if (!$this->getting_api_id) { $this->API->settings['updates']['callback'] = $update_closure; $this->API->settings['logger']['logger_param'] = $logger_closure; } - flock($realpaths['lockfile'], LOCK_UN); - fclose($realpaths['lockfile']); + $unlock(); } - \danog\MadelineProto\Logger::log('Done serializing'); + Logger::log('Done serializing'); return $wrote; })()); diff --git a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php index f124b1eb..9d8461d5 100644 --- a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php @@ -26,6 +26,7 @@ use danog\MadelineProto\MTProtoTools\Crypt; use danog\MadelineProto\NothingInTheSocketException; use danog\MadelineProto\Tools; use Amp\ByteStream\StreamException; +use Amp\ByteStream\PendingReadError; /** * Socket read loop. @@ -57,7 +58,7 @@ class ReadLoop extends SignalLoop while (true) { try { $error = yield $this->waitSignal($this->readMessage()); - } catch (NothingInTheSocketException|StreamException $e) { + } catch (NothingInTheSocketException|StreamException|PendingReadError $e) { if (isset($connection->old)) { return; } @@ -65,10 +66,6 @@ class ReadLoop extends SignalLoop $API->logger->logger("Got nothing in the socket in DC {$datacenter}, reconnecting...", Logger::ERROR); yield $connection->reconnect(); continue; - } catch (ClosedException $e) { - $API->logger->logger($e->getMessage(), Logger::FATAL_ERROR); - - throw $e; } if (is_int($error)) { diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index 010b5262..431a1c7d 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -73,7 +73,7 @@ class MTProto extends AsyncConstruct implements TLCallback /* const V = 71; */ - const V = 124; + const V = 125; const RELEASE = '4.0'; const NOT_LOGGED_IN = 0; const WAITING_CODE = 1; @@ -801,9 +801,11 @@ class MTProto extends AsyncConstruct implements TLCallback ], 'upload' => [ 'allow_automatic_upload' => true, 'part_size' => 512 * 1024, + 'parallel_chunks' => 20 ], 'download' => [ 'report_broken_media' => true, 'part_size' => 1024 * 1024, + 'parallel_chunks' => 20 ], 'pwr' => [ 'pwr' => false, // Need info ? diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 325a6d41..258c4430 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -19,11 +19,20 @@ namespace danog\MadelineProto\MTProtoTools; +use Amp\ByteStream\OutputStream; +use Amp\ByteStream\ResourceOutputStream; +use Amp\ByteStream\StreamException; use danog\MadelineProto\Async\AsyncParameters; use danog\MadelineProto\Exception; +use danog\MadelineProto\FileCallbackInterface; use danog\MadelineProto\Logger; use danog\MadelineProto\RPCErrorException; +use function Amp\File\open; +use function Amp\File\stat; use function Amp\Promise\all; +use Amp\Deferred; +use Amp\Success; +use Amp\File\StatCache; /** * Manages upload and download of files. @@ -33,7 +42,7 @@ trait Files public function upload_async($file, $file_name = '', $cb = null, $encrypted = false): \Generator { if (is_object($file)) { - if (!isset(class_implements($file)['danog\MadelineProto\FileCallbackInterface'])) { + if (!$file instanceof FileCallbackInterface) { throw new \danog\MadelineProto\Exception('Provided object does not implement FileCallbackInterface'); } $cb = $file; @@ -511,7 +520,7 @@ trait Files public function download_to_dir_async($message_media, $dir, $cb = null) { - if (is_object($dir) && class_implements($dir)['danog\MadelineProto\FileCallbackInterface']) { + if (is_object($dir) && $dir instanceof FileCallbackInterface) { $cb = $dir; $dir = $dir->getFile(); } @@ -523,7 +532,7 @@ trait Files public function download_to_file_async($message_media, $file, $cb = null) { - if (is_object($file) && class_implements($file)['danog\MadelineProto\FileCallbackInterface']) { + if (is_object($file) && $file instanceof FileCallbackInterface) { $cb = $file; $file = $file->getFile(); } @@ -533,22 +542,21 @@ trait Files } $file = realpath($file); $message_media = yield $this->get_download_info_async($message_media); - $stream = fopen($file, 'r+b'); - $size = fstat($stream)['size']; + + StatCache::clear($file); + + $size = (yield stat($file))['size']; + $stream = yield open($file, 'cb'); + $this->logger->logger('Waiting for lock of file to download...'); - do { - $res = flock($stream, LOCK_EX|LOCK_NB); - if (!$res) { - yield $this->sleep(0.1); - } - } while (!$res); + $unlock = yield $this->flock($file, LOCK_EX); try { yield $this->download_to_stream_async($message_media, $stream, $cb, $size, -1); } finally { - flock($stream, LOCK_UN); - fclose($stream); - clearstatcache(); + $unlock(); + yield $stream->close(); + StatCache::clear($file); } return $file; @@ -556,7 +564,9 @@ trait Files public function download_to_stream_async($message_media, $stream, $cb = null, $offset = 0, $end = -1) { - if (is_object($stream) && class_implements($stream)['danog\MadelineProto\FileCallbackInterface']) { + $message_media = yield $this->get_download_info_async($message_media); + + if (is_object($stream) && $stream instanceof FileCallbackInterface) { $cb = $stream; $stream = $stream->getFile(); } @@ -567,25 +577,34 @@ trait Files }; } - $message_media = yield $this->get_download_info_async($message_media); - - try { - if (stream_get_meta_data($stream)['seekable']) { - fseek($stream, $offset); - } - } catch (\danog\MadelineProto\Exception $e) { + /** @var $stream \Amp\ByteStream\OutputStream */ + if (!is_object($stream)) { + $stream = new ResourceOutputStream($stream); } - $downloaded_size = 0; + if (!$stream instanceof OutputStream) { + throw new Exception("Invalid stream provided"); + } + $seekable = false; + if (method_exists($stream, 'seek')) { + try { + yield $stream->seek($offset); + $seekable = true; + } catch (StreamException $e) { + } + } + if ($end === -1 && isset($message_media['size'])) { $end = $message_media['size']; } - $size = $end - $offset; + $part_size = $this->settings['download']['part_size']; - $percent = 0; + $parallel_chunks = $this->settings['download']['parallel_chunks']; + $datacenter = isset($message_media['InputFileLocation']['dc_id']) ? $message_media['InputFileLocation']['dc_id'] : $this->settings['connection_settings']['default_dc']; if (isset($this->datacenter->sockets[$datacenter.'_media'])) { $datacenter .= '_media'; } + if (isset($message_media['key'])) { $digest = hash('md5', $message_media['key'].$message_media['iv'], true); $fingerprint = $this->unpack_signed_int(substr($digest, 0, 4) ^ substr($digest, 4, 4)); @@ -597,44 +616,124 @@ trait Files $ige->setKey($message_media['key']); $ige->enableContinuousBuffer(); } - $theend = false; - $cdn = false; - while (true) { - if ($start_at = $offset % $part_size) { - $offset -= $start_at; + + if ($offset === $end) { + $cb(100); + return true; + } + $params = []; + $start_at = $offset % $part_size; + $probable_end = $end !== -1 ? $end : 512 * 1024 * 3000; + + $breakOut = false; + for ($x = $offset - $start_at; $x < $probable_end; $x += $part_size) { + $end_at = $part_size; + if ($end !== -1 && $x + $part_size >= $end) { + $end_at = $end % $part_size; + $breakOut = true; } + $params[] = [ + 'offset' => $x, + 'limit' => $part_size, + 'part_start_at' => $start_at, + 'part_end_at' => $end_at, + ]; + + $start_at = 0; + if ($breakOut) { + break; + } + } + + if (!$params) { + $cb(100); + return true; + } + $count = count($params); + $cb = function () use ($cb, $count) { + static $cur = 0; + $cur++; + $this->callFork($cb($cur*100/$count)); + }; + + $cdn = false; + + $params[0]['previous_promise'] = new Success(true); + + $start = microtime(true); + $size = yield $this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, array_shift($params), $stream, $seekable); + + if ($params) { + $previous_promise = new Success(true); + + $promises = []; + foreach ($params as $key => $param) { + $param['previous_promise'] = $previous_promise; + $previous_promise = $this->call($this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, $param, $stream, $seekable)); + $previous_promise->onResolve(static function ($e, $res) use (&$size) { + if ($res) { + $size += $res; + } + }); + $promises []= $previous_promise; + + if (!($key % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps + yield $this->all($promises); + $promises = []; + + $time = microtime(true) - $start; + $speed = (int) (($size*8)/$time)/1000000; + $this->logger->logger("Partial download time: $time"); + $this->logger->logger("Partial download speed: $speed mbps"); + } + } + if ($promises) { + yield $this->all($promises); + } + } + $time = microtime(true) - $start; + $speed = (int) (($size*8)/$time)/1000000; + $this->logger->logger("Total download time: $time"); + $this->logger->logger("Total download speed: $speed mbps"); + + if ($cdn) { + $this->clear_cdn_hashes($message_media['file_token']); + } + + return true; + } + + private function download_part(&$message_media, &$cdn, &$datacenter, &$old_dc, &$ige, $cb, $offset, $stream, $seekable, $postpone = false) + { + static $method = [ + false => 'upload.getFile', // non-cdn + true => 'upload.getCdnFile', // cdn + ]; + do { + if (!$cdn) { + $basic_param = [ + 'location' => $message_media['InputFileLocation'] + ]; + } else { + $basic_param = [ + 'file_token' => $message_media['file_token'] + ]; + } + try { - $res = $cdn ? - yield $this->method_call_async_read( - 'upload.getCdnFile', - [ - 'file_token' => $message_media['file_token'], - 'offset' => $offset, - 'limit' => $part_size - ], - [ - 'heavy' => true, - 'file' => true, - 'FloodWaitLimit' => 0, - 'datacenter' => $datacenter - ] - ) : - yield $this->method_call_async_read( - 'upload.getFile', - [ - 'location' => $message_media['InputFileLocation'], - 'offset' => $offset, - 'limit' => $part_size - ], - [ - 'heavy' => true, - 'file' => true, - 'FloodWaitLimit' => 0, - 'datacenter' => &$datacenter - ] - ); + $res = yield $this->method_call_async_read( + $method[$cdn], + $basic_param + $offset, + [ + 'heavy' => true, + 'file' => true, + 'FloodWaitLimit' => 0, + 'datacenter' => &$datacenter, + 'postpone' => $postpone + ] + ); } catch (\danog\MadelineProto\RPCErrorException $e) { if (strpos($e->rpc, 'FLOOD_WAIT_') === 0) { if (isset($message_media['MessageMedia']) && !$this->authorization['user']['bot'] && $this->settings['download']['report_broken_media']) { @@ -657,6 +756,7 @@ trait Files throw $e; } } + if ($res['_'] === 'upload.fileCdnRedirect') { $cdn = true; $message_media['file_token'] = $res['file_token']; @@ -669,9 +769,7 @@ trait Files yield $this->get_config_async([], ['datacenter' => $this->datacenter->curdc]); } $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['stored_on_cdn'], \danog\MadelineProto\Logger::NOTICE); - continue; - } - if ($res['_'] === 'upload.cdnFileReuploadNeeded') { + } else if ($res['_'] === 'upload.cdnFileReuploadNeeded') { $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['cdn_reupload'], \danog\MadelineProto\Logger::NOTICE); yield $this->get_config_async([], ['datacenter' => $this->datacenter->curdc]); @@ -690,51 +788,37 @@ trait Files continue; } if ($cdn === false && $res['type']['_'] === 'storage.fileUnknown' && $res['bytes'] === '') { - $datacenter = 1; + $datacenter = 0; } - while ($cdn === false && $res['type']['_'] === 'storage.fileUnknown' && $res['bytes'] === '') { - $res = yield $this->method_call_async_read('upload.getFile', ['location' => $message_media['InputFileLocation'], 'offset' => $offset, 'limit' => $part_size], ['heavy' => true, 'datacenter' => $datacenter]); - $datacenter++; - if (!isset($this->datacenter->sockets[$datacenter])) { - break; - } + while ($cdn === false && + $res['type']['_'] === 'storage.fileUnknown' && + $res['bytes'] === '' && + isset($this->datacenter->sockets[++$datacenter]) + ) { + $res = yield $this->method_call_async_read('upload.getFile', $basic_param + $offset, ['heavy' => true, 'file' => true, 'FloodWaitLimit' => 0, 'datacenter' => $datacenter]); } + if (isset($message_media['cdn_key'])) { - $ivec = substr($message_media['cdn_iv'], 0, 12).pack('N', $offset >> 4); + $ivec = substr($message_media['cdn_iv'], 0, 12).pack('N', $offset['offset'] >> 4); $res['bytes'] = $this->ctr_encrypt($res['bytes'], $message_media['cdn_key'], $ivec); - $this->check_cdn_hash($message_media['file_token'], $offset, $res['bytes'], $old_dc); + $this->check_cdn_hash($message_media['file_token'], $offset['offset'], $res['bytes'], $old_dc); } if (isset($message_media['key'])) { $res['bytes'] = $ige->decrypt($res['bytes']); } - if ($start_at) { - $res['bytes'] = substr($res['bytes'], $start_at); + if ($offset['part_start_at'] || $offset['part_end_at'] !== $offset['limit']) { + $res['bytes'] = substr($res['bytes'], $offset['part_start_at'], $offset['part_end_at']- $offset['part_start_at']); } - if ($end !== -1 && strlen($res['bytes']) + $downloaded_size >= $size) { - $res['bytes'] = substr($res['bytes'], 0, $size - $downloaded_size); - $theend = true; + + if (!$seekable) { + yield $offset['previous_promise']->promise(); + } else { + yield $stream->seek($offset['offset'] + $offset['part_start_at']); } - if ($res['bytes'] === '') { - break; - } - $offset += strlen($res['bytes']); - $downloaded_size += strlen($res['bytes']); - $this->logger->logger(fwrite($stream, $res['bytes']), \danog\MadelineProto\Logger::ULTRA_VERBOSE); - if ($theend) { - break; - } - if ($end !== -1) { - $this->callFork($cb($percent = $downloaded_size * 100 / $size)); - } - } - if ($end === -1) { - $this->callFork($cb(100)); - } - if ($cdn) { - $this->clear_cdn_hashes($message_media['file_token']); - } - - return true; + yield $stream->write($res['bytes']); + $cb(); + return strlen($res['bytes']); + } while (true); } private $cdn_hashes = []; diff --git a/src/danog/MadelineProto/Tools.php b/src/danog/MadelineProto/Tools.php index 74e85123..d4a9ca92 100644 --- a/src/danog/MadelineProto/Tools.php +++ b/src/danog/MadelineProto/Tools.php @@ -32,6 +32,9 @@ use function Amp\Promise\wait; use function Amp\ByteStream\getStdin; use function Amp\ByteStream\getStdout; use function Amp\ByteStream\getOutputBufferStream; +use function Amp\File\exists; +use function Amp\File\touch; +use Amp\File\StatCache; /** * Some tools. @@ -344,7 +347,42 @@ trait Tools return $deferred->promise(); } + /** + * Asynchronously lock a file + * Resolves with a callbable that MUST eventually be called in order to release the lock + * + * @param string $file File to lock + * @param integer $operation Locking mode (see flock) + * @param numeric $polling Polling interval for lock + * @return Promise + */ + public static function flock(string $file, int $operation, $polling = 0.1): Promise + { + return self::call(self::flockAsync($file, $operation, $polling)); + } + public static function flockAsync(string $file, int $operation, $polling) + { + if (!yield exists($file)) { + yield touch($file); + StatCache::clear($file); + } + $operation |= LOCK_NB; + $res = fopen($file, 'c'); + do { + $result = flock($res, $operation); + if (!$result) { + yield self::sleep($polling); + } + } while (!$result); + return static function () use (&$res) { + if ($res) { + flock($res, LOCK_UN); + fclose($res); + $res = null; + } + }; + } public static function sleep($time) { return new \Amp\Delayed($time * 1000); From 3886305284a5ef66d07435c5b05ac901884306c8 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Mon, 24 Jun 2019 15:14:09 +0200 Subject: [PATCH 02/17] Start rewriting upload --- .../MadelineProto/MTProtoTools/Files.php | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index c943149e..f5f21197 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -49,8 +49,6 @@ trait Files $file = $file->getFile(); } - $t = microtime(true); - $file = \danog\MadelineProto\Absolute::absolute($file); if (!file_exists($file)) { throw new \danog\MadelineProto\Exception(\danog\MadelineProto\Lang::$current_lang['file_not_exist']); @@ -71,18 +69,31 @@ trait Files $this->logger->logger('Upload status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE); }; } - $part_size = $this->settings['upload']['part_size']; - $part_total_num = (int) ceil($file_size / $part_size); - $part_num = 0; - $method = $file_size > 10 * 1024 * 1024 ? 'upload.saveBigFilePart' : 'upload.saveFilePart'; - $constructor = 'input'.($encrypted === true ? 'Encrypted' : '').($file_size > 10 * 1024 * 1024 ? 'FileBig' : 'File').($encrypted === true ? 'Uploaded' : ''); - $file_id = $this->random(8); + $f = fopen($file, 'r'); $seekable = stream_get_meta_data($f)['seekable']; if ($seekable) { fseek($f, 0); } + } + public function upload_from_callable($callable, $size, $file_name = '', $cb = null, $encrypted = false) + { + if (is_object($callable) && $callable instanceof FileCallbackInterface) { + $cb = $callable; + $callable = $callable->getFile(); + } + if (!is_callable($callable)) { + throw new Exception('Invalid callable provided'); + } + + $part_size = $this->settings['upload']['part_size']; + $part_total_num = (int) ceil($size / $part_size); + $part_num = 0; + $method = $size > 10 * 1024 * 1024 ? 'upload.saveBigFilePart' : 'upload.saveFilePart'; + $constructor = 'input'.($encrypted === true ? 'Encrypted' : '').($size > 10 * 1024 * 1024 ? 'FileBig' : 'File').($encrypted === true ? 'Uploaded' : ''); + $file_id = $this->random(8); + $ige = null; if ($encrypted === true) { $key = $this->random(32); @@ -93,6 +104,7 @@ trait Files $ige->setIV($iv); $ige->setKey($key); $ige->enableContinuousBuffer(); + $parallelize = false; } $ctx = hash_init('md5'); $promises = []; @@ -706,6 +718,10 @@ trait Files $callable = $callable->getFile(); } + if (!is_callable($callable)) { + throw new Exception('Wrong callable provided'); + } + if ($end === -1 && isset($message_media['size'])) { $end = $message_media['size']; } @@ -728,6 +744,7 @@ trait Files $ige->setIV($message_media['iv']); $ige->setKey($message_media['key']); $ige->enableContinuousBuffer(); + $parallelize = false; } if ($offset === $end) { @@ -741,7 +758,7 @@ trait Files $breakOut = false; for ($x = $offset - $start_at; $x < $probable_end; $x += $part_size) { $end_at = $part_size; - + if ($end !== -1 && $x + $part_size > $end) { $end_at = ($x + $part_size) - $end; $breakOut = true; From 7dbf529bb16ec1d06f2f62c9fc43f41c676a533a Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Mon, 24 Jun 2019 18:55:37 +0200 Subject: [PATCH 03/17] Implement parallelized upload --- .../MadelineProto/MTProtoTools/Files.php | 269 ++++++++++++++---- .../Stream/Common/BufferedRawStream.php | 5 +- .../Stream/Transport/DefaultStream.php | 2 +- 3 files changed, 219 insertions(+), 57 deletions(-) diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index f5f21197..5f85ab9f 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -19,9 +19,12 @@ namespace danog\MadelineProto\MTProtoTools; +use Amp\ByteStream\InputStream; use Amp\ByteStream\OutputStream; use Amp\ByteStream\ResourceOutputStream; use Amp\ByteStream\StreamException; +use Amp\Deferred; +use Amp\File\Handle; use Amp\File\StatCache; use Amp\Promise; use Amp\Success; @@ -30,8 +33,13 @@ use danog\MadelineProto\Exception; use danog\MadelineProto\FileCallbackInterface; use danog\MadelineProto\Logger; use danog\MadelineProto\RPCErrorException; +use danog\MadelineProto\Stream\Common\BufferedRawStream; +use danog\MadelineProto\Stream\ConnectionContext; +use danog\MadelineProto\Stream\Transport\PremadeStream; +use function Amp\File\exists; use function Amp\File\open; use function Amp\File\stat; +use function Amp\File\touch; use function Amp\Promise\all; /** @@ -39,45 +47,112 @@ use function Amp\Promise\all; */ trait Files { - public function upload_async($file, $file_name = '', $cb = null, $encrypted = false): \Generator + public function upload_async($file, $file_name = '', $cb = null, $encrypted = false) { - if (is_object($file)) { - if (!$file instanceof FileCallbackInterface) { - throw new \danog\MadelineProto\Exception('Provided object does not implement FileCallbackInterface'); - } + if (is_object($file) && $file instanceof FileCallbackInterface) { $cb = $file; $file = $file->getFile(); } + if (is_string($file) || (is_object($file) && method_exists($file, '__toString'))) { + if (filter_var($file, FILTER_VALIDATE_URL)) { + return yield $this->upload_from_url_async($file); + } + } else if (is_array($file)) { + return yield $this->upload_from_tgfile($file, $cb, $encrypted); + } $file = \danog\MadelineProto\Absolute::absolute($file); - if (!file_exists($file)) { + if (!yield exists($file)) { throw new \danog\MadelineProto\Exception(\danog\MadelineProto\Lang::$current_lang['file_not_exist']); } if (empty($file_name)) { $file_name = basename($file); } - $datacenter = $this->settings['connection_settings']['default_dc']; - if (isset($this->datacenter->sockets[$datacenter.'_media'])) { - $datacenter .= '_media'; - } - $file_size = filesize($file); - if ($file_size > 512 * 1024 * 3000) { + + StatCache::clear($file); + + $size = (yield stat($file))['size']; + if ($size > 512 * 1024 * 3000) { throw new \danog\MadelineProto\Exception('Given file is too big!'); } - if ($cb === null) { - $cb = function ($percent) { - $this->logger->logger('Upload status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE); + + $stream = yield open($file, 'rb'); + $mime = $this->get_mime_from_file($file); + + try { + return yield $this->upload_from_stream_async($stream, $size, $stream, $mime, $cb, $encrypted); + } finally { + yield $stream->close(); + } + } + public function upload_from_url_async($url, int $size = 0, string $file_name = '', $cb = null, bool $encrypted = false) + { + if (is_object($url) && $url instanceof FileCallbackInterface) { + $cb = $url; + $url = $url->getFile(); + } + /** @var $response \Amp\Artax\Response */ + $response = yield $this->datacenter->getHTTPClient()->request($url); + if (200 !== $status = $response->getStatusCode) { + throw new Exception("Wrong status code: $status"); + } + $mime = trim(explode(';', $response->getHeader('content-type') ?? 'application/octet-stream')[0]); + $size = $response->getHeader('content-length') ?? $size; + if (!$size) { + throw new Exception('Wrong size'); + } + + $body = $response->getBody(); + + return yield $this->upload_from_stream_async($body, $size, $mime, $file_name, $cb, $encrypted); + } + public function upload_from_stream_async($stream, int $size, string $mime, string $file_name = '', $cb = null, bool $encrypted = false) + { + if (is_object($stream) && $stream instanceof FileCallbackInterface) { + $cb = $stream; + $stream = $stream->getFile(); + } + + /** @var $stream \Amp\ByteStream\OutputStream */ + if (!is_object($stream)) { + $stream = new ResourceOutputStream($stream); + } + if (!$stream instanceof InputStream) { + throw new Exception("Invalid stream provided"); + } + $seekable = false; + if (method_exists($stream, 'seek')) { + try { + yield $stream->seek(0); + $seekable = true; + } catch (StreamException $e) { + } + } + + if ($stream instanceof Handle || $stream instanceof BufferedRawStream) { + $callable = static function (int $offset, int $size) use ($stream, $seekable) { + if ($seekable) { + while ($stream->tell() !== $offset) { + yield $stream->seek($offset); + } + } + return yield $stream->read($size); + }; + } else { + $ctx = (new ConnectionContext) + ->addStream(PremadeStream::getName(), $stream) + ->addStream(BufferedRawStream); + + $stream = yield $ctx->getStream(); + + $callable = static function (int $offset, int $size) use ($stream) { + return yield $stream->read($size); }; } - $f = fopen($file, 'r'); - - $seekable = stream_get_meta_data($f)['seekable']; - if ($seekable) { - fseek($f, 0); - } + return yield $this->upload_from_callable_async($callable, $size, $mime, $file_name, $cb, $seekable, $encrypted); } - public function upload_from_callable($callable, $size, $file_name = '', $cb = null, $encrypted = false) + public function upload_from_callable_async($callable, int $size, string $mime, string $file_name = '', $cb = null, bool $refetchable = true, bool $encrypted = false) { if (is_object($callable) && $callable instanceof FileCallbackInterface) { $cb = $callable; @@ -86,8 +161,20 @@ trait Files if (!is_callable($callable)) { throw new Exception('Invalid callable provided'); } + if ($cb === null) { + $cb = function ($percent) { + $this->logger->logger('Upload status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE); + }; + } + + $datacenter = $this->settings['connection_settings']['default_dc']; + if (isset($this->datacenter->sockets[$datacenter.'_media'])) { + $datacenter .= '_media'; + } $part_size = $this->settings['upload']['part_size']; + $parallel_chunks = $this->settings['upload']['parallel_chunks'] ? $this->settings['upload']['parallel_chunks'] : 3000; + $part_total_num = (int) ceil($size / $part_size); $part_num = 0; $method = $size > 10 * 1024 * 1024 ? 'upload.saveBigFilePart' : 'upload.saveFilePart'; @@ -104,40 +191,64 @@ trait Files $ige->setIV($iv); $ige->setKey($key); $ige->enableContinuousBuffer(); - $parallelize = false; + $refetchable = false; } $ctx = hash_init('md5'); $promises = []; - $cur_part_num = 0; + $cb = function () use ($cb, $part_total_num) { + static $cur = 0; + $cur++; + $this->callFork($cb($cur * 100 / $part_total_num)); + }; + + $start = microtime(true); while ($part_num < $part_total_num) { - $t = microtime(true); $read_deferred = yield $this->method_call_async_write( $method, new AsyncParameters( - static function () use ($file_id, $part_num, $part_total_num, $part_size, $f, $ctx, $ige, $seekable) { - if ($seekable) { - fseek($f, $part_num * $part_size); - } elseif (ftell($f) !== $part_num * $part_size) { - throw new \danog\MadelineProto\Exception('Wrong position!'); - } + static function () use ($file_id, $part_num, $part_total_num, $part_size, $callable, $ctx, $ige) { + static $fetched = false; + $already_fetched = $fetched; + $fetched = true; - $bytes = stream_get_contents($f, $part_size); + $bytes = yield $callable($part_num * $part_size, $part_size); + if (!$already_fetched) { + hash_update($ctx, $bytes); + } if ($ige) { $bytes = $ige->encrypt(str_pad($bytes, $part_size, chr(0))); } - hash_update($ctx, $bytes); return ['file_id' => $file_id, 'file_part' => $part_num, 'file_total_parts' => $part_total_num, 'bytes' => $bytes]; }, - $seekable + $refetchable ), - ['heavy' => true, 'file' => true, 'datacenter' => $datacenter] + ['heavy' => true, 'file' => true, 'datacenter' => &$datacenter] ); - $this->callFork($cb(ftell($f) * 100 / $file_size)); - $this->logger->logger('Speed for chunk: '.(($part_size * 8 / 1000000) / (microtime(true) - $t))); + $read_deferred->onResolve(static function ($e, $res) use ($cb) { + if ($res) { + $cb(); + } + }); + $part_num++; $promises[] = $read_deferred->promise(); + + if (!($part_num % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps (run the code in this if every second) + $result = yield $this->all($promises); + foreach ($result as $kkey => $result) { + if (!$result) { + throw new \danog\MadelineProto\Exception('Upload of part '.$kkey.' failed'); + } + } + $promises = []; + + $time = microtime(true) - $start; + $speed = (int) (($size * 8) / $time) / 1000000; + $this->logger->logger("Partial download time: $time"); + $this->logger->logger("Partial download speed: $speed mbps"); + } } $result = yield all($promises); @@ -146,18 +257,18 @@ trait Files throw new \danog\MadelineProto\Exception('Upload of part '.$kkey.' failed'); } } + $time = microtime(true) - $start; + $speed = (int) (($size * 8) / $time) / 1000000; + $this->logger->logger("Total download time: $time"); + $this->logger->logger("Total download speed: $speed mbps"); - $constructor = ['_' => $constructor, 'id' => $file_id, 'parts' => $part_total_num, 'name' => $file_name, 'mime_type' => $this->get_mime_from_file($file)]; + $constructor = ['_' => $constructor, 'id' => $file_id, 'parts' => $part_total_num, 'name' => $file_name, 'mime_type' => $mime]; if ($encrypted === true) { $constructor['key_fingerprint'] = $fingerprint; $constructor['key'] = $key; $constructor['iv'] = $iv; } - - fclose($f); - clearstatcache(); - - $this->logger->logger('Speed: '.(($file_size * 8) / (microtime(true) - $t) / 1000000)); + $constructor['md5_checksum'] = hash_final($ctx); return $constructor; } @@ -167,7 +278,56 @@ trait Files return $this->upload_async($file, $file_name, $cb, true); } - public function gen_all_file_async($media, $regenerate) + public function upload_from_tgfile_async($media, $cb = null, $encrypted = false) + { + if (is_object($media) && $media instanceof FileCallbackInterface) { + $cb = $media; + $media = $media->getFile(); + } + $media = yield $this->get_download_info_async($media); + if (!isset($media['size'], $media['mime'])) { + throw new Exception('Wrong file provided!'); + } + $size = $media['size']; + $mime = $media['mime']; + + $bridge = new class + { + private $done = []; + private $pending = []; + public function write(string $data, int $offset) + { + if (isset($this->pending[$offset])) { + $promise = $this->pending[$offset]; + unset($this->pending[$offset]); + $promise->resolve($data); + } else { + $this->done[$offset] = $data; + } + } + public function read(int $offset, int $size) + { + if (isset($this->done[$offset])) { + if (strlen($this->done[$offset]) > $size) { + throw new Exception('Wrong size!'); + } + $result = $this->done[$offset]; + unset($this->done[$offset]); + return $result; + } + $this->pending[$offset] = new Deferred; + return $this->pending[$offset]->promise(); + } + }; + $reader = [$bridge, 'read']; + $writer = [$bridge, 'write']; + yield $this->all([ + $this->download_to_callable_async($media, $writer, $cb), + $this->upload_from_callable_async($reader, $size, $mime, '', $cb, false, $encrypted), + ]); + } + + public function gen_all_file_async($media) { $res = [$this->constructors->find_by_predicate($media['_'])['type'] => $media]; switch ($media['_']) { @@ -260,7 +420,7 @@ trait Files return $res; } - public function get_file_info_async($constructor, $regenerate = false) + public function get_file_info_async($constructor) { if (is_string($constructor)) { $constructor = $this->unpack_file_id($constructor)['MessageMedia']; @@ -276,7 +436,7 @@ trait Files $constructor = $constructor['media']; } - return yield $this->gen_all_file_async($constructor, $regenerate); + return yield $this->gen_all_file_async($constructor); } public function get_propic_info_async($data) { @@ -644,8 +804,8 @@ trait Files $file = $file->getFile(); } $file = \danog\MadelineProto\Absolute::absolute(preg_replace('|/+|', '/', $file)); - if (!file_exists($file)) { - touch($file); + if (!yield exists($file)) { + yield touch($file); } $file = realpath($file); $message_media = yield $this->get_download_info_async($message_media); @@ -677,12 +837,6 @@ trait Files $stream = $stream->getFile(); } - if ($cb === null) { - $cb = function ($percent) { - $this->logger->logger('Download status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE); - }; - } - /** @var $stream \Amp\ByteStream\OutputStream */ if (!is_object($stream)) { $stream = new ResourceOutputStream($stream); @@ -721,13 +875,18 @@ trait Files if (!is_callable($callable)) { throw new Exception('Wrong callable provided'); } + if ($cb === null) { + $cb = function ($percent) { + $this->logger->logger('Download status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE); + }; + } if ($end === -1 && isset($message_media['size'])) { $end = $message_media['size']; } $part_size = $this->settings['download']['part_size']; - $parallel_chunks = $this->settings['download']['parallel_chunks']; + $parallel_chunks = $this->settings['download']['parallel_chunks'] ? $this->settings['download']['parallel_chunks'] : 3000; $datacenter = isset($message_media['InputFileLocation']['dc_id']) ? $message_media['InputFileLocation']['dc_id'] : $this->settings['connection_settings']['default_dc']; if (isset($this->datacenter->sockets[$datacenter.'_media'])) { diff --git a/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php b/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php index 129380e9..a25a983b 100644 --- a/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php +++ b/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php @@ -24,13 +24,16 @@ use danog\MadelineProto\Exception; use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\ConnectionContext; use function Amp\Socket\connect; +use danog\MadelineProto\Stream\BufferedStreamInterface; +use danog\MadelineProto\Stream\BufferInterface; +use danog\MadelineProto\Stream\RawStreamInterface; /** * Buffered raw stream. * * @author Daniil Gentili */ -class BufferedRawStream implements \danog\MadelineProto\Stream\BufferedStreamInterface, \danog\MadelineProto\Stream\BufferInterface, \danog\MadelineProto\Stream\RawStreamInterface +class BufferedRawStream implements BufferedStreamInterface, BufferInterface, RawStreamInterface { use RawStream; diff --git a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php index 9b4a3fdb..a01fe5b8 100644 --- a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php +++ b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php @@ -47,7 +47,7 @@ class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInt public function enableCrypto(ClientTlsContext $tlsContext = null): \Amp\Promise { - return $this->enableCrypto($tlsContext); + return $this->stream->enableCrypto($tlsContext); } public function getStream() From 83d9c2e1a0edbb721e68c29ac15870719b66da51 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Mon, 24 Jun 2019 20:03:15 +0200 Subject: [PATCH 04/17] Finish implementing new upload --- bot.php | 13 +++++++ .../MadelineProto/MTProtoTools/Files.php | 38 ++++++++++++------- src/danog/MadelineProto/TL/TL.php | 19 +++++++++- 3 files changed, 55 insertions(+), 15 deletions(-) diff --git a/bot.php b/bot.php index 33cbc70b..195eba78 100755 --- a/bot.php +++ b/bot.php @@ -46,6 +46,19 @@ class EventHandler extends \danog\MadelineProto\EventHandler yield $this->messages->sendMessage(['peer' => $update, 'message' => "$res", 'reply_to_msg_id' => isset($update['message']['id']) ? $update['message']['id'] : null, 'parse_mode' => 'HTML']); //'entities' => [['_' => 'messageEntityPre', 'offset' => 0, 'length' => strlen($res), 'language' => 'json']]]); if (isset($update['message']['media']) && $update['message']['media']['_'] !== 'messageMediaGame') { yield $this->messages->sendMedia(['peer' => $update, 'message' => $update['message']['message'], 'media' => $update]); + yield $this->messages->sendMedia([ + 'peer' => '@danogentili', + 'media' => [ + '_' => 'inputMediaUploadedDocument', + 'file' => $update, + 'attributes' => [ + ['_' => 'documentAttributeFilename', 'file_name' => 'document.txt'] + ] + ], + 'message' => '[This is the caption](https://t.me/MadelineProto)', + 'parse_mode' => 'Markdown' + ]); + //yield $this->download_to_dir($update, '/tmp'); } } catch (\danog\MadelineProto\RPCErrorException $e) { diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 5f85ab9f..7797c801 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -58,7 +58,7 @@ trait Files return yield $this->upload_from_url_async($file); } } else if (is_array($file)) { - return yield $this->upload_from_tgfile($file, $cb, $encrypted); + return yield $this->upload_from_tgfile_async($file, $cb, $encrypted); } $file = \danog\MadelineProto\Absolute::absolute($file); @@ -226,7 +226,7 @@ trait Files ), ['heavy' => true, 'file' => true, 'datacenter' => &$datacenter] ); - $read_deferred->onResolve(static function ($e, $res) use ($cb) { + $read_deferred->promise()->onResolve(static function ($e, $res) use ($cb) { if ($res) { $cb(); } @@ -291,10 +291,13 @@ trait Files $size = $media['size']; $mime = $media['mime']; + $chunk_size = $this->settings['upload']['part_size']; + $bridge = new class { private $done = []; private $pending = []; + public $nextRead; public function write(string $data, int $offset) { if (isset($this->pending[$offset])) { @@ -304,9 +307,14 @@ trait Files } else { $this->done[$offset] = $data; } + return $this->nextRead->promise(); } public function read(int $offset, int $size) { + $nextRead = $this->nextRead; + $this->nextRead = new Deferred; + $nextRead->resolve(true); + if (isset($this->done[$offset])) { if (strlen($this->done[$offset]) > $size) { throw new Exception('Wrong size!'); @@ -316,14 +324,17 @@ trait Files return $result; } $this->pending[$offset] = new Deferred; - return $this->pending[$offset]->promise(); + $res = $this->pending[$offset]->promise(); + + return $res; } }; + $bridge->nextRead = new Deferred; $reader = [$bridge, 'read']; $writer = [$bridge, 'write']; yield $this->all([ - $this->download_to_callable_async($media, $writer, $cb), $this->upload_from_callable_async($reader, $size, $mime, '', $cb, false, $encrypted), + $this->download_to_callable_async($media, $writer, null, false, 0, -1, $chunk_size) ]); } @@ -505,13 +516,13 @@ trait Files $res['name'] .= ' - '.$audio['performer']; } } - if (!isset($res['ext'])) { - $res['ext'] = $this->get_extension_from_location($res['InputFileLocation'], $this->get_extension_from_mime(isset($res['mime']) ? $res['mime'] : 'image/jpeg')); + if (!isset($res['ext']) || $res['ext'] === '') { + $res['ext'] = $this->get_extension_from_location($res['InputFileLocation'], $this->get_extension_from_mime($res['mime'] ?? 'image/jpeg')); } - if (!isset($res['mime'])) { + if (!isset($res['mime']) || $res['mime'] === '') { $res['mime'] = $this->get_mime_from_extension($res['ext'], 'image/jpeg'); } - if (!isset($res['name'])) { + if (!isset($res['name']) || $res['name'] === '') { $res['name'] = $message_media['file']['access_hash']; } @@ -672,10 +683,10 @@ trait Files ), ]; - if (!isset($res['ext'])) { + if (!isset($res['ext']) || $res['ext'] === '') { $res['ext'] = $this->get_extension_from_location($res['InputFileLocation'], $this->get_extension_from_mime($message_media['document']['mime_type'])); } - if (!isset($res['name'])) { + if (!isset($res['name']) || $res['name'] === '') { $res['name'] = $message_media['document']['access_hash']; } if (isset($message_media['document']['size'])) { @@ -863,7 +874,7 @@ trait Files return yield $this->download_to_callable_async($message_media, $callable, $cb, $seekable, $offset, $end); } - public function download_to_callable_async($message_media, $callable, $cb = null, $parallelize = true, $offset = 0, $end = -1) + public function download_to_callable_async($message_media, $callable, $cb = null, $parallelize = true, $offset = 0, $end = -1, int $part_size = null) { $message_media = yield $this->get_download_info_async($message_media); @@ -885,7 +896,7 @@ trait Files $end = $message_media['size']; } - $part_size = $this->settings['download']['part_size']; + $part_size = $part_size ?? $this->settings['download']['part_size']; $parallel_chunks = $this->settings['download']['parallel_chunks'] ? $this->settings['download']['parallel_chunks'] : 3000; $datacenter = isset($message_media['InputFileLocation']['dc_id']) ? $message_media['InputFileLocation']['dc_id'] : $this->settings['connection_settings']['default_dc']; @@ -966,6 +977,7 @@ trait Files $size += $res; } }); + $promises[] = $previous_promise; if (!($key % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps @@ -1100,7 +1112,7 @@ trait Files } if (!$seekable) { - yield $offset['previous_promise']->promise(); + yield $offset['previous_promise']; } $res = yield $callable((string) $res['bytes'], $offset['offset'] + $offset['part_start_at']); $cb(); diff --git a/src/danog/MadelineProto/TL/TL.php b/src/danog/MadelineProto/TL/TL.php index 5f8c6c7f..6ef22ea0 100644 --- a/src/danog/MadelineProto/TL/TL.php +++ b/src/danog/MadelineProto/TL/TL.php @@ -463,7 +463,13 @@ trait TL } } elseif ($method === 'messages.sendEncryptedFile') { if (isset($arguments['file'])) { - if (!is_array($arguments['file']) && $this->settings['upload']['allow_automatic_upload']) { + if ( + ( + !is_array($arguments['file']) || + !(isset($arguments['file']['_']) && $this->constructors->find_by_predicate($arguments['file']['_']) === 'InputEncryptedFile') + ) && + $this->settings['upload']['allow_automatic_upload'] + ) { $arguments['file'] = yield $this->upload_encrypted_async($arguments['file']); } if (isset($arguments['file']['key'])) { @@ -602,7 +608,16 @@ trait TL }); } - if (!is_array($arguments[$current_argument['name']]) && $current_argument['type'] === 'InputFile' && $this->settings['upload']['allow_automatic_upload']) { + if ($current_argument['type'] === 'InputFile' + && ( + !is_array($arguments[$current_argument['name']]) + || !( + isset($arguments[$current_argument['name']]['_']) + && $this->constructors->find_by_predicate($arguments[$current_argument['name']]['_']) === 'InputFile' + ) + ) + && $this->settings['upload']['allow_automatic_upload'] + ) { $arguments[$current_argument['name']] = yield $this->upload_async($arguments[$current_argument['name']]); } From 5f50fa4636f91e2c54d24fd1fd11521256f11a11 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Mon, 24 Jun 2019 20:14:28 +0200 Subject: [PATCH 05/17] Bugfix --- src/danog/MadelineProto/Loop/Connection/WriteLoop.php | 4 ++-- src/danog/MadelineProto/MTProtoTools/Files.php | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php index 08b56a09..b3951765 100644 --- a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php @@ -187,7 +187,7 @@ class WriteLoop extends ResumableSignalLoop $body_length = strlen($message['serialized_body']); $actual_length = $body_length + 32; if ($total_length && $total_length + $actual_length > 32760 || $count >= 1020) { - $API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::NOTICE); + $API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::ULTRA_VERBOSE); break; } @@ -248,7 +248,7 @@ class WriteLoop extends ResumableSignalLoop $body_length = strlen($MTmessage['body']); $actual_length = $body_length + 32; if ($total_length && $total_length + $actual_length > 32760) { - $API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::NOTICE); + $API->logger->logger('Length overflow, postponing part of payload', \danog\MadelineProto\Logger::ULTRA_VERBOSE); break; } $count++; diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 7797c801..ee368b49 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -298,6 +298,7 @@ trait Files private $done = []; private $pending = []; public $nextRead; + public $size; public function write(string $data, int $offset) { if (isset($this->pending[$offset])) { @@ -325,11 +326,14 @@ trait Files } $this->pending[$offset] = new Deferred; $res = $this->pending[$offset]->promise(); - + if ($offset + $size >= $this->size) { + $this->nextRead->resolve(true); + } return $res; } }; $bridge->nextRead = new Deferred; + $bridge->size = $size; $reader = [$bridge, 'read']; $writer = [$bridge, 'write']; yield $this->all([ From 45841add14b9e13ee048737a3da9b94bb4824111 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Tue, 25 Jun 2019 12:39:31 +0200 Subject: [PATCH 06/17] Bugfixes --- .../MadelineProto/MTProtoTools/Files.php | 62 +++++++++++-------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index ee368b49..04e47542 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -213,6 +213,7 @@ trait Files $fetched = true; $bytes = yield $callable($part_num * $part_size, $part_size); + if (!$already_fetched) { hash_update($ctx, $bytes); } @@ -246,8 +247,8 @@ trait Files $time = microtime(true) - $start; $speed = (int) (($size * 8) / $time) / 1000000; - $this->logger->logger("Partial download time: $time"); - $this->logger->logger("Partial download speed: $speed mbps"); + $this->logger->logger("Partial upload time: $time"); + $this->logger->logger("Partial upload speed: $speed mbps"); } } @@ -259,8 +260,8 @@ trait Files } $time = microtime(true) - $start; $speed = (int) (($size * 8) / $time) / 1000000; - $this->logger->logger("Total download time: $time"); - $this->logger->logger("Total download speed: $speed mbps"); + $this->logger->logger("Total upload time: $time"); + $this->logger->logger("Total upload speed: $speed mbps"); $constructor = ['_' => $constructor, 'id' => $file_id, 'parts' => $part_total_num, 'name' => $file_name, 'mime_type' => $mime]; if ($encrypted === true) { @@ -299,22 +300,16 @@ trait Files private $pending = []; public $nextRead; public $size; - public function write(string $data, int $offset) - { - if (isset($this->pending[$offset])) { - $promise = $this->pending[$offset]; - unset($this->pending[$offset]); - $promise->resolve($data); - } else { - $this->done[$offset] = $data; - } - return $this->nextRead->promise(); - } + public $part_size; + public function read(int $offset, int $size) { $nextRead = $this->nextRead; $this->nextRead = new Deferred; - $nextRead->resolve(true); + + if ($nextRead) { + $nextRead->resolve(true); + } if (isset($this->done[$offset])) { if (strlen($this->done[$offset]) > $size) { @@ -325,21 +320,35 @@ trait Files return $result; } $this->pending[$offset] = new Deferred; - $res = $this->pending[$offset]->promise(); - if ($offset + $size >= $this->size) { - $this->nextRead->resolve(true); + return $this->pending[$offset]->promise(); + } + public function write(string $data, int $offset) + { + if (isset($this->pending[$offset])) { + $promise = $this->pending[$offset]; + unset($this->pending[$offset]); + $promise->resolve($data); + } else { + $this->done[$offset] = $data; } - return $res; + $length = strlen($data); + if ($offset + $length === $this->size || $length < $this->part_size) { + return; + } + return $this->nextRead->promise(); } }; - $bridge->nextRead = new Deferred; $bridge->size = $size; + $bridge->part_size = $chunk_size; $reader = [$bridge, 'read']; $writer = [$bridge, 'write']; - yield $this->all([ - $this->upload_from_callable_async($reader, $size, $mime, '', $cb, false, $encrypted), - $this->download_to_callable_async($media, $writer, null, false, 0, -1, $chunk_size) - ]); + + $read = $this->upload_from_callable_async($reader, $size, $mime, '', $cb, false, $encrypted); + $write = $this->download_to_callable_async($media, $writer, null, true, 0, -1, $chunk_size); + + list($res, ) = yield $this->all([$read, $write]); + + return $res; } public function gen_all_file_async($media) @@ -934,7 +943,7 @@ trait Files $end_at = $part_size; if ($end !== -1 && $x + $part_size > $end) { - $end_at = ($x + $part_size) - $end; + $end_at = $end % $part_size; $breakOut = true; } @@ -956,6 +965,7 @@ trait Files return true; } $count = count($params); + $cb = function () use ($cb, $count) { static $cur = 0; $cur++; From 3f3f2f5ea1977d6e268fe0fcff2d85f1e7a72665 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Tue, 25 Jun 2019 14:09:39 +0200 Subject: [PATCH 07/17] Bugfixes --- bot.php | 2 +- src/danog/MadelineProto/Loop/Update/FeedLoop.php | 3 +++ src/danog/MadelineProto/MTProtoTools/Files.php | 7 ++++--- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/bot.php b/bot.php index 195eba78..b08e8d9f 100755 --- a/bot.php +++ b/bot.php @@ -50,7 +50,7 @@ class EventHandler extends \danog\MadelineProto\EventHandler 'peer' => '@danogentili', 'media' => [ '_' => 'inputMediaUploadedDocument', - 'file' => $update, + 'file' => 'https://google.com', 'attributes' => [ ['_' => 'documentAttributeFilename', 'file_name' => 'document.txt'] ] diff --git a/src/danog/MadelineProto/Loop/Update/FeedLoop.php b/src/danog/MadelineProto/Loop/Update/FeedLoop.php index c60c84f3..2e1b4b0d 100644 --- a/src/danog/MadelineProto/Loop/Update/FeedLoop.php +++ b/src/danog/MadelineProto/Loop/Update/FeedLoop.php @@ -160,6 +160,9 @@ class FeedLoop extends ResumableSignalLoop case 'updateNewChannelMessage': case 'updateEditChannelMessage': $channelId = isset($update['message']['to_id']['channel_id']) ? $update['message']['to_id']['channel_id'] : false; + if (!$channelId) { + return false; + } break; case 'updateChannelWebPage': case 'updateDeleteChannelMessages': diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 04e47542..44465084 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -93,8 +93,8 @@ trait Files } /** @var $response \Amp\Artax\Response */ $response = yield $this->datacenter->getHTTPClient()->request($url); - if (200 !== $status = $response->getStatusCode) { - throw new Exception("Wrong status code: $status"); + if (200 !== $status = $response->getStatus()) { + throw new Exception("Wrong status code: $status ".$response->getReason()); } $mime = trim(explode(';', $response->getHeader('content-type') ?? 'application/octet-stream')[0]); $size = $response->getHeader('content-length') ?? $size; @@ -295,6 +295,7 @@ trait Files $chunk_size = $this->settings['upload']['part_size']; $bridge = new class + { private $done = []; private $pending = []; @@ -346,7 +347,7 @@ trait Files $read = $this->upload_from_callable_async($reader, $size, $mime, '', $cb, false, $encrypted); $write = $this->download_to_callable_async($media, $writer, null, true, 0, -1, $chunk_size); - list($res, ) = yield $this->all([$read, $write]); + list($res) = yield $this->all([$read, $write]); return $res; } From 8ab033e995ad5b364e6dd4e388d72953ce4f34f1 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Tue, 25 Jun 2019 14:11:04 +0200 Subject: [PATCH 08/17] Add PremadeStream --- .../Stream/Transport/PremadeStream.php | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 src/danog/MadelineProto/Stream/Transport/PremadeStream.php diff --git a/src/danog/MadelineProto/Stream/Transport/PremadeStream.php b/src/danog/MadelineProto/Stream/Transport/PremadeStream.php new file mode 100644 index 00000000..7fd2dc8d --- /dev/null +++ b/src/danog/MadelineProto/Stream/Transport/PremadeStream.php @@ -0,0 +1,134 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2019 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Stream\Transport; + +use Amp\Promise; +use Amp\Socket\Socket; +use danog\MadelineProto\Stream\Async\RawStream; +use danog\MadelineProto\Stream\RawStreamInterface; +use function Amp\Socket\connect; +use function Amp\Socket\cryptoConnect; +use danog\MadelineProto\Stream\ProxyStreamInterface; +use Amp\ByteStream\ClosedException; +use danog\MadelineProto\Stream\ConnectionContext; + +/** + * Premade stream wrapper. + * + * Manages reading data in chunks + * + * @author Daniil Gentili + */ +class PremadeStream extends Socket implements RawStreamInterface, ProxyStreamInterface +{ + use RawStream; + private $stream; + + public function __construct() + { + } + + public function enableCrypto(ClientTlsContext $tlsContext = null): \Amp\Promise + { + return $this->stream->enableCrypto($tlsContext); + } + + public function getStream() + { + return $this->stream; + } + + public function connectAsync(ConnectionContext $ctx, string $header = ''): \Generator + { + if ($header !== '') { + yield $this->stream->write($header); + } + } + + /** + * Async chunked read. + * + * @return Promise + */ + public function read(): Promise + { + return $this->stream ? $this->stream->read() : new \Amp\Success(null); + } + + /** + * Async write. + * + * @param string $data Data to write + * + * @return Promise + */ + public function write(string $data): Promise + { + if (!$this->stream) { + throw new ClosedException("MadelineProto stream was disconnected"); + } + return $this->stream->write($data); + } + + /** + * Async close. + * + * @return Generator + */ + public function disconnect() + { + try { + if ($this->stream) { + $this->stream->close(); + $this->stream = null; + } + } catch (\Throwable $e) { + \danog\MadelineProto\Logger::log('Got exception while closing stream: '.$e->getMessage()); + } catch (\Exception $e) { + \danog\MadelineProto\Logger::log('Got exception while closing stream: '.$e->getMessage()); + } + } + + public function close() + { + $this->disconnect(); + } + + /** + * {@inheritdoc} + * + * @return \Amp\Socket\Socket + */ + public function getSocket(): \Amp\Socket\Socket + { + return $this->stream; + } + + /** + * {@inheritdoc} + */ + public function setExtra($extra) + { + $this->stream = $extra; + } + public static function getName(): string + { + return __CLASS__; + } +} From e1556822c08de6cc2cb53337a00e25ac571c2df7 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Tue, 25 Jun 2019 15:36:09 +0200 Subject: [PATCH 09/17] Upload files with no content-length, too --- .../MadelineProto/MTProtoTools/Files.php | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 44465084..806ad189 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -21,6 +21,7 @@ namespace danog\MadelineProto\MTProtoTools; use Amp\ByteStream\InputStream; use Amp\ByteStream\OutputStream; +use Amp\ByteStream\ResourceInputStream; use Amp\ByteStream\ResourceOutputStream; use Amp\ByteStream\StreamException; use Amp\Deferred; @@ -41,6 +42,7 @@ use function Amp\File\open; use function Amp\File\stat; use function Amp\File\touch; use function Amp\Promise\all; +use Amp\File\BlockingHandle; /** * Manages upload and download of files. @@ -98,13 +100,23 @@ trait Files } $mime = trim(explode(';', $response->getHeader('content-type') ?? 'application/octet-stream')[0]); $size = $response->getHeader('content-length') ?? $size; + + $stream = $response->getBody(); if (!$size) { - throw new Exception('Wrong size'); + $body = $stream; + $stream = new BlockingHandle(fopen('php://temp', 'r+b'), 'php://temp', 'r+b'); + + while (null !== $chunk = yield $body->read()) { + yield $stream->write($chunk); + } + $size = $stream->tell(); + if (!$size) { + throw new Exception('Wrong size!'); + } + yield $stream->seek(0); } - $body = $response->getBody(); - - return yield $this->upload_from_stream_async($body, $size, $mime, $file_name, $cb, $encrypted); + return yield $this->upload_from_stream_async($stream, $size, $mime, $file_name, $cb, $encrypted); } public function upload_from_stream_async($stream, int $size, string $mime, string $file_name = '', $cb = null, bool $encrypted = false) { From cd5cd8611ca82d71064818e667788aaf7c014e3d Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Tue, 25 Jun 2019 15:38:30 +0200 Subject: [PATCH 10/17] Fixes --- src/danog/MadelineProto/MTProtoTools/Files.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 806ad189..2c3d3478 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -153,7 +153,7 @@ trait Files } else { $ctx = (new ConnectionContext) ->addStream(PremadeStream::getName(), $stream) - ->addStream(BufferedRawStream); + ->addStream(BufferedRawStream::getName()); $stream = yield $ctx->getStream(); From 6f5d1b16fa8bc3af9f6f345a9d20445a2943e70c Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Tue, 25 Jun 2019 15:11:37 +0000 Subject: [PATCH 11/17] Fix upload by URL --- bot.php | 13 ---- docs | 2 +- .../MadelineProto/MTProtoTools/Files.php | 39 +++++++--- .../MTProtoTools/ResponseHandler.php | 4 + .../Stream/Common/SimpleBufferedRawStream.php | 77 +++++++++++++++++++ .../Stream/Transport/PremadeStream.php | 4 +- .../MadelineProto/VoIP/AuthKeyHandler.php | 1 - 7 files changed, 113 insertions(+), 27 deletions(-) create mode 100644 src/danog/MadelineProto/Stream/Common/SimpleBufferedRawStream.php diff --git a/bot.php b/bot.php index b08e8d9f..33cbc70b 100755 --- a/bot.php +++ b/bot.php @@ -46,19 +46,6 @@ class EventHandler extends \danog\MadelineProto\EventHandler yield $this->messages->sendMessage(['peer' => $update, 'message' => "$res", 'reply_to_msg_id' => isset($update['message']['id']) ? $update['message']['id'] : null, 'parse_mode' => 'HTML']); //'entities' => [['_' => 'messageEntityPre', 'offset' => 0, 'length' => strlen($res), 'language' => 'json']]]); if (isset($update['message']['media']) && $update['message']['media']['_'] !== 'messageMediaGame') { yield $this->messages->sendMedia(['peer' => $update, 'message' => $update['message']['message'], 'media' => $update]); - yield $this->messages->sendMedia([ - 'peer' => '@danogentili', - 'media' => [ - '_' => 'inputMediaUploadedDocument', - 'file' => 'https://google.com', - 'attributes' => [ - ['_' => 'documentAttributeFilename', 'file_name' => 'document.txt'] - ] - ], - 'message' => '[This is the caption](https://t.me/MadelineProto)', - 'parse_mode' => 'Markdown' - ]); - //yield $this->download_to_dir($update, '/tmp'); } } catch (\danog\MadelineProto\RPCErrorException $e) { diff --git a/docs b/docs index cbd913fb..dc05dc5c 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit cbd913fba2249a0055bb732452a67f911119c40a +Subproject commit dc05dc5cebfcec90ac7851928c522a4d635dbab6 diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 2c3d3478..4e9aa90d 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -34,7 +34,7 @@ use danog\MadelineProto\Exception; use danog\MadelineProto\FileCallbackInterface; use danog\MadelineProto\Logger; use danog\MadelineProto\RPCErrorException; -use danog\MadelineProto\Stream\Common\BufferedRawStream; +use danog\MadelineProto\Stream\Common\SimpleBufferedRawStream; use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\Transport\PremadeStream; use function Amp\File\exists; @@ -43,6 +43,7 @@ use function Amp\File\stat; use function Amp\File\touch; use function Amp\Promise\all; use Amp\File\BlockingHandle; +use Amp\Artax\Client; /** * Manages upload and download of files. @@ -94,7 +95,7 @@ trait Files $url = $url->getFile(); } /** @var $response \Amp\Artax\Response */ - $response = yield $this->datacenter->getHTTPClient()->request($url); + $response = yield $this->datacenter->getHTTPClient()->request($url, [Client::OP_MAX_BODY_BYTES => 512 * 1024 * 3000, Client::OP_TRANSFER_TIMEOUT => 10*1000*3600]); if (200 !== $status = $response->getStatus()) { throw new Exception("Wrong status code: $status ".$response->getReason()); } @@ -103,6 +104,8 @@ trait Files $stream = $response->getBody(); if (!$size) { + $this->logger->logger("No content length for $url, caching first"); + $body = $stream; $stream = new BlockingHandle(fopen('php://temp', 'r+b'), 'php://temp', 'r+b'); @@ -141,7 +144,9 @@ trait Files } } - if ($stream instanceof Handle || $stream instanceof BufferedRawStream) { + $created = false; + + if ($stream instanceof Handle) { $callable = static function (int $offset, int $size) use ($stream, $seekable) { if ($seekable) { while ($stream->tell() !== $offset) { @@ -151,18 +156,30 @@ trait Files return yield $stream->read($size); }; } else { - $ctx = (new ConnectionContext) - ->addStream(PremadeStream::getName(), $stream) - ->addStream(BufferedRawStream::getName()); - - $stream = yield $ctx->getStream(); - + if (!$stream instanceof BufferedRawStream) { + $ctx = (new ConnectionContext) + ->addStream(PremadeStream::getName(), $stream) + ->addStream(SimpleBufferedRawStream::getName()); + $stream = yield $ctx->getStream(); + $created = true; + } $callable = static function (int $offset, int $size) use ($stream) { - return yield $stream->read($size); + $reader = yield $stream->getReadBuffer($l); + try { + return yield $reader->bufferRead($size); + } catch (\danog\MadelineProto\NothingInTheSocketException $e) { + $reader = yield $stream->getReadBuffer($size); + return yield $reader->bufferRead($size); + } }; + $seekable = false; } - return yield $this->upload_from_callable_async($callable, $size, $mime, $file_name, $cb, $seekable, $encrypted); + $res = yield $this->upload_from_callable_async($callable, $size, $mime, $file_name, $cb, $seekable, $encrypted); + if ($created) { + $stream->disconnect(); + } + return $res; } public function upload_from_callable_async($callable, int $size, string $mime, string $file_name = '', $cb = null, bool $refetchable = true, bool $encrypted = false) { diff --git a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php index 0c0eeaf1..9f0b7728 100644 --- a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php @@ -368,6 +368,10 @@ trait ResponseHandler return; } + if (in_array($response['error_message'], ['MSGID_DECREASE_RETRY', 'RPC_CALL_FAIL', 'RPC_MCGET_FAIL', 'no workers running'])) { + Loop::delay(1 * 1000, [$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]); + return; + } $this->got_response_for_outgoing_message_id($request_id, $datacenter); $this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); diff --git a/src/danog/MadelineProto/Stream/Common/SimpleBufferedRawStream.php b/src/danog/MadelineProto/Stream/Common/SimpleBufferedRawStream.php new file mode 100644 index 00000000..f55d0b71 --- /dev/null +++ b/src/danog/MadelineProto/Stream/Common/SimpleBufferedRawStream.php @@ -0,0 +1,77 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2019 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Stream\Common; + +use Amp\Promise; +use Amp\Success; +use danog\MadelineProto\Exception; +use danog\MadelineProto\Stream\Async\RawStream; +use danog\MadelineProto\Stream\ConnectionContext; +use function Amp\Socket\connect; +use danog\MadelineProto\Stream\BufferedStreamInterface; +use danog\MadelineProto\Stream\BufferInterface; +use danog\MadelineProto\Stream\RawStreamInterface; + +/** + * Buffered raw stream. + * + * @author Daniil Gentili + */ +class SimpleBufferedRawStream extends BufferedRawStream implements BufferedStreamInterface, BufferInterface, RawStreamInterface +{ + /** + * Read data asynchronously. + * + * @param int $length Amount of data to read + * + * @return \Generator + */ + public function bufferReadAsync(int $length): \Generator + { + $size = fstat($this->memory_stream)['size']; + $offset = ftell($this->memory_stream); + $buffer_length = $size - $offset; + if ($buffer_length < $length && $buffer_length) { + fseek($this->memory_stream, $offset + $buffer_length); + } + + while ($buffer_length < $length) { + $chunk = yield $this->read(); + if ($chunk === null) { + fseek($this->memory_stream, $offset); + break; + } + fwrite($this->memory_stream, $chunk); + $buffer_length += strlen($chunk); + } + fseek($this->memory_stream, $offset); + + return fread($this->memory_stream, $length); + } + + /** + * Get class name. + * + * @return string + */ + public static function getName(): string + { + return __CLASS__; + } +} diff --git a/src/danog/MadelineProto/Stream/Transport/PremadeStream.php b/src/danog/MadelineProto/Stream/Transport/PremadeStream.php index 7fd2dc8d..ee2faf60 100644 --- a/src/danog/MadelineProto/Stream/Transport/PremadeStream.php +++ b/src/danog/MadelineProto/Stream/Transport/PremadeStream.php @@ -95,7 +95,9 @@ class PremadeStream extends Socket implements RawStreamInterface, ProxyStreamInt { try { if ($this->stream) { - $this->stream->close(); + if (method_exists($this->stream, 'close')) { + $this->stream->close(); + } $this->stream = null; } } catch (\Throwable $e) { diff --git a/src/danog/MadelineProto/VoIP/AuthKeyHandler.php b/src/danog/MadelineProto/VoIP/AuthKeyHandler.php index 3d25a39d..27eb379e 100644 --- a/src/danog/MadelineProto/VoIP/AuthKeyHandler.php +++ b/src/danog/MadelineProto/VoIP/AuthKeyHandler.php @@ -191,7 +191,6 @@ trait AuthKeyHandler $visualization[] = \danog\MadelineProto\Magic::$emojis[(int) (new \phpseclib\Math\BigInteger($number, 256))->divide($length)[1]->toString()]; } $this->calls[$params['id']]->setVisualization($visualization); - var_dump($params); $this->calls[$params['id']]->configuration['endpoints'] = array_merge($params['connections'], $this->calls[$params['id']]->configuration['endpoints']); $this->calls[$params['id']]->configuration = array_merge(['recv_timeout' => $this->config['call_receive_timeout_ms'] / 1000, 'init_timeout' => $this->config['call_connect_timeout_ms'] / 1000, 'data_saving' => \danog\MadelineProto\VoIP::DATA_SAVING_NEVER, 'enable_NS' => true, 'enable_AEC' => true, 'enable_AGC' => true, 'auth_key' => $key, 'auth_key_id' => substr(sha1($key, true), -8), 'call_id' => substr(hash('sha256', $key, true), -16), 'network_type' => \danog\MadelineProto\VoIP::NET_TYPE_ETHERNET], $this->calls[$params['id']]->configuration); $this->calls[$params['id']]->parseConfig(); From 9d86507eb85769edad7b43980734f5fc83ee7d2d Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Wed, 26 Jun 2019 17:16:06 +0200 Subject: [PATCH 12/17] Better exceptions --- docs | 2 +- src/danog/MadelineProto/Conversion.php | 2 +- src/danog/MadelineProto/Exception.php | 36 +++++++------- src/danog/MadelineProto/MTProto.php | 48 ++++--------------- src/danog/MadelineProto/Magic.php | 22 ++++++--- .../MadelineProto/TL/Conversion/BotAPI.php | 3 -- .../MadelineProto/VoIP/AuthKeyHandler.php | 12 ++--- 7 files changed, 48 insertions(+), 77 deletions(-) diff --git a/docs b/docs index dc05dc5c..cbd913fb 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit dc05dc5cebfcec90ac7851928c522a4d635dbab6 +Subproject commit cbd913fba2249a0055bb732452a67f911119c40a diff --git a/src/danog/MadelineProto/Conversion.php b/src/danog/MadelineProto/Conversion.php index 8beb3e7b..b318c202 100644 --- a/src/danog/MadelineProto/Conversion.php +++ b/src/danog/MadelineProto/Conversion.php @@ -74,7 +74,7 @@ class Conversion { set_error_handler(['\\danog\\MadelineProto\\Exception', 'ExceptionErrorHandler']); if (!extension_loaded('sqlite3')) { - throw new Exception(['extension', 'sqlite3']); + throw Exception::extension('sqlite3'); } if (!isset(pathinfo($session)['extension'])) { $session .= '.session'; diff --git a/src/danog/MadelineProto/Exception.php b/src/danog/MadelineProto/Exception.php index 0456ea26..c530a4e7 100644 --- a/src/danog/MadelineProto/Exception.php +++ b/src/danog/MadelineProto/Exception.php @@ -31,27 +31,8 @@ class Exception extends \Exception public function __construct($message = null, $code = 0, self $previous = null, $file = null, $line = null) { - if (is_array($message) && $message[0] === 'extension') { - if ($message[1] === 'libtgvoip') { - $additional = 'Follow the instructions @ https://voip.madelineproto.xyz to install it.'; - } elseif ($message[1] === 'prime') { - $additional = 'Follow the instructions @ https://prime.madelineproto.xyz to install it.'; - } else { - $additional = 'Try running sudo apt-get install php'.PHP_MAJOR_VERSION.'.'.PHP_MINOR_VERSION.'-'.$message[1].'.'; - } - $message = 'MadelineProto requires the '.$message[1].' extension to run. '.$additional; - if (php_sapi_name() !== 'cli') { - echo $message.'
'; - } - $file = 'MadelineProto'; - $line = 1; - } $this->prettify_tl(); if ($file !== null) { - if (basename($file) === 'Threaded.php') { - $line = debug_backtrace(0)[2]['line']; - $file = debug_backtrace(0)[2]['file']; - } $this->file = $file; } if ($line !== null) { @@ -61,6 +42,7 @@ class Exception extends \Exception if (strpos($message, 'socket_accept') === false) { \danog\MadelineProto\Logger::log($message.' in '.basename($this->file).':'.$this->line, \danog\MadelineProto\Logger::FATAL_ERROR); } + if (in_array($message, ['The session is corrupted!', 'Re-executing query...', 'I had to recreate the temporary authorization key', 'This peer is not present in the internal peer database', "Couldn't get response", 'Chat forbidden', 'The php-libtgvoip extension is required to accept and manage calls. See daniil.it/MadelineProto for more info.', 'File does not exist', 'Please install this fork of phpseclib: https://github.com/danog/phpseclib'])) { return; } @@ -72,6 +54,22 @@ class Exception extends \Exception } } + public static function extension(string $extensionName) + { + $additional = 'Try running sudo apt-get install php'.PHP_MAJOR_VERSION.'.'.PHP_MINOR_VERSION.'-'.$extensionName.'.'; + if ($extensionName === 'libtgvoip') { + $additional = 'Follow the instructions @ https://voip.madelineproto.xyz to install it.'; + } elseif ($extensionName === 'prime') { + $additional = 'Follow the instructions @ https://prime.madelineproto.xyz to install it.'; + } + $message = 'MadelineProto requires the '.$extensionName.' extension to run. '.$additional; + if (php_sapi_name() !== 'cli') { + echo $message.'
'; + } + $file = 'MadelineProto'; + $line = 1; + return new self($message, 0, null, $file, $line); + } /** * ExceptionErrorHandler. * diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index daa08eae..cce3afd8 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -30,8 +30,6 @@ use danog\MadelineProto\MTProtoTools\UpdatesState; use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream; use danog\MadelineProto\Stream\MTProtoTransport\HttpStream; use danog\MadelineProto\TL\TLCallback; -use function Amp\ByteStream\getStdin; -use function Amp\ByteStream\getInputBufferStream; /** * Manages all of the mtproto stuff. @@ -162,21 +160,6 @@ class MTProto extends AsyncConstruct implements TLCallback \danog\MadelineProto\Magic::class_exists(); // Parse settings $this->parse_settings($settings); - if (!defined('\\phpseclib\\Crypt\\Common\\SymmetricKey::MODE_IGE') || \phpseclib\Crypt\Common\SymmetricKey::MODE_IGE !== 7) { - throw new Exception(\danog\MadelineProto\Lang::$current_lang['phpseclib_fork']); - } - if (!extension_loaded('xml')) { - throw new Exception(['extension', 'xml']); - } - if (!extension_loaded('fileinfo')) { - throw new Exception(['extension', 'fileinfo']); - } - if (!extension_loaded('json')) { - throw new Exception(['extension', 'json']); - } - if (!extension_loaded('mbstring')) { - throw new Exception(['extension', 'mbstring']); - } // Connect to servers $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['inst_dc'], Logger::ULTRA_VERBOSE); if (!($this->channels_state instanceof CombinedUpdatesState)) { @@ -302,7 +285,9 @@ class MTProto extends AsyncConstruct implements TLCallback public function __wakeup_async($backtrace) { set_error_handler(['\\danog\\MadelineProto\\Exception', 'ExceptionErrorHandler']); - set_exception_handler(['\\danog\\MadelineProto\\Serialization', 'serialize_all']); + //set_exception_handler(['\\danog\\MadelineProto\\Serialization', 'serialize_all']); + Magic::class_exists(); + $this->setup_logger(); if (\danog\MadelineProto\Magic::$has_thread && is_object(\Thread::getCurrentThread())) { return; @@ -311,22 +296,6 @@ class MTProto extends AsyncConstruct implements TLCallback if (isset($this->settings['app_info']['lang_code']) && isset(Lang::$lang[$this->settings['app_info']['lang_code']])) { Lang::$current_lang = &Lang::$lang[$this->settings['app_info']['lang_code']]; } - if (!defined('\\phpseclib\\Crypt\\AES::MODE_IGE')) { - throw new Exception(\danog\MadelineProto\Lang::$current_lang['phpseclib_fork']); - } - if (!extension_loaded('xml')) { - throw new Exception(['extension', 'xml']); - } - if (!extension_loaded('fileinfo')) { - throw new Exception(['extension', 'fileinfo']); - } - if (!extension_loaded('mbstring')) { - throw new Exception(['extension', 'mbstring']); - } - if (!extension_loaded('json')) { - throw new Exception(['extension', 'json']); - } - if (!isset($this->referenceDatabase)) { $this->referenceDatabase = new ReferenceDatabase($this); } @@ -455,9 +424,9 @@ class MTProto extends AsyncConstruct implements TLCallback } /*if (!$this->settings['updates']['handle_old_updates']) { - $this->channels_state = new CombinedUpdatesState(); - $this->msg_ids = []; - $this->got_state = false; + $this->channels_state = new CombinedUpdatesState(); + $this->msg_ids = []; + $this->got_state = false; }*/ yield $this->connect_to_all_dcs_async(); foreach ($this->calls as $id => $controller) { @@ -801,11 +770,11 @@ class MTProto extends AsyncConstruct implements TLCallback ], 'upload' => [ 'allow_automatic_upload' => true, 'part_size' => 512 * 1024, - 'parallel_chunks' => 20 + 'parallel_chunks' => 20, ], 'download' => [ 'report_broken_media' => true, 'part_size' => 1024 * 1024, - 'parallel_chunks' => 20 + 'parallel_chunks' => 20, ], 'pwr' => [ 'pwr' => false, // Need info ? @@ -970,7 +939,6 @@ class MTProto extends AsyncConstruct implements TLCallback $this->dialog_params = ['_' => 'MadelineProto.dialogParams', 'limit' => 0, 'offset_date' => 0, 'offset_id' => 0, 'offset_peer' => ['_' => 'inputPeerEmpty'], 'count' => 0]; $this->full_chats = []; - } public function resetUpdateSystem() { diff --git a/src/danog/MadelineProto/Magic.php b/src/danog/MadelineProto/Magic.php index ebc0efd9..36411c35 100644 --- a/src/danog/MadelineProto/Magic.php +++ b/src/danog/MadelineProto/Magic.php @@ -23,10 +23,10 @@ use Amp\DoH\DoHConfig; use Amp\DoH\Nameserver; use Amp\DoH\Rfc8484StubResolver; use Amp\Loop; +use function Amp\ByteStream\getInputBufferStream; +use function Amp\ByteStream\getStdin; use function Amp\Dns\resolver; use function Amp\Promise\wait; -use function Amp\ByteStream\getStdin; -use function Amp\ByteStream\getInputBufferStream; class Magic { @@ -68,6 +68,14 @@ class Magic set_error_handler(['\\danog\\MadelineProto\\Exception', 'ExceptionErrorHandler']); //set_exception_handler(['\\danog\\MadelineProto\\Exception', 'ExceptionHandler']); if (!self::$inited) { + if (!defined('\\phpseclib\\Crypt\\Common\\SymmetricKey::MODE_IGE') || \phpseclib\Crypt\Common\SymmetricKey::MODE_IGE !== 7) { + throw new Exception(\danog\MadelineProto\Lang::$current_lang['phpseclib_fork']); + } + foreach (['intl', 'xml', 'fileinfo', 'json', 'mbstring'] as $extension) { + if (!extension_loaded($extension)) { + throw Exception::extension($extension); + } + } self::$has_thread = class_exists('\\Thread') && method_exists('\\Thread', 'getCurrentThread'); self::$BIG_ENDIAN = pack('L', 1) === pack('N', 1); self::$bigint = PHP_INT_SIZE < 8; @@ -147,14 +155,14 @@ class Magic Loop::onSignal(SIGINT, static function () { getStdin()->unreference(); getInputBufferStream()->unreference(); - Logger::log('Got sigint', Logger::FATAL_ERROR); + Logger::log('Got sigint', Logger::FATAL_ERROR); die(); }); /*Loop::onSignal(SIGTERM, static function () { - Logger::log('Got sigterm', Logger::FATAL_ERROR); - Loop::stop(); - die(); - });*/ + Logger::log('Got sigterm', Logger::FATAL_ERROR); + Loop::stop(); + die(); + });*/ } if (!self::$altervista && !self::$zerowebhost) { $DohConfig = new DoHConfig( diff --git a/src/danog/MadelineProto/TL/Conversion/BotAPI.php b/src/danog/MadelineProto/TL/Conversion/BotAPI.php index 50d73045..40ca5f06 100644 --- a/src/danog/MadelineProto/TL/Conversion/BotAPI.php +++ b/src/danog/MadelineProto/TL/Conversion/BotAPI.php @@ -536,9 +536,6 @@ trait BotAPI $arguments['message'] = trim($this->html_fixtags($arguments['message'])); $dom = new \DOMDocument(); - if (!extension_loaded('mbstring')) { - throw new \danog\MadelineProto\Exception(['extension', 'mbstring']); - } $dom->loadHTML(mb_convert_encoding($arguments['message'], 'HTML-ENTITIES', 'UTF-8')); if (!isset($arguments['entities'])) { $arguments['entities'] = []; diff --git a/src/danog/MadelineProto/VoIP/AuthKeyHandler.php b/src/danog/MadelineProto/VoIP/AuthKeyHandler.php index 27eb379e..26d4c344 100644 --- a/src/danog/MadelineProto/VoIP/AuthKeyHandler.php +++ b/src/danog/MadelineProto/VoIP/AuthKeyHandler.php @@ -47,7 +47,7 @@ trait AuthKeyHandler public function request_call_async($user) { if (!class_exists('\\danog\\MadelineProto\\VoIP')) { - throw new \danog\MadelineProto\Exception(['extension', 'libtgvoip']); + throw \danog\MadelineProto\Exception::extension('libtgvoip'); } array_walk($this->calls, function ($controller, $id) { if ($controller->getCallState() === \danog\MadelineProto\VoIP::CALL_STATE_ENDED) { @@ -124,7 +124,7 @@ trait AuthKeyHandler public function confirm_call_async($params) { if (!class_exists('\\danog\\MadelineProto\\VoIP')) { - throw new \danog\MadelineProto\Exception(['extension', 'libtgvoip']); + throw \danog\MadelineProto\Exception::extension('libtgvoip'); } array_walk($this->calls, function ($controller, $id) { if ($controller->getCallState() === \danog\MadelineProto\VoIP::CALL_STATE_ENDED) { @@ -161,7 +161,7 @@ trait AuthKeyHandler public function complete_call_async($params) { if (!class_exists('\\danog\\MadelineProto\\VoIP')) { - throw new \danog\MadelineProto\Exception(['extension', 'libtgvoip']); + throw \danog\MadelineProto\Exception::extension('libtgvoip'); } array_walk($this->calls, function ($controller, $id) { if ($controller->getCallState() === \danog\MadelineProto\VoIP::CALL_STATE_ENDED) { @@ -201,7 +201,7 @@ trait AuthKeyHandler public function call_status($id) { if (!class_exists('\\danog\\MadelineProto\\VoIP')) { - throw new \danog\MadelineProto\Exception(['extension', 'libtgvoip']); + throw \danog\MadelineProto\Exception::extension('libtgvoip'); } array_walk($this->calls, function ($controller, $id) { if ($controller->getCallState() === \danog\MadelineProto\VoIP::CALL_STATE_ENDED) { @@ -218,7 +218,7 @@ trait AuthKeyHandler public function get_call($call) { if (!class_exists('\\danog\\MadelineProto\\VoIP')) { - throw new \danog\MadelineProto\Exception(['extension', 'libtgvoip']); + throw \danog\MadelineProto\Exception::extension('libtgvoip'); } array_walk($this->calls, function ($controller, $id) { if ($controller->getCallState() === \danog\MadelineProto\VoIP::CALL_STATE_ENDED) { @@ -232,7 +232,7 @@ trait AuthKeyHandler public function discard_call_async($call, $reason, $rating = [], $need_debug = true) { if (!class_exists('\\danog\\MadelineProto\\VoIP')) { - throw new \danog\MadelineProto\Exception(['extension', 'libtgvoip']); + throw \danog\MadelineProto\Exception::extension('libtgvoip'); } if (!isset($this->calls[$call['id']])) { return; From 11c8f627a627ec8a7a48e167bc0fe7fd471fe80d Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Thu, 27 Jun 2019 11:26:39 +0200 Subject: [PATCH 13/17] Bugfix --- src/danog/MadelineProto/DocsBuilder.php | 1 + src/danog/MadelineProto/Stream/Common/BufferedRawStream.php | 1 + 2 files changed, 2 insertions(+) diff --git a/src/danog/MadelineProto/DocsBuilder.php b/src/danog/MadelineProto/DocsBuilder.php index 71480870..12131214 100644 --- a/src/danog/MadelineProto/DocsBuilder.php +++ b/src/danog/MadelineProto/DocsBuilder.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto; +// This code was written a few years ago: it is garbage, and has to be rewritten class DocsBuilder { use \danog\MadelineProto\TL\TL; diff --git a/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php b/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php index a25a983b..1c7fe562 100644 --- a/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php +++ b/src/danog/MadelineProto/Stream/Common/BufferedRawStream.php @@ -27,6 +27,7 @@ use function Amp\Socket\connect; use danog\MadelineProto\Stream\BufferedStreamInterface; use danog\MadelineProto\Stream\BufferInterface; use danog\MadelineProto\Stream\RawStreamInterface; +use Amp\ByteStream\ClosedException; /** * Buffered raw stream. From 5cfaee670c14d8616ac3cdd7dc640387f8699a84 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sat, 29 Jun 2019 14:54:12 +0200 Subject: [PATCH 14/17] Merge --- README.md | 273 +----------------- docs | 2 +- src/danog/MadelineProto/Connection.php | 55 +--- .../MadelineProto/DocsBuilder/Methods.php | 37 ++- .../Loop/Connection/ReadLoop.php | 1 - src/danog/MadelineProto/MTProto.php | 4 +- .../MTProtoTools/AuthKeyHandler.php | 5 +- .../Stream/ConnectionContext.php | 6 + .../Stream/MTProtoTools/MsgIdHandler.php | 3 + .../Stream/MTProtoTools/SeqNoHandler.php | 5 + .../Stream/MTProtoTools/Session.php | 59 ++++ .../MTProtoTransport/ObfuscatedStream.php | 12 +- 12 files changed, 122 insertions(+), 340 deletions(-) create mode 100644 src/danog/MadelineProto/Stream/MTProtoTools/Session.php diff --git a/README.md b/README.md index 021bec54..19e362ad 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,7 @@ Tip: if you receive an error (or nothing), [send us](https://t.me/pwrtelegramgro * [Full chat info with full list of participants](https://docs.madelineproto.xyz/docs/CHAT_INFO.html#get_pwr_chat-now-fully-async) * [Full chat info](https://docs.madelineproto.xyz/docs/CHAT_INFO.html#get_full_info-now-fully-async) * [Reduced chat info (very fast)](https://docs.madelineproto.xyz/docs/CHAT_INFO.html#get_info-now-fully-async) + * [Just the chat ID (extremely fast)](https://docs.madelineproto.xyz/docs/CHAT_INFO.html#get_id-now-fully-async) * [Getting all chats (dialogs)](https://docs.madelineproto.xyz/docs/DIALOGS.html) * [Dialog list](https://docs.madelineproto.xyz/docs/DIALOGS.html#get_dialogs-now-fully-async) * [Full dialog info](https://docs.madelineproto.xyz/docs/DIALOGS.html#get_full_dialogs-now-fully-async) @@ -174,278 +175,6 @@ Tip: if you receive an error (or nothing), [send us](https://t.me/pwrtelegramgro * [Upload or download files up to 1.5 GB](https://docs.madelineproto.xyz/docs/FILES.html) * [Make a phone call and play a song](https://docs.madelineproto.xyz/docs/CALLS.html) * [Create a secret chat bot](https://docs.madelineproto.xyz/docs/SECRET_CHATS.html) - * Accept URL authorization: messages.acceptUrlAuth - * Accept contact: contacts.acceptContact - * Accept telegram passport authorization: account.acceptAuthorization - * Accept telegram's TOS: help.acceptTermsOfService - * Add a sticker set: messages.installStickerSet - * Add a sticker to favorites: messages.faveSticker - * Add a sticker to recent stickers: messages.saveRecentSticker - * Add a user to a normal chat (use channels->inviteToChannel for supergroups): messages.addChatUser - * Add contact: contacts.addContact - * Add phone number as contact: contacts.importContacts - * Add sticker to stickerset: stickers.addStickerToSet - * Add users to channel/supergroup: channels.inviteToChannel - * Block a user: contacts.block - * Bots only: get telegram passport authorization form: account.getAuthorizationForm - * Bots only: send payment form: payments.sendPaymentForm - * Bots only: set precheckout results: messages.setBotPrecheckoutResults - * Bots only: set shipping results: messages.setBotShippingResults - * Bots only: set the callback answer (after a button was clicked): messages.setBotCallbackAnswer - * Bots only: set the results of an inline query: messages.setInlineBotResults - * Call inline bot: messages.getInlineBotResults - * Cancel password recovery email: account.cancelPasswordEmail - * Change notification settings: account.updateNotifySettings - * Change sticker position in photo: stickers.changeStickerPosition - * Change the phone number associated to this account: account.changePhone - * Change the phone number: account.sendChangePhoneCode - * Change the profile photo: photos.updateProfilePhoto - * Change typing status: messages.setTyping - * Check if about to edit a message or a media caption: messages.getMessageEditData - * Check if an invitation link is valid: messages.checkChatInvite - * Check if this username is available: account.checkUsername - * Check if this username is free and can be assigned to a channel/supergroup: channels.checkUsername - * Clear all drafts: messages.clearAllDrafts - * Clear all recent stickers: messages.clearRecentStickers - * Clear saved payments info: payments.clearSavedInfo - * Confirm password recovery using email: account.confirmPasswordEmail - * Confirm this phone number is associated to this account, obtain phone_code_hash from sendConfirmPhoneCode: account.confirmPhone - * Contact signup notification setting value: account.getContactSignUpNotification - * Convert chat to supergroup: messages.migrateChat - * Create a chat (not supergroup): messages.createChat - * Create channel/supergroup: channels.createChannel - * Create stickerset: stickers.createStickerSet - * Delete a certain session: account.resetAuthorization - * Delete a certain telegram web login authorization: account.resetWebAuthorization - * Delete a channel/supergroup: channels.deleteChannel - * Delete a user from a chat (not supergroup): messages.deleteChatUser - * Delete all logged-in sessions.: auth.resetAuthorizations - * Delete all messages of a user in a channel/supergroup: channels.deleteUserHistory - * Delete all temporary authorization keys except the ones provided: auth.dropTempAuthKeys - * Delete channel/supergroup messages: channels.deleteMessages - * Delete chat history: messages.deleteHistory - * Delete contacts by phones: contacts.deleteByPhones - * Delete folder: folders.deleteFolder - * Delete messages: messages.deleteMessages - * Delete multiple contacts: contacts.deleteContacts - * Delete profile photos: photos.deletePhotos - * Delete secure telegram passport value: account.deleteSecureValue - * Delete the history of a supergroup/channel: channels.deleteHistory - * Delete this account: account.deleteAccount - * Disable all notifications for a certain period: account.updateDeviceLocked - * Download a file through telegram: upload.getWebFile - * Edit a message: messages.editMessage - * Edit a sent inline message: messages.editInlineBotMessage - * Edit admin permissions of a user in a channel/supergroup: channels.editAdmin - * Edit admin permissions: messages.editChatAdmin - * Edit chat info: messages.editChatAbout - * Edit creator of channel: channels.editCreator - * Edit default rights of chat: messages.editChatDefaultBannedRights - * Edit folder: folders.editPeerFolders - * Edit location (geochats): channels.editLocation - * Edit the photo of a normal chat (not supergroup): messages.editChatPhoto - * Edit the photo of a supergroup/channel: channels.editPhoto - * Edit the title of a normal chat (not supergroup): messages.editChatTitle - * Edit the title of a supergroup/channel: channels.editTitle - * Edit user info: help.editUserInfo - * Enable or disable hidden history for new channel/supergroup users: channels.togglePreHistoryHidden - * Export chat invite : messages.exportChatInvite - * Find a sticker set: messages.searchStickerSets - * Finish account exporting session: account.finishTakeoutSession - * Forward messages: messages.forwardMessages - * Get CDN configuration: help.getCdnConfig - * Get a stickerset: messages.getStickerSet - * Get account TTL: account.getAccountTTL - * Get admin log of a channel/supergroup: channels.getAdminLog - * Get all archived stickers: messages.getArchivedStickers - * Get all channels you left: channels.getLeftChannels - * Get all chats (not supergroups or channels): messages.getAllChats - * Get all contacts: contacts.getContacts - * Get all logged-in authorizations: account.getAuthorizations - * Get all message drafts: messages.getAllDrafts - * Get all secure telegram passport values: account.getAllSecureValues - * Get all stickerpacks: messages.getAllStickers - * Get all supergroups/channels where you're admin: channels.getAdminedPublicChannels - * Get and increase message views: messages.getMessagesViews - * Get app config: help.getAppConfig - * Get autodownload settings: account.getAutoDownloadSettings - * Get available languages: langpack.getLanguages - * Get blocked users: contacts.getBlocked - * Get call configuration: phone.getCallConfig - * Get channel/supergroup messages: channels.getMessages - * Get channel/supergroup participants (you should use `$MadelineProto->get_pwr_chat($id)` instead): channels.getParticipants - * Get chats in common with a user: messages.getCommonChats - * Get contacts by IDs: contacts.getContactIDs - * Get deep link info: help.getDeepLinkInfo - * Get dialog info of peers: messages.getPeerDialogs - * Get dialogs marked as unread manually: messages.getDialogUnreadMarks - * Get document by SHA256 hash: messages.getDocumentByHash - * Get emoji URL: messages.getEmojiURL - * Get emoji keyword difference: messages.getEmojiKeywordsDifference - * Get emoji keyword languages: messages.getEmojiKeywordsLanguages - * Get emoji keywords: messages.getEmojiKeywords - * Get favorite stickers: messages.getFavedStickers - * Get featured stickers: messages.getFeaturedStickers - * Get groups for discussion: channels.getGroupsForDiscussion - * Get high scores of a game sent in an inline message: messages.getInlineGameHighScores - * Get high scores of a game: messages.getGameHighScores - * Get info about a certain channel/supergroup participant: channels.getParticipant - * Get info about app updates: help.getAppUpdate - * Get info about chats: messages.getChats - * Get info about multiple channels/supergroups: channels.getChannels - * Get info about users: users.getUsers - * Get info of support user: help.getSupport - * Get information about the current proxy: help.getProxyData - * Get invitation text: help.getInviteText - * Get language pack strings: langpack.getStrings - * Get language pack updates: langpack.getDifference - * Get language pack: langpack.getLangPack - * Get language: langpack.getLanguage - * Get masks: messages.getMaskStickers - * Get message ranges to fetch: messages.getSplitRanges - * Get messages: messages.getMessages - * Get most used chats: contacts.getTopPeers - * Get nearest datacenter: help.getNearestDc - * Get notification exceptions: account.getNotifyExceptions - * Get notification settings: account.getNotifySettings - * Get online status of all users: contacts.getStatuses - * Get online users: messages.getOnlines - * Get passport config: help.getPassportConfig - * Get payment form: payments.getPaymentForm - * Get payment receipt: payments.getPaymentReceipt - * Get people nearby (geochats): contacts.getLocated - * Get pinned dialogs: messages.getPinnedDialogs - * Get poll results: messages.getPollResults - * Get previous messages of a group: messages.getHistory - * Get privacy settings: account.getPrivacy - * Get recent locations: messages.getRecentLocations - * Get recent stickers: messages.getRecentStickers - * Get recent t.me URLs: help.getRecentMeUrls - * Get saved contacts: contacts.getSaved - * Get saved gifs: messages.getSavedGifs - * Get saved payments info: payments.getSavedInfo - * Get search counter: messages.getSearchCounters - * Get secure value for telegram passport: account.getSecureValue - * Get server configuration: help.getConfig - * Get stats URL: messages.getStatsURL - * Get stickers attachable to images: messages.getAttachedStickers - * Get stickers: messages.getStickers - * Get support name: help.getSupportName - * Get telegram web login authorizations: account.getWebAuthorizations - * Get temporary password for buying products through bots: account.getTmpPassword - * Get the callback answer of a bot (after clicking a button): messages.getBotCallbackAnswer - * Get the changelog of this app: help.getAppChangelog - * Get the current password: account.getPassword - * Get the link of a message in a channel: channels.exportMessageLink - * Get the profile photos of a user: photos.getUserPhotos - * Get the settings of apeer: messages.getPeerSettings - * Get unread mentions: messages.getUnreadMentions - * Get updated TOS: help.getTermsOfServiceUpdate - * Get user info: help.getUserInfo - * Get wallpaper info: account.getWallPaper - * Get webpage preview: messages.getWebPage - * Get webpage preview: messages.getWebPagePreview - * Gets list of chats: you should use $MadelineProto->get_dialogs() instead: https://docs.madelineproto.xyz/docs/DIALOGS.html: messages.getDialogs - * Global message search: messages.searchGlobal - * Hide peer settings bar: messages.hidePeerSettingsBar - * Import chat invite: messages.importChatInvite - * Initializes connection and save information on the user's device and application.: initConnection - * Install wallpaper: account.installWallPaper - * Invalidate sent phone code: auth.cancelCode - * Invoke method from takeout session: invokeWithTakeout - * Invoke this method with layer X: invokeWithLayer - * Invoke with messages range: invokeWithMessagesRange - * Invoke with method without returning updates in the socket: invokeWithoutUpdates - * Invokes a query after successfull completion of one of the previous queries.: invokeAfterMsg - * Join a channel/supergroup: channels.joinChannel - * Kick or ban a user from a channel/supergroup: channels.editBanned - * Leave a channel/supergroup: channels.leaveChannel - * Log data for developer of this app: help.saveAppLog - * Mark channel/supergroup history as read: channels.readHistory - * Mark channel/supergroup messages as read: channels.readMessageContents - * Mark dialog as unread : messages.markDialogUnread - * Mark mentions as read: messages.readMentions - * Mark message as read: messages.readMessageContents - * Mark messages as read in secret chats: messages.readEncryptedHistory - * Mark messages as read: messages.readHistory - * Mark messages as read: messages.receivedMessages - * Mark new featured stickers as read: messages.readFeaturedStickers - * Notify server that you received a call (server will refuse all incoming calls until the current call is over): phone.receivedCall - * Pin or unpin dialog: messages.toggleDialogPin - * Register device for push notifications: account.registerDevice - * Remove a sticker set: messages.uninstallStickerSet - * Remove sticker from stickerset: stickers.removeStickerFromSet - * Reorder pinned dialogs: messages.reorderPinnedDialogs - * Reorder sticker sets: messages.reorderStickerSets - * Report a message in a supergroup/channel for spam: channels.reportSpam - * Report a message: messages.report - * Report a peer for spam: messages.reportSpam - * Report for spam a secret chat: messages.reportEncryptedSpam - * Report for spam: account.reportPeer - * Request URL authorization: messages.requestUrlAuth - * Resend password recovery email: account.resendPasswordEmail - * Resend the SMS verification code: auth.resendCode - * Reset all notification settings: account.resetNotifySettings - * Reset all telegram web login authorizations: account.resetWebAuthorizations - * Reset saved contacts: contacts.resetSaved - * Reset top peer rating for a certain category/peer: contacts.resetTopPeerRating - * Reset wallpapers: account.resetWallPapers - * Result type returned by a current query.: invokeAfterMsgs - * Returns a list of available wallpapers.: account.getWallPapers - * Save a GIF: messages.saveGif - * Save a message draft: messages.saveDraft - * Save autodownload settings: account.saveAutoDownloadSettings - * Save call debugging info: phone.saveCallDebug - * Save telegram passport secure value: account.saveSecureValue - * Save wallpaper: account.saveWallPaper - * Search contacts: contacts.search - * Search gifs: messages.searchGifs - * Search peers or messages: messages.search - * Send a custom request to the bot API: bots.sendCustomRequest - * Send a file to a secret chat: messages.sendEncryptedFile - * Send a media: messages.sendMedia - * Send a message: messages.sendMessage - * Send a service message to a secret chat: messages.sendEncryptedService - * Send an album: messages.sendMultiMedia - * Send an email to recover the 2FA password: auth.requestPasswordRecovery - * Send confirmation phone code: account.sendConfirmPhoneCode - * Send email verification code: account.sendVerifyEmailCode - * Send inline bot result obtained with messages.getInlineBotResults to the chat: messages.sendInlineBotResult - * Send message to secret chat: messages.sendEncrypted - * Send phone verification code: account.sendVerifyPhoneCode - * Send screenshot notification: messages.sendScreenshotNotification - * Send typing notification to secret chat: messages.setEncryptedTyping - * Send vote: messages.sendVote - * Send webhook request via bot API: bots.answerWebhookJSONQuery - * Set account TTL: account.setAccountTTL - * Set contact sign up notification: account.setContactSignUpNotification - * Set discussion group of channel: channels.setDiscussionGroup - * Set phone call rating: phone.setCallRating - * Set privacy settings: account.setPrivacy - * Set secure value error for telegram passport: users.setSecureValueErrors - * Set the game score of an inline message: messages.setInlineGameScore - * Set the game score: messages.setGameScore - * Set the supergroup/channel stickerpack: channels.setStickers - * Set the update status of webhook: help.setBotUpdatesStatus - * Start a bot: messages.startBot - * Start account exporting session: account.initTakeoutSession - * Stop sending PUSH notifications to app: account.unregisterDevice - * Toggle channel signatures: channels.toggleSignatures - * Toggle top peers: contacts.toggleTopPeers - * Unblock a user: contacts.unblock - * Update online status: account.updateStatus - * Update pinned message: messages.updatePinnedMessage - * Update profile info: account.updateProfile - * Update the username of a supergroup/channel: channels.updateUsername - * Update this user's username: account.updateUsername - * Upload a file without sending it to anyone: messages.uploadMedia - * Upload a secret chat file without sending it to anyone: messages.uploadEncryptedFile - * Upload profile photo: photos.uploadProfilePhoto - * Upload wallpaper: account.uploadWallPaper - * Use the code that was emailed to you after running $MadelineProto->auth->requestPasswordRecovery to login to your account: auth.recoverPassword - * Validate requested payment info: payments.validateRequestedInfo - * Verify email address: account.verifyEmail - * Verify phone number: account.verifyPhone * [Peers](https://docs.madelineproto.xyz/docs/USING_METHODS.html#peers) * [Files](https://docs.madelineproto.xyz/docs/FILES.html) * [Secret chats](https://docs.madelineproto.xyz/docs/USING_METHODS.html#secret-chats) diff --git a/docs b/docs index cbd913fb..7414ae3e 160000 --- a/docs +++ b/docs @@ -1 +1 @@ -Subproject commit cbd913fba2249a0055bb732452a67f911119c40a +Subproject commit 7414ae3e537b26a15b75d2a00ef6f93e702d2cd8 diff --git a/src/danog/MadelineProto/Connection.php b/src/danog/MadelineProto/Connection.php index 3d5a6149..9caa7706 100644 --- a/src/danog/MadelineProto/Connection.php +++ b/src/danog/MadelineProto/Connection.php @@ -20,7 +20,6 @@ namespace danog\MadelineProto; use Amp\ByteStream\ClosedException; use Amp\Deferred; -use Amp\Promise; use danog\MadelineProto\Loop\Connection\CheckLoop; use danog\MadelineProto\Loop\Connection\HttpWaitLoop; use danog\MadelineProto\Loop\Connection\ReadLoop; @@ -42,6 +41,7 @@ class Connection use Crypt; use MsgIdHandler; use SeqNoHandler; + use \danog\Serializable; use Tools; @@ -55,40 +55,23 @@ class Connection public $stream; - public $time_delta = 0; public $type = 0; public $peer_tag; + public $temp_auth_key; public $auth_key; - public $session_id; - public $session_out_seq_no = 0; - public $session_in_seq_no = 0; - public $incoming_messages = []; - public $outgoing_messages = []; - public $new_incoming = []; - public $new_outgoing = []; + + public $pending_outgoing = []; public $pending_outgoing_key = 0; - public $pending_outgoing_unencrypted = []; - public $pending_outgoing_unencrypted_key = 0; - public $max_incoming_id; - public $max_outgoing_id; + public $authorized = false; - public $call_queue = []; - public $ack_queue = []; - public $i = []; - public $last_recv = 0; - private $last_chunk = 0; - public $last_http_wait = 0; public $datacenter; public $API; - public $resumeWriterDeferred; - public $ctx; - public $pendingCheckWatcherId; - public $http_req_count = 0; - public $http_res_count = 0; + public $ctx; + public function getCtx() { @@ -277,19 +260,7 @@ class Connection { return __CLASS__; } - public function haveRead() - { - $this->last_chunk = microtime(true); - } - /** - * Get the receive date of the latest chunk of data from the socket - * - * @return void - */ - public function getLastChunk() - { - return $this->last_chunk; - } + /** * Sleep function. * @@ -301,14 +272,4 @@ class Connection { return ['peer_tag', 'temp_auth_key', 'auth_key', 'session_id', 'session_out_seq_no', 'session_in_seq_no', 'max_incoming_id', 'max_outgoing_id', 'authorized', 'ack_queue']; } - - public function __wakeup() - { - $this->time_delta = 0; - $this->pending_outgoing = []; - $this->new_outgoing = []; - $this->new_incoming = []; - $this->outgoing_messages = []; - $this->incoming_messages = []; - } } diff --git a/src/danog/MadelineProto/DocsBuilder/Methods.php b/src/danog/MadelineProto/DocsBuilder/Methods.php index 9ff06eaa..d9ae773c 100644 --- a/src/danog/MadelineProto/DocsBuilder/Methods.php +++ b/src/danog/MadelineProto/DocsBuilder/Methods.php @@ -23,9 +23,21 @@ trait Methods { public function mk_methods() { - $bots = json_decode(file_get_contents('https://rpc.pwrtelegram.xyz/?bot'), true)['result']; - $errors = json_decode(file_get_contents('https://rpc.pwrtelegram.xyz/?all'), true); - $errors['result'] = array_merge_recursive(...$errors['result']); + static $bots; + if (!$bots) $bots = json_decode(file_get_contents('https://rpc.pwrtelegram.xyz/?bot'), true)['result']; + static $errors; + if (!$errors) $errors = json_decode(file_get_contents('https://rpc.pwrtelegram.xyz/?all'), true); + $new = ['result' => []]; + foreach ($errors['result'] as $code => $suberrors) { + foreach ($suberrors as $method => $suberrors) { + if (!isset($new[$method])) { + $new[$method] = []; + } + foreach ($suberrors as $error) { + $new['result'][$method][] = [$error, $code]; + } + } + } foreach (glob('methods/'.$this->any) as $unlink) { unlink($unlink); } @@ -76,11 +88,12 @@ trait Methods $this->docs_methods[$method] = '$MadelineProto->'.$md_method.'(\\['.$params.'\\]) === [$'.str_replace('_', '\\_', $type).'](../types/'.$php_type.'.md) '; +/* if (!isset(\danog\MadelineProto\MTProto::DISALLOWED_METHODS[$data['method']]) && isset($this->td_descriptions['methods'][$data['method']])) { $this->human_docs_methods[$this->td_descriptions['methods'][$data['method']]['description'].': '.$data['method']] = '* '.$this->td_descriptions['methods'][$data['method']]['description'].': '.$data['method'].' '; - } + }*/ $params = ''; $lua_params = ''; $pwr_params = ''; @@ -192,11 +205,12 @@ image: https://docs.madelineproto.xyz/favicons/android-chrome-256x256.png '; +/* if (isset(\danog\MadelineProto\MTProto::DISALLOWED_METHODS[$data['method']])) { $header .= '**'.\danog\MadelineProto\MTProto::DISALLOWED_METHODS[$data['method']]."**\n\n\n\n\n"; file_put_contents('methods/'.$method.'.md', $header); continue; - } + }*/ if ($this->td) { $header .= 'YOU CANNOT USE THIS METHOD IN MADELINEPROTO @@ -293,14 +307,15 @@ You can also use normal markdown, note that to create mentions you must use the MadelineProto supports all html entities supported by [html_entity_decode](http://php.net/manual/en/function.html-entity-decode.php). '; } - if (isset($errors['result'][$data['method']])) { - $example .= '### Errors this method can return: + if (isset($new['result'][$data['method']])) { + $example .= '### Errors -| Error | Description | -|----------|---------------| +| Code | Type | Description | +|------|----------|---------------| '; - foreach ($errors['result'][$data['method']] as $error) { - $example .= '|'.$error.'|'.$errors['human_result'][$error][0].'|'."\n"; + foreach ($new['result'][$data['method']] as $error) { + [$error, $code] = $error; + $example .= "|$code|$error|".$errors['human_result'][$error][0].'|'."\n"; } $example .= "\n\n"; } diff --git a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php index 9d8461d5..21160737 100644 --- a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php @@ -214,7 +214,6 @@ class ReadLoop extends SignalLoop $connection->incoming_messages[$message_id]['content'] = $deserialized; $connection->incoming_messages[$message_id]['response'] = -1; $connection->new_incoming[$message_id] = $message_id; - $connection->last_recv = time(); $connection->last_http_wait = 0; $API->logger->logger('Received payload from DC '.$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE); diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index cce3afd8..8010e342 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -133,6 +133,7 @@ class MTProto extends AsyncConstruct implements TLCallback public $authorized = 0; public $authorized_dc = -1; private $rsa_keys = []; + private $cdn_rsa_keys = []; private $dh_config = ['version' => 0]; public $chats = []; public $channel_participants = []; @@ -180,6 +181,7 @@ class MTProto extends AsyncConstruct implements TLCallback } // Load rsa keys $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['load_rsa'], Logger::ULTRA_VERBOSE); + $this->rsa_keys = []; foreach ($this->settings['authorization']['rsa_keys'] as $key) { $key = yield (new RSA())->load($key); $this->rsa_keys[$key->fp] = $key; @@ -1016,7 +1018,7 @@ class MTProto extends AsyncConstruct implements TLCallback try { foreach ((yield $this->method_call_async_read('help.getCdnConfig', [], ['datacenter' => $datacenter]))['public_keys'] as $curkey) { $tempkey = new \danog\MadelineProto\RSA($curkey['public_key']); - $this->rsa_keys[$tempkey->fp] = $tempkey; + $this->cdn_rsa_keys[$tempkey->fp] = $tempkey; } } catch (\danog\MadelineProto\TL\Exception $e) { $this->logger->logger($e->getMessage(), \danog\MadelineProto\Logger::FATAL_ERROR); diff --git a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php index 76959f2d..23147109 100644 --- a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php @@ -35,7 +35,8 @@ trait AuthKeyHandler public function create_auth_key_async($expires_in, $datacenter): \Generator { - $req_pq = strpos($datacenter, 'cdn') ? 'req_pq' : 'req_pq_multi'; + $cdn = strpos($datacenter, 'cdn'); + $req_pq = $cdn ? 'req_pq' : 'req_pq_multi'; for ($retry_id_total = 1; $retry_id_total <= $this->settings['max_tries']['authorization']; $retry_id_total++) { try { $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['req_pq'], \danog\MadelineProto\Logger::VERBOSE); @@ -69,7 +70,7 @@ trait AuthKeyHandler * *********************************************************************** * Find our key in the server_public_key_fingerprints vector */ - foreach ($this->rsa_keys as $curkey) { + foreach ($cdn ? array_merge($this->cdn_rsa_keys, $this->rsa_keys) : $this->rsa_keys as $curkey) { if (in_array($curkey->fp, $ResPQ['server_public_key_fingerprints'])) { $key = $curkey; } diff --git a/src/danog/MadelineProto/Stream/ConnectionContext.php b/src/danog/MadelineProto/Stream/ConnectionContext.php index 2ee330ec..17bc7def 100644 --- a/src/danog/MadelineProto/Stream/ConnectionContext.php +++ b/src/danog/MadelineProto/Stream/ConnectionContext.php @@ -23,6 +23,7 @@ use Amp\Socket\ClientConnectContext; use Amp\Uri\Uri; use danog\MadelineProto\Stream\Transport\DefaultStream; use danog\MadelineProto\Stream\MTProtoTransport\ObfuscatedStream; +use danog\MadelineProto\Exception; /** * Connection context class. @@ -218,6 +219,7 @@ class ConnectionContext { return $this->isDns; } + /** * Whether this connection context will only be used by the DNS client * @@ -262,6 +264,10 @@ class ConnectionContext */ public function setDc($dc): self { + $int = intval($dc); + if (!(1 <= $int && $int <= 1000)) { + throw new Exception("Invalid DC id provided: $dc"); + } $this->dc = $dc; return $this; diff --git a/src/danog/MadelineProto/Stream/MTProtoTools/MsgIdHandler.php b/src/danog/MadelineProto/Stream/MTProtoTools/MsgIdHandler.php index ffc3e4eb..543663ab 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTools/MsgIdHandler.php +++ b/src/danog/MadelineProto/Stream/MTProtoTools/MsgIdHandler.php @@ -24,6 +24,9 @@ namespace danog\MadelineProto\Stream\MTProtoTools; */ trait MsgIdHandler { + public $max_incoming_id; + public $max_outgoing_id; + public function check_message_id($new_message_id, $aargs) { if (!is_object($new_message_id)) { diff --git a/src/danog/MadelineProto/Stream/MTProtoTools/SeqNoHandler.php b/src/danog/MadelineProto/Stream/MTProtoTools/SeqNoHandler.php index 11206963..c0128b82 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTools/SeqNoHandler.php +++ b/src/danog/MadelineProto/Stream/MTProtoTools/SeqNoHandler.php @@ -26,6 +26,11 @@ trait SeqNoHandler { use \danog\MadelineProto\MTProtoTools\SeqNoHandler; + public $session_out_seq_no = 0; + public $session_in_seq_no = 0; + + public $session_id; + public function generate_out_seq_no($content_related) { $in = $content_related ? 1 : 0; diff --git a/src/danog/MadelineProto/Stream/MTProtoTools/Session.php b/src/danog/MadelineProto/Stream/MTProtoTools/Session.php new file mode 100644 index 00000000..09308aae --- /dev/null +++ b/src/danog/MadelineProto/Stream/MTProtoTools/Session.php @@ -0,0 +1,59 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2019 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Stream\MTProtoTools; + +/** + * Manages MTProto session-specific data + */ +class Session +{ + use MsgIdHandler; + use SaltHandler; + use SeqNoHandler; + public $incoming_messages = []; + public $outgoing_messages = []; + public $new_incoming = []; + public $new_outgoing = []; + + public $http_req_count = 0; + public $http_res_count = 0; + + public $last_http_wait = 0; + private $last_chunk = 0; + + public $time_delta = 0; + + public $call_queue = []; + public $ack_queue = []; + + + public function haveRead() + { + $this->last_chunk = microtime(true); + } + /** + * Get the receive date of the latest chunk of data from the socket + * + * @return void + */ + public function getLastChunk() + { + return $this->last_chunk; + } +} \ No newline at end of file diff --git a/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php b/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php index 2aa2759d..63dbfd4a 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php +++ b/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php @@ -186,11 +186,13 @@ class ObfuscatedStream implements BufferedProxyStreamInterface */ public function setExtra($extra) { - if (isset($extra['secret']) && strlen($extra['secret']) > 17) { - $extra['secret'] = hex2bin($extra['secret']); - } - if (isset($extra['secret']) && strlen($extra['secret']) == 17) { - $extra['secret'] = substr($extra['secret'], 0, 16); + if (isset($extra['secret'])) { + if (strlen($extra['secret']) > 17) { + $extra['secret'] = hex2bin($extra['secret']); + } + if (strlen($extra['secret']) == 17) { + $extra['secret'] = substr($extra['secret'], 1, 16); + } } $this->extra = $extra; } From f33c3c9ee65a7f07af844099953df76ddc348b7b Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sat, 6 Jul 2019 17:57:14 +0200 Subject: [PATCH 15/17] Typo fix --- src/danog/MadelineProto/Wrappers/Loop.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/danog/MadelineProto/Wrappers/Loop.php b/src/danog/MadelineProto/Wrappers/Loop.php index 7f83c44c..5dd11649 100644 --- a/src/danog/MadelineProto/Wrappers/Loop.php +++ b/src/danog/MadelineProto/Wrappers/Loop.php @@ -182,7 +182,7 @@ trait Loop header('Connection: close'); ignore_user_abort(true); ob_start(); - echo '

'.$message.'

'; + echo '

'.$message.'

'; $size = ob_get_length(); header("Content-Length: $size"); header('Content-Type: text/html'); From 60989481fff7220bb9dfc510a0ce81accbe8c73f Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sat, 6 Jul 2019 19:01:11 +0200 Subject: [PATCH 16/17] More fixes --- src/danog/MadelineProto/Wrappers/Loop.php | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/danog/MadelineProto/Wrappers/Loop.php b/src/danog/MadelineProto/Wrappers/Loop.php index 5dd11649..defb2b62 100644 --- a/src/danog/MadelineProto/Wrappers/Loop.php +++ b/src/danog/MadelineProto/Wrappers/Loop.php @@ -178,12 +178,13 @@ trait Loop return; } $this->logger->logger($message); + $buffer = @ob_get_contents(); @ob_end_clean(); header('Connection: close'); ignore_user_abort(true); - ob_start(); - echo '

'.$message.'

'; - $size = ob_get_length(); + $buffer .= '

'.htmlentities($message).'

'; + echo $buffer; + $size = max(ob_get_length(), strlen($buffer)); header("Content-Length: $size"); header('Content-Type: text/html'); ob_end_flush(); From bdc3ff6d8b9015e484e8f624f1fe814d54673fb1 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sat, 6 Jul 2019 19:27:13 +0200 Subject: [PATCH 17/17] bugfixes --- src/danog/MadelineProto/Loop/Connection/ReadLoop.php | 2 -- src/danog/MadelineProto/Tools.php | 3 --- tests/testing.php | 9 +++++++++ 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php index a1e3cb36..a958e97d 100644 --- a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php @@ -27,8 +27,6 @@ use danog\MadelineProto\Loop\Impl\SignalLoop; use danog\MadelineProto\MTProtoTools\Crypt; use danog\MadelineProto\NothingInTheSocketException; use danog\MadelineProto\Tools; -use Amp\ByteStream\StreamException; -use Amp\ByteStream\PendingReadError; /** * Socket read loop. diff --git a/src/danog/MadelineProto/Tools.php b/src/danog/MadelineProto/Tools.php index 42ca113b..242db9fc 100644 --- a/src/danog/MadelineProto/Tools.php +++ b/src/danog/MadelineProto/Tools.php @@ -23,7 +23,6 @@ use Amp\Failure; use Amp\Loop; use Amp\Promise; use Amp\Success; -use function Amp\ByteStream\getOutputBufferStream; use function Amp\ByteStream\getStdin; use function Amp\ByteStream\getStdout; use function Amp\Promise\all; @@ -32,8 +31,6 @@ use function Amp\Promise\first; use function Amp\Promise\some; use function Amp\Promise\timeout; use function Amp\Promise\wait; -use function Amp\ByteStream\getStdin; -use function Amp\ByteStream\getStdout; use function Amp\ByteStream\getOutputBufferStream; use function Amp\File\exists; use function Amp\File\touch; diff --git a/tests/testing.php b/tests/testing.php index dce3c12b..44df8964 100755 --- a/tests/testing.php +++ b/tests/testing.php @@ -58,6 +58,15 @@ try { $MadelineProto->accept_tos(); } } + + $inputMediaUploadedPhoto1 = ['_' => 'inputMediaUploadedPhoto','file' => '1.jpg']; + $inputMediaUploadedPhoto2 = ['_' => 'inputMediaUploadedPhoto','file' => '2.jpg']; + $inputMediaUploadedPhoto3 = ['_' => 'inputMediaUploadedPhoto','file' => '3.jpg']; + $inputSingleMedia1 = ['_' => 'inputSingleMedia', 'media' => $inputMediaUploadedPhoto1, 'message' => 'str']; + $inputSingleMedia2 = ['_' => 'inputSingleMedia', 'media' => $inputMediaUploadedPhoto2, 'message' => 'str']; + $inputSingleMedia3 = ['_' => 'inputSingleMedia', 'media' => $inputMediaUploadedPhoto3, 'message' => 'str']; + $Updates = $this->messages->sendMultiMedia(['peer' => 'danogentili','multi_media' => [$inputSingleMedia3, $inputSingleMedia2,$inputSingleMedia1]]); + //var_dump(count($MadelineProto->get_pwr_chat('@madelineproto')['participants'])); /*