diff --git a/src/danog/MadelineProto/CombinedAPI.php b/src/danog/MadelineProto/CombinedAPI.php index 125b4636..2c813799 100644 --- a/src/danog/MadelineProto/CombinedAPI.php +++ b/src/danog/MadelineProto/CombinedAPI.php @@ -247,7 +247,7 @@ class CombinedAPI } if (!$instance->API->settings['updates']['handle_updates']) { $instance->API->settings['updates']['handle_updates'] = true; - $instance->API->datacenter->sockets[$instance->API->settings['connection_settings']['default_dc']]->updater->start(); + $instance->API->updaters[false]->start(); } $instance->setCallback(function ($update) use ($path) { return $this->event_update_handler($update, $path); diff --git a/src/danog/MadelineProto/Loop/Impl/Loop.php b/src/danog/MadelineProto/Loop/Impl/Loop.php index 45b6b95d..eca250f6 100644 --- a/src/danog/MadelineProto/Loop/Impl/Loop.php +++ b/src/danog/MadelineProto/Loop/Impl/Loop.php @@ -20,6 +20,7 @@ namespace danog\MadelineProto\Loop\Impl; use Amp\Promise; use danog\MadelineProto\Loop\LoopInterface; +use danog\MadelineProto\Logger; /** * Loop helper trait. diff --git a/src/danog/MadelineProto/Loop/Update/FeedLoop.php b/src/danog/MadelineProto/Loop/Update/FeedLoop.php index 89bf1138..c08ec7be 100644 --- a/src/danog/MadelineProto/Loop/Update/FeedLoop.php +++ b/src/danog/MadelineProto/Loop/Update/FeedLoop.php @@ -78,14 +78,14 @@ class FeedLoop extends ResumableSignalLoop return; } - if (!$this->settings['updates']['handle_updates']) { + if (!$this->API->settings['updates']['handle_updates']) { $API->logger->logger("Exiting update feed loop channel {$this->channelId}"); $this->exitedLoop(); return; } while ($this->incomingUpdates) { $updates = $this->incomingUpdates; - $this->incomingUpdates = null; + $this->incomingUpdates = []; yield $this->parse($updates); $updates = null; } @@ -101,7 +101,6 @@ class FeedLoop extends ResumableSignalLoop { reset($updates); while ($updates) { - $options = []; $key = key($updates); $update = $updates[$key]; unset($updates[$key]); @@ -138,10 +137,8 @@ class FeedLoop extends ResumableSignalLoop $logger("PTS OK"); $this->state->pts($update['pts']); - if ($this->channelId === false && isset($options['date'])) { - $this->state->date($options['date']); - } } + $this->save($update); } } @@ -157,6 +154,13 @@ class FeedLoop extends ResumableSignalLoop { $this->parsedUpdates []= $update; } + public function saveMessages($messages) + { + foreach ($messages as $message) { + $this->parsedUpdates []= ['_' => $this->channelId === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => -1, 'pts_count' => -1]; + } + } + public function has_all_auth() { if ($this->API->isInitingAuthorization()) { diff --git a/src/danog/MadelineProto/Loop/Update/SeqLoop.php b/src/danog/MadelineProto/Loop/Update/SeqLoop.php index 22a4b934..16e47339 100644 --- a/src/danog/MadelineProto/Loop/Update/SeqLoop.php +++ b/src/danog/MadelineProto/Loop/Update/SeqLoop.php @@ -83,7 +83,7 @@ class SeqLoop extends ResumableSignalLoop } while ($this->incomingUpdates) { $updates = $this->incomingUpdates; - $this->incomingUpdates = null; + $this->incomingUpdates = []; yield $this->parse($updates); $updates = null; } @@ -100,25 +100,28 @@ class SeqLoop extends ResumableSignalLoop unset($updates[$key]); $options = $update['options']; $updates = $update['updates']; - unset($update); $seq_start = $options['seq_start']; $seq_end = $options['seq_end']; + $result = $this->state->checkSeq($seq_start); if ($result > 0) { - $this->logger->logger('Seq hole of $result. seq_start: '.$seq_start.' != cur seq: '.$this->state->seq().' + 1', \danog\MadelineProto\Logger::ERROR); - yield $this->updaters[false]->resume(); + $this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.($this->state->seq() + 1), \danog\MadelineProto\Logger::ERROR); + yield $this->pause(1.0); + if (!$this->incomingUpdates) { + yield $this->updaters[false]->resume(); + } + $this->incomingUpdates = array_merge($this->incomingUpdates, [$update], $updates); continue; } if ($result < 0) { - + $this->logger->logger('Seq too old. seq_start: '.$seq_start.' != cur seq: '.($this->state->seq() + 1), \danog\MadelineProto\Logger::ERROR); + continue; } - if ($this->state->seq() !== $seq) { - $this->state->seq($seq); - if (isset($options['date'])) { - $this->state->date($options['date']); - } + $this->state->seq($seq_end); + if (isset($options['date'])) { + $this->state->date($options['date']); } $this->save($updates); diff --git a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php index 4e0801d2..56a447e6 100644 --- a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php +++ b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php @@ -100,7 +100,7 @@ class UpdateLoop extends ResumableSignalLoop $state->update($difference); $feeder->feed($difference['other_updates']); - yield $this->handle_update_messages_async($difference['new_messages'], $channel); + $feeder->saveMessages($difference['new_messages']); if (!$difference['final']) { if ($difference['pts'] >= $toPts) { unset($difference); @@ -114,7 +114,7 @@ class UpdateLoop extends ResumableSignalLoop case 'updates.channelDifferenceTooLong': $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); $state->update($difference); - yield $this->handle_update_messages_async($difference['messages'], $channel); + $feeder->saveMessages($difference['messages']); unset($difference); break; default: @@ -137,7 +137,7 @@ class UpdateLoop extends ResumableSignalLoop } $feeder->feed($difference['other_updates']); $feeder->feed($difference['new_encrypted_messages']); - yield $this->handle_update_messages_async($difference['new_messages']); + $feeder->saveMessages($difference['new_messages']); $state->update($difference['state']); unset($difference); break 2; @@ -147,7 +147,7 @@ class UpdateLoop extends ResumableSignalLoop } $feeder->feed($difference['other_updates']); $feeder->feed($difference['new_encrypted_messages']); - yield $this->handle_update_messages_async($difference['new_messages']); + $feeder->saveMessages($difference['new_messages']); $state->update($difference['intermediate_state']); if ($difference['intermediate_state']['pts'] >= $toPts) { unset($difference); diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index 1a2fc734..f990efad 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -29,6 +29,7 @@ use danog\MadelineProto\Stream\MTProtoTransport\HttpStream; use danog\MadelineProto\TL\TLCallback; use danog\MadelineProto\Loop\Update\UpdateLoop; use danog\MadelineProto\Loop\Update\FeedLoop; +use danog\MadelineProto\Loop\Update\SeqLoop; /** * Manages all of the mtproto stuff. @@ -422,7 +423,7 @@ class MTProto extends AsyncConstruct implements TLCallback $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['getupdates_deserialization'], Logger::NOTICE); yield $this->updaters[false]->resume(); } - $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start(); + $this->updaters[false]->start(); } public function __destruct() @@ -869,6 +870,10 @@ class MTProto extends AsyncConstruct implements TLCallback $this->feeders[$channelId]->start(); $this->updaters[$channelId]->start(); } + if (!isset($this->seqUpdater)) { + $this->seqUpdater = new SeqLoop($this); + } + $this->seqUpdater->start(); } public function get_phone_config_async($watcherId = null) diff --git a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php index 20aaeb3d..38fd35a5 100644 --- a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php @@ -571,7 +571,6 @@ trait ResponseHandler } $this->logger->logger('Parsing updates received via the socket...', \danog\MadelineProto\Logger::VERBOSE); - $result = []; switch ($updates['_']) { case 'updates': @@ -585,6 +584,7 @@ trait ResponseHandler unset($updates['updates'][$key]); } } + if ($updates['updates']) { if ($updates['_'] === 'updatesCombined') { $updates['updates'][0]['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; @@ -592,7 +592,7 @@ trait ResponseHandler $updates['updates'][0]['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; } foreach ($updates as $update) { - $result[yield $this->feedSingle($update)] = true; + $result[yield $this->seqUpdater->feed($update)] = true; } } break; diff --git a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php index aff482cb..033f2482 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php @@ -62,7 +62,7 @@ trait UpdateHandler { if (!$this->settings['updates']['handle_updates']) { $this->settings['updates']['handle_updates'] = true; - $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start(); + $this->updaters[false]->start(); } if (!$this->settings['updates']['run_callback']) { $this->settings['updates']['run_callback'] = true; @@ -238,13 +238,6 @@ trait UpdateHandler return $channelId; } - public function handle_update_messages_async($messages, $channel = false) - { - foreach ($messages as $message) { - yield $this->save_update_async(['_' => $channel === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => ($channel === false ? (yield $this->load_update_state_async()) : $this->channels_state->get($channel))->pts(), 'pts_count' => 0]); - } - } - public function save_update_async($update) { if ($update['_'] === 'updateConfig') { diff --git a/src/danog/MadelineProto/Wrappers/Callback.php b/src/danog/MadelineProto/Wrappers/Callback.php index 30eef04b..3cfaf7fa 100644 --- a/src/danog/MadelineProto/Wrappers/Callback.php +++ b/src/danog/MadelineProto/Wrappers/Callback.php @@ -30,6 +30,6 @@ trait Callback $this->settings['updates']['run_callback'] = true; $this->settings['updates']['handle_updates'] = true; - return $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start(); + return $this->updaters[false]; } } diff --git a/src/danog/MadelineProto/Wrappers/Events.php b/src/danog/MadelineProto/Wrappers/Events.php index 991acbce..e27022a0 100644 --- a/src/danog/MadelineProto/Wrappers/Events.php +++ b/src/danog/MadelineProto/Wrappers/Events.php @@ -63,7 +63,7 @@ trait Events $this->settings['updates']['run_callback'] = true; if (isset($this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater) && !$this->asyncInitPromise) { - $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start(); + $this->updaters[false]->start(); } } diff --git a/src/danog/MadelineProto/Wrappers/Loop.php b/src/danog/MadelineProto/Wrappers/Loop.php index 15dcba1e..e1095b28 100644 --- a/src/danog/MadelineProto/Wrappers/Loop.php +++ b/src/danog/MadelineProto/Wrappers/Loop.php @@ -99,7 +99,7 @@ trait Loop if (!$this->settings['updates']['run_callback']) { $this->settings['updates']['run_callback'] = true; } - $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start(); + $this->updaters[false]->start(); $this->logger->logger('Started update loop', \danog\MadelineProto\Logger::NOTICE); diff --git a/src/danog/MadelineProto/Wrappers/Noop.php b/src/danog/MadelineProto/Wrappers/Noop.php index 5ee1f22d..43403d49 100644 --- a/src/danog/MadelineProto/Wrappers/Noop.php +++ b/src/danog/MadelineProto/Wrappers/Noop.php @@ -26,7 +26,7 @@ trait Noop $this->settings['updates']['callback'] = [$this, 'noop']; $this->settings['updates']['run_callback'] = false; $this->settings['updates']['handle_updates'] = true; - $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start(); + $this->updaters[false]->start(); } public function noop() diff --git a/src/danog/MadelineProto/Wrappers/Webhook.php b/src/danog/MadelineProto/Wrappers/Webhook.php index 45fa833a..d795a998 100644 --- a/src/danog/MadelineProto/Wrappers/Webhook.php +++ b/src/danog/MadelineProto/Wrappers/Webhook.php @@ -31,6 +31,6 @@ trait Webhook $this->settings['updates']['callback'] = [$this, 'pwr_webhook']; $this->settings['updates']['run_callback'] = true; $this->settings['updates']['handle_updates'] = true; - $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start(); + $this->updaters[false]->start(); } }