diff --git a/src/danog/MadelineProto/Connection.php b/src/danog/MadelineProto/Connection.php index 20ef8729..3d5a6149 100644 --- a/src/danog/MadelineProto/Connection.php +++ b/src/danog/MadelineProto/Connection.php @@ -78,6 +78,7 @@ class Connection public $ack_queue = []; public $i = []; public $last_recv = 0; + private $last_chunk = 0; public $last_http_wait = 0; public $datacenter; @@ -276,7 +277,19 @@ class Connection { return __CLASS__; } - + public function haveRead() + { + $this->last_chunk = microtime(true); + } + /** + * Get the receive date of the latest chunk of data from the socket + * + * @return void + */ + public function getLastChunk() + { + return $this->last_chunk; + } /** * Sleep function. * diff --git a/src/danog/MadelineProto/DataCenter.php b/src/danog/MadelineProto/DataCenter.php index c6dc9369..dc6e7e4d 100644 --- a/src/danog/MadelineProto/DataCenter.php +++ b/src/danog/MadelineProto/DataCenter.php @@ -21,6 +21,7 @@ namespace danog\MadelineProto; use Amp\Artax\Client; use Amp\Artax\Cookie\ArrayCookieJar; +use Amp\Artax\Cookie\CookieJar; use Amp\Artax\DefaultClient; use Amp\Artax\HttpSocketPool; use Amp\CancellationToken; @@ -38,6 +39,7 @@ use Amp\Socket\ClientConnectContext; use Amp\Socket\ClientSocket; use Amp\Socket\ClientTlsContext; use Amp\Socket\ConnectException; +use Amp\Socket\Socket; use Amp\TimeoutException; use danog\MadelineProto\Stream\Common\BufferedRawStream; use danog\MadelineProto\Stream\ConnectionContext; @@ -55,7 +57,6 @@ use danog\MadelineProto\Stream\Transport\WssStream; use danog\MadelineProto\Stream\Transport\WsStream; use function Amp\call; use function Amp\Socket\Internal\parseUri; -use Amp\Artax\Cookie\CookieJar; /** * Manages datacenters. @@ -98,7 +99,7 @@ class DataCenter $this->HTTPClient = new DefaultClient($this->CookieJar, new HttpSocketPool(new ProxySocketPool([$this, 'rawConnectAsync']))); $DoHHTTPClient = new DefaultClient( - $this->CookieJar, + $this->CookieJar, new HttpSocketPool( new ProxySocketPool( function (string $uri, CancellationToken $token = null, ClientConnectContext $ctx = null) { @@ -327,8 +328,34 @@ class DataCenter continue; // Could not connect to host, try next host in the list. } + if ($dc = $ctx->getDc()) { + $callback = [$this->sockets[$dc], 'haveRead']; + $socket = new class($socket) extends ClientSocket + { + private $callback; + public function setReadCallback($callback) + { + $this->callback = $callback; + } - return new ClientSocket($socket); + /** @inheritdoc */ + public function read(): Promise + { + $promise = parent::read(); + $promise->onResolve(function ($e, $res) { + if ($res) { + ($this->callback)(); + } + }); + return $promise; + } + }; + $socket->setReadCallback($callback); + } else { + $socket = new ClientSocket($socket); + } + + return $socket; } // This is reached if either all URIs failed or the maximum number of attempts is reached. diff --git a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php index 3af1bec1..de04b42f 100644 --- a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php @@ -55,7 +55,9 @@ class CheckLoop extends ResumableSignalLoop } if ($connection->hasPendingCalls()) { - $last_recv = $connection->get_max_id(true); + $last_msgid = $connection->get_max_id(true); + $last_chunk = $connection->getLastChunk(); + if ($connection->temp_auth_key !== null) { $full_message_ids = $connection->getPendingCalls(); //array_values($connection->new_outgoing); foreach (array_chunk($full_message_ids, 8192) as $message_ids) { @@ -139,7 +141,7 @@ class CheckLoop extends ResumableSignalLoop return; } - if ($connection->get_max_id(true) === $last_recv) { + if ($connection->get_max_id(true) === $last_msgid && $connection->getLastChunk() === $last_chunk) { $API->logger->logger("We did not receive a response for $timeout seconds: reconnecting and exiting check loop on DC $datacenter"); $this->exitedLoop(); yield $connection->reconnect(); diff --git a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php index 0fcd7d9f..08b56a09 100644 --- a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php @@ -215,6 +215,7 @@ class WriteLoop extends ResumableSignalLoop 'system_lang_code' => $API->settings['app_info']['lang_code'], 'lang_code' => $API->settings['app_info']['lang_code'], 'lang_pack' => $API->settings['app_info']['lang_pack'], + 'proxy' => $connection->getCtx()->getInputClientProxy(), 'query' => $MTmessage['body'], ] ), diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 258c4430..c943149e 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -22,6 +22,9 @@ namespace danog\MadelineProto\MTProtoTools; use Amp\ByteStream\OutputStream; use Amp\ByteStream\ResourceOutputStream; use Amp\ByteStream\StreamException; +use Amp\File\StatCache; +use Amp\Promise; +use Amp\Success; use danog\MadelineProto\Async\AsyncParameters; use danog\MadelineProto\Exception; use danog\MadelineProto\FileCallbackInterface; @@ -30,9 +33,6 @@ use danog\MadelineProto\RPCErrorException; use function Amp\File\open; use function Amp\File\stat; use function Amp\Promise\all; -use Amp\Deferred; -use Amp\Success; -use Amp\File\StatCache; /** * Manages upload and download of files. @@ -517,7 +517,102 @@ trait Files throw new \danog\MadelineProto\Exception('Invalid constructor provided: '.$message_media['_']); } } + /* + public function download_to_browser_single_async($message_media, $cb = null) + { + if (php_sapi_name() === 'cli') { + throw new Exception('Cannot download file to browser from command line: start this script from a browser'); + } + if (headers_sent()) { + throw new Exception('Headers already sent, cannot stream file to browser!'); + } + if (is_object($message_media) && $message_media instanceof FileCallbackInterface) { + $cb = $message_media; + $message_media = $message_media->getFile(); + } + + $message_media = yield $this->get_download_info_async($message_media); + + $servefile = $_SERVER['REQUEST_METHOD'] !== 'HEAD'; + + if (isset($_SERVER['HTTP_RANGE'])) { + $range = explode('=', $_SERVER['HTTP_RANGE'], 2); + if (count($range) == 1) { + $range[1] = ''; + } + list($size_unit, $range_orig) = $range; + if ($size_unit == 'bytes') { + //multiple ranges could be specified at the same time, but for simplicity only serve the first range + //http://tools.ietf.org/id/draft-ietf-http-range-retrieval-00.txt + $list = explode(',', $range_orig, 2); + if (count($list) == 1) { + $list[1] = ''; + } + list($range, $extra_ranges) = $list; + } else { + $range = ''; + return Tools::noCache(416, '

