This commit is contained in:
Daniil Gentili 2019-05-13 13:07:30 +02:00
parent 12f8c095c3
commit afbb08a027
7 changed files with 41 additions and 60 deletions

View File

@ -33,7 +33,6 @@ use Amp\Failure;
use Amp\Internal;
use Amp\Promise;
use Amp\Success;
use React\Promise\PromiseInterface as ReactPromise;
/**
* Creates a promise from a generator function yielding promises.
@ -67,7 +66,7 @@ final class Coroutine implements Promise
try {
$yielded = $this->generator->current();
if (!$yielded instanceof Promise) {
while (!$yielded instanceof Promise) {
if ($yielded instanceof \YieldReturnValue) {
$this->resolve($yielded->getReturn());
$this->generator->next();
@ -82,7 +81,11 @@ final class Coroutine implements Promise
return;
}
$yielded = $this->transform($yielded);
if ($yielded instanceof \Generator) {
$yielded = new self($yielded);
} else {
$yielded = $this->generator->send($yielded);
}
}
} catch (\Throwable $exception) {
$this->fail($exception);
@ -111,12 +114,11 @@ final class Coroutine implements Promise
// Send the new value and execute to next yield statement.
$yielded = $this->generator->send($this->value);
}
if (!$yielded instanceof Promise) {
while (!$yielded instanceof Promise) {
if ($yielded instanceof \YieldReturnValue) {
$this->resolve($yielded->getReturn());
$this->onResolve = null;
$this->generator->next();
return;
}
@ -130,7 +132,11 @@ final class Coroutine implements Promise
return;
}
$yielded = $this->transform($yielded);
if ($yielded instanceof \Generator) {
$yielded = new self($yielded);
} else {
$yielded = $this->generator->send($yielded);
}
}
$this->immediate = false;
$yielded->onResolve($this->onResolve);
@ -139,7 +145,6 @@ final class Coroutine implements Promise
} catch (\Throwable $exception) {
$this->fail($exception);
$this->onResolve = null;
} finally {
$this->exception = null;
$this->value = null;
@ -147,39 +152,4 @@ final class Coroutine implements Promise
};
$yielded->onResolve($this->onResolve);
}
/**
* Attempts to transform the non-promise yielded from the generator into a promise, otherwise returns an instance
* `Amp\Failure` failed with an instance of `Amp\InvalidYieldError`.
*
* @param mixed $yielded Non-promise yielded from generator.
*
* @return \Amp\Promise
*/
private function transform($yielded): Promise
{
try {
if ($yielded instanceof \Generator) {
return new self($yielded);
}
if (\is_array($yielded)) {
foreach ($yielded as &$val) {
if ($val instanceof \Generator) {
$val = new self($val);
}
}
return Promise\all($yielded);
}
if ($yielded instanceof ReactPromise) {
return Promise\adapt($yielded);
}
// No match, continue to returning Failure below.
} catch (\Throwable $exception) {
// Conversion to promise failed, fall-through to returning Failure below.
}
return $yielded instanceof \Throwable || $yielded instanceof \Exception ? new Failure($yielded) : new Success($yielded);
}
}

View File

@ -99,9 +99,9 @@ class ReadLoop extends SignalLoop
$connection->http_res_count++;
try {
$API->logger->logger("Handling messages from DC ".$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$API->handle_messages($datacenter);
$API->logger->logger("Handled messages from DC ".$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
//$API->logger->logger("Handling messages from DC ".$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
Loop::defer([$API, 'handle_messages'], $datacenter);
//$API->logger->logger("Handled messages from DC ".$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
} finally {
$this->exitedLoop();
}

View File

@ -839,25 +839,24 @@ class MTProto implements TLCallback
foreach ($this->datacenter->get_dcs() as $new_dc) {
$dcs[] = $this->datacenter->dcConnectAsync($new_dc);
}
yield $dcs;
yield $this->all($dcs);
yield $this->init_authorization_async();
$dcs = [];
foreach ($this->datacenter->get_dcs(false) as $new_dc) {
$dcs[] = $this->datacenter->dcConnectAsync($new_dc);
}
yield $dcs;
yield $this->all($dcs);
yield $this->init_authorization_async();
if (!$this->phoneConfigWatcherId) {
$this->phoneConfigWatcherId = Loop::repeat(24 * 3600 * 1000, [$this, 'get_phone_config_async']);
}
yield $this->get_phone_config_async();
$this->logger->logger("Started phone config fetcher");
}
public function get_phone_config_async($watcherId = null)
{
if ($this->authorized === self::LOGGED_IN && class_exists('\\danog\\MadelineProto\\VoIPServerConfig') && !$this->authorization['user']['bot'] && $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->temp_auth_key !== null) {
if ($this->authorized === self::LOGGED_IN && class_exists('\\danog\\MadelineProto\\VoIPServerConfigInternal') && !$this->authorization['user']['bot'] && $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->temp_auth_key !== null) {
$this->logger->logger("Fetching phone config...");
VoIPServerConfig::updateDefault(yield $this->method_call_async_read('phone.getCallConfig', [], ['datacenter' => $this->settings['connection_settings']['default_dc']]));
} else {

View File

@ -606,12 +606,11 @@ trait AuthKeyHandler
foreach ($dcs as $id => &$dc) {
$dc = $dc();
}
yield $dcs;
yield $this->all($dcs);
foreach ($postpone as $id => $socket) {
yield $this->init_authorization_socket_async($id, $socket);
}
//foreach ($dcs as $dc) { yield $dc; }
if ($this->pending_auth && empty($this->init_auth_dcs)) {
$this->pending_auth = false;

View File

@ -125,7 +125,7 @@ trait CallHandler
$this->datacenter->sockets[$aargs['datacenter']]->writer->resume();
}
return yield $promises;
return yield all($promises);
}
$args = yield $this->botAPI_to_MTProto_async($args);
if (isset($args['ping_id']) && is_int($args['ping_id'])) {

View File

@ -23,6 +23,7 @@ use danog\MadelineProto\Async\AsyncParameters;
use danog\MadelineProto\Exception;
use danog\MadelineProto\Logger;
use danog\MadelineProto\RPCErrorException;
use function Amp\Promise\all;
/**
* Manages upload and download of files.
@ -118,7 +119,7 @@ trait Files
$promises[] = $read_deferred->promise();
}
$result = yield $promises;
$result = yield all($promises);
foreach ($result as $key => $result) {
if (!$result) {
throw new \danog\MadelineProto\Exception('Upload of part '.$key.' failed');

View File

@ -19,11 +19,12 @@
namespace danog\MadelineProto;
use Amp\Deferred;
use Amp\Failure;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use function Amp\Promise\all;
use function Amp\Promise\wait;
use Amp\Failure;
/**
* Some tools.
@ -210,7 +211,13 @@ trait Tools
}
} while (true);
}
public function all($promises)
{
foreach ($promises as &$promise) {
$promise = $this->call($promise);
}
return all($promises);
}
public function call($promise)
{
if ($promise instanceof \Generator) {
@ -223,11 +230,16 @@ trait Tools
}
public function callFork($promise)
{
$this->call($promise)->onResolve(function ($e, $res) {
if ($e) {
$this->rethrow($e);
}
});
if ($promise instanceof \Generator) {
$promise = new Coroutine($promise);
}
if ($promise instanceof Promise) {
$promise->onResolve(function ($e, $res) {
if ($e) {
$this->rethrow($e);
}
});
}
}
public function rethrow($e)
{