Improved response handling
This commit is contained in:
parent
071f63244d
commit
0cc2ea1cef
1
.gitignore
vendored
1
.gitignore
vendored
@ -69,3 +69,4 @@ token.php
|
||||
session.mad
|
||||
*.madeline
|
||||
enc.tar.xz
|
||||
a
|
||||
|
@ -36,6 +36,8 @@ class Connection extends Tools
|
||||
|
||||
public $incoming_messages = [];
|
||||
public $outgoing_messages = [];
|
||||
public $new_incoming = [];
|
||||
public $new_outgoing = [];
|
||||
|
||||
public function __construct($ip, $port, $protocol, $timeout)
|
||||
{
|
||||
|
@ -22,6 +22,19 @@ class MTProto extends MTProtoTools
|
||||
public $waiting_code = false;
|
||||
public $config = ['expires' => -1];
|
||||
public $ipv6 = false;
|
||||
public $bad_msg_error_codes = [
|
||||
16 => 'msg_id too low (most likely, client time is wrong; it would be worthwhile to synchronize it using msg_id notifications and re-send the original message with the “correct” msg_id or wrap it in a container with a new msg_id if the original message had waited too long on the client to be transmitted)',
|
||||
17 => 'msg_id too high (similar to the previous case, the client time has to be synchronized, and the message re-sent with the correct msg_id)',
|
||||
18 => 'incorrect two lower order msg_id bits (the server expects client message msg_id to be divisible by 4)',
|
||||
19 => 'container msg_id is the same as msg_id of a previously received message (this must never happen)',
|
||||
20 => 'message too old, and it cannot be verified whether the server has received a message with this msg_id or not',
|
||||
32 => 'msg_seqno too low (the server has already received a message with a lower msg_id but with either a higher or an equal and odd seqno)',
|
||||
33 => 'msg_seqno too high (similarly, there is a message with a higher msg_id but with either a lower or an equal and odd seqno)',
|
||||
34 => 'an even msg_seqno expected (irrelevant message), but odd received',
|
||||
35 => 'odd msg_seqno expected (relevant message), but even received',
|
||||
48 => 'incorrect server salt (in this case, the bad_server_salt response is received with the correct salt, and the message is to be re-sent with it)',
|
||||
64 => 'invalid container.',
|
||||
];
|
||||
|
||||
public function __construct($settings = [])
|
||||
{
|
||||
|
@ -17,59 +17,24 @@ namespace danog\MadelineProto\MTProtoTools;
|
||||
*/
|
||||
class CallHandler extends AuthKeyHandler
|
||||
{
|
||||
public function wait_for_response($last_sent, $optional_name, $response_type)
|
||||
public function wait_for_response()
|
||||
{
|
||||
foreach ($this->datacenter->new_outgoing as $key => $current) {
|
||||
$response = null;
|
||||
$count = 0;
|
||||
while ($response == null && $count++ < $this->settings['max_tries']['response']) {
|
||||
\danog\MadelineProto\Logger::log('Getting response (try number '.$count.' for '.$optional_name.')...');
|
||||
$last_received = $this->recv_message();
|
||||
$this->handle_message($last_sent, $last_received, $response_type);
|
||||
if (isset($this->datacenter->outgoing_messages[$last_sent]['response']) && isset($this->datacenter->incoming_messages[$this->datacenter->outgoing_messages[$last_sent]['response']]['content'])) {
|
||||
$response = $this->datacenter->incoming_messages[$this->datacenter->outgoing_messages[$last_sent]['response']]['content'];
|
||||
\danog\MadelineProto\Logger::log('Getting response (try number '.$count.' for '.$current['method'].')...');
|
||||
$this->recv_message();
|
||||
$this->handle_messages($current);
|
||||
if (isset($this->datacenter->incoming_messages[$this->datacenter->outgoing_messages[$current['msg_id']]['response']]['content'])) {
|
||||
$response = $this->datacenter->incoming_messages[$this->datacenter->outgoing_messages[$current['msg_id']]['response']]['content'];
|
||||
}
|
||||
}
|
||||
if ($response == null) {
|
||||
throw new \danog\MadelineProto\Exception("Couldn't get response");
|
||||
if ($response === null) {
|
||||
\danog\MadelineProto\Logger::log('Could not get response for '.$current['method'].'!');
|
||||
} else {
|
||||
unset($this->datacenter->new_outgoing[$key]);
|
||||
}
|
||||
switch ($response['_']) {
|
||||
case 'rpc_error':
|
||||
switch ($response['error_code']) {
|
||||
case 303:
|
||||
$dc = preg_replace('/[^0-9]+/', '', $response['error_message']);
|
||||
\danog\MadelineProto\Logger::log('Received request to switch to DC '.$dc);
|
||||
$this->switch_dc($dc);
|
||||
throw new \danog\MadelineProto\Exception('I had to switch to datacenter '.$dc);
|
||||
|
||||
break;
|
||||
case 401:
|
||||
switch ($response['error_message']) {
|
||||
case 'AUTH_KEY_UNREGISTERED':
|
||||
case 'AUTH_KEY_INVALID':
|
||||
unset($this->datacenter->temp_auth_key);
|
||||
unset($this->datacenter->auth_key);
|
||||
$this->init_authorization();
|
||||
case 'USER_DEACTIVATED':
|
||||
case 'SESSION_REVOKED':
|
||||
case 'SESSION_EXPIRED':
|
||||
$this->datacenter->authorized = false;
|
||||
$this->datacenter->authorization = null;
|
||||
throw new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code']);
|
||||
break;
|
||||
}
|
||||
case 420:
|
||||
$seconds = preg_replace('/[^0-9]+/', '', $response['error_message']);
|
||||
\danog\MadelineProto\Logger::log('Flood, waiting '.$seconds.' seconds...');
|
||||
sleep($seconds);
|
||||
throw new \danog\MadelineProto\Exception('Re-executing query...');
|
||||
default:
|
||||
throw new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code']);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
return $response;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -84,7 +49,61 @@ class CallHandler extends AuthKeyHandler
|
||||
$args = $this->tl->get_named_method_args($method, $args);
|
||||
$int_message_id = $this->send_message($this->tl->serialize_method($method, $args), $this->tl->content_related($method), $message_id);
|
||||
$this->datacenter->outgoing_messages[$int_message_id]['content'] = ['method' => $method, 'args' => $args];
|
||||
$server_answer = $this->wait_for_response($int_message_id, $method, $this->tl->methods->find_by_method($method)['type']);
|
||||
$this->datacenter->new_outgoing[$int_message_id] = ['msg_id' => $int_message_id, 'method' => $method, 'type' => $this->tl->methods->find_by_method($method)['type']];
|
||||
$this->wait_for_response();
|
||||
if (!isset($this->datacenter->incoming_messages[$this->datacenter->outgoing_messages[$int_message_id]['response']]['content'])) {
|
||||
throw new \danog\MadelineProto\Exception("Response isn't yet present!");
|
||||
}
|
||||
$server_answer = $this->datacenter->incoming_messages[$this->datacenter->outgoing_messages[$int_message_id]['response']]['content'];
|
||||
if ($server_answer == null) {
|
||||
throw new \danog\MadelineProto\Exception("Couldn't get response");
|
||||
}
|
||||
switch ($server_answer['_']) {
|
||||
case 'rpc_error':
|
||||
switch ($server_answer['error_code']) {
|
||||
case 303:
|
||||
$dc = preg_replace('/[^0-9]+/', '', $server_answer['error_message']);
|
||||
\danog\MadelineProto\Logger::log('Received request to switch to DC '.$dc);
|
||||
$this->switch_dc($dc);
|
||||
throw new \danog\MadelineProto\Exception('I had to switch to datacenter '.$dc);
|
||||
|
||||
break;
|
||||
case 401:
|
||||
switch ($server_answer['error_message']) {
|
||||
case 'AUTH_KEY_UNREGISTERED':
|
||||
case 'AUTH_KEY_INVALID':
|
||||
unset($this->datacenter->temp_auth_key);
|
||||
unset($this->datacenter->auth_key);
|
||||
$this->init_authorization();
|
||||
case 'USER_DEACTIVATED':
|
||||
case 'SESSION_REVOKED':
|
||||
case 'SESSION_EXPIRED':
|
||||
$this->datacenter->authorized = false;
|
||||
$this->datacenter->authorization = null;
|
||||
throw new \danog\MadelineProto\RPCErrorException($server_answer['error_message'], $server_answer['error_code']);
|
||||
break;
|
||||
}
|
||||
case 420:
|
||||
$seconds = preg_replace('/[^0-9]+/', '', $server_answer['error_message']);
|
||||
\danog\MadelineProto\Logger::log('Flood, waiting '.$seconds.' seconds...');
|
||||
sleep($seconds);
|
||||
throw new \danog\MadelineProto\Exception('Re-executing query...');
|
||||
default:
|
||||
throw new \danog\MadelineProto\RPCErrorException($server_answer['error_message'], $server_answer['error_code']);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
case 'bad_server_salt':
|
||||
case 'bad_msg_notification':
|
||||
switch ($server_answer['error_code']) {
|
||||
case 48:
|
||||
$this->datacenter->temp_auth_key['server_salt'] = $server_answer['new_server_salt'];
|
||||
throw new \danog\MadelineProto\Exception('New server salt stored, re-executing query');
|
||||
break;
|
||||
}
|
||||
throw new \danog\MadelineProto\RPCErrorException('Received bad_msg_notification: '.$this->bad_msg_error_codes[$server_answer['error_code']], $server_answer['error_code']);
|
||||
break;
|
||||
}
|
||||
} catch (\danog\MadelineProto\Exception $e) {
|
||||
\danog\MadelineProto\Logger::log('An error occurred while calling method '.$method.': '.$e->getMessage().' in '.basename($e->getFile(), '.php').' on line '.$e->getLine().'. Recreating connection and retrying to call method...');
|
||||
$this->datacenter->close_and_reopen();
|
||||
@ -110,7 +129,6 @@ class CallHandler extends AuthKeyHandler
|
||||
\danog\MadelineProto\Logger::log('Sending object (try number '.$count.' for '.$object.')...');
|
||||
$int_message_id = $this->send_message($this->tl->serialize_object(['type' => $object], $args), $this->tl->content_related($object));
|
||||
$this->datacenter->outgoing_messages[$int_message_id]['content'] = ['object' => $object, 'args' => $args];
|
||||
// $server_answer = $this->wait_for_response($int_message_id);
|
||||
} catch (Exception $e) {
|
||||
\danog\MadelineProto\Logger::log('An error occurred while calling object '.$object.': '.$e->getMessage().' in '.$e->getFile().':'.$e->getLine().'. Recreating connection and retrying to call object...');
|
||||
$this->datacenter->close_and_reopen();
|
||||
@ -118,11 +136,6 @@ class CallHandler extends AuthKeyHandler
|
||||
}
|
||||
|
||||
return;
|
||||
// if ($server_answer == null) {
|
||||
// throw new \danog\MadelineProto\Exception('An error occurred while calling object '.$object.'.');
|
||||
// }
|
||||
// $deserialized = $this->tl->deserialize($this->fopen_and_write('php://memory', 'rw+b', $server_answer));
|
||||
// return $deserialized;
|
||||
}
|
||||
throw new \danog\MadelineProto\Exception('An error occurred while calling object '.$object.'.');
|
||||
}
|
||||
|
@ -42,6 +42,7 @@ class MessageHandler extends Crypt
|
||||
$message = $this->datacenter->temp_auth_key['id'].$message_key.$this->ige_encrypt($encrypted_data.$padding, $aes_key, $aes_iv);
|
||||
$this->datacenter->outgoing_messages[$int_message_id]['seq_no'] = $seq_no;
|
||||
}
|
||||
$this->datacenter->outgoing_messages[$int_message_id]['response'] = -1;
|
||||
$this->datacenter->send_message($message);
|
||||
|
||||
return $int_message_id;
|
||||
@ -112,7 +113,6 @@ class MessageHandler extends Crypt
|
||||
}
|
||||
$deserialized = $this->tl->deserialize($this->fopen_and_write('php://memory', 'rw+b', $message_data));
|
||||
$this->datacenter->incoming_messages[$message_id]['content'] = $deserialized;
|
||||
|
||||
return $message_id;
|
||||
$this->datacenter->new_incoming[$message_id] = $message_id;
|
||||
}
|
||||
}
|
||||
|
@ -17,127 +17,95 @@ namespace danog\MadelineProto\MTProtoTools;
|
||||
*/
|
||||
class ResponseHandler extends MsgIdHandler
|
||||
{
|
||||
public function handle_message($last_sent, $last_received, $expected_type)
|
||||
public function handle_messages($expecting)
|
||||
{
|
||||
$response = $this->datacenter->incoming_messages[$last_received]['content'];
|
||||
foreach ($this->datacenter->new_incoming as $current_msg_id) {
|
||||
$response = $this->datacenter->incoming_messages[$current_msg_id]['content'];
|
||||
|
||||
switch ($response['_']) {
|
||||
case 'msgs_ack':
|
||||
foreach ($response['msg_ids'] as $msg_id) {
|
||||
$this->ack_outgoing_message_id($msg_id); // Acknowledge that the server received my message
|
||||
}
|
||||
unset($this->datacenter->new_incoming[$current_msg_id]);
|
||||
break;
|
||||
|
||||
case 'rpc_result':
|
||||
$this->ack_incoming_message_id($last_received); // Acknowledge that I received the server's response
|
||||
$this->datacenter->incoming_messages[$last_received]['content'] = $response['result'];
|
||||
$this->ack_incoming_message_id($current_msg_id); // Acknowledge that I received the server's response
|
||||
$this->datacenter->incoming_messages[$current_msg_id]['content'] = $response['result'];
|
||||
case 'future_salts':
|
||||
$this->ack_outgoing_message_id($response['req_msg_id']); // Acknowledge that the server received my request
|
||||
|
||||
$this->try_store_response($response['req_msg_id'], $last_received, $expected_type, true);
|
||||
$this->datacenter->outgoing_messages[$response['req_msg_id']]['response'] = $current_msg_id;
|
||||
unset($this->datacenter->new_incoming[$current_msg_id]);
|
||||
break;
|
||||
|
||||
case 'bad_server_salt':
|
||||
case 'bad_msg_notification':
|
||||
$error_codes = [
|
||||
16 => 'msg_id too low (most likely, client time is wrong; it would be worthwhile to synchronize it using msg_id notifications and re-send the original message with the “correct” msg_id or wrap it in a container with a new msg_id if the original message had waited too long on the client to be transmitted)',
|
||||
17 => 'msg_id too high (similar to the previous case, the client time has to be synchronized, and the message re-sent with the correct msg_id)',
|
||||
18 => 'incorrect two lower order msg_id bits (the server expects client message msg_id to be divisible by 4)',
|
||||
19 => 'container msg_id is the same as msg_id of a previously received message (this must never happen)',
|
||||
20 => 'message too old, and it cannot be verified whether the server has received a message with this msg_id or not',
|
||||
32 => 'msg_seqno too low (the server has already received a message with a lower msg_id but with either a higher or an equal and odd seqno)',
|
||||
33 => 'msg_seqno too high (similarly, there is a message with a higher msg_id but with either a lower or an equal and odd seqno)',
|
||||
34 => 'an even msg_seqno expected (irrelevant message), but odd received',
|
||||
35 => 'odd msg_seqno expected (relevant message), but even received',
|
||||
48 => 'incorrect server salt (in this case, the bad_server_salt response is received with the correct salt, and the message is to be re-sent with it)',
|
||||
64 => 'invalid container.',
|
||||
];
|
||||
switch ($response['error_code']) {
|
||||
case 48:
|
||||
$this->datacenter->temp_auth_key['server_salt'] = $response['new_server_salt'];
|
||||
$this->ack_outgoing_message_id($response['bad_msg_id']); // Acknowledge that the server received my request
|
||||
throw new \danog\MadelineProto\Exception('New server salt stored, re-executing query');
|
||||
break;
|
||||
}
|
||||
throw new \danog\MadelineProto\RPCErrorException('Received bad_msg_notification for '.$response['bad_msg_id'].': '.$error_codes[$response['error_code']]);
|
||||
$this->datacenter->outgoing_messages[$response['bad_msg_id']]['response'] = $current_msg_id;
|
||||
unset($this->datacenter->new_incoming[$current_msg_id]);
|
||||
break;
|
||||
|
||||
case 'pong':
|
||||
foreach ($this->datacenter->outgoing_messages as $msg_id => &$omessage) {
|
||||
if (isset($omessage['content']['args']['ping_id']) && $omessage['content']['args']['ping_id'] == $response['ping_id']) {
|
||||
$this->ack_outgoing_message_id($msg_id);
|
||||
$omessage['response'] = $response['msg_id'];
|
||||
$this->datacenter->incoming_messages[$response['msg_id']]['content'] = $response;
|
||||
$this->ack_outgoing_message_id($msg_id);
|
||||
unset($this->datacenter->new_incoming[$current_msg_id]);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 'new_session_created':
|
||||
$this->datacenter->temp_auth_key['server_salt'] = $response['server_salt'];
|
||||
$this->ack_incoming_message_id($last_received); // Acknowledge that I received the server's response
|
||||
$this->ack_incoming_message_id($current_msg_id); // Acknowledge that I received the server's response
|
||||
\danog\MadelineProto\Logger::log('new session created');
|
||||
\danog\MadelineProto\Logger::log($response);
|
||||
unset($this->datacenter->new_incoming[$current_msg_id]);
|
||||
break;
|
||||
case 'msg_container':
|
||||
$responses = [];
|
||||
\danog\MadelineProto\Logger::log('Received container.');
|
||||
\danog\MadelineProto\Logger::log($response['messages']);
|
||||
unset($this->datacenter->new_incoming[$current_msg_id]);
|
||||
foreach ($response['messages'] as $message) {
|
||||
$this->check_message_id($message['msg_id'], false, true);
|
||||
$this->datacenter->incoming_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'content' => $message['body']];
|
||||
$responses[] = $this->handle_message($last_sent, $message['msg_id'], $expected_type);
|
||||
}
|
||||
foreach ($responses as $key => $response) {
|
||||
if ($response == null) {
|
||||
unset($responses[$key]);
|
||||
}
|
||||
}
|
||||
switch (count($responses)) {
|
||||
case 0:
|
||||
return;
|
||||
$this->datacenter->new_incoming[$message['msg_id']] = $message['msg_id'];
|
||||
|
||||
case 1:
|
||||
return end($responses);
|
||||
break;
|
||||
default:
|
||||
\danog\MadelineProto\Logger::log('Received multiple responses, returning last one');
|
||||
\danog\MadelineProto\Logger::log($responses);
|
||||
|
||||
return end($responses);
|
||||
break;
|
||||
$this->handle_messages($expecting);
|
||||
}
|
||||
break;
|
||||
case 'msg_copy':
|
||||
$this->ack_incoming_message_id($last_received); // Acknowledge that I received the server's response
|
||||
$this->ack_incoming_message_id($current_msg_id); // Acknowledge that I received the server's response
|
||||
if (isset($this->datacenter->incoming_messages[$response['orig_message']['msg_id']])) {
|
||||
$this->ack_incoming_message_id($response['orig_message']['msg_id']); // Acknowledge that I received the server's response
|
||||
} else {
|
||||
$this->check_message_id($message['orig_message']['msg_id'], false, true);
|
||||
$this->datacenter->incoming_messages[$message['orig_message']['msg_id']] = ['content' => $response['orig_message']];
|
||||
$this->datacenter->new_incoming[$message['orig_message']['msg_id']] = $message['orig_message']['msg_id'];
|
||||
|
||||
return $this->handle_message($last_sent, $message['orig_message']['msg_id'], $expected_type);
|
||||
$this->handle_messages($expecting);
|
||||
}
|
||||
unset($this->datacenter->new_incoming[$current_msg_id]);
|
||||
break;
|
||||
case 'http_wait':
|
||||
\danog\MadelineProto\Logger::log('Received http wait.');
|
||||
\danog\MadelineProto\Logger::log($response);
|
||||
unset($this->datacenter->new_incoming[$current_msg_id]);
|
||||
break;
|
||||
case 'rpc_answer_dropped_running':
|
||||
case 'rpc_answer_dropped':
|
||||
$this->ack_outgoing_message_id($response['req_msg_id']); // Acknowledge that the server received the original query (the same one, the response to which we wish to forget)
|
||||
default:
|
||||
$this->ack_incoming_message_id($last_received); // Acknowledge that I received the server's response
|
||||
$this->try_store_response($last_sent, $last_received, $expected_type);
|
||||
$this->ack_incoming_message_id($current_msg_id); // Acknowledge that I received the server's response
|
||||
if ($this->tl->constructors->find_by_predicate($this->datacenter->response['_'])['type'] == $expecting['type']) {
|
||||
$this->datacenter->outgoing_messages[$expecting['msg_id']]['response'] = $response;
|
||||
unset($this->datacenter->new_incoming[$current_msg_id]);
|
||||
} else {
|
||||
throw new \danog\MadelineProto\ResponseException('Dunno how to handle '.PHP_EOL.var_export($response, true));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public function try_store_response($request, $response, $type, $force = true)
|
||||
{
|
||||
if ($force) {
|
||||
return $this->datacenter->outgoing_messages[$request]['response'] = $response;
|
||||
}
|
||||
if ($this->tl->constructors->find_by_predicate($this->datacenter->incoming_messages[$response]['content']['_'])['type'] == $type) {
|
||||
$this->datacenter->outgoing_messages[$request]['response'] = $response;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user