More rewriting

This commit is contained in:
Daniil Gentili 2019-05-29 22:00:51 +02:00
parent 60b20f98f7
commit 28928e339f
13 changed files with 43 additions and 37 deletions

View File

@ -247,7 +247,7 @@ class CombinedAPI
}
if (!$instance->API->settings['updates']['handle_updates']) {
$instance->API->settings['updates']['handle_updates'] = true;
$instance->API->datacenter->sockets[$instance->API->settings['connection_settings']['default_dc']]->updater->start();
$instance->API->updaters[false]->start();
}
$instance->setCallback(function ($update) use ($path) {
return $this->event_update_handler($update, $path);

View File

@ -20,6 +20,7 @@ namespace danog\MadelineProto\Loop\Impl;
use Amp\Promise;
use danog\MadelineProto\Loop\LoopInterface;
use danog\MadelineProto\Logger;
/**
* Loop helper trait.

View File

@ -78,14 +78,14 @@ class FeedLoop extends ResumableSignalLoop
return;
}
if (!$this->settings['updates']['handle_updates']) {
if (!$this->API->settings['updates']['handle_updates']) {
$API->logger->logger("Exiting update feed loop channel {$this->channelId}");
$this->exitedLoop();
return;
}
while ($this->incomingUpdates) {
$updates = $this->incomingUpdates;
$this->incomingUpdates = null;
$this->incomingUpdates = [];
yield $this->parse($updates);
$updates = null;
}
@ -101,7 +101,6 @@ class FeedLoop extends ResumableSignalLoop
{
reset($updates);
while ($updates) {
$options = [];
$key = key($updates);
$update = $updates[$key];
unset($updates[$key]);
@ -138,10 +137,8 @@ class FeedLoop extends ResumableSignalLoop
$logger("PTS OK");
$this->state->pts($update['pts']);
if ($this->channelId === false && isset($options['date'])) {
$this->state->date($options['date']);
}
}
$this->save($update);
}
}
@ -157,6 +154,13 @@ class FeedLoop extends ResumableSignalLoop
{
$this->parsedUpdates []= $update;
}
public function saveMessages($messages)
{
foreach ($messages as $message) {
$this->parsedUpdates []= ['_' => $this->channelId === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => -1, 'pts_count' => -1];
}
}
public function has_all_auth()
{
if ($this->API->isInitingAuthorization()) {

View File

@ -83,7 +83,7 @@ class SeqLoop extends ResumableSignalLoop
}
while ($this->incomingUpdates) {
$updates = $this->incomingUpdates;
$this->incomingUpdates = null;
$this->incomingUpdates = [];
yield $this->parse($updates);
$updates = null;
}
@ -100,25 +100,28 @@ class SeqLoop extends ResumableSignalLoop
unset($updates[$key]);
$options = $update['options'];
$updates = $update['updates'];
unset($update);
$seq_start = $options['seq_start'];
$seq_end = $options['seq_end'];
$result = $this->state->checkSeq($seq_start);
if ($result > 0) {
$this->logger->logger('Seq hole of $result. seq_start: '.$seq_start.' != cur seq: '.$this->state->seq().' + 1', \danog\MadelineProto\Logger::ERROR);
yield $this->updaters[false]->resume();
$this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.($this->state->seq() + 1), \danog\MadelineProto\Logger::ERROR);
yield $this->pause(1.0);
if (!$this->incomingUpdates) {
yield $this->updaters[false]->resume();
}
$this->incomingUpdates = array_merge($this->incomingUpdates, [$update], $updates);
continue;
}
if ($result < 0) {
$this->logger->logger('Seq too old. seq_start: '.$seq_start.' != cur seq: '.($this->state->seq() + 1), \danog\MadelineProto\Logger::ERROR);
continue;
}
if ($this->state->seq() !== $seq) {
$this->state->seq($seq);
if (isset($options['date'])) {
$this->state->date($options['date']);
}
$this->state->seq($seq_end);
if (isset($options['date'])) {
$this->state->date($options['date']);
}
$this->save($updates);

View File

@ -100,7 +100,7 @@ class UpdateLoop extends ResumableSignalLoop
$state->update($difference);
$feeder->feed($difference['other_updates']);
yield $this->handle_update_messages_async($difference['new_messages'], $channel);
$feeder->saveMessages($difference['new_messages']);
if (!$difference['final']) {
if ($difference['pts'] >= $toPts) {
unset($difference);
@ -114,7 +114,7 @@ class UpdateLoop extends ResumableSignalLoop
case 'updates.channelDifferenceTooLong':
$this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE);
$state->update($difference);
yield $this->handle_update_messages_async($difference['messages'], $channel);
$feeder->saveMessages($difference['messages']);
unset($difference);
break;
default:
@ -137,7 +137,7 @@ class UpdateLoop extends ResumableSignalLoop
}
$feeder->feed($difference['other_updates']);
$feeder->feed($difference['new_encrypted_messages']);
yield $this->handle_update_messages_async($difference['new_messages']);
$feeder->saveMessages($difference['new_messages']);
$state->update($difference['state']);
unset($difference);
break 2;
@ -147,7 +147,7 @@ class UpdateLoop extends ResumableSignalLoop
}
$feeder->feed($difference['other_updates']);
$feeder->feed($difference['new_encrypted_messages']);
yield $this->handle_update_messages_async($difference['new_messages']);
$feeder->saveMessages($difference['new_messages']);
$state->update($difference['intermediate_state']);
if ($difference['intermediate_state']['pts'] >= $toPts) {
unset($difference);

View File

@ -29,6 +29,7 @@ use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
use danog\MadelineProto\TL\TLCallback;
use danog\MadelineProto\Loop\Update\UpdateLoop;
use danog\MadelineProto\Loop\Update\FeedLoop;
use danog\MadelineProto\Loop\Update\SeqLoop;
/**
* Manages all of the mtproto stuff.
@ -422,7 +423,7 @@ class MTProto extends AsyncConstruct implements TLCallback
$this->logger->logger(\danog\MadelineProto\Lang::$current_lang['getupdates_deserialization'], Logger::NOTICE);
yield $this->updaters[false]->resume();
}
$this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start();
$this->updaters[false]->start();
}
public function __destruct()
@ -869,6 +870,10 @@ class MTProto extends AsyncConstruct implements TLCallback
$this->feeders[$channelId]->start();
$this->updaters[$channelId]->start();
}
if (!isset($this->seqUpdater)) {
$this->seqUpdater = new SeqLoop($this);
}
$this->seqUpdater->start();
}
public function get_phone_config_async($watcherId = null)

View File

@ -571,7 +571,6 @@ trait ResponseHandler
}
$this->logger->logger('Parsing updates received via the socket...', \danog\MadelineProto\Logger::VERBOSE);
$result = [];
switch ($updates['_']) {
case 'updates':
@ -585,6 +584,7 @@ trait ResponseHandler
unset($updates['updates'][$key]);
}
}
if ($updates['updates']) {
if ($updates['_'] === 'updatesCombined') {
$updates['updates'][0]['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
@ -592,7 +592,7 @@ trait ResponseHandler
$updates['updates'][0]['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
}
foreach ($updates as $update) {
$result[yield $this->feedSingle($update)] = true;
$result[yield $this->seqUpdater->feed($update)] = true;
}
}
break;

View File

@ -62,7 +62,7 @@ trait UpdateHandler
{
if (!$this->settings['updates']['handle_updates']) {
$this->settings['updates']['handle_updates'] = true;
$this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start();
$this->updaters[false]->start();
}
if (!$this->settings['updates']['run_callback']) {
$this->settings['updates']['run_callback'] = true;
@ -238,13 +238,6 @@ trait UpdateHandler
return $channelId;
}
public function handle_update_messages_async($messages, $channel = false)
{
foreach ($messages as $message) {
yield $this->save_update_async(['_' => $channel === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => ($channel === false ? (yield $this->load_update_state_async()) : $this->channels_state->get($channel))->pts(), 'pts_count' => 0]);
}
}
public function save_update_async($update)
{
if ($update['_'] === 'updateConfig') {

View File

@ -30,6 +30,6 @@ trait Callback
$this->settings['updates']['run_callback'] = true;
$this->settings['updates']['handle_updates'] = true;
return $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start();
return $this->updaters[false];
}
}

View File

@ -63,7 +63,7 @@ trait Events
$this->settings['updates']['run_callback'] = true;
if (isset($this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater) && !$this->asyncInitPromise) {
$this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start();
$this->updaters[false]->start();
}
}

View File

@ -99,7 +99,7 @@ trait Loop
if (!$this->settings['updates']['run_callback']) {
$this->settings['updates']['run_callback'] = true;
}
$this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start();
$this->updaters[false]->start();
$this->logger->logger('Started update loop', \danog\MadelineProto\Logger::NOTICE);

View File

@ -26,7 +26,7 @@ trait Noop
$this->settings['updates']['callback'] = [$this, 'noop'];
$this->settings['updates']['run_callback'] = false;
$this->settings['updates']['handle_updates'] = true;
$this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start();
$this->updaters[false]->start();
}
public function noop()

View File

@ -31,6 +31,6 @@ trait Webhook
$this->settings['updates']['callback'] = [$this, 'pwr_webhook'];
$this->settings['updates']['run_callback'] = true;
$this->settings['updates']['handle_updates'] = true;
$this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start();
$this->updaters[false]->start();
}
}