diff --git a/src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php b/src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php index 7e008787..34dce555 100644 --- a/src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php +++ b/src/danog/MadelineProto/Loop/Impl/ResumableSignalLoop.php @@ -55,7 +55,7 @@ abstract class ResumableSignalLoop extends SignalLoop implements ResumableLoopIn $pause = $this->pause; $this->pause = new Deferred; - Loop::defer([$pause, 'resolve']); + if ($pause) Loop::defer([$pause, 'resolve']); return $this->resume->promise(); } diff --git a/src/danog/MadelineProto/Loop/Update/FeedLoop.php b/src/danog/MadelineProto/Loop/Update/FeedLoop.php index d441801f..0acebff0 100644 --- a/src/danog/MadelineProto/Loop/Update/FeedLoop.php +++ b/src/danog/MadelineProto/Loop/Update/FeedLoop.php @@ -16,7 +16,7 @@ * @link https://docs.madelineproto.xyz MadelineProto documentation */ -namespace danog\MadelineProto\Loop\Connection; +namespace danog\MadelineProto\Loop\Update; use Amp\Success; use danog\MadelineProto\Logger; @@ -43,7 +43,7 @@ class FeedLoop extends ResumableSignalLoop public function loop() { $API = $this->API; - $updater = $this->updater = $API->updater[$this->channelId]; + $updater = $this->updater = $API->updaters[$this->channelId]; if (!$this->API->settings['updates']['handle_updates']) { yield new Success(0); @@ -145,7 +145,7 @@ class FeedLoop extends ResumableSignalLoop $seq_start = isset($options['seq_start']) ? $options['seq_start'] : $options['seq']; if ($seq_start != $this->state->seq() + 1 && $seq_start > $this->state->seq()) { $this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.$this->state->seq().' + 1', \danog\MadelineProto\Logger::ERROR); - yield $this->get_updates_difference_async(); + yield $this->updaters[false]->resume(); return false; } @@ -164,10 +164,9 @@ class FeedLoop extends ResumableSignalLoop { $this->incomingUpdates = array_merge($this->incomingUpdates, $updates); } - public function fetchSlice($to_pts) + public function feedSingle($update) { - $difference = yield $this->method_call_async_read('updates.getDifference', ['pts' => $this->state->pts(), 'pts_total_limit' => $to_pts, 'date' => $this->state->date(), 'qts' => $this->state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]); - var_dumP($difference); + $this->incomingUpdates []= $update; } public function save($update) { diff --git a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php index d2b6e60d..3fc39e10 100644 --- a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php +++ b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php @@ -42,7 +42,6 @@ class UpdateLoop extends ResumableSignalLoop public function loop() { $API = $this->API; - $datacenter = $this->datacenter; $feeder = $this->feeder = $API->feeder[$this->channelId]; while (!$this->API->settings['updates']['handle_updates'] || !$this->has_all_auth()) { @@ -69,101 +68,99 @@ class UpdateLoop extends ResumableSignalLoop return; } } - if (time() - $API->last_getdifference > $timeout) { - $toPts = $this->toPts; - $this->toPts = null; - while (true) { - if ($this->channelId) { - $this->API->logger->logger('Fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); - if ($state->pts() <= 1) { - $limit = 10; - } else if ($API->authorization['user']['bot']) { - $limit = 100000; - } else { - $limit = 100; - } - $difference = yield $this->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $state->pts(), 'limit' => $limit, 'force' => true], ['datacenter' => $this->datacenter->curdc]); - if (isset($difference['timeout'])) { - $timeout = $difference['timeout']; - } - - switch ($difference['_']) { - case 'updates.channelDifferenceEmpty': - $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); - $state->update($difference); - unset($difference); - break 2; - case 'updates.channelDifference': - $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); - if ($state->pts() >= $difference['pts'] && $state->pts() > 1) { - $this->API->logger->logger("The PTS ({$difference['pts']}) I got with getDifference is smaller than the PTS I requested ".$state->pts().", using ".($state->pts()+1), \danog\MadelineProto\Logger::VERBOSE); - $difference['pts'] = $state->pts() + 1; - } - $state->update($difference); - $feeder->feed($difference['other_updates']); - - yield $this->handle_update_messages_async($difference['new_messages'], $channel); - if (!$difference['final']) { - if ($difference['pts'] >= $toPts) { - unset($difference); - break 2; - } - unset($difference); - break; - } - unset($difference); - break 2; - case 'updates.channelDifferenceTooLong': - $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); - $state->update($difference); - yield $this->handle_update_messages_async($difference['messages'], $channel); - unset($difference); - break; - default: - throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true)); - } + $toPts = $this->toPts; + $this->toPts = null; + while (true) { + if ($this->channelId) { + $this->API->logger->logger('Fetching '.$this->channelId.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); + if ($state->pts() <= 1) { + $limit = 10; + } else if ($API->authorization['user']['bot']) { + $limit = 100000; } else { - $this->API->logger->logger('Fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); + $limit = 100; + } + $difference = yield $this->method_call_async_read('updates.getChannelDifference', ['channel' => 'channel#'.$this->channelId, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $state->pts(), 'limit' => $limit, 'force' => true], ['datacenter' => $this->datacenter->curdc]); + if (isset($difference['timeout'])) { + $timeout = $difference['timeout']; + } - $difference = yield $this->API->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]); - $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE); + switch ($difference['_']) { + case 'updates.channelDifferenceEmpty': + $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); + $state->update($difference); + unset($difference); + break 2; + case 'updates.channelDifference': + $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); + if ($state->pts() >= $difference['pts'] && $state->pts() > 1) { + $this->API->logger->logger("The PTS ({$difference['pts']}) I got with getDifference is smaller than the PTS I requested ".$state->pts().", using ".($state->pts() + 1), \danog\MadelineProto\Logger::VERBOSE); + $difference['pts'] = $state->pts() + 1; + } + $state->update($difference); + $feeder->feed($difference['other_updates']); - switch ($difference['_']) { - case 'updates.differenceEmpty': - $state->update($difference); - unset($difference); - break 2; - case 'updates.difference': - foreach ($difference['new_encrypted_messages'] as &$encrypted) { - $encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted]; - } - $feeder->feed($difference['other_updates']); - $feeder->feed($difference['new_encrypted_messages']); - yield $this->handle_update_messages_async($difference['new_messages']); - $state->update($difference['state']); - unset($difference); - break 2; - case 'updates.differenceSlice': - foreach ($difference['new_encrypted_messages'] as &$encrypted) { - $encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted]; - } - $feeder->feed($difference['other_updates']); - $feeder->feed($difference['new_encrypted_messages']); - yield $this->handle_update_messages_async($difference['new_messages']); - $state->update($difference['intermediate_state']); - if ($difference['intermediate_state']['pts'] >= $toPts) { + yield $this->handle_update_messages_async($difference['new_messages'], $channel); + if (!$difference['final']) { + if ($difference['pts'] >= $toPts) { unset($difference); break 2; } unset($difference); break; - default: - throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true)); - } + } + unset($difference); + break 2; + case 'updates.channelDifferenceTooLong': + $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); + $state->update($difference); + yield $this->handle_update_messages_async($difference['messages'], $channel); + unset($difference); + break; + default: + throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true)); + } + } else { + $this->API->logger->logger('Fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); + + $difference = yield $this->API->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->API->settings['connection_settings']['default_dc']]); + $this->API->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE); + + switch ($difference['_']) { + case 'updates.differenceEmpty': + $state->update($difference); + unset($difference); + break 2; + case 'updates.difference': + foreach ($difference['new_encrypted_messages'] as &$encrypted) { + $encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted]; + } + $feeder->feed($difference['other_updates']); + $feeder->feed($difference['new_encrypted_messages']); + yield $this->handle_update_messages_async($difference['new_messages']); + $state->update($difference['state']); + unset($difference); + break 2; + case 'updates.differenceSlice': + foreach ($difference['new_encrypted_messages'] as &$encrypted) { + $encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted]; + } + $feeder->feed($difference['other_updates']); + $feeder->feed($difference['new_encrypted_messages']); + yield $this->handle_update_messages_async($difference['new_messages']); + $state->update($difference['intermediate_state']); + if ($difference['intermediate_state']['pts'] >= $toPts) { + unset($difference); + break 2; + } + unset($difference); + break; + default: + throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true)); } } } - if (yield $this->waitSignal($this->pause(($API->last_getdifference + $timeout) - time()))) { + if (yield $this->waitSignal($this->pause($timeout))) { $API->logger->logger("Exiting update loop in DC $datacenter"); $this->exitedLoop(); diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index bca3bd7e..1a2fc734 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -420,7 +420,7 @@ class MTProto extends AsyncConstruct implements TLCallback } if ($this->authorized === self::LOGGED_IN && $this->settings['updates']['handle_updates'] && !$this->updates_state->syncLoading()) { $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['getupdates_deserialization'], Logger::NOTICE); - yield $this->get_updates_difference_async(); + yield $this->updaters[false]->resume(); } $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->start(); } diff --git a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php index a2a6bbca..e54dd5a5 100644 --- a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php @@ -463,13 +463,7 @@ trait AuthKeyHandler public function get_dh_config_async() { - $this->updates_state->syncLoading(true); - - try { - $dh_config = yield $this->method_call_async_read('messages.getDhConfig', ['version' => $this->dh_config['version'], 'random_length' => 0], ['datacenter' => $this->datacenter->curdc]); - } finally { - $this->updates_state->syncLoading(false); - } + $dh_config = yield $this->method_call_async_read('messages.getDhConfig', ['version' => $this->dh_config['version'], 'random_length' => 0], ['datacenter' => $this->datacenter->curdc]); if ($dh_config['_'] === 'messages.dhConfigNotModified') { $this->logger->logger(\danog\MadelineProto\Logger::VERBOSE, ['DH configuration not modified']); @@ -563,12 +557,7 @@ trait AuthKeyHandler return false; } - // Creates authorization keys public function init_authorization_async() - { - return $this->ainit_authorization_async(); - } - public function ainit_authorization_async() { if ($this->pending_auth) { return; @@ -576,8 +565,6 @@ trait AuthKeyHandler $initing = $this->initing_authorization; $this->initing_authorization = true; - $this->updates_state->syncLoading(true); - $this->postpone_updates = true; try { $dcs = []; @@ -618,10 +605,7 @@ trait AuthKeyHandler } } finally { $this->pending_auth = false; - $this->postpone_updates = false; $this->initing_authorization = $initing; - $this->updates_state->syncLoading(false); - yield $this->handle_pending_updates_async(); } } diff --git a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php index 4cbe2cb6..f566a73e 100644 --- a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php @@ -115,8 +115,7 @@ trait ResponseHandler // Acknowledge that I received the server's response if ($this->authorized === self::LOGGED_IN && !$this->initing_authorization && $this->datacenter->sockets[$this->datacenter->curdc]->temp_auth_key !== null) { - - $this->callFork($this->get_updates_difference_async()); + $this->updaters[false]->resumeDefer(); } unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); @@ -292,7 +291,7 @@ trait ResponseHandler } } $this->datacenter->sockets[$datacenter]->writer->resume(); - + //$this->n--; return $only_updates; @@ -562,23 +561,6 @@ trait ResponseHandler )()); } - public function handle_pending_updates_async() - { - if ($this->postpone_updates) { - return false; - } - if (count($this->pending_updates)) { - $this->logger->logger('Parsing pending updates...'); - foreach (array_keys($this->pending_updates) as $key) { - if (isset($this->pending_updates[$key])) { - $updates = $this->pending_updates[$key]; - unset($this->pending_updates[$key]); - yield $this->handle_updates_async($updates); - } - } - } - } - public function handle_updates_async($updates, $actual_updates = null) { if (!$this->settings['updates']['handle_updates']) { @@ -588,78 +570,71 @@ trait ResponseHandler $updates = $actual_updates; } - if ($this->postpone_updates) { - $this->logger->logger('Postpone update handling', \danog\MadelineProto\Logger::VERBOSE); - $this->pending_updates[] = $updates; - - return false; - } - yield $this->handle_pending_updates_async(); $this->logger->logger('Parsing updates received via the socket...', \danog\MadelineProto\Logger::VERBOSE); - try { - $this->postpone_updates = true; - - $opts = []; - foreach (['date', 'seq', 'seq_start'] as $key) { - if (isset($updates[$key])) { - $opts[$key] = $updates[$key]; + $opts = []; + switch ($updates['_']) { + case 'updates': + case 'updatesCombined': + $handle_updates = []; + foreach ($updates['updates'] as $key => $update) { + if ($update['_'] === 'updateNewMessage' || $update['_'] === 'updateReadMessagesContents' || + $update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' || + $update['_'] === 'updateReadHistoryInbox' || $update['_'] === 'updateReadHistoryOutbox' || + $update['_'] === 'updateWebPage' || $update['_'] === 'updateMessageID') { + $handle_updates[] = $update; + unset($updates['updates'][$key]); + } } - } - switch ($updates['_']) { - case 'updates': - case 'updatesCombined': - foreach ($updates['updates'] as $update) { - yield $this->handle_update_async($update, $opts); + $this->feeders[false]->feed($handle_updates); + if ($updates['updates']) { + if ($updates['_'] === 'updatesCombined') { + $updates['updates'][0]['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; + } else { + $updates['updates'][0]['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; } - break; - case 'updateShort': - yield $this->handle_update_async($updates['update'], $opts); - break; - case 'updateShortMessage': - case 'updateShortChatMessage': - $from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']); - $to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']); - if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) { - $this->logger->logger('getDifference: good - getting user for updateShortMessage', \danog\MadelineProto\Logger::VERBOSE); - yield $this->get_updates_difference_async(); - } - $message = $updates; - $message['_'] = 'message'; - $message['from_id'] = $from_id; + $this->feeders[false]->feed($updates); + } + break; + case 'updateShort': + $updates['update']['options'] = ['date' => $updates['date']]; + $this->feeders[false]->feed([$updates['update']]); + break; + case 'updateShortMessage': + case 'updateShortChatMessage': + $from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']); + $to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']); + if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) { + yield $this->updaters[false]->resume(); + // TOFIX + } + $message = $updates; + $message['_'] = 'message'; + $message['from_id'] = $from_id; - try { - $message['to_id'] = (yield $this->get_info_async($to_id))['Peer']; - } catch (\danog\MadelineProto\Exception $e) { - $this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR); - //$this->pending_updates[] = $updates; - break; - } catch (\danog\MadelineProto\RPCErrorException $e) { - $this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR); - //$this->pending_updates[] = $updates; - break; - } - $update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']]; - yield $this->handle_update_async($update, $opts); + try { + $message['to_id'] = (yield $this->get_info_async($to_id))['Peer']; + } catch (\danog\MadelineProto\Exception $e) { + $this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR); + //$this->pending_updates[] = $updates; break; - case 'updateShortSentMessage': - //yield $this->set_update_state_async(['date' => $updates['date']]); + } catch (\danog\MadelineProto\RPCErrorException $e) { + $this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR); + //$this->pending_updates[] = $updates; break; - case 'updatesTooLong': - yield $this->get_updates_difference_async(); - break; - default: - throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.var_export($updates, true)); - break; - } - } finally { - $this->postpone_updates = false; - } - if ($this->updates && $this->update_deferred) { - $d = $this->update_deferred; - $this->update_deferred = null; - - Loop::defer([$d, 'resolve']); + } + $update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']]; + yield $this->handle_update_async($update, $opts); + break; + case 'updateShortSentMessage': + //yield $this->set_update_state_async(['date' => $updates['date']]); + break; + case 'updatesTooLong': + $this->updaters[false]->resumeDefer(); + break; + default: + throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.var_export($updates, true)); + break; } } } diff --git a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php index f88220cf..cb3acfd2 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php @@ -122,99 +122,6 @@ trait UpdateHandler return false; } - public function get_channel_difference_async($channel) - { - if (!$this->settings['updates']['handle_updates']) { - return; - } - if ($this->channels_state->syncLoading($channel)) { - $this->logger->logger('Not fetching '.$channel.' difference, I am already fetching it'); - - return; - } - $this->channels_state->syncLoading($channel, true); - $this->postpone_updates = true; - - try { - $input = yield $this->get_info_async('channel#'.$channel); - if (!isset($input['InputChannel'])) { - throw new \danog\MadelineProto\Exception('This peer is not present in the internal peer database'); - } - $input = $input['InputChannel']; - } catch (\danog\MadelineProto\Exception $e) { - return false; - } catch (\danog\MadelineProto\RPCErrorException $e) { - return false; - } finally { - $this->postpone_updates = false; - $this->channels_state->syncLoading($channel, false); - } - $this->logger->logger('Fetching '.$channel.' difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); - $this->channels_state->syncLoading($channel, true); - $this->postpone_updates = true; - - try { - $difference = yield $this->method_call_async_read('updates.getChannelDifference', ['channel' => $input, 'filter' => ['_' => 'channelMessagesFilterEmpty'], 'pts' => $this->channels_state->get($channel)->pts(), 'limit' => 30], ['datacenter' => $this->datacenter->curdc]); - } catch (\danog\MadelineProto\RPCErrorException $e) { - if ($e->getMessage() === "You haven't joined this channel/supergroup") { - return false; - } - - throw $e; - } catch (\danog\MadelineProto\PTSException $e) { - $this->logger->logger($e->getMessage()); - $this->channels_state->remove($channel); - - return false; //yield $this->get_channel_difference_async($channel); - } finally { - $this->postpone_updates = false; - $this->channels_state->syncLoading($channel, false); - } - unset($input); - - switch ($difference['_']) { - case 'updates.channelDifferenceEmpty': - $this->channels_state->get($channel, $difference); - break; - case 'updates.channelDifference': - $this->channels_state->syncLoading($channel, true); - $this->postpone_updates = true; - - try { - $this->channels_state->get($channel, $difference); - yield $this->handle_update_messages_async($difference['new_messages'], $channel); - yield $this->handle_multiple_update_async($difference['other_updates'], [], $channel); - } finally { - $this->postpone_updates = false; - $this->channels_state->syncLoading($channel, false); - } - if (!$difference['final']) { - unset($difference); - yield $this->get_channel_difference_async($channel); - } - break; - case 'updates.channelDifferenceTooLong': - $this->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::VERBOSE); - $this->channels_state->syncLoading($channel, true); - $this->postpone_updates = true; - - try { - $this->channels_state->get($channel, $difference); - yield $this->handle_update_messages_async($difference['messages'], $channel); - unset($difference); - } finally { - $this->postpone_updates = false; - $this->channels_state->syncLoading($channel, false); - } - yield $this->get_channel_difference_async($channel); - break; - default: - throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true)); - break; - } - yield $this->handle_pending_updates_async(); - } - public function load_update_state_async() { if (!$this->got_state) { @@ -229,91 +136,10 @@ trait UpdateHandler return $this->channels_state->get($channelId); } - public function get_updates_difference_async($w = null) - { - if (!$this->settings['updates']['handle_updates']) { - return; - } - if ($this->updates_state->syncLoading()) { - $this->logger->logger('Not fetching normal difference, I am already fetching it'); - - return false; - } - $this->updates_state->syncLoading(true); - $this->postpone_updates = true; - $this->logger->logger('Fetching normal difference...', \danog\MadelineProto\Logger::ULTRA_VERBOSE); - while (!isset($difference)) { - try { - $state = yield $this->load_update_state_async(); - $difference = yield $this->method_call_async_read('updates.getDifference', ['pts' => $state->pts(), 'date' => $state->date(), 'qts' => $state->qts()], ['datacenter' => $this->settings['connection_settings']['default_dc']]); - } catch (\danog\MadelineProto\PTSException $e) { - $this->updates_state->syncLoading(false); - $this->got_state = false; - } finally { - $this->postpone_updates = false; - $this->updates_state->syncLoading(false); - } - } - $this->logger->logger('Got '.$difference['_'], \danog\MadelineProto\Logger::ULTRA_VERBOSE); - $this->postpone_updates = true; - $this->updates_state->syncLoading(true); - $this->last_getdifference = time(); - $this->datacenter->sockets[$this->settings['connection_settings']['default_dc']]->updater->resume(); - - try { - switch ($difference['_']) { - case 'updates.differenceEmpty': - $this->updates_state->update($difference); - break; - case 'updates.difference': - $this->updates_state->syncLoading(true); - yield $this->handle_multiple_update_async($difference['other_updates']); - foreach ($difference['new_encrypted_messages'] as $encrypted) { - yield $this->handle_encrypted_update_async(['_' => 'updateNewEncryptedMessage', 'message' => $encrypted], true); - } - yield $this->handle_update_messages_async($difference['new_messages']); - $this->updates_state->update($difference['state']); - break; - case 'updates.differenceSlice': - $this->updates_state->syncLoading(true); - yield $this->handle_multiple_update_async($difference['other_updates']); - yield $this->handle_update_messages_async($difference['new_messages']); - $this->updates_state->update($difference['intermediate_state']); - unset($difference); - $this->updates_state->syncLoading(false); - yield $this->get_updates_difference_async(); - break; - default: - throw new \danog\MadelineProto\Exception('Unrecognized update difference received: '.var_export($difference, true)); - break; - } - } finally { - $this->postpone_updates = false; - $this->updates_state->syncLoading(false); - } - yield $this->handle_pending_updates_async(); - - if ($this->updates && $this->update_deferred) { - $d = $this->update_deferred; - $this->update_deferred = null; - - Loop::defer([$d, 'resolve']); - } - - return true; - } - public function get_updates_state_async() { - $last = $this->updates_state->syncLoading(); - $this->updates_state->syncLoading(true); - - try { - $data = yield $this->method_call_async_read('updates.getState', [], ['datacenter' => $this->settings['connection_settings']['default_dc']]); - yield $this->get_cdn_config_async($this->settings['connection_settings']['default_dc']); - } finally { - $this->updates_state->syncLoading($last); - } + $data = yield $this->method_call_async_read('updates.getState', [], ['datacenter' => $this->settings['connection_settings']['default_dc']]); + yield $this->get_cdn_config_async($this->settings['connection_settings']['default_dc']); return $data; } @@ -324,37 +150,44 @@ trait UpdateHandler return; } $this->logger->logger('Handling an update of type '.$update['_'].'...', \danog\MadelineProto\Logger::VERBOSE); - $channel_id = false; + $channelId = false; switch ($update['_']) { case 'updateChannelWebPage': case 'updateNewChannelMessage': case 'updateEditChannelMessage': - $channel_id = $update['message']['to_id']['channel_id']; + $channelId = $update['message']['to_id']['channel_id']; break; case 'updateDeleteChannelMessages': - $channel_id = $update['channel_id']; + $channelId = $update['channel_id']; break; case 'updateChannelTooLong': - $channel_id = $update['channel_id']; - $this->logger->logger('Got channel too long update, getting difference...', \danog\MadelineProto\Logger::VERBOSE); - if (!$this->channels_state->has($channel_id) && !isset($update['pts'])) { - $this->logger->logger('I do not have the channel in the states and the pts is not set.', \danog\MadelineProto\Logger::ERROR); - - return; + $channelId = $update['channel_id']; + if (!$this->channels_state->has($channelId) && !isset($update['pts'])) { + $update['pts'] = 1; } break; } - if ($channel_id === false) { + if ($channelId === false) { $cur_state = yield $this->load_update_state_async(); } else { - $cur_state = $this->channels_state->get($channel_id, $update); + if (!$this->channels_state->has($channelId)) { + if (!isset($this->feeders[$channelId])) { + $this->feeders[$channelId] = new FeedLoop($this, $channelId); + } + if (!isset($this->updaters[$channelId])) { + $this->updaters[$channelId] = new UpdateLoop($this, $channelId); + } + $this->feeders[$channelId]->start(); + $this->updaters[$channelId]->start(); + } + $cur_state = $this->channels_state->get($channelId, $update); } switch ($update['_']) { case 'updateChannelTooLong': - $this->datacenter->sockets[] - yield $this->get_channel_difference_async($channel_id); + $this->logger->logger('Got channel too long update, getting difference...', \danog\MadelineProto\Logger::VERBOSE); + $this->updaters[$channelId]->resumeDefer(); return false; case 'updateNewMessage': @@ -389,104 +222,29 @@ trait UpdateHandler } $this->logger->logger("Not enough data: for message update $log, getting difference...", \danog\MadelineProto\Logger::VERBOSE); - if ($channel_id !== false && yield $this->peer_isset_async($this->to_supergroup($channel_id))) { - yield $this->get_channel_difference_async($channel_id); + if ($channelId !== false && yield $this->peer_isset_async($this->to_supergroup($channelId))) { + $this->updaters[$channelId]->resumeDefer(); } else { - yield $this->get_updates_difference_async(); + $this->updaters[false]->resumeDefer(); } return false; } break; default: - if ($channel_id !== false && !yield $this->peer_isset_async($this->to_supergroup($channel_id))) { - $this->logger->logger('Skipping update, I do not have the channel id '.$channel_id, \danog\MadelineProto\Logger::ERROR); + if ($channelId !== false && !yield $this->peer_isset_async($this->to_supergroup($channelId))) { + $this->logger->logger('Skipping update, I do not have the channel id '.$channelId, \danog\MadelineProto\Logger::ERROR); return false; } break; } - if (isset($update['pts'])) { - $logger = function ($msg) use ($update, $cur_state, $channel_id) { - $pts_count = isset($update['pts_count']) ? $update['pts_count'] : 0; - $this->logger->logger($update); - $double = isset($update['message']['id']) ? $update['message']['id'] * 2 : '-'; - $mid = isset($update['message']['id']) ? $update['message']['id'] : '-'; - $mypts = $cur_state->pts(); - $this->logger->logger("$msg. My pts: {$mypts}, remote pts: {$update['pts']}, remote pts count: {$pts_count}, msg id: {$mid} (*2=$double), channel id: $channel_id", \danog\MadelineProto\Logger::ERROR); - }; - if ($update['pts'] < $cur_state->pts()) { - $logger("PTS duplicate"); - - return false; - } - if ($cur_state->pts() + (isset($update['pts_count']) ? $update['pts_count'] : 0) !== $update['pts']) { - $logger("PTS hole"); - if ($channel_id !== false && yield $this->peer_isset_async($this->to_supergroup($channel_id))) { - yield $this->get_channel_difference_async($channel_id); - } else { - yield $this->get_updates_difference_async(); - } - - return false; - } - if (isset($update['message']['id'], $update['message']['to_id']) && !in_array($update['_'], ['updateEditMessage', 'updateEditChannelMessage'])) { - if (!$this->check_msg_id($update['message'])) { - $logger("MSGID duplicate"); - - return false; - } - } - $logger("PTS OK"); - - //$this->logger->logger('Applying pts. my pts: '.$cur_state->pts().', remote pts: '.$update['pts'].', channel id: '.$channel_id, \danog\MadelineProto\Logger::VERBOSE); - $cur_state->pts($update['pts']); - if ($channel_id === false && isset($options['date'])) { - $cur_state->date($options['date']); - } - } - if ($channel_id === false && isset($options['seq']) || isset($options['seq_start'])) { - $seq = $options['seq']; - $seq_start = isset($options['seq_start']) ? $options['seq_start'] : $options['seq']; - if ($seq_start != $cur_state->seq() + 1 && $seq_start > $cur_state->seq()) { - $this->logger->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.$cur_state->seq().' + 1', \danog\MadelineProto\Logger::ERROR); - yield $this->get_updates_difference_async(); - - return false; - } - if ($cur_state->seq() !== $seq) { - $cur_state->seq($seq); - if (isset($options['date'])) { - $cur_state->date($options['date']); - } - } - } - yield $this->save_update_async($update); - } - - public function handle_multiple_update_async($updates, $options = [], $channel = false) - { - if (!$this->settings['updates']['handle_updates']) { - return; - } - if ($channel === false) { - foreach ($updates as $update) { - yield $this->handle_update_async($update, $options); - } - } else { - foreach ($updates as $update) { - yield $this->handle_update_async($update); - } - } } public function handle_update_messages_async($messages, $channel = false) { - if (!$this->settings['updates']['handle_updates']) { - return; - } foreach ($messages as $message) { - yield $this->handle_update_async(['_' => $channel === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => ($channel === false ? (yield $this->load_update_state_async()) : $this->channels_state->get($channel))->pts(), 'pts_count' => 0]); + yield $this->save_update_async(['_' => $channel === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => ($channel === false ? (yield $this->load_update_state_async()) : $this->channels_state->get($channel))->pts(), 'pts_count' => 0]); } } @@ -555,7 +313,7 @@ trait UpdateHandler } if ($update['qts'] > $cur_state->qts() + 1) { $this->logger->logger('Qts hole. Fetching updates manually: update qts: '.$update['qts'].' > current qts '.$cur_state->qts().'+1, chat id: '.$update['message']['chat_id'], \danog\MadelineProto\Logger::ERROR); - yield $this->get_updates_difference_async(); + $this->updaters[$channelId]->resumeDefer(); return false; } diff --git a/src/danog/MadelineProto/SecretChats/AuthKeyHandler.php b/src/danog/MadelineProto/SecretChats/AuthKeyHandler.php index 1f1a70a0..2d93b79b 100644 --- a/src/danog/MadelineProto/SecretChats/AuthKeyHandler.php +++ b/src/danog/MadelineProto/SecretChats/AuthKeyHandler.php @@ -53,7 +53,6 @@ trait AuthKeyHandler $this->check_G($g_b, $dh_config['p']); yield $this->method_call_async_read('messages.acceptEncryption', ['peer' => $params['id'], 'g_b' => $g_b->toBytes(), 'key_fingerprint' => $key['fingerprint']], ['datacenter' => $this->datacenter->curdc]); yield $this->notify_layer_async($params['id']); - yield $this->handle_pending_updates_async(); $this->logger->logger('Secret chat '.$params['id'].' accepted successfully!', \danog\MadelineProto\Logger::NOTICE); } @@ -73,8 +72,7 @@ trait AuthKeyHandler $this->check_G($g_a, $dh_config['p']); $res = yield $this->method_call_async_read('messages.requestEncryption', ['user_id' => $user, 'g_a' => $g_a->toBytes()], ['datacenter' => $this->datacenter->curdc]); $this->temp_requested_secret_chats[$res['id']] = $a; - yield $this->handle_pending_updates_async(); - yield $this->get_updates_difference_async(); + $this->updaters[false]->resume(); $this->logger->logger('Secret chat '.$res['id'].' requested successfully!', \danog\MadelineProto\Logger::NOTICE); return $res['id']; @@ -104,7 +102,6 @@ trait AuthKeyHandler $key['visualization_46'] = substr(hash('sha256', $key['auth_key'], true), 20); $this->secret_chats[$params['id']] = ['key' => $key, 'admin' => true, 'user_id' => $params['participant_id'], 'InputEncryptedChat' => ['chat_id' => $params['id'], 'access_hash' => $params['access_hash'], '_' => 'inputEncryptedChat'], 'in_seq_no_x' => 0, 'out_seq_no_x' => 1, 'in_seq_no' => 0, 'out_seq_no' => 0, 'layer' => 8, 'ttl' => 0, 'ttr' => 100, 'updated' => time(), 'incoming' => [], 'outgoing' => [], 'created' => time(), 'rekeying' => [0], 'key_x' => 'to server', 'mtproto' => 1]; yield $this->notify_layer_async($params['id']); - yield $this->handle_pending_updates_async(); $this->logger->logger('Secret chat '.$params['id'].' completed successfully!', \danog\MadelineProto\Logger::NOTICE); } @@ -131,8 +128,7 @@ trait AuthKeyHandler $this->temp_rekeyed_secret_chats[$e] = $a; $this->secret_chats[$chat]['rekeying'] = [1, $e]; yield $this->method_call_async_read('messages.sendEncryptedService', ['peer' => $chat, 'message' => ['_' => 'decryptedMessageService', 'action' => ['_' => 'decryptedMessageActionRequestKey', 'g_a' => $g_a->toBytes(), 'exchange_id' => $e]]], ['datacenter' => $this->datacenter->curdc]); - yield $this->handle_pending_updates_async(); - yield $this->get_updates_difference_async(); + $this->updaters[false]->resume(); return $e; } @@ -167,8 +163,7 @@ trait AuthKeyHandler $g_b = $dh_config['g']->powMod($b, $dh_config['p']); $this->check_G($g_b, $dh_config['p']); yield $this->method_call_async_read('messages.sendEncryptedService', ['peer' => $chat, 'message' => ['_' => 'decryptedMessageService', 'action' => ['_' => 'decryptedMessageActionAcceptKey', 'g_b' => $g_b->toBytes(), 'exchange_id' => $params['exchange_id'], 'key_fingerprint' => $key['fingerprint']]]], ['datacenter' => $this->datacenter->curdc]); - yield $this->handle_pending_updates_async(); - yield $this->get_updates_difference_async(); + $this->updaters[false]->resume(); } public function commit_rekey_async($chat, $params) @@ -198,8 +193,7 @@ trait AuthKeyHandler $this->secret_chats[$chat]['key'] = $key; $this->secret_chats[$chat]['ttr'] = 100; $this->secret_chats[$chat]['updated'] = time(); - yield $this->handle_pending_updates_async(); - yield $this->get_updates_difference_async(); + $this->updaters[false]->resume(); } public function complete_rekey_async($chat, $params) diff --git a/src/danog/MadelineProto/VoIP/AuthKeyHandler.php b/src/danog/MadelineProto/VoIP/AuthKeyHandler.php index b97fe190..8b491f53 100644 --- a/src/danog/MadelineProto/VoIP/AuthKeyHandler.php +++ b/src/danog/MadelineProto/VoIP/AuthKeyHandler.php @@ -67,8 +67,7 @@ trait AuthKeyHandler $res = yield $this->method_call_async_read('phone.requestCall', ['user_id' => $user, 'g_a_hash' => hash('sha256', $g_a->toBytes(), true), 'protocol' => ['_' => 'phoneCallProtocol', 'udp_p2p' => true, 'udp_reflector' => true, 'min_layer' => 65, 'max_layer' => \danog\MadelineProto\VoIP::getConnectionMaxLayer()]], ['datacenter' => $this->datacenter->curdc]); $controller->setCall($res['phone_call']); $this->calls[$res['phone_call']['id']] = $controller; - yield $this->handle_pending_updates_async(); - yield $this->get_updates_difference_async(); + yield $this->updaters[false]->resume(); return $controller; } @@ -113,8 +112,7 @@ trait AuthKeyHandler throw $e; } $this->calls[$res['phone_call']['id']]->storage['b'] = $b; - yield $this->handle_pending_updates_async(); - yield $this->get_updates_difference_async(); + yield $this->updaters[false]->resume(); return true; } @@ -152,7 +150,6 @@ trait AuthKeyHandler $this->calls[$params['id']]->configuration = array_merge(['recv_timeout' => $this->config['call_receive_timeout_ms'] / 1000, 'init_timeout' => $this->config['call_connect_timeout_ms'] / 1000, 'data_saving' => \danog\MadelineProto\VoIP::DATA_SAVING_NEVER, 'enable_NS' => true, 'enable_AEC' => true, 'enable_AGC' => true, 'auth_key' => $key, 'auth_key_id' => substr(sha1($key, true), -8), 'call_id' => substr(hash('sha256', $key, true), -16), 'network_type' => \danog\MadelineProto\VoIP::NET_TYPE_ETHERNET], $this->calls[$params['id']]->configuration); $this->calls[$params['id']]->parseConfig(); $res = $this->calls[$params['id']]->startTheMagic(); - yield $this->handle_pending_updates_async(); return $res; } diff --git a/src/danog/MadelineProto/Wrappers/DialogHandler.php b/src/danog/MadelineProto/Wrappers/DialogHandler.php index b20954c8..35b41ccf 100644 --- a/src/danog/MadelineProto/Wrappers/DialogHandler.php +++ b/src/danog/MadelineProto/Wrappers/DialogHandler.php @@ -40,9 +40,7 @@ trait DialogHandler $res = ['dialogs' => [0], 'count' => 1]; $datacenter = $this->datacenter->curdc; $dialogs = []; - $this->postpone_updates = true; - try { $this->logger->logger(\danog\MadelineProto\Lang::$current_lang['getting_dialogs']); while ($this->dialog_params['count'] < $res['count']) { $res = yield $this->method_call_async_read('messages.getDialogs', $this->dialog_params, ['datacenter' => $datacenter, 'FloodWaitLimit' => 100]); @@ -82,10 +80,6 @@ trait DialogHandler break; } } - } finally { - $this->postpone_updates = false; - $this->callFork($this->handle_pending_updates_async()); - } return $dialogs; }