Improved update signaling

This commit is contained in:
Daniil Gentili 2019-05-30 17:22:28 +02:00
parent 9197c7013d
commit 060659dc92
5 changed files with 29 additions and 22 deletions

View File

@ -96,15 +96,7 @@ class FeedLoop extends ResumableSignalLoop
yield $API->save_update_async($update); yield $API->save_update_async($update);
} }
$this->parsedUpdates = []; $this->parsedUpdates = [];
if ($API->update_deferred) { $this->API->signalUpdate();
Loop::defer(function () use ($API) {
if ($API->update_deferred) {
$API->logger->logger("Resuming deferred in $this", Logger::VERBOSE);
$API->update_deferred->resolve();
$API->logger->logger("Done resuming deferred in $this", Logger::VERBOSE);
}
});
}
} }
} }

View File

@ -164,15 +164,7 @@ class UpdateLoop extends ResumableSignalLoop
foreach ($result as $channelId => $boh) { foreach ($result as $channelId => $boh) {
$this->API->feeders[$channelId]->resumeDefer(); $this->API->feeders[$channelId]->resumeDefer();
} }
if ($API->update_deferred) { $this->API->signalUpdate();
Loop::defer(function () use ($API) {
if ($API->update_deferred) {
$API->logger->logger("Resuming deferred in $this", Logger::VERBOSE);
$API->update_deferred->resolve();
$API->logger->logger("Done resuming deferred in $this", Logger::VERBOSE);
}
});
}
if (yield $this->waitSignal($this->pause($timeout))) { if (yield $this->waitSignal($this->pause($timeout))) {
$API->logger->logger("Exiting $this"); $API->logger->logger("Exiting $this");

View File

@ -148,7 +148,6 @@ class MTProto extends AsyncConstruct implements TLCallback
private $postpone_updates = false; private $postpone_updates = false;
private $supportUser = 0; private $supportUser = 0;
public $referenceDatabase; public $referenceDatabase;
public $update_deferred;
public $phoneConfigWatcherId; public $phoneConfigWatcherId;
public $feeders = []; public $feeders = [];
public $updaters = []; public $updaters = [];

View File

@ -25,6 +25,7 @@ use Amp\Delayed;
use function Amp\Promise\any; use function Amp\Promise\any;
use danog\MadelineProto\Loop\Update\FeedLoop; use danog\MadelineProto\Loop\Update\FeedLoop;
use danog\MadelineProto\Loop\Update\UpdateLoop; use danog\MadelineProto\Loop\Update\UpdateLoop;
use Amp\Loop;
/** /**
* Manages updates. * Manages updates.
@ -80,7 +81,7 @@ trait UpdateHandler
if (!$params['timeout']) { if (!$params['timeout']) {
$params['timeout'] = 0.001; $params['timeout'] = 0.001;
} }
yield any([$this->update_deferred->promise(), new Delayed($params['timeout'] * 1000)]); yield any([$this->waitUpdate(), new Delayed($params['timeout'] * 1000)]);
} }
if (empty($this->updates)) { if (empty($this->updates)) {
@ -101,6 +102,30 @@ trait UpdateHandler
return $updates; return $updates;
} }
public $update_resolved = false;
public $update_deferred;
public function waitUpdate()
{
if (!$this->update_deferred) {
$this->update_deferred = new Deferred;
}
yield $this->update_deferred->promise();
$this->update_resolved = false;
$this->update_deferred = new Deferred;
}
public function signalUpdate()
{
if (!$this->update_deferred) {
$this->update_deferred = new Deferred;
}
Loop::defer(function () {
if (!$this->update_resolved) {
$this->update_resolved = true;
$this->update_deferred->resolve();
}
});
}
public function check_msg_id($message) public function check_msg_id($message)
{ {

View File

@ -123,8 +123,7 @@ trait Loop
$controller->discard(); $controller->discard();
} }
}); });
$this->update_deferred = new Deferred(); yield $this->waitUpdate();
yield $this->update_deferred->promise();
} }
} }
public function closeConnection($message = 'OK!') public function closeConnection($message = 'OK!')