diff --git a/src/danog/MadelineProto/Loop/Update/FeedLoop.php b/src/danog/MadelineProto/Loop/Update/FeedLoop.php index 0acebff0..89bf1138 100644 --- a/src/danog/MadelineProto/Loop/Update/FeedLoop.php +++ b/src/danog/MadelineProto/Loop/Update/FeedLoop.php @@ -89,6 +89,12 @@ class FeedLoop extends ResumableSignalLoop yield $this->parse($updates); $updates = null; } + if ($this->parsedUpdates) { + foreach ($this->parsedUpdates as $update) { + yield $API->save_update_async($update); + } + $this->parsedUpdates = []; + } } } public function parse($updates) @@ -99,10 +105,6 @@ class FeedLoop extends ResumableSignalLoop $key = key($updates); $update = $updates[$key]; unset($updates[$key]); - if (isset($update['options'])) { - $options = $update['options']; - unset($update['options']); - } if (isset($update['pts'])) { $logger = function ($msg) use ($update) { $pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0; @@ -140,23 +142,6 @@ class FeedLoop extends ResumableSignalLoop $this->state->date($options['date']); } } - if ($this->channelId === false && isset($options['seq']) || isset($options['seq_start'])) { - $seq = $options['seq']; - $seq_start = isset($options['seq_start']) ? $options['seq_start'] : $options['seq']; - if ($seq_start != $this->state->seq() + 1 && $seq_start > $this->state->seq()) { - $this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.$this->state->seq().' + 1', \danog\MadelineProto\Logger::ERROR); - yield $this->updaters[false]->resume(); - - return false; - } - if ($this->state->seq() !== $seq) { - $this->state->seq($seq); - if (isset($options['date'])) { - $this->state->date($options['date']); - } - } - } - $this->save($update); } } diff --git a/src/danog/MadelineProto/Loop/Update/SeqLoop.php b/src/danog/MadelineProto/Loop/Update/SeqLoop.php new file mode 100644 index 00000000..22a4b934 --- /dev/null +++ b/src/danog/MadelineProto/Loop/Update/SeqLoop.php @@ -0,0 +1,149 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2018 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto\Loop\Update; + +use Amp\Success; +use danog\MadelineProto\Logger; +use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; + +/** + * update feed loop. + * + * @author Daniil Gentili + */ +class SeqLoop extends ResumableSignalLoop +{ + use \danog\MadelineProto\Tools; + private $incomingUpdates = []; + private $channelId; + private $feeder; + + public function __construct($API) + { + $this->API = $API; + } + public function loop() + { + $API = $this->API; + $feeder = $this->feeder = $API->feeders[false]; + + if (!$this->API->settings['updates']['handle_updates']) { + yield new Success(0); + + return false; + } + + $this->startedLoop(); + $API->logger->logger("Entered update seq loop", Logger::ULTRA_VERBOSE); + while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { + if (yield $this->waitSignal($this->pause())) { + $API->logger->logger("Exiting update seq loop"); + $this->exitedLoop(); + + return; + } + } + $this->state = yield $API->load_update_state_async(); + + while (true) { + while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { + if (yield $this->waitSignal($this->pause())) { + $API->logger->logger("Exiting update seq loop"); + $this->exitedLoop(); + + return; + } + } + if (yield $this->waitSignal($this->pause())) { + $API->logger->logger("Exiting update seq loop"); + $this->exitedLoop(); + + return; + } + if (!$this->settings['updates']['handle_updates']) { + $API->logger->logger("Exiting update seq loop"); + $this->exitedLoop(); + return; + } + while ($this->incomingUpdates) { + $updates = $this->incomingUpdates; + $this->incomingUpdates = null; + yield $this->parse($updates); + $updates = null; + } + $feeder->resumeDefer(); + } + } + public function parse($updates) + { + reset($updates); + while ($updates) { + $options = []; + $key = key($updates); + $update = $updates[$key]; + unset($updates[$key]); + $options = $update['options']; + $updates = $update['updates']; + unset($update); + + $seq_start = $options['seq_start']; + $seq_end = $options['seq_end']; + $result = $this->state->checkSeq($seq_start); + if ($result > 0) { + $this->logger->logger('Seq hole of $result. seq_start: '.$seq_start.' != cur seq: '.$this->state->seq().' + 1', \danog\MadelineProto\Logger::ERROR); + yield $this->updaters[false]->resume(); + + continue; + } + if ($result < 0) { + + } + if ($this->state->seq() !== $seq) { + $this->state->seq($seq); + if (isset($options['date'])) { + $this->state->date($options['date']); + } + } + + $this->save($updates); + } + } + public function feed($updates) + { + $this->incomingUpdates[] = $updates; + } + public function save($updates) + { + $this->feeder->feed($updates); + } + public function has_all_auth() + { + if ($this->API->isInitingAuthorization()) { + return false; + } + + foreach ($this->API->datacenter->sockets as $dc) { + if (!$dc->authorized || $dc->temp_auth_key === null) { + return false; + } + } + + return true; + } +} diff --git a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php index 3fc39e10..4e0801d2 100644 --- a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php +++ b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php @@ -42,7 +42,7 @@ class UpdateLoop extends ResumableSignalLoop public function loop() { $API = $this->API; - $feeder = $this->feeder = $API->feeder[$this->channelId]; + $feeder = $this->feeder = $API->feeders[$this->channelId]; while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { if (yield $this->waitSignal($this->pause())) { @@ -56,13 +56,13 @@ class UpdateLoop extends ResumableSignalLoop $this->startedLoop(); - $API->logger->logger("Entered updates loop in DC {$datacenter}", Logger::ULTRA_VERBOSE); + $API->logger->logger("Entered updates loop in channel {$this->channelId}", Logger::ULTRA_VERBOSE); $timeout = $API->settings['updates']['getdifference_interval']; while (true) { while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { if (yield $this->waitSignal($this->pause())) { - $API->logger->logger("Exiting update loop in DC $datacenter"); + $API->logger->logger("Exiting update loop in channel {$this->channelId}"); $this->exitedLoop(); return; @@ -160,8 +160,9 @@ class UpdateLoop extends ResumableSignalLoop } } } + $feeder->resumeDefer(); if (yield $this->waitSignal($this->pause($timeout))) { - $API->logger->logger("Exiting update loop in DC $datacenter"); + $API->logger->logger("Exiting update loop in channel {$this->channelId}"); $this->exitedLoop(); return; diff --git a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php index f566a73e..20aaeb3d 100644 --- a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php @@ -258,7 +258,7 @@ trait ResponseHandler unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); if (strpos($datacenter, 'cdn') === false) { - $this->callFork($this->handle_updates_async($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'])); + $this->callForkDefer($this->handle_updates_async($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'])); } unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); @@ -572,40 +572,41 @@ trait ResponseHandler $this->logger->logger('Parsing updates received via the socket...', \danog\MadelineProto\Logger::VERBOSE); - $opts = []; + $result = []; switch ($updates['_']) { case 'updates': case 'updatesCombined': - $handle_updates = []; foreach ($updates['updates'] as $key => $update) { if ($update['_'] === 'updateNewMessage' || $update['_'] === 'updateReadMessagesContents' || $update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' || $update['_'] === 'updateReadHistoryInbox' || $update['_'] === 'updateReadHistoryOutbox' || $update['_'] === 'updateWebPage' || $update['_'] === 'updateMessageID') { - $handle_updates[] = $update; + $result[yield $this->feedSingle($update)] = true; unset($updates['updates'][$key]); } } - $this->feeders[false]->feed($handle_updates); if ($updates['updates']) { if ($updates['_'] === 'updatesCombined') { $updates['updates'][0]['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; } else { $updates['updates'][0]['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; } - $this->feeders[false]->feed($updates); + foreach ($updates as $update) { + $result[yield $this->feedSingle($update)] = true; + } } break; case 'updateShort': $updates['update']['options'] = ['date' => $updates['date']]; - $this->feeders[false]->feed([$updates['update']]); + $this->feedSingle($updates['update']); break; case 'updateShortMessage': case 'updateShortChatMessage': $from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']); $to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']); if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) { - yield $this->updaters[false]->resume(); + yield $this->updaters[false]->resumeDefer(); + return; // TOFIX } $message = $updates; @@ -624,7 +625,8 @@ trait ResponseHandler break; } $update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']]; - yield $this->handle_update_async($update, $opts); + $updates['update']['options'] = ['date' => $updates['date']]; + $result[yield $this->feedSingle($update)] = true; break; case 'updateShortSentMessage': //yield $this->set_update_state_async(['date' => $updates['date']]); @@ -636,5 +638,8 @@ trait ResponseHandler throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.var_export($updates, true)); break; } + foreach ($result as $channelId => $Boh) { + $this->feeders[$channelId]->resumeDefer(); + } } } diff --git a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php index cb3acfd2..aff482cb 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php @@ -22,7 +22,6 @@ namespace danog\MadelineProto\MTProtoTools; use Amp\Artax\Request; use Amp\Deferred; use Amp\Delayed; -use Amp\Loop; use function Amp\Promise\any; /** @@ -144,7 +143,7 @@ trait UpdateHandler return $data; } - public function handle_update_async($update, $options = []) + public function feedSingle($update) { if (!$this->settings['updates']['handle_updates']) { return; @@ -168,20 +167,16 @@ trait UpdateHandler break; } - if ($channelId === false) { - $cur_state = yield $this->load_update_state_async(); - } else { - if (!$this->channels_state->has($channelId)) { - if (!isset($this->feeders[$channelId])) { - $this->feeders[$channelId] = new FeedLoop($this, $channelId); - } - if (!isset($this->updaters[$channelId])) { - $this->updaters[$channelId] = new UpdateLoop($this, $channelId); - } - $this->feeders[$channelId]->start(); - $this->updaters[$channelId]->start(); + if ($channelId && !$this->channels_state->has($channelId)) { + $this->channels_state->get($channelId, $update); + if (!isset($this->feeders[$channelId])) { + $this->feeders[$channelId] = new FeedLoop($this, $channelId); } - $cur_state = $this->channels_state->get($channelId, $update); + if (!isset($this->updaters[$channelId])) { + $this->updaters[$channelId] = new UpdateLoop($this, $channelId); + } + $this->feeders[$channelId]->start(); + $this->updaters[$channelId]->start(); } switch ($update['_']) { @@ -189,7 +184,7 @@ trait UpdateHandler $this->logger->logger('Got channel too long update, getting difference...', \danog\MadelineProto\Logger::VERBOSE); $this->updaters[$channelId]->resumeDefer(); - return false; + return; case 'updateNewMessage': case 'updateEditMessage': case 'updateNewChannelMessage': @@ -228,17 +223,19 @@ trait UpdateHandler $this->updaters[false]->resumeDefer(); } - return false; + return; } break; default: if ($channelId !== false && !yield $this->peer_isset_async($this->to_supergroup($channelId))) { $this->logger->logger('Skipping update, I do not have the channel id '.$channelId, \danog\MadelineProto\Logger::ERROR); - return false; + return; } break; } + $this->feeders[$channelId]->feedSingle($update); + return $channelId; } public function handle_update_messages_async($messages, $channel = false) @@ -313,7 +310,7 @@ trait UpdateHandler } if ($update['qts'] > $cur_state->qts() + 1) { $this->logger->logger('Qts hole. Fetching updates manually: update qts: '.$update['qts'].' > current qts '.$cur_state->qts().'+1, chat id: '.$update['message']['chat_id'], \danog\MadelineProto\Logger::ERROR); - $this->updaters[$channelId]->resumeDefer(); + $this->updaters[false]->resumeDefer(); return false; } diff --git a/src/danog/MadelineProto/MTProtoTools/UpdatesState.php b/src/danog/MadelineProto/MTProtoTools/UpdatesState.php index 8fc4907b..7c41eafd 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdatesState.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdatesState.php @@ -193,4 +193,14 @@ class UpdatesState { return ($this->pts + $update['pts_count']) - $update['pts']; } + /** + * Check validity of seq contained in update + * + * @param int $seq + * @return int -1 if it's too old, 0 if it's ok, 1 if it's too new + */ + public function checkSeq($seq) + { + return $seq - ($this->seq + 1); + } } diff --git a/src/danog/MadelineProto/Tools.php b/src/danog/MadelineProto/Tools.php index 35e8fa3f..f1252d72 100644 --- a/src/danog/MadelineProto/Tools.php +++ b/src/danog/MadelineProto/Tools.php @@ -240,8 +240,11 @@ trait Tools return $promise; } - public function callFork($promise) + public function callFork($promise, $actual = null) { + if ($actual) { + $promise = $actual; + } if ($promise instanceof \Generator) { $promise = new Coroutine($promise); } @@ -254,6 +257,10 @@ trait Tools } return $promise; } + public function callForkDefer($promise) + { + Loop::defer([$this, 'callFork'], $promise); + } public function rethrow($e) { $logger = isset($this->logger) ? $this->logger : Logger::$default;