Implement parallelized upload
This commit is contained in:
parent
58d71a390a
commit
7dbf529bb1
@ -19,9 +19,12 @@
|
||||
|
||||
namespace danog\MadelineProto\MTProtoTools;
|
||||
|
||||
use Amp\ByteStream\InputStream;
|
||||
use Amp\ByteStream\OutputStream;
|
||||
use Amp\ByteStream\ResourceOutputStream;
|
||||
use Amp\ByteStream\StreamException;
|
||||
use Amp\Deferred;
|
||||
use Amp\File\Handle;
|
||||
use Amp\File\StatCache;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
@ -30,8 +33,13 @@ use danog\MadelineProto\Exception;
|
||||
use danog\MadelineProto\FileCallbackInterface;
|
||||
use danog\MadelineProto\Logger;
|
||||
use danog\MadelineProto\RPCErrorException;
|
||||
use danog\MadelineProto\Stream\Common\BufferedRawStream;
|
||||
use danog\MadelineProto\Stream\ConnectionContext;
|
||||
use danog\MadelineProto\Stream\Transport\PremadeStream;
|
||||
use function Amp\File\exists;
|
||||
use function Amp\File\open;
|
||||
use function Amp\File\stat;
|
||||
use function Amp\File\touch;
|
||||
use function Amp\Promise\all;
|
||||
|
||||
/**
|
||||
@ -39,45 +47,112 @@ use function Amp\Promise\all;
|
||||
*/
|
||||
trait Files
|
||||
{
|
||||
public function upload_async($file, $file_name = '', $cb = null, $encrypted = false): \Generator
|
||||
public function upload_async($file, $file_name = '', $cb = null, $encrypted = false)
|
||||
{
|
||||
if (is_object($file)) {
|
||||
if (!$file instanceof FileCallbackInterface) {
|
||||
throw new \danog\MadelineProto\Exception('Provided object does not implement FileCallbackInterface');
|
||||
}
|
||||
if (is_object($file) && $file instanceof FileCallbackInterface) {
|
||||
$cb = $file;
|
||||
$file = $file->getFile();
|
||||
}
|
||||
if (is_string($file) || (is_object($file) && method_exists($file, '__toString'))) {
|
||||
if (filter_var($file, FILTER_VALIDATE_URL)) {
|
||||
return yield $this->upload_from_url_async($file);
|
||||
}
|
||||
} else if (is_array($file)) {
|
||||
return yield $this->upload_from_tgfile($file, $cb, $encrypted);
|
||||
}
|
||||
|
||||
$file = \danog\MadelineProto\Absolute::absolute($file);
|
||||
if (!file_exists($file)) {
|
||||
if (!yield exists($file)) {
|
||||
throw new \danog\MadelineProto\Exception(\danog\MadelineProto\Lang::$current_lang['file_not_exist']);
|
||||
}
|
||||
if (empty($file_name)) {
|
||||
$file_name = basename($file);
|
||||
}
|
||||
$datacenter = $this->settings['connection_settings']['default_dc'];
|
||||
if (isset($this->datacenter->sockets[$datacenter.'_media'])) {
|
||||
$datacenter .= '_media';
|
||||
}
|
||||
$file_size = filesize($file);
|
||||
if ($file_size > 512 * 1024 * 3000) {
|
||||
|
||||
StatCache::clear($file);
|
||||
|
||||
$size = (yield stat($file))['size'];
|
||||
if ($size > 512 * 1024 * 3000) {
|
||||
throw new \danog\MadelineProto\Exception('Given file is too big!');
|
||||
}
|
||||
if ($cb === null) {
|
||||
$cb = function ($percent) {
|
||||
$this->logger->logger('Upload status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE);
|
||||
|
||||
$stream = yield open($file, 'rb');
|
||||
$mime = $this->get_mime_from_file($file);
|
||||
|
||||
try {
|
||||
return yield $this->upload_from_stream_async($stream, $size, $stream, $mime, $cb, $encrypted);
|
||||
} finally {
|
||||
yield $stream->close();
|
||||
}
|
||||
}
|
||||
public function upload_from_url_async($url, int $size = 0, string $file_name = '', $cb = null, bool $encrypted = false)
|
||||
{
|
||||
if (is_object($url) && $url instanceof FileCallbackInterface) {
|
||||
$cb = $url;
|
||||
$url = $url->getFile();
|
||||
}
|
||||
/** @var $response \Amp\Artax\Response */
|
||||
$response = yield $this->datacenter->getHTTPClient()->request($url);
|
||||
if (200 !== $status = $response->getStatusCode) {
|
||||
throw new Exception("Wrong status code: $status");
|
||||
}
|
||||
$mime = trim(explode(';', $response->getHeader('content-type') ?? 'application/octet-stream')[0]);
|
||||
$size = $response->getHeader('content-length') ?? $size;
|
||||
if (!$size) {
|
||||
throw new Exception('Wrong size');
|
||||
}
|
||||
|
||||
$body = $response->getBody();
|
||||
|
||||
return yield $this->upload_from_stream_async($body, $size, $mime, $file_name, $cb, $encrypted);
|
||||
}
|
||||
public function upload_from_stream_async($stream, int $size, string $mime, string $file_name = '', $cb = null, bool $encrypted = false)
|
||||
{
|
||||
if (is_object($stream) && $stream instanceof FileCallbackInterface) {
|
||||
$cb = $stream;
|
||||
$stream = $stream->getFile();
|
||||
}
|
||||
|
||||
/** @var $stream \Amp\ByteStream\OutputStream */
|
||||
if (!is_object($stream)) {
|
||||
$stream = new ResourceOutputStream($stream);
|
||||
}
|
||||
if (!$stream instanceof InputStream) {
|
||||
throw new Exception("Invalid stream provided");
|
||||
}
|
||||
$seekable = false;
|
||||
if (method_exists($stream, 'seek')) {
|
||||
try {
|
||||
yield $stream->seek(0);
|
||||
$seekable = true;
|
||||
} catch (StreamException $e) {
|
||||
}
|
||||
}
|
||||
|
||||
if ($stream instanceof Handle || $stream instanceof BufferedRawStream) {
|
||||
$callable = static function (int $offset, int $size) use ($stream, $seekable) {
|
||||
if ($seekable) {
|
||||
while ($stream->tell() !== $offset) {
|
||||
yield $stream->seek($offset);
|
||||
}
|
||||
}
|
||||
return yield $stream->read($size);
|
||||
};
|
||||
} else {
|
||||
$ctx = (new ConnectionContext)
|
||||
->addStream(PremadeStream::getName(), $stream)
|
||||
->addStream(BufferedRawStream);
|
||||
|
||||
$stream = yield $ctx->getStream();
|
||||
|
||||
$callable = static function (int $offset, int $size) use ($stream) {
|
||||
return yield $stream->read($size);
|
||||
};
|
||||
}
|
||||
|
||||
$f = fopen($file, 'r');
|
||||
|
||||
$seekable = stream_get_meta_data($f)['seekable'];
|
||||
if ($seekable) {
|
||||
fseek($f, 0);
|
||||
}
|
||||
return yield $this->upload_from_callable_async($callable, $size, $mime, $file_name, $cb, $seekable, $encrypted);
|
||||
}
|
||||
public function upload_from_callable($callable, $size, $file_name = '', $cb = null, $encrypted = false)
|
||||
public function upload_from_callable_async($callable, int $size, string $mime, string $file_name = '', $cb = null, bool $refetchable = true, bool $encrypted = false)
|
||||
{
|
||||
if (is_object($callable) && $callable instanceof FileCallbackInterface) {
|
||||
$cb = $callable;
|
||||
@ -86,8 +161,20 @@ trait Files
|
||||
if (!is_callable($callable)) {
|
||||
throw new Exception('Invalid callable provided');
|
||||
}
|
||||
if ($cb === null) {
|
||||
$cb = function ($percent) {
|
||||
$this->logger->logger('Upload status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE);
|
||||
};
|
||||
}
|
||||
|
||||
$datacenter = $this->settings['connection_settings']['default_dc'];
|
||||
if (isset($this->datacenter->sockets[$datacenter.'_media'])) {
|
||||
$datacenter .= '_media';
|
||||
}
|
||||
|
||||
$part_size = $this->settings['upload']['part_size'];
|
||||
$parallel_chunks = $this->settings['upload']['parallel_chunks'] ? $this->settings['upload']['parallel_chunks'] : 3000;
|
||||
|
||||
$part_total_num = (int) ceil($size / $part_size);
|
||||
$part_num = 0;
|
||||
$method = $size > 10 * 1024 * 1024 ? 'upload.saveBigFilePart' : 'upload.saveFilePart';
|
||||
@ -104,40 +191,64 @@ trait Files
|
||||
$ige->setIV($iv);
|
||||
$ige->setKey($key);
|
||||
$ige->enableContinuousBuffer();
|
||||
$parallelize = false;
|
||||
$refetchable = false;
|
||||
}
|
||||
$ctx = hash_init('md5');
|
||||
$promises = [];
|
||||
$cur_part_num = 0;
|
||||
|
||||
$cb = function () use ($cb, $part_total_num) {
|
||||
static $cur = 0;
|
||||
$cur++;
|
||||
$this->callFork($cb($cur * 100 / $part_total_num));
|
||||
};
|
||||
|
||||
$start = microtime(true);
|
||||
while ($part_num < $part_total_num) {
|
||||
$t = microtime(true);
|
||||
$read_deferred = yield $this->method_call_async_write(
|
||||
$method,
|
||||
new AsyncParameters(
|
||||
static function () use ($file_id, $part_num, $part_total_num, $part_size, $f, $ctx, $ige, $seekable) {
|
||||
if ($seekable) {
|
||||
fseek($f, $part_num * $part_size);
|
||||
} elseif (ftell($f) !== $part_num * $part_size) {
|
||||
throw new \danog\MadelineProto\Exception('Wrong position!');
|
||||
}
|
||||
static function () use ($file_id, $part_num, $part_total_num, $part_size, $callable, $ctx, $ige) {
|
||||
static $fetched = false;
|
||||
$already_fetched = $fetched;
|
||||
$fetched = true;
|
||||
|
||||
$bytes = stream_get_contents($f, $part_size);
|
||||
$bytes = yield $callable($part_num * $part_size, $part_size);
|
||||
if (!$already_fetched) {
|
||||
hash_update($ctx, $bytes);
|
||||
}
|
||||
if ($ige) {
|
||||
$bytes = $ige->encrypt(str_pad($bytes, $part_size, chr(0)));
|
||||
}
|
||||
hash_update($ctx, $bytes);
|
||||
|
||||
return ['file_id' => $file_id, 'file_part' => $part_num, 'file_total_parts' => $part_total_num, 'bytes' => $bytes];
|
||||
},
|
||||
$seekable
|
||||
$refetchable
|
||||
),
|
||||
['heavy' => true, 'file' => true, 'datacenter' => $datacenter]
|
||||
['heavy' => true, 'file' => true, 'datacenter' => &$datacenter]
|
||||
);
|
||||
$this->callFork($cb(ftell($f) * 100 / $file_size));
|
||||
$this->logger->logger('Speed for chunk: '.(($part_size * 8 / 1000000) / (microtime(true) - $t)));
|
||||
$read_deferred->onResolve(static function ($e, $res) use ($cb) {
|
||||
if ($res) {
|
||||
$cb();
|
||||
}
|
||||
});
|
||||
|
||||
$part_num++;
|
||||
$promises[] = $read_deferred->promise();
|
||||
|
||||
if (!($part_num % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps (run the code in this if every second)
|
||||
$result = yield $this->all($promises);
|
||||
foreach ($result as $kkey => $result) {
|
||||
if (!$result) {
|
||||
throw new \danog\MadelineProto\Exception('Upload of part '.$kkey.' failed');
|
||||
}
|
||||
}
|
||||
$promises = [];
|
||||
|
||||
$time = microtime(true) - $start;
|
||||
$speed = (int) (($size * 8) / $time) / 1000000;
|
||||
$this->logger->logger("Partial download time: $time");
|
||||
$this->logger->logger("Partial download speed: $speed mbps");
|
||||
}
|
||||
}
|
||||
|
||||
$result = yield all($promises);
|
||||
@ -146,18 +257,18 @@ trait Files
|
||||
throw new \danog\MadelineProto\Exception('Upload of part '.$kkey.' failed');
|
||||
}
|
||||
}
|
||||
$time = microtime(true) - $start;
|
||||
$speed = (int) (($size * 8) / $time) / 1000000;
|
||||
$this->logger->logger("Total download time: $time");
|
||||
$this->logger->logger("Total download speed: $speed mbps");
|
||||
|
||||
$constructor = ['_' => $constructor, 'id' => $file_id, 'parts' => $part_total_num, 'name' => $file_name, 'mime_type' => $this->get_mime_from_file($file)];
|
||||
$constructor = ['_' => $constructor, 'id' => $file_id, 'parts' => $part_total_num, 'name' => $file_name, 'mime_type' => $mime];
|
||||
if ($encrypted === true) {
|
||||
$constructor['key_fingerprint'] = $fingerprint;
|
||||
$constructor['key'] = $key;
|
||||
$constructor['iv'] = $iv;
|
||||
}
|
||||
|
||||
fclose($f);
|
||||
clearstatcache();
|
||||
|
||||
$this->logger->logger('Speed: '.(($file_size * 8) / (microtime(true) - $t) / 1000000));
|
||||
$constructor['md5_checksum'] = hash_final($ctx);
|
||||
|
||||
return $constructor;
|
||||
}
|
||||
@ -167,7 +278,56 @@ trait Files
|
||||
return $this->upload_async($file, $file_name, $cb, true);
|
||||
}
|
||||
|
||||
public function gen_all_file_async($media, $regenerate)
|
||||
public function upload_from_tgfile_async($media, $cb = null, $encrypted = false)
|
||||
{
|
||||
if (is_object($media) && $media instanceof FileCallbackInterface) {
|
||||
$cb = $media;
|
||||
$media = $media->getFile();
|
||||
}
|
||||
$media = yield $this->get_download_info_async($media);
|
||||
if (!isset($media['size'], $media['mime'])) {
|
||||
throw new Exception('Wrong file provided!');
|
||||
}
|
||||
$size = $media['size'];
|
||||
$mime = $media['mime'];
|
||||
|
||||
$bridge = new class
|
||||
{
|
||||
private $done = [];
|
||||
private $pending = [];
|
||||
public function write(string $data, int $offset)
|
||||
{
|
||||
if (isset($this->pending[$offset])) {
|
||||
$promise = $this->pending[$offset];
|
||||
unset($this->pending[$offset]);
|
||||
$promise->resolve($data);
|
||||
} else {
|
||||
$this->done[$offset] = $data;
|
||||
}
|
||||
}
|
||||
public function read(int $offset, int $size)
|
||||
{
|
||||
if (isset($this->done[$offset])) {
|
||||
if (strlen($this->done[$offset]) > $size) {
|
||||
throw new Exception('Wrong size!');
|
||||
}
|
||||
$result = $this->done[$offset];
|
||||
unset($this->done[$offset]);
|
||||
return $result;
|
||||
}
|
||||
$this->pending[$offset] = new Deferred;
|
||||
return $this->pending[$offset]->promise();
|
||||
}
|
||||
};
|
||||
$reader = [$bridge, 'read'];
|
||||
$writer = [$bridge, 'write'];
|
||||
yield $this->all([
|
||||
$this->download_to_callable_async($media, $writer, $cb),
|
||||
$this->upload_from_callable_async($reader, $size, $mime, '', $cb, false, $encrypted),
|
||||
]);
|
||||
}
|
||||
|
||||
public function gen_all_file_async($media)
|
||||
{
|
||||
$res = [$this->constructors->find_by_predicate($media['_'])['type'] => $media];
|
||||
switch ($media['_']) {
|
||||
@ -260,7 +420,7 @@ trait Files
|
||||
return $res;
|
||||
}
|
||||
|
||||
public function get_file_info_async($constructor, $regenerate = false)
|
||||
public function get_file_info_async($constructor)
|
||||
{
|
||||
if (is_string($constructor)) {
|
||||
$constructor = $this->unpack_file_id($constructor)['MessageMedia'];
|
||||
@ -276,7 +436,7 @@ trait Files
|
||||
$constructor = $constructor['media'];
|
||||
}
|
||||
|
||||
return yield $this->gen_all_file_async($constructor, $regenerate);
|
||||
return yield $this->gen_all_file_async($constructor);
|
||||
}
|
||||
public function get_propic_info_async($data)
|
||||
{
|
||||
@ -644,8 +804,8 @@ trait Files
|
||||
$file = $file->getFile();
|
||||
}
|
||||
$file = \danog\MadelineProto\Absolute::absolute(preg_replace('|/+|', '/', $file));
|
||||
if (!file_exists($file)) {
|
||||
touch($file);
|
||||
if (!yield exists($file)) {
|
||||
yield touch($file);
|
||||
}
|
||||
$file = realpath($file);
|
||||
$message_media = yield $this->get_download_info_async($message_media);
|
||||
@ -677,12 +837,6 @@ trait Files
|
||||
$stream = $stream->getFile();
|
||||
}
|
||||
|
||||
if ($cb === null) {
|
||||
$cb = function ($percent) {
|
||||
$this->logger->logger('Download status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE);
|
||||
};
|
||||
}
|
||||
|
||||
/** @var $stream \Amp\ByteStream\OutputStream */
|
||||
if (!is_object($stream)) {
|
||||
$stream = new ResourceOutputStream($stream);
|
||||
@ -721,13 +875,18 @@ trait Files
|
||||
if (!is_callable($callable)) {
|
||||
throw new Exception('Wrong callable provided');
|
||||
}
|
||||
if ($cb === null) {
|
||||
$cb = function ($percent) {
|
||||
$this->logger->logger('Download status: '.$percent.'%', \danog\MadelineProto\Logger::NOTICE);
|
||||
};
|
||||
}
|
||||
|
||||
if ($end === -1 && isset($message_media['size'])) {
|
||||
$end = $message_media['size'];
|
||||
}
|
||||
|
||||
$part_size = $this->settings['download']['part_size'];
|
||||
$parallel_chunks = $this->settings['download']['parallel_chunks'];
|
||||
$parallel_chunks = $this->settings['download']['parallel_chunks'] ? $this->settings['download']['parallel_chunks'] : 3000;
|
||||
|
||||
$datacenter = isset($message_media['InputFileLocation']['dc_id']) ? $message_media['InputFileLocation']['dc_id'] : $this->settings['connection_settings']['default_dc'];
|
||||
if (isset($this->datacenter->sockets[$datacenter.'_media'])) {
|
||||
|
@ -24,13 +24,16 @@ use danog\MadelineProto\Exception;
|
||||
use danog\MadelineProto\Stream\Async\RawStream;
|
||||
use danog\MadelineProto\Stream\ConnectionContext;
|
||||
use function Amp\Socket\connect;
|
||||
use danog\MadelineProto\Stream\BufferedStreamInterface;
|
||||
use danog\MadelineProto\Stream\BufferInterface;
|
||||
use danog\MadelineProto\Stream\RawStreamInterface;
|
||||
|
||||
/**
|
||||
* Buffered raw stream.
|
||||
*
|
||||
* @author Daniil Gentili <daniil@daniil.it>
|
||||
*/
|
||||
class BufferedRawStream implements \danog\MadelineProto\Stream\BufferedStreamInterface, \danog\MadelineProto\Stream\BufferInterface, \danog\MadelineProto\Stream\RawStreamInterface
|
||||
class BufferedRawStream implements BufferedStreamInterface, BufferInterface, RawStreamInterface
|
||||
{
|
||||
use RawStream;
|
||||
|
||||
|
@ -47,7 +47,7 @@ class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInt
|
||||
|
||||
public function enableCrypto(ClientTlsContext $tlsContext = null): \Amp\Promise
|
||||
{
|
||||
return $this->enableCrypto($tlsContext);
|
||||
return $this->stream->enableCrypto($tlsContext);
|
||||
}
|
||||
|
||||
public function getStream()
|
||||
|
Loading…
Reference in New Issue
Block a user