From a1d139d526aa8d24938f69b37e3479ffa3372706 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sat, 31 Aug 2019 23:07:20 +0200 Subject: [PATCH] Refactor --- src/danog/MadelineProto/Connection.php | 7 + .../MadelineProto/DataCenterConnection.php | 3 +- .../AckHandler.php | 2 +- .../MsgIdHandler.php | 2 +- .../ResponseHandler.php | 243 ++++++------------ .../SaltHandler.php | 2 +- .../SeqNoHandler.php | 2 +- .../Session.php | 9 +- .../MTProtoTools/UpdateHandler.php | 80 ++++++ 9 files changed, 176 insertions(+), 174 deletions(-) rename src/danog/MadelineProto/{MTProtoConnection => MTProtoSession}/AckHandler.php (98%) rename src/danog/MadelineProto/{MTProtoConnection => MTProtoSession}/MsgIdHandler.php (99%) rename src/danog/MadelineProto/{MTProtoConnection => MTProtoSession}/ResponseHandler.php (72%) rename src/danog/MadelineProto/{MTProtoConnection => MTProtoSession}/SaltHandler.php (96%) rename src/danog/MadelineProto/{MTProtoConnection => MTProtoSession}/SeqNoHandler.php (98%) rename src/danog/MadelineProto/{MTProtoConnection => MTProtoSession}/Session.php (91%) diff --git a/src/danog/MadelineProto/Connection.php b/src/danog/MadelineProto/Connection.php index ecfdc796..c53c88b7 100644 --- a/src/danog/MadelineProto/Connection.php +++ b/src/danog/MadelineProto/Connection.php @@ -87,6 +87,13 @@ class Connection extends Session */ protected $API; + /** + * DC ID + * + * @var string + */ + protected $datacenter; + /** * Whether the socket is reading data. * diff --git a/src/danog/MadelineProto/DataCenterConnection.php b/src/danog/MadelineProto/DataCenterConnection.php index cc11390c..701647d0 100644 --- a/src/danog/MadelineProto/DataCenterConnection.php +++ b/src/danog/MadelineProto/DataCenterConnection.php @@ -155,7 +155,8 @@ class DataCenterConnection $this->connections = []; for ($x = 0; $x < $count; $x++) { - $this->connections[$x] = yield $ctx->getStream(); + $this->connections[$x] = new Connection(); + yield $this->connections[$x]->connect(yield $ctx->getStream()); $ctx = $this->ctx->getCtx(); } } diff --git a/src/danog/MadelineProto/MTProtoConnection/AckHandler.php b/src/danog/MadelineProto/MTProtoSession/AckHandler.php similarity index 98% rename from src/danog/MadelineProto/MTProtoConnection/AckHandler.php rename to src/danog/MadelineProto/MTProtoSession/AckHandler.php index 4d2b8cf2..89739a3f 100644 --- a/src/danog/MadelineProto/MTProtoConnection/AckHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/AckHandler.php @@ -17,7 +17,7 @@ * @link https://docs.madelineproto.xyz MadelineProto documentation */ -namespace danog\MadelineProto\Stream\MTProtoTools; +namespace danog\MadelineProto\MTProtoSession; /** * Manages acknowledgement of messages. diff --git a/src/danog/MadelineProto/MTProtoConnection/MsgIdHandler.php b/src/danog/MadelineProto/MTProtoSession/MsgIdHandler.php similarity index 99% rename from src/danog/MadelineProto/MTProtoConnection/MsgIdHandler.php rename to src/danog/MadelineProto/MTProtoSession/MsgIdHandler.php index 543663ab..61b34416 100644 --- a/src/danog/MadelineProto/MTProtoConnection/MsgIdHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/MsgIdHandler.php @@ -17,7 +17,7 @@ * @link https://docs.madelineproto.xyz MadelineProto documentation */ -namespace danog\MadelineProto\Stream\MTProtoTools; +namespace danog\MadelineProto\MTProtoSession; /** * Manages message ids. diff --git a/src/danog/MadelineProto/MTProtoConnection/ResponseHandler.php b/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php similarity index 72% rename from src/danog/MadelineProto/MTProtoConnection/ResponseHandler.php rename to src/danog/MadelineProto/MTProtoSession/ResponseHandler.php index d38c0ac6..e876f742 100644 --- a/src/danog/MadelineProto/MTProtoConnection/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/ResponseHandler.php @@ -17,7 +17,7 @@ * @link https://docs.madelineproto.xyz MadelineProto documentation */ -namespace danog\MadelineProto\Stream\MTProtoTools; +namespace danog\MadelineProto\MTProtoSession; use Amp\Loop; @@ -50,16 +50,13 @@ trait ResponseHandler } $info .= \chr($cur_info); } - $this->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['datacenter' => $datacenter, 'postpone' => true])]['response'] = $req_msg_id; + $this->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['postpone' => true])]['response'] = $req_msg_id; } public $n = 0; - public function handle_messages($datacenter, $actual_datacenter = null) + public function handle_messages() { - if ($actual_datacenter) { - $datacenter = $actual_datacenter; - } $only_updates = true; while ($this->new_incoming) { \reset($this->new_incoming); @@ -68,7 +65,7 @@ trait ResponseHandler unset($this->new_incoming[$current_msg_id]); continue; } - $this->logger->logger((isset($this->incoming_messages[$current_msg_id]['from_container']) ? 'Inside of container, received ' : 'Received ').$this->incoming_messages[$current_msg_id]['content']['_'].' from DC '.$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE); + $this->logger->logger((isset($this->incoming_messages[$current_msg_id]['from_container']) ? 'Inside of container, received ' : 'Received ').$this->incoming_messages[$current_msg_id]['content']['_'].' from DC '.$this->datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE); switch ($this->incoming_messages[$current_msg_id]['content']['_']) { case 'msgs_ack': @@ -76,7 +73,7 @@ trait ResponseHandler $this->check_in_seq_no($current_msg_id); $only_updates = false; foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { - $this->ack_outgoing_message_id($msg_id, $datacenter); + $this->ack_outgoing_message_id($msg_id); // Acknowledge that the server received my message } @@ -84,14 +81,14 @@ trait ResponseHandler break; case 'rpc_result': unset($this->new_incoming[$current_msg_id]); - $this->ack_incoming_message_id($current_msg_id, $datacenter); + $this->ack_incoming_message_id($current_msg_id); $only_updates = false; // Acknowledge that the server received my request $req_msg_id = $this->incoming_messages[$current_msg_id]['content']['req_msg_id']; $this->incoming_messages[$current_msg_id]['content'] = $this->incoming_messages[$current_msg_id]['content']['result']; $this->check_in_seq_no($current_msg_id); - $this->handle_response($req_msg_id, $current_msg_id, $datacenter); + $this->handle_response($req_msg_id, $current_msg_id); break; case 'future_salts': @@ -108,7 +105,7 @@ trait ResponseHandler $this->check_in_seq_no($current_msg_id); $only_updates = false; - $this->handle_response($this->incoming_messages[$current_msg_id]['content'][$msg_id_type], $current_msg_id, $datacenter); + $this->handle_response($this->incoming_messages[$current_msg_id]['content'][$msg_id_type], $current_msg_id); unset($msg_id_type); break; @@ -118,10 +115,10 @@ trait ResponseHandler $only_updates = false; $this->temp_auth_key['server_salt'] = $this->incoming_messages[$current_msg_id]['content']['server_salt']; - $this->ack_incoming_message_id($current_msg_id, $datacenter); + $this->ack_incoming_message_id($current_msg_id); // Acknowledge that I received the server's response - if ($this->authorized === self::LOGGED_IN && !$this->initing_authorization && $this->datacenter->sockets[$this->datacenter->curdc]->temp_auth_key !== null && isset($this->updaters[false])) { + if ($this->authorized === self::LOGGED_IN && !$this->initing_authorization && $this->API->datacenter->sockets[$this->API->datacenter->curdc]->temp_auth_key !== null && isset($this->updaters[false])) { $this->updaters[false]->resumeDefer(); } @@ -137,7 +134,7 @@ trait ResponseHandler $this->new_incoming[$message['msg_id']] = $message['msg_id']; } \ksort($this->new_incoming); - //$this->handle_messages($datacenter); + //$this->handle_messages(); //$this->check_in_seq_no($current_msg_id); unset($this->incoming_messages[$current_msg_id]['content']); @@ -147,10 +144,10 @@ trait ResponseHandler $this->check_in_seq_no($current_msg_id); $only_updates = false; - $this->ack_incoming_message_id($current_msg_id, $datacenter); + $this->ack_incoming_message_id($current_msg_id); // Acknowledge that I received the server's response if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id']])) { - $this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id'], $datacenter); + $this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id']); // Acknowledge that I received the server's response } else { $message = $this->incoming_messages[$current_msg_id]['content']; @@ -177,7 +174,7 @@ trait ResponseHandler $only_updates = false; unset($this->new_incoming[$current_msg_id]); - $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter)); + $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'])); unset($this->incoming_messages[$current_msg_id]['content']); break; case 'msgs_all_info': @@ -190,7 +187,7 @@ trait ResponseHandler $msg_id = new \phpseclib\Math\BigInteger(\strrev($msg_id), 256); $status = 'Status for message id '.$msg_id.': '; /*if ($info & 4) { - *$this->got_response_for_outgoing_message_id($msg_id, $datacenter); + *$this->got_response_for_outgoing_message_id($msg_id); *} */ foreach (self::MSGS_INFO_FLAGS as $flag => $description) { @@ -208,9 +205,9 @@ trait ResponseHandler $only_updates = false; if (isset($this->outgoing_messages[$this->incoming_messages[$current_msg_id]['content']['msg_id']])) { if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) { - $this->handle_response($this->incoming_messages[$current_msg_id]['content']['msg_id'], $this->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter); + $this->handle_response($this->incoming_messages[$current_msg_id]['content']['msg_id'], $this->incoming_messages[$current_msg_id]['content']['answer_msg_id']); } else { - $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true])); + $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['postpone' => true])); } } break; @@ -220,9 +217,9 @@ trait ResponseHandler unset($this->new_incoming[$current_msg_id]); if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) { - $this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter); + $this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['answer_msg_id']); } else { - $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true])); + $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['postpone' => true])); } break; case 'msg_resend_req': @@ -238,10 +235,10 @@ trait ResponseHandler } if ($ok) { foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { - $this->method_recall('', ['message_id' => $msg_id, 'datacenter' => $datacenter, 'postpone' => true]); + $this->method_recall('', ['message_id' => $msg_id, 'postpone' => true]); } } else { - $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter)); + $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'])); } break; case 'msg_resend_ans_req': @@ -249,16 +246,16 @@ trait ResponseHandler $only_updates = false; unset($this->new_incoming[$current_msg_id]); - $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter)); + $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'])); foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { if (isset($this->incoming_messages[$msg_id]['response']) && isset($this->outgoing_messages[$this->incoming_messages[$msg_id]['response']])) { - $this->callFork($this->object_call_async($this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['_'], $this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter, 'postpone' => true])); + $this->callFork($this->object_call_async($this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['_'], $this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['body'], ['postpone' => true])); } } break; default: $this->check_in_seq_no($current_msg_id); - $this->ack_incoming_message_id($current_msg_id, $datacenter); + $this->ack_incoming_message_id($current_msg_id); // Acknowledge that I received the server's response $response_type = $this->constructors->find_by_predicate($this->incoming_messages[$current_msg_id]['content']['_'])['type']; @@ -266,8 +263,8 @@ trait ResponseHandler case 'Updates': unset($this->new_incoming[$current_msg_id]); - if (\strpos($datacenter, 'cdn') === false) { - $this->callForkDefer($this->handle_updates_async($this->incoming_messages[$current_msg_id]['content'])); + if (\strpos($this->datacenter, 'cdn') === false) { + $this->callForkDefer($this->API->handle_updates_async($this->incoming_messages[$current_msg_id]['content'])); } unset($this->incoming_messages[$current_msg_id]['content']); @@ -287,7 +284,7 @@ trait ResponseHandler if ($response_type === $expecting['type']) { $this->logger->logger('Yes', \danog\MadelineProto\Logger::VERBOSE); unset($this->new_incoming[$current_msg_id]); - $this->handle_response($expecting_msg_id, $current_msg_id, $datacenter); + $this->handle_response($expecting_msg_id, $current_msg_id); break 2; } $this->logger->logger('No', \danog\MadelineProto\Logger::VERBOSE); @@ -308,7 +305,7 @@ trait ResponseHandler return $only_updates; } - public function handle_reject($datacenter, &$request, $data) + public function handle_reject(&$request, $data) { if (isset($request['promise']) && \is_object($request['promise'])) { Loop::defer(function () use (&$request, $data) { @@ -330,7 +327,7 @@ trait ResponseHandler }); } elseif (isset($request['container'])) { foreach ($request['container'] as $message_id) { - $this->handle_reject($datacenter, $this->outgoing_messages[$message_id], $data); + $this->handle_reject($this->outgoing_messages[$message_id], $data); } } else { $this->logger->logger('Rejecting: already got response for '.(isset($request['_']) ? $request['_'] : '-')); @@ -352,8 +349,8 @@ trait ResponseHandler } if (\in_array($response['error_message'], ['PERSISTENT_TIMESTAMP_EMPTY', 'PERSISTENT_TIMESTAMP_OUTDATED', 'PERSISTENT_TIMESTAMP_INVALID'])) { - $this->got_response_for_outgoing_message_id($request_id, $datacenter); - $this->handle_reject($datacenter, $request, new \danog\MadelineProto\PTSException($response['error_message'])); + $this->got_response_for_outgoing_message_id($request_id); + $this->handle_reject($request, new \danog\MadelineProto\PTSException($response['error_message'])); return; } @@ -365,7 +362,7 @@ trait ResponseHandler unset($request['serialized_body']); } - $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); + $this->method_recall('', ['message_id' => $request_id, 'postpone' => true]); return; } @@ -374,33 +371,33 @@ trait ResponseHandler case -500: if ($response['error_message'] === 'MSG_WAIT_FAILED') { $this->call_queue[$request['queue']] = []; - $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); + $this->method_recall('', ['message_id' => $request_id, 'postpone' => true]); return; } if (\in_array($response['error_message'], ['MSGID_DECREASE_RETRY', 'RPC_CALL_FAIL', 'RPC_MCGET_FAIL', 'no workers running'])) { - Loop::delay(1 * 1000, [$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]); + Loop::delay(1 * 1000, [$this, 'method_recall'], ['message_id' => $request_id, ]); return; } - $this->got_response_for_outgoing_message_id($request_id, $datacenter); + $this->got_response_for_outgoing_message_id($request_id); - $this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); + $this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); return; case 303: $old_datacenter = $datacenter; - $this->datacenter->curdc = $datacenter = (int) \preg_replace('/[^0-9]+/', '', $response['error_message']); + $this->API->datacenter->curdc = $datacenter = (int) \preg_replace('/[^0-9]+/', '', $response['error_message']); - if (isset($request['file']) && $request['file'] && isset($this->datacenter->sockets[$datacenter.'_media'])) { + if (isset($request['file']) && $request['file'] && isset($this->API->datacenter->sockets[$datacenter.'_media'])) { \danog\MadelineProto\Logger::log('Using media DC'); $datacenter .= '_media'; } if (isset($request['user_related']) && $request['user_related']) { - $this->settings['connection_settings']['default_dc'] = $this->authorized_dc = $this->datacenter->curdc; + $this->settings['connection_settings']['default_dc'] = $this->API->authorized_dc = $this->API->datacenter->curdc; } - Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter]); - //$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]); + Loop::defer([$this->API, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter]); + //$this->API->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]); return; case 401: @@ -408,10 +405,10 @@ trait ResponseHandler case 'USER_DEACTIVATED': case 'SESSION_REVOKED': case 'SESSION_EXPIRED': - $this->got_response_for_outgoing_message_id($request_id, $datacenter); + $this->got_response_for_outgoing_message_id($request_id); $this->logger->logger($response['error_message'], \danog\MadelineProto\Logger::FATAL_ERROR); - foreach ($this->datacenter->sockets as $socket) { + foreach ($this->API->datacenter->sockets as $socket) { $socket->temp_auth_key = null; $socket->session_id = null; $socket->auth_key = null; @@ -428,24 +425,24 @@ trait ResponseHandler $this->logger->logger('If you intentionally deleted this account, ignore this message.', \danog\MadelineProto\Logger::FATAL_ERROR); } - $this->resetSession(); + $this->API->resetSession(); - $this->callFork((function () use ($datacenter, &$request, &$response) { - yield $this->init_authorization_async(); + $this->callFork((function () use (&$request, &$response) { + yield $this->API->init_authorization_async(); - $this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); + $this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); })()); return; case 'AUTH_KEY_UNREGISTERED': case 'AUTH_KEY_INVALID': if ($this->authorized !== self::LOGGED_IN) { - $this->got_response_for_outgoing_message_id($request_id, $datacenter); + $this->got_response_for_outgoing_message_id($request_id); - $this->callFork((function () use ($datacenter, &$request, &$response) { - yield $this->init_authorization_async(); + $this->callFork((function () use (&$request, &$response) { + yield $this->API->init_authorization_async(); - $this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); + $this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); })()); return; @@ -457,11 +454,11 @@ trait ResponseHandler $this->logger->logger('Auth key not registered, resetting temporary and permanent auth keys...', \danog\MadelineProto\Logger::ERROR); - if ($this->authorized_dc === $datacenter && $this->authorized === self::LOGGED_IN) { - $this->got_response_for_outgoing_message_id($request_id, $datacenter); + if ($this->API->authorized_dc === $this->datacenter && $this->authorized === self::LOGGED_IN) { + $this->got_response_for_outgoing_message_id($request_id); $this->logger->logger('Permanent auth key was main authorized key, logging out...', \danog\MadelineProto\Logger::FATAL_ERROR); - foreach ($this->datacenter->sockets as $socket) { + foreach ($this->API->datacenter->sockets as $socket) { $socket->temp_auth_key = null; $socket->auth_key = null; $socket->authorized = false; @@ -474,20 +471,20 @@ trait ResponseHandler $this->logger->logger('Then login again.', \danog\MadelineProto\Logger::FATAL_ERROR); $this->logger->logger('If you intentionally deleted this account, ignore this message.', \danog\MadelineProto\Logger::FATAL_ERROR); - $this->resetSession(); + $this->API->resetSession(); - $this->callFork((function () use ($datacenter, &$request, &$response) { - yield $this->init_authorization_async(); + $this->callFork((function () use (&$request, &$response) { + yield $this->API->init_authorization_async(); - $this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); + $this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); })()); return; } - $this->callFork((function () use ($request_id, $datacenter) { - yield $this->init_authorization_async(); + $this->callFork((function () use ($request_id) { + yield $this->API->init_authorization_async(); - $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]); + $this->method_recall('', ['message_id' => $request_id, ]); })()); return; @@ -495,35 +492,35 @@ trait ResponseHandler $this->logger->logger('Temporary auth key not bound, resetting temporary auth key...', \danog\MadelineProto\Logger::ERROR); $this->temp_auth_key = null; - $this->callFork((function () use ($request_id, $datacenter) { - yield $this->init_authorization_async(); - $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]); + $this->callFork((function () use ($request_id) { + yield $this->API->init_authorization_async(); + $this->method_recall('', ['message_id' => $request_id, ]); })()); return; } - $this->got_response_for_outgoing_message_id($request_id, $datacenter); + $this->got_response_for_outgoing_message_id($request_id); - $this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); + $this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); return; case 420: $seconds = \preg_replace('/[^0-9]+/', '', $response['error_message']); $limit = isset($request['FloodWaitLimit']) ? $request['FloodWaitLimit'] : $this->settings['flood_timeout']['wait_if_lt']; if (\is_numeric($seconds) && $seconds < $limit) { - //$this->got_response_for_outgoing_message_id($request_id, $datacenter); + //$this->got_response_for_outgoing_message_id($request_id); $this->logger->logger('Flood, waiting '.$seconds.' seconds before repeating async call...', \danog\MadelineProto\Logger::NOTICE); $request['sent'] += $seconds; - Loop::delay($seconds * 1000, [$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]); + Loop::delay($seconds * 1000, [$this, 'method_recall'], ['message_id' => $request_id, ]); return; } // no break default: - $this->got_response_for_outgoing_message_id($request_id, $datacenter); + $this->got_response_for_outgoing_message_id($request_id); - $this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); + $this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : '')); return; } @@ -539,7 +536,7 @@ trait ResponseHandler switch ($response['error_code']) { case 48: $this->temp_auth_key['server_salt'] = $response['new_server_salt']; - $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); + $this->method_recall('', ['message_id' => $request_id, 'postpone' => true]); return; case 16: @@ -548,15 +545,15 @@ trait ResponseHandler $this->logger->logger('Set time delta to '.$this->time_delta, \danog\MadelineProto\Logger::WARNING); $this->reset_session(); $this->temp_auth_key = null; - $this->callFork((function () use ($datacenter, $request_id) { - yield $this->init_authorization_async(); - $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]); + $this->callFork((function () use ($request_id) { + yield $this->API->init_authorization_async(); + $this->method_recall('', ['message_id' => $request_id, ]); })()); return; } - $this->got_response_for_outgoing_message_id($request_id, $datacenter); - $this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException('Received bad_msg_notification: '.self::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], isset($request['_']) ? $request['_'] : '')); + $this->got_response_for_outgoing_message_id($request_id); + $this->handle_reject($request, new \danog\MadelineProto\RPCErrorException('Received bad_msg_notification: '.self::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], isset($request['_']) ? $request['_'] : '')); return; } @@ -572,16 +569,16 @@ trait ResponseHandler return; } $botAPI = isset($request['botAPI']) && $request['botAPI']; - if (isset($response['_']) && \strpos($datacenter, 'cdn') === false && $this->constructors->find_by_predicate($response['_'])['type'] === 'Updates') { + if (isset($response['_']) && \strpos($this->datacenter, 'cdn') === false && $this->constructors->find_by_predicate($response['_'])['type'] === 'Updates') { $response['request'] = $request; - $this->callForkDefer($this->handle_updates_async($response)); + $this->callForkDefer($this->API->handle_updates_async($response)); } unset($request); - $this->got_response_for_outgoing_message_id($request_id, $datacenter); + $this->got_response_for_outgoing_message_id($request_id); $r = isset($response['_']) ? $response['_'] : \json_encode($response); $this->logger->logger("Defer sending $r to deferred"); $this->callFork(( - function () use ($request_id, $response, $datacenter, $botAPI) { + function () use ($request_id, $response, $botAPI) { $r = isset($response['_']) ? $response['_'] : \json_encode($response); $this->logger->logger("Deferred: sent $r to deferred"); if ($botAPI) { @@ -602,84 +599,4 @@ trait ResponseHandler } )()); } - - public function handle_updates_async($updates, $actual_updates = null) - { - if (!$this->settings['updates']['handle_updates']) { - return; - } - if ($actual_updates) { - $updates = $actual_updates; - } - $this->logger->logger('Parsing updates ('.$updates['_'].') received via the socket...', \danog\MadelineProto\Logger::VERBOSE); - switch ($updates['_']) { - case 'updates': - case 'updatesCombined': - $result = []; - foreach ($updates['updates'] as $key => $update) { - if ($update['_'] === 'updateNewMessage' || $update['_'] === 'updateReadMessagesContents' || - $update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' || - $update['_'] === 'updateReadHistoryInbox' || $update['_'] === 'updateReadHistoryOutbox' || - $update['_'] === 'updateWebPage' || $update['_'] === 'updateMessageID') { - $result[yield $this->feeders[false]->feedSingle($update)] = true; - unset($updates['updates'][$key]); - } - } - $this->seqUpdater->addPendingWakeups($result); - if ($updates['updates']) { - if ($updates['_'] === 'updatesCombined') { - $updates['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; - } else { - $updates['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; - } - $this->seqUpdater->feed($updates); - } - $this->seqUpdater->resume(); - break; - case 'updateShort': - $this->feeders[yield $this->feeders[false]->feedSingle($updates['update'])]->resume(); - break; - case 'updateShortSentMessage': - if (!isset($updates['request']['body'])) { - break; - } - $updates['user_id'] = (yield $this->get_info_async($updates['request']['body']['peer']))['bot_api_id']; - $updates['message'] = $updates['request']['body']['message']; - unset($updates['request']); - // no break - case 'updateShortMessage': - case 'updateShortChatMessage': - $from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']); - $to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']); - if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) { - yield $this->updaters[false]->resume(); - - return; - } - $message = $updates; - $message['_'] = 'message'; - $message['from_id'] = $from_id; - - try { - $message['to_id'] = (yield $this->get_info_async($to_id))['Peer']; - } catch (\danog\MadelineProto\Exception $e) { - $this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR); - //$this->pending_updates[] = $updates; - break; - } catch (\danog\MadelineProto\RPCErrorException $e) { - $this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR); - //$this->pending_updates[] = $updates; - break; - } - $update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']]; - $this->feeders[yield $this->feeders[false]->feedSingle($update)]->resume(); - break; - case 'updatesTooLong': - $this->updaters[false]->resume(); - break; - default: - throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.\var_export($updates, true)); - break; - } - } } diff --git a/src/danog/MadelineProto/MTProtoConnection/SaltHandler.php b/src/danog/MadelineProto/MTProtoSession/SaltHandler.php similarity index 96% rename from src/danog/MadelineProto/MTProtoConnection/SaltHandler.php rename to src/danog/MadelineProto/MTProtoSession/SaltHandler.php index 5d55886c..3afcd3e3 100644 --- a/src/danog/MadelineProto/MTProtoConnection/SaltHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/SaltHandler.php @@ -17,7 +17,7 @@ * @link https://docs.madelineproto.xyz MadelineProto documentation */ -namespace danog\MadelineProto\Stream\MTProtoTools; +namespace danog\MadelineProto\MTProtoSession; /** * Manages message ids. diff --git a/src/danog/MadelineProto/MTProtoConnection/SeqNoHandler.php b/src/danog/MadelineProto/MTProtoSession/SeqNoHandler.php similarity index 98% rename from src/danog/MadelineProto/MTProtoConnection/SeqNoHandler.php rename to src/danog/MadelineProto/MTProtoSession/SeqNoHandler.php index c0128b82..e247d0d0 100644 --- a/src/danog/MadelineProto/MTProtoConnection/SeqNoHandler.php +++ b/src/danog/MadelineProto/MTProtoSession/SeqNoHandler.php @@ -17,7 +17,7 @@ * @link https://docs.madelineproto.xyz MadelineProto documentation */ -namespace danog\MadelineProto\Stream\MTProtoTools; +namespace danog\MadelineProto\MTProtoSession; /** * Manages sequence number. diff --git a/src/danog/MadelineProto/MTProtoConnection/Session.php b/src/danog/MadelineProto/MTProtoSession/Session.php similarity index 91% rename from src/danog/MadelineProto/MTProtoConnection/Session.php rename to src/danog/MadelineProto/MTProtoSession/Session.php index 274349b6..e2982cbf 100644 --- a/src/danog/MadelineProto/MTProtoConnection/Session.php +++ b/src/danog/MadelineProto/MTProtoSession/Session.php @@ -16,12 +16,10 @@ * @link https://docs.madelineproto.xyz MadelineProto documentation */ -namespace danog\MadelineProto\Stream\MTProtoTools; - -use danog\MadelineProto\Logger; +namespace danog\MadelineProto\MTProtoSession; /** - * Manages MTProto session-specific data + * Manages MTProto session-specific data. */ abstract class Session { @@ -43,5 +41,4 @@ abstract class Session public $call_queue = []; public $ack_queue = []; - -} \ No newline at end of file +} diff --git a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php index d99856c3..aa0046bf 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php @@ -170,6 +170,86 @@ trait UpdateHandler return $data; } + + public function handle_updates_async($updates, $actual_updates = null) + { + if (!$this->settings['updates']['handle_updates']) { + return; + } + if ($actual_updates) { + $updates = $actual_updates; + } + $this->logger->logger('Parsing updates ('.$updates['_'].') received via the socket...', \danog\MadelineProto\Logger::VERBOSE); + switch ($updates['_']) { + case 'updates': + case 'updatesCombined': + $result = []; + foreach ($updates['updates'] as $key => $update) { + if ($update['_'] === 'updateNewMessage' || $update['_'] === 'updateReadMessagesContents' || + $update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' || + $update['_'] === 'updateReadHistoryInbox' || $update['_'] === 'updateReadHistoryOutbox' || + $update['_'] === 'updateWebPage' || $update['_'] === 'updateMessageID') { + $result[yield $this->feeders[false]->feedSingle($update)] = true; + unset($updates['updates'][$key]); + } + } + $this->seqUpdater->addPendingWakeups($result); + if ($updates['updates']) { + if ($updates['_'] === 'updatesCombined') { + $updates['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; + } else { + $updates['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']]; + } + $this->seqUpdater->feed($updates); + } + $this->seqUpdater->resume(); + break; + case 'updateShort': + $this->feeders[yield $this->feeders[false]->feedSingle($updates['update'])]->resume(); + break; + case 'updateShortSentMessage': + if (!isset($updates['request']['body'])) { + break; + } + $updates['user_id'] = (yield $this->get_info_async($updates['request']['body']['peer']))['bot_api_id']; + $updates['message'] = $updates['request']['body']['message']; + unset($updates['request']); + // no break + case 'updateShortMessage': + case 'updateShortChatMessage': + $from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']); + $to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']); + if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) { + yield $this->updaters[false]->resume(); + + return; + } + $message = $updates; + $message['_'] = 'message'; + $message['from_id'] = $from_id; + + try { + $message['to_id'] = (yield $this->get_info_async($to_id))['Peer']; + } catch (\danog\MadelineProto\Exception $e) { + $this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR); + //$this->pending_updates[] = $updates; + break; + } catch (\danog\MadelineProto\RPCErrorException $e) { + $this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR); + //$this->pending_updates[] = $updates; + break; + } + $update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']]; + $this->feeders[yield $this->feeders[false]->feedSingle($update)]->resume(); + break; + case 'updatesTooLong': + $this->updaters[false]->resume(); + break; + default: + throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.\var_export($updates, true)); + break; + } + } public function save_update_async($update) { if ($update['_'] === 'updateConfig') {