This commit is contained in:
Daniil Gentili 2019-08-31 23:07:20 +02:00
parent 3fe86e0162
commit a1d139d526
9 changed files with 176 additions and 174 deletions

View File

@ -87,6 +87,13 @@ class Connection extends Session
*/
protected $API;
/**
* DC ID
*
* @var string
*/
protected $datacenter;
/**
* Whether the socket is reading data.
*

View File

@ -155,7 +155,8 @@ class DataCenterConnection
$this->connections = [];
for ($x = 0; $x < $count; $x++) {
$this->connections[$x] = yield $ctx->getStream();
$this->connections[$x] = new Connection();
yield $this->connections[$x]->connect(yield $ctx->getStream());
$ctx = $this->ctx->getCtx();
}
}

View File

@ -17,7 +17,7 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Stream\MTProtoTools;
namespace danog\MadelineProto\MTProtoSession;
/**
* Manages acknowledgement of messages.

View File

@ -17,7 +17,7 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Stream\MTProtoTools;
namespace danog\MadelineProto\MTProtoSession;
/**
* Manages message ids.

View File

@ -17,7 +17,7 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Stream\MTProtoTools;
namespace danog\MadelineProto\MTProtoSession;
use Amp\Loop;
@ -50,16 +50,13 @@ trait ResponseHandler
}
$info .= \chr($cur_info);
}
$this->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['datacenter' => $datacenter, 'postpone' => true])]['response'] = $req_msg_id;
$this->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['postpone' => true])]['response'] = $req_msg_id;
}
public $n = 0;
public function handle_messages($datacenter, $actual_datacenter = null)
public function handle_messages()
{
if ($actual_datacenter) {
$datacenter = $actual_datacenter;
}
$only_updates = true;
while ($this->new_incoming) {
\reset($this->new_incoming);
@ -68,7 +65,7 @@ trait ResponseHandler
unset($this->new_incoming[$current_msg_id]);
continue;
}
$this->logger->logger((isset($this->incoming_messages[$current_msg_id]['from_container']) ? 'Inside of container, received ' : 'Received ').$this->incoming_messages[$current_msg_id]['content']['_'].' from DC '.$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->logger->logger((isset($this->incoming_messages[$current_msg_id]['from_container']) ? 'Inside of container, received ' : 'Received ').$this->incoming_messages[$current_msg_id]['content']['_'].' from DC '.$this->datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
switch ($this->incoming_messages[$current_msg_id]['content']['_']) {
case 'msgs_ack':
@ -76,7 +73,7 @@ trait ResponseHandler
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
$this->ack_outgoing_message_id($msg_id, $datacenter);
$this->ack_outgoing_message_id($msg_id);
// Acknowledge that the server received my message
}
@ -84,14 +81,14 @@ trait ResponseHandler
break;
case 'rpc_result':
unset($this->new_incoming[$current_msg_id]);
$this->ack_incoming_message_id($current_msg_id, $datacenter);
$this->ack_incoming_message_id($current_msg_id);
$only_updates = false;
// Acknowledge that the server received my request
$req_msg_id = $this->incoming_messages[$current_msg_id]['content']['req_msg_id'];
$this->incoming_messages[$current_msg_id]['content'] = $this->incoming_messages[$current_msg_id]['content']['result'];
$this->check_in_seq_no($current_msg_id);
$this->handle_response($req_msg_id, $current_msg_id, $datacenter);
$this->handle_response($req_msg_id, $current_msg_id);
break;
case 'future_salts':
@ -108,7 +105,7 @@ trait ResponseHandler
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
$this->handle_response($this->incoming_messages[$current_msg_id]['content'][$msg_id_type], $current_msg_id, $datacenter);
$this->handle_response($this->incoming_messages[$current_msg_id]['content'][$msg_id_type], $current_msg_id);
unset($msg_id_type);
break;
@ -118,10 +115,10 @@ trait ResponseHandler
$only_updates = false;
$this->temp_auth_key['server_salt'] = $this->incoming_messages[$current_msg_id]['content']['server_salt'];
$this->ack_incoming_message_id($current_msg_id, $datacenter);
$this->ack_incoming_message_id($current_msg_id);
// Acknowledge that I received the server's response
if ($this->authorized === self::LOGGED_IN && !$this->initing_authorization && $this->datacenter->sockets[$this->datacenter->curdc]->temp_auth_key !== null && isset($this->updaters[false])) {
if ($this->authorized === self::LOGGED_IN && !$this->initing_authorization && $this->API->datacenter->sockets[$this->API->datacenter->curdc]->temp_auth_key !== null && isset($this->updaters[false])) {
$this->updaters[false]->resumeDefer();
}
@ -137,7 +134,7 @@ trait ResponseHandler
$this->new_incoming[$message['msg_id']] = $message['msg_id'];
}
\ksort($this->new_incoming);
//$this->handle_messages($datacenter);
//$this->handle_messages();
//$this->check_in_seq_no($current_msg_id);
unset($this->incoming_messages[$current_msg_id]['content']);
@ -147,10 +144,10 @@ trait ResponseHandler
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
$this->ack_incoming_message_id($current_msg_id, $datacenter);
$this->ack_incoming_message_id($current_msg_id);
// Acknowledge that I received the server's response
if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id']])) {
$this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id'], $datacenter);
$this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id']);
// Acknowledge that I received the server's response
} else {
$message = $this->incoming_messages[$current_msg_id]['content'];
@ -177,7 +174,7 @@ trait ResponseHandler
$only_updates = false;
unset($this->new_incoming[$current_msg_id]);
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids']));
unset($this->incoming_messages[$current_msg_id]['content']);
break;
case 'msgs_all_info':
@ -190,7 +187,7 @@ trait ResponseHandler
$msg_id = new \phpseclib\Math\BigInteger(\strrev($msg_id), 256);
$status = 'Status for message id '.$msg_id.': ';
/*if ($info & 4) {
*$this->got_response_for_outgoing_message_id($msg_id, $datacenter);
*$this->got_response_for_outgoing_message_id($msg_id);
*}
*/
foreach (self::MSGS_INFO_FLAGS as $flag => $description) {
@ -208,9 +205,9 @@ trait ResponseHandler
$only_updates = false;
if (isset($this->outgoing_messages[$this->incoming_messages[$current_msg_id]['content']['msg_id']])) {
if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) {
$this->handle_response($this->incoming_messages[$current_msg_id]['content']['msg_id'], $this->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter);
$this->handle_response($this->incoming_messages[$current_msg_id]['content']['msg_id'], $this->incoming_messages[$current_msg_id]['content']['answer_msg_id']);
} else {
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true]));
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['postpone' => true]));
}
}
break;
@ -220,9 +217,9 @@ trait ResponseHandler
unset($this->new_incoming[$current_msg_id]);
if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) {
$this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter);
$this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['answer_msg_id']);
} else {
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true]));
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['postpone' => true]));
}
break;
case 'msg_resend_req':
@ -238,10 +235,10 @@ trait ResponseHandler
}
if ($ok) {
foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
$this->method_recall('', ['message_id' => $msg_id, 'datacenter' => $datacenter, 'postpone' => true]);
$this->method_recall('', ['message_id' => $msg_id, 'postpone' => true]);
}
} else {
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids']));
}
break;
case 'msg_resend_ans_req':
@ -249,16 +246,16 @@ trait ResponseHandler
$only_updates = false;
unset($this->new_incoming[$current_msg_id]);
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids']));
foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
if (isset($this->incoming_messages[$msg_id]['response']) && isset($this->outgoing_messages[$this->incoming_messages[$msg_id]['response']])) {
$this->callFork($this->object_call_async($this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['_'], $this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter, 'postpone' => true]));
$this->callFork($this->object_call_async($this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['_'], $this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['body'], ['postpone' => true]));
}
}
break;
default:
$this->check_in_seq_no($current_msg_id);
$this->ack_incoming_message_id($current_msg_id, $datacenter);
$this->ack_incoming_message_id($current_msg_id);
// Acknowledge that I received the server's response
$response_type = $this->constructors->find_by_predicate($this->incoming_messages[$current_msg_id]['content']['_'])['type'];
@ -266,8 +263,8 @@ trait ResponseHandler
case 'Updates':
unset($this->new_incoming[$current_msg_id]);
if (\strpos($datacenter, 'cdn') === false) {
$this->callForkDefer($this->handle_updates_async($this->incoming_messages[$current_msg_id]['content']));
if (\strpos($this->datacenter, 'cdn') === false) {
$this->callForkDefer($this->API->handle_updates_async($this->incoming_messages[$current_msg_id]['content']));
}
unset($this->incoming_messages[$current_msg_id]['content']);
@ -287,7 +284,7 @@ trait ResponseHandler
if ($response_type === $expecting['type']) {
$this->logger->logger('Yes', \danog\MadelineProto\Logger::VERBOSE);
unset($this->new_incoming[$current_msg_id]);
$this->handle_response($expecting_msg_id, $current_msg_id, $datacenter);
$this->handle_response($expecting_msg_id, $current_msg_id);
break 2;
}
$this->logger->logger('No', \danog\MadelineProto\Logger::VERBOSE);
@ -308,7 +305,7 @@ trait ResponseHandler
return $only_updates;
}
public function handle_reject($datacenter, &$request, $data)
public function handle_reject(&$request, $data)
{
if (isset($request['promise']) && \is_object($request['promise'])) {
Loop::defer(function () use (&$request, $data) {
@ -330,7 +327,7 @@ trait ResponseHandler
});
} elseif (isset($request['container'])) {
foreach ($request['container'] as $message_id) {
$this->handle_reject($datacenter, $this->outgoing_messages[$message_id], $data);
$this->handle_reject($this->outgoing_messages[$message_id], $data);
}
} else {
$this->logger->logger('Rejecting: already got response for '.(isset($request['_']) ? $request['_'] : '-'));
@ -352,8 +349,8 @@ trait ResponseHandler
}
if (\in_array($response['error_message'], ['PERSISTENT_TIMESTAMP_EMPTY', 'PERSISTENT_TIMESTAMP_OUTDATED', 'PERSISTENT_TIMESTAMP_INVALID'])) {
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\PTSException($response['error_message']));
$this->got_response_for_outgoing_message_id($request_id);
$this->handle_reject($request, new \danog\MadelineProto\PTSException($response['error_message']));
return;
}
@ -365,7 +362,7 @@ trait ResponseHandler
unset($request['serialized_body']);
}
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
$this->method_recall('', ['message_id' => $request_id, 'postpone' => true]);
return;
}
@ -374,33 +371,33 @@ trait ResponseHandler
case -500:
if ($response['error_message'] === 'MSG_WAIT_FAILED') {
$this->call_queue[$request['queue']] = [];
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
$this->method_recall('', ['message_id' => $request_id, 'postpone' => true]);
return;
}
if (\in_array($response['error_message'], ['MSGID_DECREASE_RETRY', 'RPC_CALL_FAIL', 'RPC_MCGET_FAIL', 'no workers running'])) {
Loop::delay(1 * 1000, [$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]);
Loop::delay(1 * 1000, [$this, 'method_recall'], ['message_id' => $request_id, ]);
return;
}
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->got_response_for_outgoing_message_id($request_id);
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
$this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
return;
case 303:
$old_datacenter = $datacenter;
$this->datacenter->curdc = $datacenter = (int) \preg_replace('/[^0-9]+/', '', $response['error_message']);
$this->API->datacenter->curdc = $datacenter = (int) \preg_replace('/[^0-9]+/', '', $response['error_message']);
if (isset($request['file']) && $request['file'] && isset($this->datacenter->sockets[$datacenter.'_media'])) {
if (isset($request['file']) && $request['file'] && isset($this->API->datacenter->sockets[$datacenter.'_media'])) {
\danog\MadelineProto\Logger::log('Using media DC');
$datacenter .= '_media';
}
if (isset($request['user_related']) && $request['user_related']) {
$this->settings['connection_settings']['default_dc'] = $this->authorized_dc = $this->datacenter->curdc;
$this->settings['connection_settings']['default_dc'] = $this->API->authorized_dc = $this->API->datacenter->curdc;
}
Loop::defer([$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter]);
//$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]);
Loop::defer([$this->API, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter]);
//$this->API->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'old_datacenter' => $old_datacenter, 'postpone' => true]);
return;
case 401:
@ -408,10 +405,10 @@ trait ResponseHandler
case 'USER_DEACTIVATED':
case 'SESSION_REVOKED':
case 'SESSION_EXPIRED':
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->got_response_for_outgoing_message_id($request_id);
$this->logger->logger($response['error_message'], \danog\MadelineProto\Logger::FATAL_ERROR);
foreach ($this->datacenter->sockets as $socket) {
foreach ($this->API->datacenter->sockets as $socket) {
$socket->temp_auth_key = null;
$socket->session_id = null;
$socket->auth_key = null;
@ -428,24 +425,24 @@ trait ResponseHandler
$this->logger->logger('If you intentionally deleted this account, ignore this message.', \danog\MadelineProto\Logger::FATAL_ERROR);
}
$this->resetSession();
$this->API->resetSession();
$this->callFork((function () use ($datacenter, &$request, &$response) {
yield $this->init_authorization_async();
$this->callFork((function () use (&$request, &$response) {
yield $this->API->init_authorization_async();
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
$this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
})());
return;
case 'AUTH_KEY_UNREGISTERED':
case 'AUTH_KEY_INVALID':
if ($this->authorized !== self::LOGGED_IN) {
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->got_response_for_outgoing_message_id($request_id);
$this->callFork((function () use ($datacenter, &$request, &$response) {
yield $this->init_authorization_async();
$this->callFork((function () use (&$request, &$response) {
yield $this->API->init_authorization_async();
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
$this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
})());
return;
@ -457,11 +454,11 @@ trait ResponseHandler
$this->logger->logger('Auth key not registered, resetting temporary and permanent auth keys...', \danog\MadelineProto\Logger::ERROR);
if ($this->authorized_dc === $datacenter && $this->authorized === self::LOGGED_IN) {
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
if ($this->API->authorized_dc === $this->datacenter && $this->authorized === self::LOGGED_IN) {
$this->got_response_for_outgoing_message_id($request_id);
$this->logger->logger('Permanent auth key was main authorized key, logging out...', \danog\MadelineProto\Logger::FATAL_ERROR);
foreach ($this->datacenter->sockets as $socket) {
foreach ($this->API->datacenter->sockets as $socket) {
$socket->temp_auth_key = null;
$socket->auth_key = null;
$socket->authorized = false;
@ -474,20 +471,20 @@ trait ResponseHandler
$this->logger->logger('Then login again.', \danog\MadelineProto\Logger::FATAL_ERROR);
$this->logger->logger('If you intentionally deleted this account, ignore this message.', \danog\MadelineProto\Logger::FATAL_ERROR);
$this->resetSession();
$this->API->resetSession();
$this->callFork((function () use ($datacenter, &$request, &$response) {
yield $this->init_authorization_async();
$this->callFork((function () use (&$request, &$response) {
yield $this->API->init_authorization_async();
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
$this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
})());
return;
}
$this->callFork((function () use ($request_id, $datacenter) {
yield $this->init_authorization_async();
$this->callFork((function () use ($request_id) {
yield $this->API->init_authorization_async();
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]);
$this->method_recall('', ['message_id' => $request_id, ]);
})());
return;
@ -495,35 +492,35 @@ trait ResponseHandler
$this->logger->logger('Temporary auth key not bound, resetting temporary auth key...', \danog\MadelineProto\Logger::ERROR);
$this->temp_auth_key = null;
$this->callFork((function () use ($request_id, $datacenter) {
yield $this->init_authorization_async();
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]);
$this->callFork((function () use ($request_id) {
yield $this->API->init_authorization_async();
$this->method_recall('', ['message_id' => $request_id, ]);
})());
return;
}
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->got_response_for_outgoing_message_id($request_id);
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
$this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
return;
case 420:
$seconds = \preg_replace('/[^0-9]+/', '', $response['error_message']);
$limit = isset($request['FloodWaitLimit']) ? $request['FloodWaitLimit'] : $this->settings['flood_timeout']['wait_if_lt'];
if (\is_numeric($seconds) && $seconds < $limit) {
//$this->got_response_for_outgoing_message_id($request_id, $datacenter);
//$this->got_response_for_outgoing_message_id($request_id);
$this->logger->logger('Flood, waiting '.$seconds.' seconds before repeating async call...', \danog\MadelineProto\Logger::NOTICE);
$request['sent'] += $seconds;
Loop::delay($seconds * 1000, [$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]);
Loop::delay($seconds * 1000, [$this, 'method_recall'], ['message_id' => $request_id, ]);
return;
}
// no break
default:
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->got_response_for_outgoing_message_id($request_id);
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
$this->handle_reject($request, new \danog\MadelineProto\RPCErrorException($response['error_message'], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
return;
}
@ -539,7 +536,7 @@ trait ResponseHandler
switch ($response['error_code']) {
case 48:
$this->temp_auth_key['server_salt'] = $response['new_server_salt'];
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
$this->method_recall('', ['message_id' => $request_id, 'postpone' => true]);
return;
case 16:
@ -548,15 +545,15 @@ trait ResponseHandler
$this->logger->logger('Set time delta to '.$this->time_delta, \danog\MadelineProto\Logger::WARNING);
$this->reset_session();
$this->temp_auth_key = null;
$this->callFork((function () use ($datacenter, $request_id) {
yield $this->init_authorization_async();
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]);
$this->callFork((function () use ($request_id) {
yield $this->API->init_authorization_async();
$this->method_recall('', ['message_id' => $request_id, ]);
})());
return;
}
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\RPCErrorException('Received bad_msg_notification: '.self::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
$this->got_response_for_outgoing_message_id($request_id);
$this->handle_reject($request, new \danog\MadelineProto\RPCErrorException('Received bad_msg_notification: '.self::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], isset($request['_']) ? $request['_'] : ''));
return;
}
@ -572,16 +569,16 @@ trait ResponseHandler
return;
}
$botAPI = isset($request['botAPI']) && $request['botAPI'];
if (isset($response['_']) && \strpos($datacenter, 'cdn') === false && $this->constructors->find_by_predicate($response['_'])['type'] === 'Updates') {
if (isset($response['_']) && \strpos($this->datacenter, 'cdn') === false && $this->constructors->find_by_predicate($response['_'])['type'] === 'Updates') {
$response['request'] = $request;
$this->callForkDefer($this->handle_updates_async($response));
$this->callForkDefer($this->API->handle_updates_async($response));
}
unset($request);
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->got_response_for_outgoing_message_id($request_id);
$r = isset($response['_']) ? $response['_'] : \json_encode($response);
$this->logger->logger("Defer sending $r to deferred");
$this->callFork((
function () use ($request_id, $response, $datacenter, $botAPI) {
function () use ($request_id, $response, $botAPI) {
$r = isset($response['_']) ? $response['_'] : \json_encode($response);
$this->logger->logger("Deferred: sent $r to deferred");
if ($botAPI) {
@ -602,84 +599,4 @@ trait ResponseHandler
}
)());
}
public function handle_updates_async($updates, $actual_updates = null)
{
if (!$this->settings['updates']['handle_updates']) {
return;
}
if ($actual_updates) {
$updates = $actual_updates;
}
$this->logger->logger('Parsing updates ('.$updates['_'].') received via the socket...', \danog\MadelineProto\Logger::VERBOSE);
switch ($updates['_']) {
case 'updates':
case 'updatesCombined':
$result = [];
foreach ($updates['updates'] as $key => $update) {
if ($update['_'] === 'updateNewMessage' || $update['_'] === 'updateReadMessagesContents' ||
$update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' ||
$update['_'] === 'updateReadHistoryInbox' || $update['_'] === 'updateReadHistoryOutbox' ||
$update['_'] === 'updateWebPage' || $update['_'] === 'updateMessageID') {
$result[yield $this->feeders[false]->feedSingle($update)] = true;
unset($updates['updates'][$key]);
}
}
$this->seqUpdater->addPendingWakeups($result);
if ($updates['updates']) {
if ($updates['_'] === 'updatesCombined') {
$updates['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
} else {
$updates['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
}
$this->seqUpdater->feed($updates);
}
$this->seqUpdater->resume();
break;
case 'updateShort':
$this->feeders[yield $this->feeders[false]->feedSingle($updates['update'])]->resume();
break;
case 'updateShortSentMessage':
if (!isset($updates['request']['body'])) {
break;
}
$updates['user_id'] = (yield $this->get_info_async($updates['request']['body']['peer']))['bot_api_id'];
$updates['message'] = $updates['request']['body']['message'];
unset($updates['request']);
// no break
case 'updateShortMessage':
case 'updateShortChatMessage':
$from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']);
$to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']);
if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) {
yield $this->updaters[false]->resume();
return;
}
$message = $updates;
$message['_'] = 'message';
$message['from_id'] = $from_id;
try {
$message['to_id'] = (yield $this->get_info_async($to_id))['Peer'];
} catch (\danog\MadelineProto\Exception $e) {
$this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR);
//$this->pending_updates[] = $updates;
break;
} catch (\danog\MadelineProto\RPCErrorException $e) {
$this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR);
//$this->pending_updates[] = $updates;
break;
}
$update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']];
$this->feeders[yield $this->feeders[false]->feedSingle($update)]->resume();
break;
case 'updatesTooLong':
$this->updaters[false]->resume();
break;
default:
throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.\var_export($updates, true));
break;
}
}
}

View File

@ -17,7 +17,7 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Stream\MTProtoTools;
namespace danog\MadelineProto\MTProtoSession;
/**
* Manages message ids.

View File

@ -17,7 +17,7 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Stream\MTProtoTools;
namespace danog\MadelineProto\MTProtoSession;
/**
* Manages sequence number.

View File

@ -16,12 +16,10 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Stream\MTProtoTools;
use danog\MadelineProto\Logger;
namespace danog\MadelineProto\MTProtoSession;
/**
* Manages MTProto session-specific data
* Manages MTProto session-specific data.
*/
abstract class Session
{
@ -43,5 +41,4 @@ abstract class Session
public $call_queue = [];
public $ack_queue = [];
}

View File

@ -170,6 +170,86 @@ trait UpdateHandler
return $data;
}
public function handle_updates_async($updates, $actual_updates = null)
{
if (!$this->settings['updates']['handle_updates']) {
return;
}
if ($actual_updates) {
$updates = $actual_updates;
}
$this->logger->logger('Parsing updates ('.$updates['_'].') received via the socket...', \danog\MadelineProto\Logger::VERBOSE);
switch ($updates['_']) {
case 'updates':
case 'updatesCombined':
$result = [];
foreach ($updates['updates'] as $key => $update) {
if ($update['_'] === 'updateNewMessage' || $update['_'] === 'updateReadMessagesContents' ||
$update['_'] === 'updateEditMessage' || $update['_'] === 'updateDeleteMessages' ||
$update['_'] === 'updateReadHistoryInbox' || $update['_'] === 'updateReadHistoryOutbox' ||
$update['_'] === 'updateWebPage' || $update['_'] === 'updateMessageID') {
$result[yield $this->feeders[false]->feedSingle($update)] = true;
unset($updates['updates'][$key]);
}
}
$this->seqUpdater->addPendingWakeups($result);
if ($updates['updates']) {
if ($updates['_'] === 'updatesCombined') {
$updates['options'] = ['seq_start' => $updates['seq_start'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
} else {
$updates['options'] = ['seq_start' => $updates['seq'], 'seq_end' => $updates['seq'], 'date' => $updates['date']];
}
$this->seqUpdater->feed($updates);
}
$this->seqUpdater->resume();
break;
case 'updateShort':
$this->feeders[yield $this->feeders[false]->feedSingle($updates['update'])]->resume();
break;
case 'updateShortSentMessage':
if (!isset($updates['request']['body'])) {
break;
}
$updates['user_id'] = (yield $this->get_info_async($updates['request']['body']['peer']))['bot_api_id'];
$updates['message'] = $updates['request']['body']['message'];
unset($updates['request']);
// no break
case 'updateShortMessage':
case 'updateShortChatMessage':
$from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']);
$to_id = isset($updates['chat_id']) ? -$updates['chat_id'] : ($updates['out'] ? $updates['user_id'] : $this->authorization['user']['id']);
if (!yield $this->peer_isset_async($from_id) || !yield $this->peer_isset_async($to_id) || isset($updates['via_bot_id']) && !yield $this->peer_isset_async($updates['via_bot_id']) || isset($updates['entities']) && !yield $this->entities_peer_isset_async($updates['entities']) || isset($updates['fwd_from']) && !yield $this->fwd_peer_isset_async($updates['fwd_from'])) {
yield $this->updaters[false]->resume();
return;
}
$message = $updates;
$message['_'] = 'message';
$message['from_id'] = $from_id;
try {
$message['to_id'] = (yield $this->get_info_async($to_id))['Peer'];
} catch (\danog\MadelineProto\Exception $e) {
$this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR);
//$this->pending_updates[] = $updates;
break;
} catch (\danog\MadelineProto\RPCErrorException $e) {
$this->logger->logger('Still did not get user in database, postponing update', \danog\MadelineProto\Logger::ERROR);
//$this->pending_updates[] = $updates;
break;
}
$update = ['_' => 'updateNewMessage', 'message' => $message, 'pts' => $updates['pts'], 'pts_count' => $updates['pts_count']];
$this->feeders[yield $this->feeders[false]->feedSingle($update)]->resume();
break;
case 'updatesTooLong':
$this->updaters[false]->resume();
break;
default:
throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.\var_export($updates, true));
break;
}
}
public function save_update_async($update)
{
if ($update['_'] === 'updateConfig') {