This commit is contained in:
Daniil Gentili 2020-01-16 21:14:38 +01:00
parent 489ce873d0
commit 48368be109
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
2 changed files with 28 additions and 11 deletions

View File

@ -410,6 +410,12 @@ trait Files
* @var Deferred[] * @var Deferred[]
*/ */
private $read = []; private $read = [];
/**
* Read promises (write lenth).
*
* @var int[]
*/
private $wrote = [];
/** /**
* Write promises. * Write promises.
* *
@ -476,6 +482,7 @@ trait Files
{ {
$offset /= $this->partSize; $offset /= $this->partSize;
$this->write[$offset]->resolve($data); $this->write[$offset]->resolve($data);
$this->wrote[$offset] = strlen($data);
return $this->read[$offset]->promise(); return $this->read[$offset]->promise();
} }
/** /**
@ -487,7 +494,8 @@ trait Files
*/ */
public function callback(...$params): void public function callback(...$params): void
{ {
$this->read[$this->offset++]->resolve(); $offset = $this->offset++;
$this->read[$offset]->resolve($this->wrote[$offset]);
if ($this->cb) { if ($this->cb) {
Tools::callFork(($this->cb)(...$params)); Tools::callFork(($this->cb)(...$params));
} }
@ -1079,6 +1087,7 @@ trait Files
$this->logger->logger('Waiting for lock of file to download...'); $this->logger->logger('Waiting for lock of file to download...');
$unlock = yield \danog\MadelineProto\Tools::flock($file, LOCK_EX); $unlock = yield \danog\MadelineProto\Tools::flock($file, LOCK_EX);
$this->logger->logger('Got lock of file to download');
try { try {
yield $this->downloadToStream($message_media, $stream, $cb, $size, -1); yield $this->downloadToStream($message_media, $stream, $cb, $size, -1);
@ -1262,16 +1271,17 @@ trait Files
if ($res) { if ($res) {
$size += $res; $size += $res;
} }
return (bool) $res;
}); });
$promises[] = $previous_promise; $promises[] = $previous_promise;
if (!($key % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps if (!($key % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps
$res = array_sum(yield \danog\MadelineProto\Tools::all($promises)); $res = yield \danog\MadelineProto\Tools::all($promises);
if ($res !== count($promises)) { $promises = [];
$promises = []; foreach ($res as $r) {
break; if (!$r) {
break 2;
}
} }
$time = \microtime(true) - $start; $time = \microtime(true) - $start;
@ -1393,6 +1403,8 @@ trait Files
} }
continue; continue;
} }
$res['bytes'] = (string) $res['bytes'];
if ($cdn === false && $res['type']['_'] === 'storage.fileUnknown' && $res['bytes'] === '') { if ($cdn === false && $res['type']['_'] === 'storage.fileUnknown' && $res['bytes'] === '') {
$datacenter = 0; $datacenter = 0;
} }
@ -1423,7 +1435,7 @@ trait Files
if (!$seekable) { if (!$seekable) {
yield $offset['previous_promise']; yield $offset['previous_promise'];
} }
$res = yield $callable((string) $res['bytes'], $offset['offset'] + $offset['part_start_at']); $res = yield $callable($res['bytes'], $offset['offset'] + $offset['part_start_at']);
$cb(); $cb();
return $res; return $res;
} while (true); } while (true);

View File

@ -101,11 +101,11 @@ $MadelineProto->loop(function () use ($MadelineProto) {
if (!\getenv('TRAVIS_COMMIT') && \stripos(yield $MadelineProto->readline('Do you want to make a call? (y/n): '), 'y') !== false) { if (!\getenv('TRAVIS_COMMIT') && \stripos(yield $MadelineProto->readline('Do you want to make a call? (y/n): '), 'y') !== false) {
$controller = yield $MadelineProto->requestCall(\getenv('TEST_SECRET_CHAT'))->play('input.raw')->then('input.raw')->playOnHold(['input.raw'])->setOutputFile('output.raw'); $controller = yield $MadelineProto->requestCall(\getenv('TEST_SECRET_CHAT'))->play('input.raw')->then('input.raw')->playOnHold(['input.raw'])->setOutputFile('output.raw');
while ($controller->getCallState() < \danog\MadelineProto\VoIP::CALL_STATE_READY) { while ($controller->getCallState() < \danog\MadelineProto\VoIP::CALL_STATE_READY) {
yield $MadelineProto->getUpdates(); yield $MadelineProto->sleep(1);
} }
$MadelineProto->logger($controller->configuration); $MadelineProto->logger($controller->configuration);
while ($controller->getCallState() < \danog\MadelineProto\VoIP::CALL_STATE_ENDED) { while ($controller->getCallState() < \danog\MadelineProto\VoIP::CALL_STATE_ENDED) {
yield $MadelineProto->getUpdates(); yield $MadelineProto->sleep(1);
} }
} }
@ -273,10 +273,15 @@ var_dump(time()-$t);
foreach ($media as $type => $inputMedia) { foreach ($media as $type => $inputMedia) {
$MadelineProto->logger("Sending $type"); $MadelineProto->logger("Sending $type");
$type = yield $MadelineProto->messages->sendMedia(['peer' => $peer, 'media' => $inputMedia, 'message' => '['.$message.'](mention:'.$mention.')', 'parse_mode' => 'markdown']); yield $MadelineProto->messages->sendMedia(['peer' => $peer, 'media' => $inputMedia, 'message' => '['.$message.'](mention:'.$mention.')', 'parse_mode' => 'markdown']);
yield $MadelineProto->downloadToDir($media = yield $MadelineProto->messages->uploadMedia(['peer' => '@me', 'media' => $inputMedia]), '/tmp'); $MadelineProto->logger("Uploading $type");
$media = yield $MadelineProto->messages->uploadMedia(['peer' => '@me', 'media' => $inputMedia]);
$MadelineProto->logger("Downloading $type");
yield $MadelineProto->downloadToDir($media, '/tmp');
$MadelineProto->logger("Re-sending $type");
$inputMedia['file'] = $media; $inputMedia['file'] = $media;
yield $MadelineProto->messages->uploadMedia(['peer' => '@me', 'media' => $inputMedia]); yield $MadelineProto->messages->uploadMedia(['peer' => '@me', 'media' => $inputMedia]);
$MadelineProto->logger("Done $type");
} }
} }