Implement download_to_callback

This commit is contained in:
Daniil Gentili 2019-06-24 14:42:14 +02:00
commit 2b1f44a31f
8 changed files with 219 additions and 38 deletions

View File

@ -78,6 +78,7 @@ class Connection
public $ack_queue = [];
public $i = [];
public $last_recv = 0;
private $last_chunk = 0;
public $last_http_wait = 0;
public $datacenter;
@ -276,7 +277,19 @@ class Connection
{
return __CLASS__;
}
public function haveRead()
{
$this->last_chunk = microtime(true);
}
/**
* Get the receive date of the latest chunk of data from the socket
*
* @return void
*/
public function getLastChunk()
{
return $this->last_chunk;
}
/**
* Sleep function.
*

View File

@ -21,6 +21,7 @@ namespace danog\MadelineProto;
use Amp\Artax\Client;
use Amp\Artax\Cookie\ArrayCookieJar;
use Amp\Artax\Cookie\CookieJar;
use Amp\Artax\DefaultClient;
use Amp\Artax\HttpSocketPool;
use Amp\CancellationToken;
@ -38,6 +39,7 @@ use Amp\Socket\ClientConnectContext;
use Amp\Socket\ClientSocket;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectException;
use Amp\Socket\Socket;
use Amp\TimeoutException;
use danog\MadelineProto\Stream\Common\BufferedRawStream;
use danog\MadelineProto\Stream\ConnectionContext;
@ -55,7 +57,6 @@ use danog\MadelineProto\Stream\Transport\WssStream;
use danog\MadelineProto\Stream\Transport\WsStream;
use function Amp\call;
use function Amp\Socket\Internal\parseUri;
use Amp\Artax\Cookie\CookieJar;
/**
* Manages datacenters.
@ -98,7 +99,7 @@ class DataCenter
$this->HTTPClient = new DefaultClient($this->CookieJar, new HttpSocketPool(new ProxySocketPool([$this, 'rawConnectAsync'])));
$DoHHTTPClient = new DefaultClient(
$this->CookieJar,
$this->CookieJar,
new HttpSocketPool(
new ProxySocketPool(
function (string $uri, CancellationToken $token = null, ClientConnectContext $ctx = null) {
@ -327,8 +328,34 @@ class DataCenter
continue; // Could not connect to host, try next host in the list.
}
if ($dc = $ctx->getDc()) {
$callback = [$this->sockets[$dc], 'haveRead'];
$socket = new class($socket) extends ClientSocket
{
private $callback;
public function setReadCallback($callback)
{
$this->callback = $callback;
}
return new ClientSocket($socket);
/** @inheritdoc */
public function read(): Promise
{
$promise = parent::read();
$promise->onResolve(function ($e, $res) {
if ($res) {
($this->callback)();
}
});
return $promise;
}
};
$socket->setReadCallback($callback);
} else {
$socket = new ClientSocket($socket);
}
return $socket;
}
// This is reached if either all URIs failed or the maximum number of attempts is reached.

View File

