From 558afb9183184fd75c6fdea1b0d9604cfe61c78e Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Thu, 30 May 2019 13:28:50 +0200 Subject: [PATCH] Finish writing feeder logic --- .../MadelineProto/Loop/Update/FeedLoop.php | 105 +++++++++++++++++- .../MadelineProto/Loop/Update/SeqLoop.php | 26 ++--- .../MadelineProto/Loop/Update/UpdateLoop.php | 15 ++- src/danog/MadelineProto/MTProto.php | 39 +++---- .../MTProtoTools/ResponseHandler.php | 30 ++--- .../MTProtoTools/UpdateHandler.php | 100 +---------------- 6 files changed, 159 insertions(+), 156 deletions(-) diff --git a/src/danog/MadelineProto/Loop/Update/FeedLoop.php b/src/danog/MadelineProto/Loop/Update/FeedLoop.php index e39c65c9..ed1e4284 100644 --- a/src/danog/MadelineProto/Loop/Update/FeedLoop.php +++ b/src/danog/MadelineProto/Loop/Update/FeedLoop.php @@ -18,10 +18,10 @@ namespace danog\MadelineProto\Loop\Update; +use Amp\Loop; use Amp\Success; use danog\MadelineProto\Logger; use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; -use Amp\Loop; /** * update feed loop. @@ -96,7 +96,10 @@ class FeedLoop extends ResumableSignalLoop } $this->parsedUpdates = []; } - if ($API->update_deferred) Loop::defer([$API->update_deferred, 'resolve']); + if ($API->update_deferred) { + Loop::defer([$API->update_deferred, 'resolve']); + } + } } public function parse($updates) @@ -152,20 +155,110 @@ class FeedLoop extends ResumableSignalLoop } public function feed($updates) { - $this->incomingUpdates = array_merge($this->incomingUpdates, $updates); + $result = []; + foreach ($updates as $update) { + $res = $this->feedSingle($update); + if ($res instanceof \Generator) { + $res = yield $res; + } + $result[$res] = true; + } + return $result; } public function feedSingle($update) { - $this->incomingUpdates []= $update; + if (!$this->channelId) { + $channelId = false; + switch ($update['_']) { + case 'updateChannelWebPage': + case 'updateNewChannelMessage': + case 'updateEditChannelMessage': + $channelId = $update['message']['to_id']['channel_id']; + break; + case 'updateDeleteChannelMessages': + $channelId = $update['channel_id']; + break; + case 'updateChannelTooLong': + $channelId = $update['channel_id']; + if (!isset($update['pts'])) { + $update['pts'] = 1; + } + break; + } + + if ($channelId && !$this->API->getChannelStates()->has($channelId)) { + $this->API->loadChannelState($channelId, $update); + if (!isset($this->API->feeders[$channelId])) { + $this->API->feeders[$channelId] = new FeedLoop($this, $channelId); + } + if (!isset($this->API->updaters[$channelId])) { + $this->API->updaters[$channelId] = new UpdateLoop($this, $channelId); + } + $this->API->feeders[$channelId]->start(); + $this->API->updaters[$channelId]->start(); + } + + switch ($update['_']) { + case 'updateNewMessage': + case 'updateEditMessage': + case 'updateNewChannelMessage': + case 'updateEditChannelMessage': + $to = false; + $from = false; + $via_bot = false; + $entities = false; + if (($from = isset($update['message']['from_id']) && !yield $this->peer_isset_async($update['message']['from_id'])) || + ($to = !yield $this->peer_isset_async($update['message']['to_id'])) || + ($via_bot = isset($update['message']['via_bot_id']) && !yield $this->peer_isset_async($update['message']['via_bot_id'])) || + ($entities = isset($update['message']['entities']) && !yield $this->entities_peer_isset_async($update['message']['entities'])) // || + //isset($update['message']['fwd_from']) && !yield $this->fwd_peer_isset_async($update['message']['fwd_from']) + ) { + $log = ''; + if ($from) { + $log .= "from_id {$update['message']['from_id']}, "; + } + + if ($to) { + $log .= "to_id ".json_encode($update['message']['to_id']).", "; + } + + if ($via_bot) { + $log .= "via_bot {$update['message']['via_bot_id']}, "; + } + + if ($entities) { + $log .= "entities ".json_encode($update['message']['entities']).", "; + } + + $this->logger->logger("Not enough data: for message update $log, getting difference...", \danog\MadelineProto\Logger::VERBOSE); + $update = ['_' => 'updateChannelTooLong']; + } + break; + default: + if ($channelId !== false && !yield $this->peer_isset_async($this->to_supergroup($channelId))) { + $this->logger->logger('Skipping update, I do not have the channel id '.$channelId, \danog\MadelineProto\Logger::ERROR); + + return; + } + break; + } + if ($channelId) { + return $this->feeders[$channelId]->feedSingle($update); + } + } + + $this->logger->logger('Was fed an update of type '.$update['_'].'...', \danog\MadelineProto\Logger::VERBOSE); + $this->incomingUpdates[] = $update; + return $this->channelId; } public function save($update) { - $this->parsedUpdates []= $update; + $this->parsedUpdates[] = $update; } public function saveMessages($messages) { foreach ($messages as $message) { - $this->parsedUpdates []= ['_' => $this->channelId === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => -1, 'pts_count' => -1]; + $this->parsedUpdates[] = ['_' => $this->channelId === false ? 'updateNewMessage' : 'updateNewChannelMessage', 'message' => $message, 'pts' => -1, 'pts_count' => -1]; } } diff --git a/src/danog/MadelineProto/Loop/Update/SeqLoop.php b/src/danog/MadelineProto/Loop/Update/SeqLoop.php index 02b7f8d6..4f6feca1 100644 --- a/src/danog/MadelineProto/Loop/Update/SeqLoop.php +++ b/src/danog/MadelineProto/Loop/Update/SeqLoop.php @@ -31,8 +31,8 @@ class SeqLoop extends ResumableSignalLoop { use \danog\MadelineProto\Tools; private $incomingUpdates = []; - private $channelId; private $feeder; + private $pendingWakeups = []; public function __construct($API) { @@ -81,21 +81,22 @@ class SeqLoop extends ResumableSignalLoop $this->exitedLoop(); return; } - $result = []; while ($this->incomingUpdates) { $updates = $this->incomingUpdates; $this->incomingUpdates = []; - $result = array_merge(yield $this->parse($updates), $result); + yield $this->parse($updates); $updates = null; } - foreach ($result as $channelId => $boh) { - $this->API->feeders[$channelId]->resumeDefer(); + while ($this->pendingWakeups) { + reset($this->pendingWakeups); + $channelId = key($this->pendingWakeups); + unset($this->pendingWakeups[$channelId]); + $this->API->feeders[$channelId]->resume(); } } } public function parse($updates) { - $fresult = []; reset($updates); while ($updates) { $options = []; @@ -128,9 +129,8 @@ class SeqLoop extends ResumableSignalLoop $this->state->date($options['date']); } - $fresult = array_merge(yield $this->save($update), $fresult); + yield $this->save($update); } - return $fresult; } public function feed($updates) { @@ -138,11 +138,11 @@ class SeqLoop extends ResumableSignalLoop } public function save($updates) { - $result = []; - foreach ($updates['updates'] as $update) { - $result[yield $this->API->feedSingle($update)] = true; - } - return $result; + $this->pendingWakeups = array_merge($this->pendingWakeups, yield $this->feeder->feed($updates['updates'])); + } + public function addPendingWakeups($wakeups) + { + $this->pendingWakeups = array_merge($wakeups, $this->pendingWakeups); } public function has_all_auth() { diff --git a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php index 8428a8ae..cdbc540b 100644 --- a/src/danog/MadelineProto/Loop/Update/UpdateLoop.php +++ b/src/danog/MadelineProto/Loop/Update/UpdateLoop.php @@ -68,6 +68,7 @@ class UpdateLoop extends ResumableSignalLoop return; } } + $result = []; $toPts = $this->toPts; $this->toPts = null; while (true) { @@ -98,7 +99,7 @@ class UpdateLoop extends ResumableSignalLoop $difference['pts'] = $state->pts() + 1; } $state->update($difference); - $feeder->feed($difference['other_updates']); + $result = array_merge($result, yield $feeder->feed($difference['other_updates'])); $feeder->saveMessages($difference['new_messages']); if (!$difference['final']) { @@ -135,8 +136,8 @@ class UpdateLoop extends ResumableSignalLoop foreach ($difference['new_encrypted_messages'] as &$encrypted) { $encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted]; } - $feeder->feed($difference['other_updates']); - $feeder->feed($difference['new_encrypted_messages']); + $result = array_merge($result, yield $feeder->feed($difference['other_updates'])); + $result = array_merge($result, yield $feeder->feed($difference['new_encrypted_messages'])); $feeder->saveMessages($difference['new_messages']); $state->update($difference['state']); unset($difference); @@ -145,8 +146,8 @@ class UpdateLoop extends ResumableSignalLoop foreach ($difference['new_encrypted_messages'] as &$encrypted) { $encrypted = ['_' => 'updateNewEncryptedMessage', 'message' => $encrypted]; } - $feeder->feed($difference['other_updates']); - $feeder->feed($difference['new_encrypted_messages']); + $result = array_merge($result, yield $feeder->feed($difference['other_updates'])); + $result = array_merge($result, yield $feeder->feed($difference['new_encrypted_messages'])); $feeder->saveMessages($difference['new_messages']); $state->update($difference['intermediate_state']); if ($difference['intermediate_state']['pts'] >= $toPts) { @@ -160,7 +161,9 @@ class UpdateLoop extends ResumableSignalLoop } } } - $feeder->resumeDefer(); + foreach ($result as $channelId) { + $this->API->feeders[$channelId]->resumeDefer(); + } if (yield $this->waitSignal($this->pause($timeout))) { $API->logger->logger("Exiting update loop in channel {$this->channelId}"); $this->exitedLoop(); diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index f990efad..08ac783a 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -21,15 +21,15 @@ namespace danog\MadelineProto; use Amp\Loop; use danog\MadelineProto\Async\AsyncConstruct; +use danog\MadelineProto\Loop\Update\FeedLoop; +use danog\MadelineProto\Loop\Update\SeqLoop; +use danog\MadelineProto\Loop\Update\UpdateLoop; use danog\MadelineProto\MTProtoTools\CombinedUpdatesState; use danog\MadelineProto\MTProtoTools\ReferenceDatabase; use danog\MadelineProto\MTProtoTools\UpdatesState; use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream; use danog\MadelineProto\Stream\MTProtoTransport\HttpStream; use danog\MadelineProto\TL\TLCallback; -use danog\MadelineProto\Loop\Update\UpdateLoop; -use danog\MadelineProto\Loop\Update\FeedLoop; -use danog\MadelineProto\Loop\Update\SeqLoop; /** * Manages all of the mtproto stuff. @@ -223,7 +223,7 @@ class MTProto extends AsyncConstruct implements TLCallback public function __sleep() { - return ['supportUser', 'referenceDatabase', 'channel_participants', 'event_handler', 'event_handler_instance', 'loop_callback', 'web_template', 'encrypted_layer', 'settings', 'config', 'authorization', 'authorized', 'rsa_keys', 'dh_config', 'chats', 'last_stored', 'qres', 'pending_updates', 'updates_state', 'got_state', 'channels_state', 'updates', 'updates_key', 'full_chats', 'msg_ids', 'dialog_params', 'datacenter', 'v', 'constructors', 'td_constructors', 'methods', 'td_methods', 'td_descriptions', 'tl_callbacks', 'temp_requested_secret_chats', 'temp_rekeyed_secret_chats', 'secret_chats', 'hook_url', 'storage', 'authorized_dc', 'tos']; + return ['supportUser', 'referenceDatabase', 'channel_participants', 'event_handler', 'event_handler_instance', 'loop_callback', 'web_template', 'encrypted_layer', 'settings', 'config', 'authorization', 'authorized', 'rsa_keys', 'dh_config', 'chats', 'last_stored', 'qres', 'updates_state', 'got_state', 'channels_state', 'updates', 'updates_key', 'full_chats', 'msg_ids', 'dialog_params', 'datacenter', 'v', 'constructors', 'td_constructors', 'methods', 'td_methods', 'td_descriptions', 'tl_callbacks', 'temp_requested_secret_chats', 'temp_rekeyed_secret_chats', 'secret_chats', 'hook_url', 'storage', 'authorized_dc', 'tos']; } public function isAltervista() @@ -840,6 +840,22 @@ class MTProto extends AsyncConstruct implements TLCallback // Connects to all datacenters and if necessary creates authorization keys, binds them and writes client info public function connect_to_all_dcs_async(): \Generator { + foreach ($this->channels_state->get() as $state) { + $channelId = $state->getChannel(); + if (!isset($this->feeders[$channelId])) { + $this->feeders[$channelId] = new FeedLoop($this, $channelId); + } + if (!isset($this->updaters[$channelId])) { + $this->updaters[$channelId] = new UpdateLoop($this, $channelId); + } + $this->feeders[$channelId]->start(); + $this->updaters[$channelId]->start(); + } + if (!isset($this->seqUpdater)) { + $this->seqUpdater = new SeqLoop($this); + } + $this->seqUpdater->start(); + $this->datacenter->__construct($this, $this->settings['connection'], $this->settings['connection_settings']); $dcs = []; foreach ($this->datacenter->get_dcs() as $new_dc) { @@ -859,21 +875,6 @@ class MTProto extends AsyncConstruct implements TLCallback yield $this->get_phone_config_async(); - foreach ($this->channels_state->get() as $state) { - $channelId = $state->getChannel(); - if (!isset($this->feeders[$channelId])) { - $this->feeders[$channelId] = new FeedLoop($this, $channelId); - } - if (!isset($this->updaters[$channelId])) { - $this->updaters[$channelId] = new UpdateLoop($this, $channelId); - } - $this->feeders[$channelId]->start(); - $this->updaters[$channelId]->start(); - } - if (!isset($this->seqUpdater)) { - $this->seqUpdater = new SeqLoop($this); - } - $this->seqUpdater->start(); } public function get_phone_config_async($watcherId = null) diff --git a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php index ee586d79..3d0b219e 100644 --- a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php @@ -341,7 +341,7 @@ trait ResponseHandler unset($request['serialized_body']); } - $this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); + $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); return; } @@ -349,7 +349,7 @@ trait ResponseHandler case 500: if ($response['error_message'] === 'MSG_WAIT_FAILED') { $this->datacenter->sockets[$datacenter]->call_queue[$request['queue']] = []; - $this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); + $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); return; } $this->got_response_for_outgoing_message_id($request_id, $datacenter); @@ -370,7 +370,7 @@ trait ResponseHandler $this->settings['connection_settings']['default_dc'] = $this->authorized_dc = $this->datacenter->curdc; } Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter]); - //$this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]); + //$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]); return; case 401: @@ -511,7 +511,7 @@ trait ResponseHandler switch ($response['error_code']) { case 48: $this->datacenter->sockets[$datacenter]->temp_auth_key['server_salt'] = $response['new_server_salt']; - $this->method_recall(['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); + $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); return; case 16: @@ -570,8 +570,7 @@ trait ResponseHandler $updates = $actual_updates; } - $this->logger->logger('Parsing updates received via the socket...', \danog\MadelineProto\Logger::VERBOSE); - $result = []; + $this->logger->logger('Parsing updates ('.$updates['_'].') received via the socket...', \danog\MadelineProto\Logger::VERBOSE); switch ($updates['_']) { case 'updates': case 'updatesCombined': @@ -580,11 +579,11 @@ trait ResponseHandler $update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' || $update['_'] === 'updateReadHistoryInbox' || $update['_'] === 'updateReadHistoryOutbox' || $update['_'] === 'updateWebPage' || $update['_'] === 'updateMessageID') { - $result[yield $this->feedSingle($update)] = true; + $result[yield $this->feeder[false]->feedSingle($update)] = true; unset($updates['updates'][$key]); } } - + $this->seqUpdater->addPendingWakeups($result); if ($updates['updates']) { if ($updates['_'] === 'updatesCombined') { $updates['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; @@ -592,19 +591,18 @@ trait ResponseHandler $updates['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; } $this->seqUpdater->feed($updates); - $this->seqUpdater->resumeDefer(); } + $this->seqUpdater->resume(); break; case 'updateShort': - $updates['update']['options'] = ['date' => $updates['date']]; - $this->feedSingle($updates['update']); + $this->feeders[yield $this->feeder[false]->feedSingle($update)]->resume(); break; case 'updateShortMessage': case 'updateShortChatMessage': $from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']); $to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']); if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) { - yield $this->updaters[false]->resumeDefer(); + yield $this->updaters[false]->resume(); return; // TOFIX } @@ -624,21 +622,17 @@ trait ResponseHandler break; } $update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']]; - $updates['update']['options'] = ['date' => $updates['date']]; - $result[yield $this->feedSingle($update)] = true; + $this->feeders[yield $this->feeders[false]->feedSingle($update)]->resume(); break; case 'updateShortSentMessage': //yield $this->set_update_state_async(['date' => $updates['date']]); break; case 'updatesTooLong': - $this->updaters[false]->resumeDefer(); + $this->updaters[false]->resume(); break; default: throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.var_export($updates, true)); break; } - foreach ($result as $channelId => $Boh) { - $this->feeders[$channelId]->resumeDefer(); - } } } diff --git a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php index ffec960c..b8ba9d49 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php @@ -31,13 +31,11 @@ use danog\MadelineProto\Loop\Update\UpdateLoop; */ trait UpdateHandler { - private $pending_updates = []; private $updates_state; private $got_state = false; private $channels_state; public $updates = []; public $updates_key = 0; - public $last_getdifference = 0; public function pwr_update_handler($update) { @@ -132,9 +130,13 @@ trait UpdateHandler return $this->updates_state; } - public function loadChannelState($channelId = null) + public function loadChannelState($channelId = null, $init = []) { - return $this->channels_state->get($channelId); + return $this->channels_state->get($channelId, $init); + } + public function getChannelStates() + { + return $this->channels_state; } public function get_updates_state_async() @@ -145,96 +147,6 @@ trait UpdateHandler return $data; } - public function feedSingle($update) - { - if (!$this->settings['updates']['handle_updates']) { - return; - } - $this->logger->logger('Handling an update of type '.$update['_'].'...', \danog\MadelineProto\Logger::VERBOSE); - $channelId = false; - switch ($update['_']) { - case 'updateChannelWebPage': - case 'updateNewChannelMessage': - case 'updateEditChannelMessage': - $channelId = $update['message']['to_id']['channel_id']; - break; - case 'updateDeleteChannelMessages': - $channelId = $update['channel_id']; - break; - case 'updateChannelTooLong': - $channelId = $update['channel_id']; - if (!$this->channels_state->has($channelId) && !isset($update['pts'])) { - $update['pts'] = 1; - } - break; - } - - if ($channelId && !$this->channels_state->has($channelId)) { - $this->channels_state->get($channelId, $update); - if (!isset($this->feeders[$channelId])) { - $this->feeders[$channelId] = new FeedLoop($this, $channelId); - } - if (!isset($this->updaters[$channelId])) { - $this->updaters[$channelId] = new UpdateLoop($this, $channelId); - } - $this->feeders[$channelId]->start(); - $this->updaters[$channelId]->start(); - } - - switch ($update['_']) { - case 'updateNewMessage': - case 'updateEditMessage': - case 'updateNewChannelMessage': - case 'updateEditChannelMessage': - $to = false; - $from = false; - $via_bot = false; - $entities = false; - if (($from = isset($update['message']['from_id']) && !yield $this->peer_isset_async($update['message']['from_id'])) || - ($to = !yield $this->peer_isset_async($update['message']['to_id'])) || - ($via_bot = isset($update['message']['via_bot_id']) && !yield $this->peer_isset_async($update['message']['via_bot_id'])) || - ($entities = isset($update['message']['entities']) && !yield $this->entities_peer_isset_async($update['message']['entities'])) // || - //isset($update['message']['fwd_from']) && !yield $this->fwd_peer_isset_async($update['message']['fwd_from']) - ) { - $log = ''; - if ($from) { - $log .= "from_id {$update['message']['from_id']}, "; - } - - if ($to) { - $log .= "to_id ".json_encode($update['message']['to_id']).", "; - } - - if ($via_bot) { - $log .= "via_bot {$update['message']['via_bot_id']}, "; - } - - if ($entities) { - $log .= "entities ".json_encode($update['message']['entities']).", "; - } - - $this->logger->logger("Not enough data: for message update $log, getting difference...", \danog\MadelineProto\Logger::VERBOSE); - if ($channelId !== false && yield $this->peer_isset_async($this->to_supergroup($channelId))) { - $this->updaters[$channelId]->resumeDefer(); - } else { - $this->updaters[false]->resumeDefer(); - } - - return; - } - break; - default: - if ($channelId !== false && !yield $this->peer_isset_async($this->to_supergroup($channelId))) { - $this->logger->logger('Skipping update, I do not have the channel id '.$channelId, \danog\MadelineProto\Logger::ERROR); - - return; - } - break; - } - $this->feeders[$channelId]->feedSingle($update); - return $channelId; - } - public function save_update_async($update) { if ($update['_'] === 'updateConfig') {