416 Requested Range Not Satisfiable.


Could not use selected range.

'); + } + } else { + $range = ''; + } + $listseek = explode('-', $range, 2); + if (count($listseek) == 1) { + $listseek[1] = ''; + } + list($seek_start, $seek_end) = $listseek; + + $seek_end = empty($seek_end) ? ($message_media['size'] - 1) : min(abs(intval($seek_end)), $message_media['size'] - 1); + + if (!empty($seek_start) && $seek_end < abs(intval($seek_start))) { + return Tools::noCache(416, '

416 Requested Range Not Satisfiable.


Could not use selected range.

'); + } + $seek_start = empty($seek_start) ? 0 : abs(intval($seek_start)); + if ($servefile) { + if ($seek_start > 0 || $seek_end < $select['file_size'] - 1) { + header('HTTP/1.1 206 Partial Content'); + header('Content-Range: bytes '.$seek_start.'-'.$seek_end.'/'.$select['file_size']); + header('Content-Length: '.($seek_end - $seek_start + 1)); + } else { + header('Content-Length: '.$select['file_size']); + } + header('Content-Type: '.$select['mime']); + header('Cache-Control: max-age=31556926;'); + header('Content-Transfer-Encoding: Binary'); + header('Accept-Ranges: bytes'); + //header('Content-disposition: attachment: filename="'.basename($select['file_path']).'"'); + $MadelineProto->download_to_stream($select['file_id'], fopen('php://output', 'w'), function ($percent) { + flush(); + ob_flush(); + \danog\MadelineProto\Logger::log('Download status: '.$percent.'%'); + }, $seek_start, $seek_end + 1); + //analytics(true, $file_path, $MadelineProto->get_self()['id'], $dbuser, $dbpassword); + $MadelineProto->API->getting_state = false; + $MadelineProto->API->store_db([], true); + $MadelineProto->API->reset_session(); + } else { + if ($seek_start > 0 || $seek_end < $select['file_size'] - 1) { + header('HTTP/1.1 206 Partial Content'); + header('Content-Range: bytes '.$seek_start.'-'.$seek_end.'/'.$select['file_size']); + header('Content-Length: '.($seek_end - $seek_start + 1)); + } else { + header('Content-Length: '.$select['file_size']); + } + header('Content-Type: '.$select['mime']); + header('Cache-Control: max-age=31556926;'); + header('Content-Transfer-Encoding: Binary'); + header('Accept-Ranges: bytes'); + analytics(true, $file_path, null, $dbuser, $dbpassword); + //header('Content-disposition: attachment: filename="'.basename($select['file_path']).'"'); + } + + header('Content-Length: '.$info['size']); + header('Content-Type: '.$info['mime']); + }*/ + public function extract_photosize($photo) + { + } public function download_to_dir_async($message_media, $dir, $cb = null) { if (is_object($dir) && $dir instanceof FileCallbackInterface) { @@ -561,7 +656,6 @@ trait Files return $file; } - public function download_to_stream_async($message_media, $stream, $cb = null, $offset = 0, $end = -1) { $message_media = yield $this->get_download_info_async($message_media); @@ -592,6 +686,25 @@ trait Files } catch (StreamException $e) { } } + $callable = static function (string $payload, int $offset) use ($stream, $seekable) { + if ($seekable) { + while ($stream->tell() !== $offset) { + yield $stream->seek($offset); + } + } + return yield $stream->write($payload); + }; + + return yield $this->download_to_callable_async($message_media, $callable, $cb, $seekable, $offset, $end); + } + public function download_to_callable_async($message_media, $callable, $cb = null, $parallelize = true, $offset = 0, $end = -1) + { + $message_media = yield $this->get_download_info_async($message_media); + + if (is_object($callable) && $callable instanceof FileCallbackInterface) { + $cb = $callable; + $callable = $callable->getFile(); + } if ($end === -1 && isset($message_media['size'])) { $end = $message_media['size']; @@ -617,7 +730,6 @@ trait Files $ige->enableContinuousBuffer(); } - if ($offset === $end) { $cb(100); return true; @@ -629,8 +741,9 @@ trait Files $breakOut = false; for ($x = $offset - $start_at; $x < $probable_end; $x += $part_size) { $end_at = $part_size; - if ($end !== -1 && $x + $part_size >= $end) { - $end_at = $end % $part_size; + + if ($end !== -1 && $x + $part_size > $end) { + $end_at = ($x + $part_size) - $end; $breakOut = true; } @@ -655,15 +768,15 @@ trait Files $cb = function () use ($cb, $count) { static $cur = 0; $cur++; - $this->callFork($cb($cur*100/$count)); + $this->callFork($cb($cur * 100 / $count)); }; - + $cdn = false; $params[0]['previous_promise'] = new Success(true); $start = microtime(true); - $size = yield $this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, array_shift($params), $stream, $seekable); + $size = yield $this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, array_shift($params), $callable, $parallelize); if ($params) { $previous_promise = new Success(true); @@ -671,20 +784,20 @@ trait Files $promises = []; foreach ($params as $key => $param) { $param['previous_promise'] = $previous_promise; - $previous_promise = $this->call($this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, $param, $stream, $seekable)); + $previous_promise = $this->call($this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, $param, $callable, $parallelize)); $previous_promise->onResolve(static function ($e, $res) use (&$size) { if ($res) { $size += $res; } }); - $promises []= $previous_promise; + $promises[] = $previous_promise; if (!($key % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps yield $this->all($promises); $promises = []; $time = microtime(true) - $start; - $speed = (int) (($size*8)/$time)/1000000; + $speed = (int) (($size * 8) / $time) / 1000000; $this->logger->logger("Partial download time: $time"); $this->logger->logger("Partial download speed: $speed mbps"); } @@ -694,7 +807,7 @@ trait Files } } $time = microtime(true) - $start; - $speed = (int) (($size*8)/$time)/1000000; + $speed = (int) (($size * 8) / $time) / 1000000; $this->logger->logger("Total download time: $time"); $this->logger->logger("Total download speed: $speed mbps"); @@ -705,7 +818,7 @@ trait Files return true; } - private function download_part(&$message_media, &$cdn, &$datacenter, &$old_dc, &$ige, $cb, $offset, $stream, $seekable, $postpone = false) + private function download_part(&$message_media, &$cdn, &$datacenter, &$old_dc, &$ige, $cb, $offset, $callable, $seekable, $postpone = false) { static $method = [ false => 'upload.getFile', // non-cdn @@ -714,14 +827,14 @@ trait Files do { if (!$cdn) { $basic_param = [ - 'location' => $message_media['InputFileLocation'] + 'location' => $message_media['InputFileLocation'], ]; } else { $basic_param = [ - 'file_token' => $message_media['file_token'] + 'file_token' => $message_media['file_token'], ]; } - + try { $res = yield $this->method_call_async_read( $method[$cdn], @@ -731,7 +844,7 @@ trait Files 'file' => true, 'FloodWaitLimit' => 0, 'datacenter' => &$datacenter, - 'postpone' => $postpone + 'postpone' => $postpone, ] ); } catch (\danog\MadelineProto\RPCErrorException $e) { @@ -790,11 +903,11 @@ trait Files if ($cdn === false && $res['type']['_'] === 'storage.fileUnknown' && $res['bytes'] === '') { $datacenter = 0; } - while ($cdn === false && - $res['type']['_'] === 'storage.fileUnknown' && - $res['bytes'] === '' && - isset($this->datacenter->sockets[++$datacenter]) - ) { + while ($cdn === false && + $res['type']['_'] === 'storage.fileUnknown' && + $res['bytes'] === '' && + isset($this->datacenter->sockets[++$datacenter]) + ) { $res = yield $this->method_call_async_read('upload.getFile', $basic_param + $offset, ['heavy' => true, 'file' => true, 'FloodWaitLimit' => 0, 'datacenter' => $datacenter]); } @@ -807,17 +920,15 @@ trait Files $res['bytes'] = $ige->decrypt($res['bytes']); } if ($offset['part_start_at'] || $offset['part_end_at'] !== $offset['limit']) { - $res['bytes'] = substr($res['bytes'], $offset['part_start_at'], $offset['part_end_at']- $offset['part_start_at']); + $res['bytes'] = substr($res['bytes'], $offset['part_start_at'], $offset['part_end_at'] - $offset['part_start_at']); } - + if (!$seekable) { yield $offset['previous_promise']->promise(); - } else { - yield $stream->seek($offset['offset'] + $offset['part_start_at']); } - yield $stream->write($res['bytes']); + $res = yield $callable((string) $res['bytes'], $offset['offset'] + $offset['part_start_at']); $cb(); - return strlen($res['bytes']); + return $res; } while (true); } diff --git a/src/danog/MadelineProto/Stream/ConnectionContext.php b/src/danog/MadelineProto/Stream/ConnectionContext.php index 9dcd9fbb..2ee330ec 100644 --- a/src/danog/MadelineProto/Stream/ConnectionContext.php +++ b/src/danog/MadelineProto/Stream/ConnectionContext.php @@ -22,6 +22,7 @@ use Amp\CancellationToken; use Amp\Socket\ClientConnectContext; use Amp\Uri\Uri; use danog\MadelineProto\Stream\Transport\DefaultStream; +use danog\MadelineProto\Stream\MTProtoTransport\ObfuscatedStream; /** * Connection context class. @@ -363,6 +364,23 @@ class ConnectionContext return $obj; } + + /** + * Get the inputClientProxy proxy MTProto object + * + * @return array + */ + public function getInputClientProxy(): ?array + { + foreach ($this->nextStreams as $couple) { + list($streamName, $extra) = $couple; + if ($streamName === ObfuscatedStream::getName() && isset($extra['address'])) { + $extra['_'] = 'inputClientProxy'; + return $extra; + } + } + return null; + } /** * Get a description "name" of the context. * diff --git a/src/danog/MadelineProto/Tools.php b/src/danog/MadelineProto/Tools.php index d4a9ca92..43132140 100644 --- a/src/danog/MadelineProto/Tools.php +++ b/src/danog/MadelineProto/Tools.php @@ -360,6 +360,14 @@ trait Tools { return self::call(self::flockAsync($file, $operation, $polling)); } + public static function noCache(int $status, string $message) + { + header('Cache-Control: no-store, no-cache, must-revalidate, max-age=0'); + header('Cache-Control: post-check=0, pre-check=0', false); + header('Pragma: no-cache'); + http_response_code($status); + return self::echo($message); + } public static function flockAsync(string $file, int $operation, $polling) { if (!yield exists($file)) { diff --git a/src/danog/MadelineProto/VoIP/AuthKeyHandler.php b/src/danog/MadelineProto/VoIP/AuthKeyHandler.php index 84e7fcb0..3d25a39d 100644 --- a/src/danog/MadelineProto/VoIP/AuthKeyHandler.php +++ b/src/danog/MadelineProto/VoIP/AuthKeyHandler.php @@ -150,7 +150,7 @@ trait AuthKeyHandler } $this->calls[$params['id']]->setVisualization($visualization); - $this->calls[$params['id']]->configuration['endpoints'] = array_merge([$res['connection']], $res['alternative_connections'], $this->calls[$params['id']]->configuration['endpoints']); + $this->calls[$params['id']]->configuration['endpoints'] = array_merge($res['connections'], $this->calls[$params['id']]->configuration['endpoints']); $this->calls[$params['id']]->configuration = array_merge(['recv_timeout' => $this->config['call_receive_timeout_ms'] / 1000, 'init_timeout' => $this->config['call_connect_timeout_ms'] / 1000, 'data_saving' => \danog\MadelineProto\VoIP::DATA_SAVING_NEVER, 'enable_NS' => true, 'enable_AEC' => true, 'enable_AGC' => true, 'auth_key' => $key, 'auth_key_id' => substr(sha1($key, true), -8), 'call_id' => substr(hash('sha256', $key, true), -16), 'network_type' => \danog\MadelineProto\VoIP::NET_TYPE_ETHERNET], $this->calls[$params['id']]->configuration); $this->calls[$params['id']]->parseConfig(); $res = $this->calls[$params['id']]->startTheMagic(); @@ -191,7 +191,8 @@ trait AuthKeyHandler $visualization[] = \danog\MadelineProto\Magic::$emojis[(int) (new \phpseclib\Math\BigInteger($number, 256))->divide($length)[1]->toString()]; } $this->calls[$params['id']]->setVisualization($visualization); - $this->calls[$params['id']]->configuration['endpoints'] = array_merge([$params['connection']], $params['alternative_connections'], $this->calls[$params['id']]->configuration['endpoints']); + var_dump($params); + $this->calls[$params['id']]->configuration['endpoints'] = array_merge($params['connections'], $this->calls[$params['id']]->configuration['endpoints']); $this->calls[$params['id']]->configuration = array_merge(['recv_timeout' => $this->config['call_receive_timeout_ms'] / 1000, 'init_timeout' => $this->config['call_connect_timeout_ms'] / 1000, 'data_saving' => \danog\MadelineProto\VoIP::DATA_SAVING_NEVER, 'enable_NS' => true, 'enable_AEC' => true, 'enable_AGC' => true, 'auth_key' => $key, 'auth_key_id' => substr(sha1($key, true), -8), 'call_id' => substr(hash('sha256', $key, true), -16), 'network_type' => \danog\MadelineProto\VoIP::NET_TYPE_ETHERNET], $this->calls[$params['id']]->configuration); $this->calls[$params['id']]->parseConfig();