@ -55,7 +55,9 @@ class CheckLoop extends ResumableSignalLoop
}
if ($connection->hasPendingCalls()) {
$last_recv = $connection->get_max_id(true);
$last_msgid = $connection->get_max_id(true);
$last_chunk = $connection->getLastChunk();
if ($connection->temp_auth_key !== null) {
$full_message_ids = $connection->getPendingCalls(); //array_values($connection->new_outgoing);
foreach (array_chunk($full_message_ids, 8192) as $message_ids) {
@ -139,7 +141,7 @@ class CheckLoop extends ResumableSignalLoop
return;
}
if ($connection->get_max_id(true) === $last_recv) {
if ($connection->get_max_id(true) === $last_msgid && $connection->getLastChunk() === $last_chunk) {
$API->logger->logger("We did not receive a response for $timeout seconds: reconnecting and exiting check loop on DC $datacenter");
$this->exitedLoop();
yield $connection->reconnect();

View File

@ -215,6 +215,7 @@ class WriteLoop extends ResumableSignalLoop
'system_lang_code' => $API->settings['app_info']['lang_code'],
'lang_code' => $API->settings['app_info']['lang_code'],
'lang_pack' => $API->settings['app_info']['lang_pack'],
'proxy' => $connection->getCtx()->getInputClientProxy(),
'query' => $MTmessage['body'],
]
),

View File

@ -22,6 +22,9 @@ namespace danog\MadelineProto\MTProtoTools;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\ByteStream\StreamException;
use Amp\File\StatCache;
use Amp\Promise;
use Amp\Success;
use danog\MadelineProto\Async\AsyncParameters;
use danog\MadelineProto\Exception;
use danog\MadelineProto\FileCallbackInterface;
@ -30,9 +33,6 @@ use danog\MadelineProto\RPCErrorException;
use function Amp\File\open;
use function Amp\File\stat;
use function Amp\Promise\all;
use Amp\Deferred;
use Amp\Success;
use Amp\File\StatCache;
/**
* Manages upload and download of files.
@ -517,7 +517,102 @@ trait Files
throw new \danog\MadelineProto\Exception('Invalid constructor provided: '.$message_media['_']);
}
}
/*
public function download_to_browser_single_async($message_media, $cb = null)
{
if (php_sapi_name() === 'cli') {
throw new Exception('Cannot download file to browser from command line: start this script from a browser');
}
if (headers_sent()) {
throw new Exception('Headers already sent, cannot stream file to browser!');
}
if (is_object($message_media) && $message_media instanceof FileCallbackInterface) {
$cb = $message_media;
$message_media = $message_media->getFile();
}
$message_media = yield $this->get_download_info_async($message_media);
$servefile = $_SERVER['REQUEST_METHOD'] !== 'HEAD';
if (isset($_SERVER['HTTP_RANGE'])) {
$range = explode('=', $_SERVER['HTTP_RANGE'], 2);
if (count($range) == 1) {
$range[1] = '';
}
list($size_unit, $range_orig) = $range;
if ($size_unit == 'bytes') {
//multiple ranges could be specified at the same time, but for simplicity only serve the first range
//http://tools.ietf.org/id/draft-ietf-http-range-retrieval-00.txt
$list = explode(',', $range_orig, 2);
if (count($list) == 1) {
$list[1] = '';
}
list($range, $extra_ranges) = $list;
} else {
$range = '';
return Tools::noCache(416, '<html><body><h1>416 Requested Range Not Satisfiable.</h1><br><p>Could not use selected range.</p></body></html>');
}
} else {
$range = '';
}
$listseek = explode('-', $range, 2);
if (count($listseek) == 1) {
$listseek[1] = '';
}
list($seek_start, $seek_end) = $listseek;
$seek_end = empty($seek_end) ? ($message_media['size'] - 1) : min(abs(intval($seek_end)), $message_media['size'] - 1);
if (!empty($seek_start) && $seek_end < abs(intval($seek_start))) {
return Tools::noCache(416, '<html><body><h1>416 Requested Range Not Satisfiable.</h1><br><p>Could not use selected range.</p></body></html>');
}
$seek_start = empty($seek_start) ? 0 : abs(intval($seek_start));
if ($servefile) {
if ($seek_start > 0 || $seek_end < $select['file_size'] - 1) {
header('HTTP/1.1 206 Partial Content');
header('Content-Range: bytes '.$seek_start.'-'.$seek_end.'/'.$select['file_size']);
header('Content-Length: '.($seek_end - $seek_start + 1));
} else {
header('Content-Length: '.$select['file_size']);
}
header('Content-Type: '.$select['mime']);
header('Cache-Control: max-age=31556926;');
header('Content-Transfer-Encoding: Binary');
header('Accept-Ranges: bytes');
//header('Content-disposition: attachment: filename="'.basename($select['file_path']).'"');
$MadelineProto->download_to_stream($select['file_id'], fopen('php://output', 'w'), function ($percent) {
flush();
ob_flush();
\danog\MadelineProto\Logger::log('Download status: '.$percent.'%');
}, $seek_start, $seek_end + 1);
//analytics(true, $file_path, $MadelineProto->get_self()['id'], $dbuser, $dbpassword);
$MadelineProto->API->getting_state = false;
$MadelineProto->API->store_db([], true);
$MadelineProto->API->reset_session();
} else {
if ($seek_start > 0 || $seek_end < $select['file_size'] - 1) {
header('HTTP/1.1 206 Partial Content');
header('Content-Range: bytes '.$seek_start.'-'.$seek_end.'/'.$select['file_size']);
header('Content-Length: '.($seek_end - $seek_start + 1));
} else {
header('Content-Length: '.$select['file_size']);
}
header('Content-Type: '.$select['mime']);
header('Cache-Control: max-age=31556926;');
header('Content-Transfer-Encoding: Binary');
header('Accept-Ranges: bytes');
analytics(true, $file_path, null, $dbuser, $dbpassword);
//header('Content-disposition: attachment: filename="'.basename($select['file_path']).'"');
}
header('Content-Length: '.$info['size']);
header('Content-Type: '.$info['mime']);
}*/
public function extract_photosize($photo)
{
}
public function download_to_dir_async($message_media, $dir, $cb = null)
{
if (is_object($dir) && $dir instanceof FileCallbackInterface) {
@ -561,7 +656,6 @@ trait Files
return $file;
}
public function download_to_stream_async($message_media, $stream, $cb = null, $offset = 0, $end = -1)
{
$message_media = yield $this->get_download_info_async($message_media);
@ -592,6 +686,25 @@ trait Files
} catch (StreamException $e) {
}
}
$callable = static function (string $payload, int $offset) use ($stream, $seekable) {
if ($seekable) {
while ($stream->tell() !== $offset) {
yield $stream->seek($offset);
}
}
return yield $stream->write($payload);
};
return yield $this->download_to_callable_async($message_media, $callable, $cb, $seekable, $offset, $end);
}
public function download_to_callable_async($message_media, $callable, $cb = null, $parallelize = true, $offset = 0, $end = -1)
{
$message_media = yield $this->get_download_info_async($message_media);
if (is_object($callable) && $callable instanceof FileCallbackInterface) {
$cb = $callable;
$callable = $callable->getFile();
}
if ($end === -1 && isset($message_media['size'])) {
$end = $message_media['size'];
@ -617,7 +730,6 @@ trait Files
$ige->enableContinuousBuffer();
}
if ($offset === $end) {
$cb(100);
return true;
@ -629,8 +741,9 @@ trait Files
$breakOut = false;
for ($x = $offset - $start_at; $x < $probable_end; $x += $part_size) {
$end_at = $part_size;
if ($end !== -1 && $x + $part_size >= $end) {
$end_at = $end % $part_size;
if ($end !== -1 && $x + $part_size > $end) {
$end_at = ($x + $part_size) - $end;
$breakOut = true;
}
@ -655,15 +768,15 @@ trait Files
$cb = function () use ($cb, $count) {
static $cur = 0;
$cur++;
$this->callFork($cb($cur*100/$count));
$this->callFork($cb($cur * 100 / $count));
};
$cdn = false;
$params[0]['previous_promise'] = new Success(true);
$start = microtime(true);
$size = yield $this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, array_shift($params), $stream, $seekable);
$size = yield $this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, array_shift($params), $callable, $parallelize);
if ($params) {
$previous_promise = new Success(true);
@ -671,20 +784,20 @@ trait Files
$promises = [];
foreach ($params as $key => $param) {
$param['previous_promise'] = $previous_promise;
$previous_promise = $this->call($this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, $param, $stream, $seekable));
$previous_promise = $this->call($this->download_part($message_media, $cdn, $datacenter, $old_dc, $ige, $cb, $param, $callable, $parallelize));
$previous_promise->onResolve(static function ($e, $res) use (&$size) {
if ($res) {
$size += $res;
}
});
$promises []= $previous_promise;
$promises[] = $previous_promise;
if (!($key % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps
yield $this->all($promises);
$promises = [];
$time = microtime(true) - $start;
$speed = (int) (($size*8)/$time)/1000000;
$speed = (int) (($size * 8) / $time) / 1000000;
$this->logger->logger("Partial download time: $time");
$this->logger->logger("Partial download speed: $speed mbps");
}
@ -694,7 +807,7 @@ trait Files
}
}
$time = microtime(true) - $start;
$speed = (int) (($size*8)/$time)/1000000;
$speed = (int) (($size * 8) / $time) / 1000000;
$this->logger->logger("Total download time: $time");
$this->logger->logger("Total download speed: $speed mbps");
@ -705,7 +818,7 @@ trait Files
return true;
}
private function download_part(&$message_media, &$cdn, &$datacenter, &$old_dc, &$ige, $cb, $offset, $stream, $seekable, $postpone = false)
private function download_part(&$message_media, &$cdn, &$datacenter, &$old_dc, &$ige, $cb, $offset, $callable, $seekable, $postpone = false)
{
static $method = [
false => 'upload.getFile', // non-cdn
@ -714,14 +827,14 @@ trait Files
do {
if (!$cdn) {
$basic_param = [
'location' => $message_media['InputFileLocation']
'location' => $message_media['InputFileLocation'],
];
} else {
$basic_param = [
'file_token' => $message_media['file_token']
'file_token' => $message_media['file_token'],
];
}
try {
$res = yield $this->method_call_async_read(
$method[$cdn],
@ -731,7 +844,7 @@ trait Files
'file' => true,
'FloodWaitLimit' => 0,
'datacenter' => &$datacenter,
'postpone' => $postpone
'postpone' => $postpone,
]
);
} catch (\danog\MadelineProto\RPCErrorException $e) {
@ -790,11 +903,11 @@ trait Files
if ($cdn === false && $res['type']['_'] === 'storage.fileUnknown' && $res['bytes'] === '') {
$datacenter = 0;
}
while ($cdn === false &&
$res['type']['_'] === 'storage.fileUnknown' &&
$res['bytes'] === '' &&
isset($this->datacenter->sockets[++$datacenter])
) {
while ($cdn === false &&
$res['type']['_'] === 'storage.fileUnknown' &&
$res['bytes'] === '' &&
isset($this->datacenter->sockets[++$datacenter])
) {
$res = yield $this->method_call_async_read('upload.getFile', $basic_param + $offset, ['heavy' => true, 'file' => true, 'FloodWaitLimit' => 0, 'datacenter' => $datacenter]);
}
@ -807,17 +920,15 @@ trait Files
$res['bytes'] = $ige->decrypt($res['bytes']);
}
if ($offset['part_start_at'] || $offset['part_end_at'] !== $offset['limit']) {
$res['bytes'] = substr($res['bytes'], $offset['part_start_at'], $offset['part_end_at']- $offset['part_start_at']);
$res['bytes'] = substr($res['bytes'], $offset['part_start_at'], $offset['part_end_at'] - $offset['part_start_at']);
}
if (!$seekable) {
yield $offset['previous_promise']->promise();
} else {
yield $stream->seek($offset['offset'] + $offset['part_start_at']);
}
yield $stream->write($res['bytes']);
$res = yield $callable((string) $res['bytes'], $offset['offset'] + $offset['part_start_at']);
$cb();
return strlen($res['bytes']);
return $res;
} while (true);
}

View File

@ -22,6 +22,7 @@ use Amp\CancellationToken;
use Amp\Socket\ClientConnectContext;
use Amp\Uri\Uri;
use danog\MadelineProto\Stream\Transport\DefaultStream;
use danog\MadelineProto\Stream\MTProtoTransport\ObfuscatedStream;
/**
* Connection context class.
@ -363,6 +364,23 @@ class ConnectionContext
return $obj;
}
/**
* Get the inputClientProxy proxy MTProto object
*
* @return array
*/
public function getInputClientProxy(): ?array
{
foreach ($this->nextStreams as $couple) {
list($streamName, $extra) = $couple;
if ($streamName === ObfuscatedStream::getName() && isset($extra['address'])) {
$extra['_'] = 'inputClientProxy';
return $extra;
}
}
return null;
}
/**
* Get a description "name" of the context.
*

View File

@ -360,6 +360,14 @@ trait Tools
{
return self::call(self::flockAsync($file, $operation, $polling));
}
public static function noCache(int $status, string $message)
{
header('Cache-Control: no-store, no-cache, must-revalidate, max-age=0');
header('Cache-Control: post-check=0, pre-check=0', false);
header('Pragma: no-cache');
http_response_code($status);
return self::echo($message);
}
public static function flockAsync(string $file, int $operation, $polling)
{
if (!yield exists($file)) {

View File

@ -150,7 +150,7 @@ trait AuthKeyHandler
}
$this->calls[$params['id']]->setVisualization($visualization);
$this->calls[$params['id']]->configuration['endpoints'] = array_merge([$res['connection']], $res['alternative_connections'], $this->calls[$params['id']]->configuration['endpoints']);
$this->calls[$params['id']]->configuration['endpoints'] = array_merge($res['connections'], $this->calls[$params['id']]->configuration['endpoints']);
$this->calls[$params['id']]->configuration = array_merge(['recv_timeout' => $this->config['call_receive_timeout_ms'] / 1000, 'init_timeout' => $this->config['call_connect_timeout_ms'] / 1000, 'data_saving' => \danog\MadelineProto\VoIP::DATA_SAVING_NEVER, 'enable_NS' => true, 'enable_AEC' => true, 'enable_AGC' => true, 'auth_key' => $key, 'auth_key_id' => substr(sha1($key, true), -8), 'call_id' => substr(hash('sha256', $key, true), -16), 'network_type' => \danog\MadelineProto\VoIP::NET_TYPE_ETHERNET], $this->calls[$params['id']]->configuration);
$this->calls[$params['id']]->parseConfig();
$res = $this->calls[$params['id']]->startTheMagic();
@ -191,7 +191,8 @@ trait AuthKeyHandler
$visualization[] = \danog\MadelineProto\Magic::$emojis[(int) (new \phpseclib\Math\BigInteger($number, 256))->divide($length)[1]->toString()];
}
$this->calls[$params['id']]->setVisualization($visualization);
$this->calls[$params['id']]->configuration['endpoints'] = array_merge([$params['connection']], $params['alternative_connections'], $this->calls[$params['id']]->configuration['endpoints']);
var_dump($params);
$this->calls[$params['id']]->configuration['endpoints'] = array_merge($params['connections'], $this->calls[$params['id']]->configuration['endpoints']);
$this->calls[$params['id']]->configuration = array_merge(['recv_timeout' => $this->config['call_receive_timeout_ms'] / 1000, 'init_timeout' => $this->config['call_connect_timeout_ms'] / 1000, 'data_saving' => \danog\MadelineProto\VoIP::DATA_SAVING_NEVER, 'enable_NS' => true, 'enable_AEC' => true, 'enable_AGC' => true, 'auth_key' => $key, 'auth_key_id' => substr(sha1($key, true), -8), 'call_id' => substr(hash('sha256', $key, true), -16), 'network_type' => \danog\MadelineProto\VoIP::NET_TYPE_ETHERNET], $this->calls[$params['id']]->configuration);
$this->calls[$params['id']]->parseConfig();