Refactoring

This commit is contained in:
Daniil Gentili 2019-08-31 22:43:58 +02:00
parent d2010cc259
commit 3fe86e0162
20 changed files with 722 additions and 294 deletions

@ -19,7 +19,7 @@
namespace phpseclib\Math;
if (PHP_MAJOR_VERSION < 7 && !(class_exists('\\Phar') && \Phar::running())) {
if (PHP_MAJOR_VERSION < 7 && !(class_exists(\Phar::class) && \Phar::running())) {
throw new \Exception('MadelineProto requires php 7 to run natively, use phar.madelineproto.xyz to run on PHP 5.6');
}
if (defined('HHVM_VERSION')) {

@ -24,10 +24,8 @@ use danog\MadelineProto\Loop\Connection\CheckLoop;
use danog\MadelineProto\Loop\Connection\HttpWaitLoop;
use danog\MadelineProto\Loop\Connection\ReadLoop;
use danog\MadelineProto\Loop\Connection\WriteLoop;
use danog\MadelineProto\MTProtoTools\Crypt;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\MTProtoTools\MsgIdHandler;
use danog\MadelineProto\Stream\MTProtoTools\SeqNoHandler;
use danog\MadelineProto\Stream\MTProtoTools\Session;
/**
* Connection class.
@ -36,44 +34,176 @@ use danog\MadelineProto\Stream\MTProtoTools\SeqNoHandler;
*
* @author Daniil Gentili <daniil@daniil.it>
*/
class Connection
class Connection extends Session
{
use Crypt;
use MsgIdHandler;
use SeqNoHandler;
use \danog\Serializable;
use Tools;
const API_ENDPOINT = 0;
const VOIP_UDP_REFLECTOR_ENDPOINT = 1;
const VOIP_TCP_REFLECTOR_ENDPOINT = 2;
const VOIP_UDP_P2P_ENDPOINT = 3;
const VOIP_UDP_LAN_ENDPOINT = 4;
const PENDING_MAX = 2000000000;
public $stream;
/**
* The actual socket
*
* @var Stream
*/
private $stream;
/**
* Connection context
*
* @var Connection context
*/
private $ctx;
public $type = 0;
public $peer_tag;
/**
* HTTP request count
*
* @var integer
*/
private $httpReqCount = 0;
/**
* HTTP response count
*
* @var integer
*/
private $httpResCount = 0;
public $temp_auth_key;
public $auth_key;
/**
* Date of last chunk received
*
* @var integer
*/
private $lastChunk = 0;
/**
* Logger instance.
*
* @var Logger
*/
protected $logger;
/**
* Main instance.
*
* @var MTProto
*/
protected $API;
/**
* Whether the socket is reading data.
*
* @var boolean
*/
private $reading = false;
/**
* Whether the socket is writing data.
*
* @var boolean
*/
private $writing = false;
/**
* Check if the socket is writing stuff.
*
* @return boolean
*/
public function isWriting(): bool
{
return $this->writing;
}
/**
* Check if the socket is reading stuff.
*
* @return boolean
*/
public function isReading(): bool
{
return $this->reading;
}
/**
* Set writing boolean
*
* @param boolean $writing
*
* @return void
*/
public function writing(bool $writing)
{
$this->writing = $writing;
}
/**
* Set reading boolean
*
* @param boolean $reading
*
* @return void
*/
public function reading(bool $reading)
{
$this->reading = $reading;
}
/**
* Tell the class that we have read a chunk of data from the socket
*
* @return void
*/
public function haveRead()
{
$this->lastChunk = \microtime(true);
}
/**
* Get the receive date of the latest chunk of data from the socket.
*
* @return void
*/
public function getLastChunk()
{
return $this->lastChunk;
}
/**
* Indicate a received HTTP response
*
* @return void
*/
public function httpReceived()
{
$this->httpResCount++;
}
/**
* Count received HTTP responses
*
* @return integer
*/
public function countHttpReceived(): int
{
return $this->httpResCount;
}
/**
* Indicate a sent HTTP request
*
* @return void
*/
public function httpSent()
{
$this->httpReqCount++;
}
/**
* Count sent HTTP requests
*
* @return integer
*/
public function countHttpSent(): int
{
return $this->httpReqCount;
}
public $pending_outgoing = [];
public $pending_outgoing_key = 0;
public $authorized = false;
public $datacenter;
public $API;
public $ctx;
public function getCtx()
/**
* Get connection context
*
* @return ConnectionContext
*/
public function getCtx(): ConnectionContext
{
return $this->ctx;
}
@ -93,6 +223,8 @@ class Connection
{
$this->API->logger->logger("Trying connection via $ctx", \danog\MadelineProto\Logger::WARNING);
$ctx->setReadCallback([$this, 'haveRead']);
$this->ctx = $ctx->getCtx();
$this->datacenter = $ctx->getDc();
$this->stream = yield $ctx->getStream();
@ -101,16 +233,16 @@ class Connection
}
if (!isset($this->writer)) {
$this->writer = new WriteLoop($this->API, $this->datacenter);
$this->writer = new WriteLoop($this);
}
if (!isset($this->reader)) {
$this->reader = new ReadLoop($this->API, $this->datacenter);
$this->reader = new ReadLoop($this);
}
if (!isset($this->checker)) {
$this->checker = new CheckLoop($this->API, $this->datacenter);
$this->checker = new CheckLoop($this);
}
if (!isset($this->waiter)) {
$this->waiter = new HttpWaitLoop($this->API, $this->datacenter);
$this->waiter = new HttpWaitLoop($this);
}
foreach ($this->new_outgoing as $message_id) {
if ($this->outgoing_messages[$message_id]['unencrypted']) {
@ -118,12 +250,11 @@ class Connection
\Amp\Loop::defer(function () use ($promise) {
$promise->fail(new Exception('Restart because we were reconnected'));
});
unset($this->new_outgoing[$message_id]);
unset($this->outgoing_messages[$message_id]);
unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]);
}
}
$this->http_req_count = 0;
$this->http_res_count = 0;
$this->httpReqCount = 0;
$this->httpResCount = 0;
$this->writer->start();
$this->reader->start();
@ -138,7 +269,7 @@ class Connection
$deferred = new Deferred();
if (!isset($message['serialized_body'])) {
$body = is_object($message['body']) ? yield $message['body'] : $message['body'];
$body = \is_object($message['body']) ? yield $message['body'] : $message['body'];
$refresh_next = isset($message['refresh_next']) && $message['refresh_next'];
//$refresh_next = true;
@ -169,11 +300,28 @@ class Connection
return $deferred->promise();
}
public function setExtra($extra)
/**
* Connect main instance.
*
* @param MTProto $extra
*
* @return void
*/
public function setExtra(MTProto $extra)
{
$this->API = $extra;
$this->logger = $extra->logger;
}
/**
* Get main instance
*
* @return MTProto
*/
public function getExtra(): MTProto
{
return $this->API;
}
public function disconnect()
{
$this->API->logger->logger("Disconnecting from DC {$this->datacenter}");
@ -220,7 +368,7 @@ class Connection
foreach ($this->new_outgoing as $message_id) {
if (isset($this->outgoing_messages[$message_id]['sent'])
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < time()
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
&& ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted']
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
) {
@ -247,7 +395,7 @@ class Connection
$result = [];
foreach ($this->new_outgoing as $message_id) {
if (isset($this->outgoing_messages[$message_id]['sent'])
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < time()
&& $this->outgoing_messages[$message_id]['sent'] + $timeout < \time()
&& ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted']
&& $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req'
) {

@ -80,6 +80,21 @@ class DataCenter
return ['sockets', 'curdc', 'dclist', 'settings'];
}
public function __wakeup()
{
foreach ($this->sockets as &$socket) {
if ($socket instanceof Connection) {
$new = new DataCenterConnection;
if ($socket->temp_auth_key) {
$new->setAuthKey($socket->temp_auth_key, true);
}
if ($socket->auth_key) {
$new->setAuthKey($socket->auth_key, false);
}
$new->authorized($socket->authorized);
}
}
}
public function __magic_construct($API, $dclist, $settings, CookieJar $jar = null)
{
$this->API = $API;
@ -328,8 +343,7 @@ class DataCenter
continue; // Could not connect to host, try next host in the list.
}
if ($dc = $ctx->getDc()) {
$callback = [$this->sockets[$dc], 'haveRead'];
if ($ctx->hasReadCallback()) {
$socket = new class($socket) extends ClientSocket
{
private $callback;
@ -350,7 +364,7 @@ class DataCenter
return $promise;
}
};
$socket->setReadCallback($callback);
$socket->setReadCallback($ctx->getReadCallback());
} else {
$socket = new ClientSocket($socket);
}

@ -0,0 +1,202 @@
<?php
/**
* Connection module handling all connections to a datacenter.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2019 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto;
use danog\MadelineProto\Stream\ConnectionContext;
class DataCenterConnection
{
/**
* Temporary auth key.
*
* @var array
*/
private $tempAuthKey;
/**
* Permanent auth key.
*
* @var array
*/
private $authKey;
/**
* Whether this auth key is authorized (as in logged in).
*
* @var boolean
*/
private $authorized = false;
/**
* Connections open to a certain DC
*
* @var array
*/
private $connections = [];
/**
* Main API instance
*
* @var \danog\MadelineProto\MTProto
*/
private $API;
/**
* Connection context
*
* @var ConnectionContext
*/
private $ctx;
/**
* DC ID
*
* @var string
*/
private $datacenter;
/**
* Get auth key.
*
* @param boolean $temp Whether to fetch the temporary auth key
*
* @return array
*/
public function getAuthKey(bool $temp = true): array
{
return $this->{$temp ? 'tempAuthKey' : 'authKey'};
}
/**
* Check if auth key is present.
*
* @param boolean $temp Whether to fetch the temporary auth key
*
* @return bool
*/
public function hasAuthKey(bool $temp = true): bool
{
return $this->{$temp ? 'tempAuthKey' : 'authKey'} !== null;
}
/**
* Set auth key.
*
* @param boolean $temp Whether to fetch the temporary auth key
*
* @return void
*/
public function setAuthKey(array $key, bool $temp = true)
{
$this->{$temp ? 'tempAuthKey' : 'authKey'} = $key;
}
/**
* Check if we are logged in.
*
* @return boolean
*/
public function isAuthorized(): bool
{
return $this->authorized;
}
/**
* Set the authorized boolean.
*
* @param boolean $authorized Whether we are authorized
*
* @return void
*/
public function authorized(bool $authorized)
{
$this->authorized = $authorized;
}
/**
* Get connection context
*
* @return ConnectionContext
*/
public function getCtx(): ConnectionContext
{
return $this->ctx;
}
/**
* Connect function.
*
* @param ConnectionContext $ctx Connection context
*
* @return \Generator
*/
public function connect(ConnectionContext $ctx): \Generator
{
$this->API->logger->logger("Trying connection via $ctx", \danog\MadelineProto\Logger::WARNING);
$this->ctx = $ctx->getCtx();
$this->datacenter = $ctx->getDc();
$media = $ctx->isMedia();
$count = $media ? $this->API->settings['connection_settings']['media_socket_count'] : 1;
$this->connections = [];
for ($x = 0; $x < $count; $x++) {
$this->connections[$x] = yield $ctx->getStream();
$ctx = $this->ctx->getCtx();
}
}
public function sendMessage($message, $flush = true)
{
}
public function setExtra(API $API)
{
$this->API = $API;
}
public function disconnect()
{
$this->API->logger->logger("Disconnecting from DC {$this->datacenter}");
foreach ($this->connections as $connection) {
$connection->disconnect();
}
$this->connections = [];
}
public function reconnect(): \Generator
{
$this->API->logger->logger("Reconnecting DC {$this->datacenter}");
foreach ($this->connections as $connection) {
yield $connection->reconnect();
}
$this->disconnect();
yield $this->API->datacenter->dcConnectAsync($this->ctx->getDc());
}
/**
* Sleep function.
*
* @internal
*
* @return array
*/
public function __sleep()
{
return ['authKey', 'tempAuthKey', 'authorized'];
}
}

@ -20,6 +20,7 @@ namespace danog\MadelineProto\Loop\Connection;
use Amp\Deferred;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
use danog\MadelineProto\MTProto;
/**
* RPC call status check loop.
@ -28,14 +29,25 @@ use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
*/
class CheckLoop extends ResumableSignalLoop
{
/**
* Connection instance
*
* @var \danog\Madelineproto\Connection
*/
protected $connection;
/**
* DC ID
*
* @var string
*/
protected $datacenter;
public function __construct($API, $datacenter)
public function __construct(Connection $connection)
{
$this->API = $API;
$this->datacenter = $datacenter;
$this->connection = $API->datacenter->sockets[$datacenter];
$this->connection = $connection;
$this->API = $connection->getExtra();
$ctx = $connection->getCtx();
$this->datacenter = $ctx->getDc();
}
public function loop()
@ -90,7 +102,7 @@ class CheckLoop extends ResumableSignalLoop
case 2:
case 3:
if ($connection->outgoing_messages[$message_id]['_'] === 'msgs_state_req') {
$API->got_response_for_outgoing_message_id($message_id, $datacenter);
$connection->got_response_for_outgoing_message_id($message_id);
break;
}
$API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.($message_id).' not received by server, resending...', \danog\MadelineProto\Logger::ERROR);

@ -18,6 +18,7 @@
namespace danog\MadelineProto\Loop\Connection;
use danog\MadelineProto\Connection;
use danog\MadelineProto\Loop\Impl\ResumableSignalLoop;
use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream;
use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
@ -29,14 +30,25 @@ use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
*/
class HttpWaitLoop extends ResumableSignalLoop
{
/**
* Connection instance
*
* @var \danog\Madelineproto\Connection
*/
protected $connection;
/**
* DC ID
*
* @var string
*/
protected $datacenter;
public function __construct($API, $datacenter)
public function __construct(Connection $connection)
{
$this->API = $API;
$this->datacenter = $datacenter;
$this->connection = $API->datacenter->sockets[$datacenter];
$this->connection = $connection;
$this->API = $connection->getExtra();
$ctx = $connection->getCtx();
$this->datacenter = $ctx->getDc();
}
public function loop()
@ -62,11 +74,11 @@ class HttpWaitLoop extends ResumableSignalLoop
return;
}
}
$API->logger->logger("DC $datacenter: request {$connection->http_req_count}, response {$connection->http_res_count}");
if ($connection->http_req_count === $connection->http_res_count && (!empty($connection->pending_outgoing) || (!empty($connection->new_outgoing) && !$connection->hasPendingCalls()))) {
$API->logger->logger("DC $datacenter: request {$connection->countHttpSent()}, response {$connection->countHttpReceived()}");
if ($connection->countHttpSent() === $connection->countHttpReceived() && (!empty($connection->pending_outgoing) || (!empty($connection->new_outgoing) && !$connection->hasPendingCalls()))) {
yield $connection->sendMessage(['_' => 'http_wait', 'body' => ['max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'content_related' => true, 'unencrypted' => false, 'method' => false]);
}
$API->logger->logger("DC $datacenter: request {$connection->http_req_count}, response {$connection->http_res_count}");
$API->logger->logger("DC $datacenter: request {$connection->countHttpSent()}, response {$connection->countHttpReceived()}");
}
}

@ -38,14 +38,25 @@ class ReadLoop extends SignalLoop
use Tools;
use Crypt;
/**
* Connection instance
*
* @var \danog\Madelineproto\Connection
*/
protected $connection;
/**
* DC ID
*
* @var string
*/
protected $datacenter;
public function __construct($API, $datacenter)
public function __construct(Connection $connection)
{
$this->API = $API;
$this->datacenter = $datacenter;
$this->connection = $API->datacenter->sockets[$datacenter];
$this->connection = $connection;
$this->API = $connection->getExtra();
$ctx = $connection->getCtx();
$this->datacenter = $ctx->getDc();
}
public function loop()
@ -99,7 +110,7 @@ class ReadLoop extends SignalLoop
return;
}
$connection->http_res_count++;
$connection->httpReceived();
Loop::defer([$API, 'handle_messages'], $datacenter);
@ -214,7 +225,7 @@ class ReadLoop extends SignalLoop
$connection->incoming_messages[$message_id]['content'] = $deserialized;
$connection->incoming_messages[$message_id]['response'] = -1;
$connection->new_incoming[$message_id] = $message_id;
$connection->last_http_wait = 0;
//$connection->last_http_wait = 0;
$API->logger->logger('Received payload from DC '.$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);

@ -36,14 +36,25 @@ class WriteLoop extends ResumableSignalLoop
use Crypt;
use Tools;
/**
* Connection instance
*
* @var \danog\Madelineproto\Connection
*/
protected $connection;
/**
* DC ID
*
* @var string
*/
protected $datacenter;
public function __construct($API, $datacenter)
public function __construct(Connection $connection)
{
$this->API = $API;
$this->datacenter = $datacenter;
$this->connection = $API->datacenter->sockets[$datacenter];
$this->connection = $connection;
$this->API = $connection->getExtra();
$ctx = $connection->getCtx();
$this->datacenter = $ctx->getDc();
}
public function loop(): \Generator
@ -110,7 +121,7 @@ class WriteLoop extends ResumableSignalLoop
yield $buffer->bufferWrite("\0\0\0\0\0\0\0\0".$message_id.$this->pack_unsigned_int($length).$message['serialized_body'].$pad);
//var_dump("plain ".bin2hex($message_id));
$connection->http_req_count++;
$connection->httpSent();
$connection->outgoing_messages[$message_id] = $message;
$connection->outgoing_messages[$message_id]['sent'] = time();
$connection->outgoing_messages[$message_id]['tries'] = 0;
@ -303,7 +314,7 @@ class WriteLoop extends ResumableSignalLoop
$t = microtime(true);
yield $buffer->bufferWrite($message);
$connection->http_req_count++;
$connection->httpSent();
$API->logger->logger("Sent encrypted payload to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE);
@ -313,11 +324,11 @@ class WriteLoop extends ResumableSignalLoop
$connection->ack_queue = [];
}
if ($has_http_wait) {
/*if ($has_http_wait) {
$connection->last_http_wait = $sent;
} elseif (Magic::$altervista) {
$connection->last_http_wait = PHP_INT_MAX;
}
}*/
foreach ($keys as $key => $message_id) {
$connection->outgoing_messages[$message_id] = &$connection->pending_outgoing[$key];

@ -35,6 +35,11 @@ abstract class Loop implements LoopInterface
private $count = 0;
/**
* MTProto instance
*
* @var \danog\MadelineProto\MTProto
*/
public $API;
public function __construct($API)

@ -72,7 +72,7 @@ class MTProto extends AsyncConstruct implements TLCallback
/*
const V = 71;
*/
const V = 128;
const V = 129;
const RELEASE = '4.0';
const NOT_LOGGED_IN = 0;
const WAITING_CODE = 1;
@ -704,6 +704,7 @@ class MTProto extends AsyncConstruct implements TLCallback
'transport' => 'tcp',
'pfs' => extension_loaded('gmp'),
],
'media_socket_count' => 5,
'default_dc' => 2,
], 'app_info' => [
// obtained in https://my.telegram.org
@ -1098,6 +1099,12 @@ class MTProto extends AsyncConstruct implements TLCallback
yield $this->connect_to_all_dcs_async();
$this->datacenter->curdc = $curdc;
}
public function content_related($method)
{
$method = is_array($method) && isset($method['_']) ? $method['_'] : $method;
return is_string($method) ? !in_array($method, MTProto::NOT_CONTENT_RELATED) : true;
}
public function get_self_async()
{

@ -17,64 +17,64 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\MTProtoTools;
namespace danog\MadelineProto\Stream\MTProtoTools;
/**
* Manages acknowledgement of messages.
*/
trait AckHandler
{
public function ack_outgoing_message_id($message_id, $datacenter)
public function ack_outgoing_message_id($message_id)
{
// The server acknowledges that it received my message
if (!isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id])) {
if (!isset($this->outgoing_messages[$message_id])) {
$this->logger->logger("WARNING: Couldn't find message id ".$message_id.' in the array of outgoing messages. Maybe try to increase its size?', \danog\MadelineProto\Logger::WARNING);
return false;
}
//$this->logger->logger("Ack-ed ".$this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['_']." with message ID $message_id on DC $datacenter");
//$this->logger->logger("Ack-ed ".$this->outgoing_messages[$message_id]['_']." with message ID $message_id on DC $datacenter");
/*
if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['body'])) {
unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['body']);
if (isset($this->outgoing_messages[$message_id]['body'])) {
unset($this->outgoing_messages[$message_id]['body']);
}
if (isset($this->datacenter->sockets[$datacenter]->new_outgoing[$message_id])) {
unset($this->datacenter->sockets[$datacenter]->new_outgoing[$message_id]);
if (isset($this->new_outgoing[$message_id])) {
unset($this->new_outgoing[$message_id]);
}*/
return true;
}
public function got_response_for_outgoing_message_id($message_id, $datacenter)
public function got_response_for_outgoing_message_id($message_id)
{
// The server acknowledges that it received my message
if (!isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id])) {
if (!isset($this->outgoing_messages[$message_id])) {
$this->logger->logger("WARNING: Couldn't find message id ".$message_id.' in the array of outgoing messages. Maybe try to increase its size?', \danog\MadelineProto\Logger::WARNING);
return false;
}
if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['body'])) {
unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['body']);
if (isset($this->outgoing_messages[$message_id]['body'])) {
unset($this->outgoing_messages[$message_id]['body']);
}
if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['serialized_body'])) {
unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['serialized_body']);
if (isset($this->outgoing_messages[$message_id]['serialized_body'])) {
unset($this->outgoing_messages[$message_id]['serialized_body']);
}
if (isset($this->datacenter->sockets[$datacenter]->new_outgoing[$message_id])) {
unset($this->datacenter->sockets[$datacenter]->new_outgoing[$message_id]);
if (isset($this->new_outgoing[$message_id])) {
unset($this->new_outgoing[$message_id]);
}
return true;
}
public function ack_incoming_message_id($message_id, $datacenter)
public function ack_incoming_message_id($message_id)
{
// I let the server know that I received its message
if (!isset($this->datacenter->sockets[$datacenter]->incoming_messages[$message_id])) {
if (!isset($this->incoming_messages[$message_id])) {
$this->logger->logger("WARNING: Couldn't find message id ".$message_id.' in the array of incoming messages. Maybe try to increase its size?', \danog\MadelineProto\Logger::WARNING);
}
/*if ($this->datacenter->sockets[$datacenter]->temp_auth_key['id'] === null || $this->datacenter->sockets[$datacenter]->temp_auth_key['id'] === "\0\0\0\0\0\0\0\0") {
// || (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack']) && $this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack'])) {
/*if ($this->temp_auth_key['id'] === null || $this->temp_auth_key['id'] === "\0\0\0\0\0\0\0\0") {
// || (isset($this->incoming_messages[$message_id]['ack']) && $this->incoming_messages[$message_id]['ack'])) {
return;
}*/
$this->datacenter->sockets[$datacenter]->ack_queue[$message_id] = $message_id;
$this->ack_queue[$message_id] = $message_id;
return true;
}

@ -17,7 +17,7 @@
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\MTProtoTools;
namespace danog\MadelineProto\Stream\MTProtoTools;
use Amp\Loop;
@ -26,18 +26,18 @@ use Amp\Loop;
*/
trait ResponseHandler
{
public function send_msgs_state_info_async($req_msg_id, $msg_ids, $datacenter)
public function send_msgs_state_info_async($req_msg_id, $msg_ids)
{
$this->logger->logger('Sending state info for '.count($msg_ids).' message IDs');
$this->logger->logger('Sending state info for '.\count($msg_ids).' message IDs');
$info = '';
foreach ($msg_ids as $msg_id) {
$cur_info = 0;
if (!isset($this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id])) {
$msg_id = new \phpseclib\Math\BigInteger(strrev($msg_id), 256);
if ((new \phpseclib\Math\BigInteger(time() + $this->datacenter->sockets[$datacenter]->time_delta + 30))->bitwise_leftShift(32)->compare($msg_id) < 0) {
if (!isset($this->incoming_messages[$msg_id])) {
$msg_id = new \phpseclib\Math\BigInteger(\strrev($msg_id), 256);
if ((new \phpseclib\Math\BigInteger(\time() + $this->time_delta + 30))->bitwise_leftShift(32)->compare($msg_id) < 0) {
$this->logger->logger("Do not know anything about $msg_id and it is too small");
$cur_info |= 3;
} elseif ((new \phpseclib\Math\BigInteger(time() + $this->datacenter->sockets[$datacenter]->time_delta - 300))->bitwise_leftShift(32)->compare($msg_id) > 0) {
} elseif ((new \phpseclib\Math\BigInteger(\time() + $this->time_delta - 300))->bitwise_leftShift(32)->compare($msg_id) > 0) {
$this->logger->logger("Do not know anything about $msg_id and it is too big");
$cur_info |= 1;
} else {
@ -48,9 +48,9 @@ trait ResponseHandler
$this->logger->logger("Know about $msg_id");
$cur_info |= 4;
}
$info .= chr($cur_info);
$info .= \chr($cur_info);
}
$this->datacenter->sockets[$datacenter]->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['datacenter' => $datacenter, 'postpone' => true])]['response'] = $req_msg_id;
$this->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['datacenter' => $datacenter, 'postpone' => true])]['response'] = $req_msg_id;
}
public $n = 0;
@ -61,35 +61,35 @@ trait ResponseHandler
$datacenter = $actual_datacenter;
}
$only_updates = true;
while ($this->datacenter->sockets[$datacenter]->new_incoming) {
reset($this->datacenter->sockets[$datacenter]->new_incoming);
$current_msg_id = key($this->datacenter->sockets[$datacenter]->new_incoming);
if (!isset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id])) {
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
while ($this->new_incoming) {
\reset($this->new_incoming);
$current_msg_id = \key($this->new_incoming);
if (!isset($this->incoming_messages[$current_msg_id])) {
unset($this->new_incoming[$current_msg_id]);
continue;
}
$this->logger->logger((isset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['from_container']) ? 'Inside of container, received ' : 'Received ').$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['_'].' from DC '.$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
$this->logger->logger((isset($this->incoming_messages[$current_msg_id]['from_container']) ? 'Inside of container, received ' : 'Received ').$this->incoming_messages[$current_msg_id]['content']['_'].' from DC '.$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE);
switch ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['_']) {
switch ($this->incoming_messages[$current_msg_id]['content']['_']) {
case 'msgs_ack':
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
unset($this->new_incoming[$current_msg_id]);
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
$this->ack_outgoing_message_id($msg_id, $datacenter);
// Acknowledge that the server received my message
}
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
unset($this->incoming_messages[$current_msg_id]['content']);
break;
case 'rpc_result':
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
unset($this->new_incoming[$current_msg_id]);
$this->ack_incoming_message_id($current_msg_id, $datacenter);
$only_updates = false;
// Acknowledge that the server received my request
$req_msg_id = $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['req_msg_id'];
$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'] = $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['result'];
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
$req_msg_id = $this->incoming_messages[$current_msg_id]['content']['req_msg_id'];
$this->incoming_messages[$current_msg_id]['content'] = $this->incoming_messages[$current_msg_id]['content']['result'];
$this->check_in_seq_no($current_msg_id);
$this->handle_response($req_msg_id, $current_msg_id, $datacenter);
break;
@ -97,25 +97,27 @@ trait ResponseHandler
case 'future_salts':
case 'msgs_state_info':
$msg_id_type = 'req_msg_id';
// no break
case 'bad_server_salt':
case 'bad_msg_notification':
$msg_id_type = isset($msg_id_type) ? $msg_id_type : 'bad_msg_id';
// no break
case 'pong':
$msg_id_type = isset($msg_id_type) ? $msg_id_type : 'msg_id';
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
unset($this->new_incoming[$current_msg_id]);
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
$this->handle_response($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'][$msg_id_type], $current_msg_id, $datacenter);
$this->handle_response($this->incoming_messages[$current_msg_id]['content'][$msg_id_type], $current_msg_id, $datacenter);
unset($msg_id_type);
break;
case 'new_session_created':
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
unset($this->new_incoming[$current_msg_id]);
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
$this->datacenter->sockets[$datacenter]->temp_auth_key['server_salt'] = $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['server_salt'];
$this->temp_auth_key['server_salt'] = $this->incoming_messages[$current_msg_id]['content']['server_salt'];
$this->ack_incoming_message_id($current_msg_id, $datacenter);
// Acknowledge that I received the server's response
@ -123,69 +125,69 @@ trait ResponseHandler
$this->updaters[false]->resumeDefer();
}
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
unset($this->incoming_messages[$current_msg_id]['content']);
break;
case 'msg_container':
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
unset($this->new_incoming[$current_msg_id]);
$only_updates = false;
foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['messages'] as $message) {
$this->datacenter->sockets[$datacenter]->check_message_id($message['msg_id'], ['outgoing' => false, 'container' => true]);
$this->datacenter->sockets[$datacenter]->incoming_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'content' => $message['body'], 'from_container' => true];
$this->datacenter->sockets[$datacenter]->new_incoming[$message['msg_id']] = $message['msg_id'];
foreach ($this->incoming_messages[$current_msg_id]['content']['messages'] as $message) {
$this->check_message_id($message['msg_id'], ['outgoing' => false, 'container' => true]);
$this->incoming_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'content' => $message['body'], 'from_container' => true];
$this->new_incoming[$message['msg_id']] = $message['msg_id'];
}
ksort($this->datacenter->sockets[$datacenter]->new_incoming);
\ksort($this->new_incoming);
//$this->handle_messages($datacenter);
//$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
//$this->check_in_seq_no($current_msg_id);
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
unset($this->incoming_messages[$current_msg_id]['content']);
break;
case 'msg_copy':
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
unset($this->new_incoming[$current_msg_id]);
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
$this->ack_incoming_message_id($current_msg_id, $datacenter);
// Acknowledge that I received the server's response
if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id']])) {
$this->ack_incoming_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id'], $datacenter);
if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id']])) {
$this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id'], $datacenter);
// Acknowledge that I received the server's response
} else {
$message = $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'];
$this->datacenter->sockets[$datacenter]->check_message_id($message['orig_message']['msg_id'], ['outgoing' => false, 'container' => true]);
$this->datacenter->sockets[$datacenter]->incoming_messages[$message['orig_message']['msg_id']] = ['content' => $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']];
$this->datacenter->sockets[$datacenter]->new_incoming[$message['orig_message']['msg_id']] = $message['orig_message']['msg_id'];
$message = $this->incoming_messages[$current_msg_id]['content'];
$this->check_message_id($message['orig_message']['msg_id'], ['outgoing' => false, 'container' => true]);
$this->incoming_messages[$message['orig_message']['msg_id']] = ['content' => $this->incoming_messages[$current_msg_id]['content']['orig_message']];
$this->new_incoming[$message['orig_message']['msg_id']] = $message['orig_message']['msg_id'];
}
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
unset($this->incoming_messages[$current_msg_id]['content']);
break;
case 'http_wait':
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
unset($this->new_incoming[$current_msg_id]);
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
$this->logger->logger($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'], \danog\MadelineProto\Logger::NOTICE);
$this->logger->logger($this->incoming_messages[$current_msg_id]['content'], \danog\MadelineProto\Logger::NOTICE);
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
unset($this->incoming_messages[$current_msg_id]['content']);
break;
case 'msgs_state_req':
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
unset($this->new_incoming[$current_msg_id]);
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
unset($this->incoming_messages[$current_msg_id]['content']);
break;
case 'msgs_all_info':
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
unset($this->new_incoming[$current_msg_id]);
foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $key => $msg_id) {
$info = ord($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['info'][$key]);
$msg_id = new \phpseclib\Math\BigInteger(strrev($msg_id), 256);
foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $key => $msg_id) {
$info = \ord($this->incoming_messages[$current_msg_id]['content']['info'][$key]);
$msg_id = new \phpseclib\Math\BigInteger(\strrev($msg_id), 256);
$status = 'Status for message id '.$msg_id.': ';
/*if ($info & 4) {
*$this->got_response_for_outgoing_message_id($msg_id, $datacenter);
@ -200,83 +202,83 @@ trait ResponseHandler
}
break;
case 'msg_detailed_info':
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
$this->check_in_seq_no($current_msg_id);
unset($this->new_incoming[$current_msg_id]);
$only_updates = false;
if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_id']])) {
if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) {
$this->handle_response($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_id'], $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter);
if (isset($this->outgoing_messages[$this->incoming_messages[$current_msg_id]['content']['msg_id']])) {
if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) {
$this->handle_response($this->incoming_messages[$current_msg_id]['content']['msg_id'], $this->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter);
} else {
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true]));
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true]));
}
}
break;
case 'msg_new_detailed_info':
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
unset($this->new_incoming[$current_msg_id]);
if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) {
$this->ack_incoming_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter);
if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) {
$this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter);
} else {
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true]));
$this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true]));
}
break;
case 'msg_resend_req':
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
unset($this->new_incoming[$current_msg_id]);
$ok = true;
foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
if (!isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$msg_id]) || isset($this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id])) {
foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
if (!isset($this->outgoing_messages[$msg_id]) || isset($this->incoming_messages[$msg_id])) {
$ok = false;
}
}
if ($ok) {
foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
$this->method_recall('', ['message_id' => $msg_id, 'datacenter' => $datacenter, 'postpone' => true]);
}
} else {
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
}
break;
case 'msg_resend_ans_req':
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
$this->check_in_seq_no($current_msg_id);
$only_updates = false;
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
unset($this->new_incoming[$current_msg_id]);
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']) && isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']])) {
$this->callFork($this->object_call_async($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['_'], $this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter, 'postpone' => true]));
$this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter));
foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) {
if (isset($this->incoming_messages[$msg_id]['response']) && isset($this->outgoing_messages[$this->incoming_messages[$msg_id]['response']])) {
$this->callFork($this->object_call_async($this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['_'], $this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter, 'postpone' => true]));
}
}
break;
default:
$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id);
$this->check_in_seq_no($current_msg_id);
$this->ack_incoming_message_id($current_msg_id, $datacenter);
// Acknowledge that I received the server's response
$response_type = $this->constructors->find_by_predicate($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['_'])['type'];
$response_type = $this->constructors->find_by_predicate($this->incoming_messages[$current_msg_id]['content']['_'])['type'];
switch ($response_type) {
case 'Updates':
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
unset($this->new_incoming[$current_msg_id]);
if (strpos($datacenter, 'cdn') === false) {
$this->callForkDefer($this->handle_updates_async($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']));
if (\strpos($datacenter, 'cdn') === false) {
$this->callForkDefer($this->handle_updates_async($this->incoming_messages[$current_msg_id]['content']));
}
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']);
unset($this->incoming_messages[$current_msg_id]['content']);
$only_updates = true && $only_updates;
break;
default:
$only_updates = false;
$this->logger->logger('Trying to assign a response of type '.$response_type.' to its request...', \danog\MadelineProto\Logger::VERBOSE);
foreach ($this->datacenter->sockets[$datacenter]->new_outgoing as $key => $expecting_msg_id) {
$expecting = $this->datacenter->sockets[$datacenter]->outgoing_messages[$expecting_msg_id];
foreach ($this->new_outgoing as $key => $expecting_msg_id) {
$expecting = $this->outgoing_messages[$expecting_msg_id];
if (!isset($expecting['type'])) {
continue;
}
@ -284,21 +286,22 @@ trait ResponseHandler
$this->logger->logger('Does the request of return type '.$expecting['type'].' match?', \danog\MadelineProto\Logger::VERBOSE);
if ($response_type === $expecting['type']) {
$this->logger->logger('Yes', \danog\MadelineProto\Logger::VERBOSE);
unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]);
unset($this->new_incoming[$current_msg_id]);
$this->handle_response($expecting_msg_id, $current_msg_id, $datacenter);
break 2;
}
$this->logger->logger('No', \danog\MadelineProto\Logger::VERBOSE);
}
throw new \danog\MadelineProto\ResponseException('Dunno how to handle '.PHP_EOL.var_export($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'], true));
throw new \danog\MadelineProto\ResponseException('Dunno how to handle '.PHP_EOL.\var_export($this->incoming_messages[$current_msg_id]['content'], true));
break;
}
break;
}
}
if ($this->datacenter->sockets[$datacenter]->pending_outgoing)
$this->datacenter->sockets[$datacenter]->writer->resume();
if ($this->pending_outgoing) {
$this->writer->resume();
}
//$this->n--;
@ -307,7 +310,7 @@ trait ResponseHandler
public function handle_reject($datacenter, &$request, $data)
{
if (isset($request['promise']) && is_object($request['promise'])) {
if (isset($request['promise']) && \is_object($request['promise'])) {
Loop::defer(function () use (&$request, $data) {
if (isset($request['promise'])) {
$promise = $request['promise'];
@ -315,12 +318,11 @@ trait ResponseHandler
try {
$promise->fail($data);
} catch (\Error $e) {
if (strpos($e->getMessage(), "Promise has already been resolved") !== 0) {
if (\strpos($e->getMessage(), "Promise has already been resolved") !== 0) {
throw $e;
}
$this->logger->logger("Got promise already resolved error", \danog\MadelineProto\Logger::FATAL_ERROR);
}
} else {
$this->logger->logger('Rejecting: already got response for '.(isset($request['_']) ? $request['_'] : '-'));
$this->logger->logger("Rejecting: $data");
@ -328,7 +330,7 @@ trait ResponseHandler
});
} elseif (isset($request['container'])) {
foreach ($request['container'] as $message_id) {
$this->handle_reject($datacenter, $this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id], $data);
$this->handle_reject($datacenter, $this->outgoing_messages[$message_id], $data);
}
} else {
$this->logger->logger('Rejecting: already got response for '.(isset($request['_']) ? $request['_'] : '-'));
@ -336,26 +338,26 @@ trait ResponseHandler
}
}
public function handle_response($request_id, $response_id, $datacenter)
public function handle_response($request_id, $response_id)
{
$response = &$this->datacenter->sockets[$datacenter]->incoming_messages[$response_id]['content'];
unset($this->datacenter->sockets[$datacenter]->incoming_messages[$response_id]['content']);
$request = &$this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id];
$response = &$this->incoming_messages[$response_id]['content'];
unset($this->incoming_messages[$response_id]['content']);
$request = &$this->outgoing_messages[$request_id];
if (isset($response['_'])) {
switch ($response['_']) {
case 'rpc_error':
if (isset($request['method']) && $request['method'] && $request['_'] !== 'auth.bindTempAuthKey' && $this->datacenter->sockets[$datacenter]->temp_auth_key !== null && (!isset($this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited']) || $this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited'] === false)) {
$this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited'] = true;
if (isset($request['method']) && $request['method'] && $request['_'] !== 'auth.bindTempAuthKey' && $this->temp_auth_key !== null && (!isset($this->temp_auth_key['connection_inited']) || $this->temp_auth_key['connection_inited'] === false)) {
$this->temp_auth_key['connection_inited'] = true;
}
if (in_array($response['error_message'], ['PERSISTENT_TIMESTAMP_EMPTY', 'PERSISTENT_TIMESTAMP_OUTDATED', 'PERSISTENT_TIMESTAMP_INVALID'])) {
if (\in_array($response['error_message'], ['PERSISTENT_TIMESTAMP_EMPTY', 'PERSISTENT_TIMESTAMP_OUTDATED', 'PERSISTENT_TIMESTAMP_INVALID'])) {
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->handle_reject($datacenter, $request, new \danog\MadelineProto\PTSException($response['error_message']));
return;
}
if (strpos($response['error_message'], 'FILE_REFERENCE_') === 0) {
if (\strpos($response['error_message'], 'FILE_REFERENCE_') === 0) {
$this->logger->logger("Got {$response['error_message']}, refreshing file reference and repeating method call...");
$request['refresh_references'] = true;
@ -371,12 +373,12 @@ trait ResponseHandler
case 500:
case -500:
if ($response['error_message'] === 'MSG_WAIT_FAILED') {
$this->datacenter->sockets[$datacenter]->call_queue[$request['queue']] = [];
$this->call_queue[$request['queue']] = [];
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
return;
}
if (in_array($response['error_message'], ['MSGID_DECREASE_RETRY', 'RPC_CALL_FAIL', 'RPC_MCGET_FAIL', 'no workers running'])) {
if (\in_array($response['error_message'], ['MSGID_DECREASE_RETRY', 'RPC_CALL_FAIL', 'RPC_MCGET_FAIL', 'no workers running'])) {
Loop::delay(1 * 1000, [$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]);
return;
}
@ -387,7 +389,7 @@ trait ResponseHandler
return;
case 303:
$old_datacenter = $datacenter;
$this->datacenter->curdc = $datacenter = (int) preg_replace('/[^0-9]+/', '', $response['error_message']);
$this->datacenter->curdc = $datacenter = (int) \preg_replace('/[^0-9]+/', '', $response['error_message']);
if (isset($request['file']) && $request['file'] && isset($this->datacenter->sockets[$datacenter.'_media'])) {
\danog\MadelineProto\Logger::log('Using media DC');
@ -448,10 +450,10 @@ trait ResponseHandler
return;
}
$this->datacenter->sockets[$datacenter]->session_id = null;
$this->datacenter->sockets[$datacenter]->temp_auth_key = null;
$this->datacenter->sockets[$datacenter]->auth_key = null;
$this->datacenter->sockets[$datacenter]->authorized = false;
$this->session_id = null;
$this->temp_auth_key = null;
$this->auth_key = null;
$this->authorized = false;
$this->logger->logger('Auth key not registered, resetting temporary and permanent auth keys...', \danog\MadelineProto\Logger::ERROR);
@ -492,7 +494,7 @@ trait ResponseHandler
case 'AUTH_KEY_PERM_EMPTY':
$this->logger->logger('Temporary auth key not bound, resetting temporary auth key...', \danog\MadelineProto\Logger::ERROR);
$this->datacenter->sockets[$datacenter]->temp_auth_key = null;
$this->temp_auth_key = null;
$this->callFork((function () use ($request_id, $datacenter) {
yield $this->init_authorization_async();
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]);
@ -506,9 +508,9 @@ trait ResponseHandler
return;
case 420:
$seconds = preg_replace('/[^0-9]+/', '', $response['error_message']);
$seconds = \preg_replace('/[^0-9]+/', '', $response['error_message']);
$limit = isset($request['FloodWaitLimit']) ? $request['FloodWaitLimit'] : $this->settings['flood_timeout']['wait_if_lt'];
if (is_numeric($seconds) && $seconds < $limit) {
if (\is_numeric($seconds) && $seconds < $limit) {
//$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$this->logger->logger('Flood, waiting '.$seconds.' seconds before repeating async call...', \danog\MadelineProto\Logger::NOTICE);
@ -536,16 +538,16 @@ trait ResponseHandler
$this->logger->logger('Received bad_msg_notification: '.self::BAD_MSG_ERROR_CODES[$response['error_code']], \danog\MadelineProto\Logger::WARNING);
switch ($response['error_code']) {
case 48:
$this->datacenter->sockets[$datacenter]->temp_auth_key['server_salt'] = $response['new_server_salt'];
$this->temp_auth_key['server_salt'] = $response['new_server_salt'];
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]);
return;
case 16:
case 17:
$this->datacenter->sockets[$datacenter]->time_delta = (int) (new \phpseclib\Math\BigInteger(strrev($response_id), 256))->bitwise_rightShift(32)->subtract(new \phpseclib\Math\BigInteger(time()))->toString();
$this->logger->logger('Set time delta to '.$this->datacenter->sockets[$datacenter]->time_delta, \danog\MadelineProto\Logger::WARNING);
$this->time_delta = (int) (new \phpseclib\Math\BigInteger(\strrev($response_id), 256))->bitwise_rightShift(32)->subtract(new \phpseclib\Math\BigInteger(\time()))->toString();
$this->logger->logger('Set time delta to '.$this->time_delta, \danog\MadelineProto\Logger::WARNING);
$this->reset_session();
$this->datacenter->sockets[$datacenter]->temp_auth_key = null;
$this->temp_auth_key = null;
$this->callFork((function () use ($datacenter, $request_id) {
yield $this->init_authorization_async();
$this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]);
@ -560,8 +562,8 @@ trait ResponseHandler
}
}
if (isset($request['method']) && $request['method'] && $request['_'] !== 'auth.bindTempAuthKey' && $this->datacenter->sockets[$datacenter]->temp_auth_key !== null && (!isset($this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited']) || $this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited'] === false)) {
$this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited'] = true;
if (isset($request['method']) && $request['method'] && $request['_'] !== 'auth.bindTempAuthKey' && $this->temp_auth_key !== null && (!isset($this->temp_auth_key['connection_inited']) || $this->temp_auth_key['connection_inited'] === false)) {
$this->temp_auth_key['connection_inited'] = true;
}
if (!isset($request['promise'])) {
@ -570,28 +572,28 @@ trait ResponseHandler
return;
}
$botAPI = isset($request['botAPI']) && $request['botAPI'];
if (isset($response['_']) && strpos($datacenter, 'cdn') === false && $this->constructors->find_by_predicate($response['_'])['type'] === 'Updates') {
if (isset($response['_']) && \strpos($datacenter, 'cdn') === false && $this->constructors->find_by_predicate($response['_'])['type'] === 'Updates') {
$response['request'] = $request;
$this->callForkDefer($this->handle_updates_async($response));
}
unset($request);
$this->got_response_for_outgoing_message_id($request_id, $datacenter);
$r = isset($response['_']) ? $response['_'] : json_encode($response);
$r = isset($response['_']) ? $response['_'] : \json_encode($response);
$this->logger->logger("Defer sending $r to deferred");
$this->callFork((
function () use ($request_id, $response, $datacenter, $botAPI) {
$r = isset($response['_']) ? $response['_'] : json_encode($response);
$r = isset($response['_']) ? $response['_'] : \json_encode($response);
$this->logger->logger("Deferred: sent $r to deferred");
if ($botAPI) {
$response = yield $this->MTProto_to_botAPI_async($response);
}
if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise'])) { // This should not happen but happens, should debug
$promise = $this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise'];
unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']);
if (isset($this->outgoing_messages[$request_id]['promise'])) { // This should not happen but happens, should debug
$promise = $this->outgoing_messages[$request_id]['promise'];
unset($this->outgoing_messages[$request_id]['promise']);
try {
$promise->resolve($response);
} catch (\Error $e) {
if (strpos($e->getMessage(), "Promise has already been resolved") !== 0) {
if (\strpos($e->getMessage(), "Promise has already been resolved") !== 0) {
throw $e;
}
$this->logger->logger("Got promise already resolved error", \danog\MadelineProto\Logger::FATAL_ERROR);
@ -644,6 +646,7 @@ trait ResponseHandler
$updates['user_id'] = (yield $this->get_info_async($updates['request']['body']['peer']))['bot_api_id'];
$updates['message'] = $updates['request']['body']['message'];
unset($updates['request']);
// no break
case 'updateShortMessage':
case 'updateShortChatMessage':
$from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']);
@ -675,7 +678,7 @@ trait ResponseHandler
$this->updaters[false]->resume();
break;
default:
throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.var_export($updates, true));
throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.\var_export($updates, true));
break;
}
}

@ -37,9 +37,4 @@ trait SaltHandler
$this->temp_auth_key['salts'][$salt] = ['valid_since' => $valid_since, 'valid_until' => $valid_until];
}
}
public function handle_future_salts($salt)
{
yield $this->method_call_async_read('messages.sendMessage', ['peer' => $salt, 'message' => base64_decode('UG93ZXJlZCBieSBATWFkZWxpbmVQcm90bw==')], ['datacenter' => $this->datacenter->curdc]);
}
}

@ -18,42 +18,30 @@
namespace danog\MadelineProto\Stream\MTProtoTools;
use danog\MadelineProto\Logger;
/**
* Manages MTProto session-specific data
*/
class Session
abstract class Session
{
use AckHandler;
use MsgIdHandler;
use ResponseHandler;
use SaltHandler;
use SeqNoHandler;
public $incoming_messages = [];
public $outgoing_messages = [];
public $new_incoming = [];
public $new_outgoing = [];
public $http_req_count = 0;
public $http_res_count = 0;
public $last_http_wait = 0;
private $last_chunk = 0;
public $pending_outgoing = [];
public $pending_outgoing_key = 0;
public $time_delta = 0;
public $call_queue = [];
public $ack_queue = [];
public function haveRead()
{
$this->last_chunk = microtime(true);
}
/**
* Get the receive date of the latest chunk of data from the socket
*
* @return void
*/
public function getLastChunk()
{
return $this->last_chunk;
}
}

@ -266,7 +266,7 @@ trait Files
$part_num++;
$promises[] = $read_deferred->promise();
if (!($part_num % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps (run the code in this if every second)
if (!($part_num % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps (run the code in this every second)
$result = yield $this->all($promises);
foreach ($result as $kkey => $result) {
if (!$result) {

@ -1,35 +0,0 @@
<?php
/**
* SeqNoHandler module.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2019 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\MTProtoTools;
use danog\MadelineProto\MTProto;
/**
* Manages sequence number.
*/
trait SeqNoHandler
{
public function content_related($method)
{
$method = is_array($method) && isset($method['_']) ? $method['_'] : $method;
return is_string($method) ? !in_array($method, MTProto::NOT_CONTENT_RELATED) : true;
}
}

@ -128,7 +128,7 @@ class RPCErrorException extends \Exception
}
}
if (!self::$rollbar || !class_exists('\\Rollbar\\Rollbar')) {
if (!self::$rollbar || !class_exists(\Rollbar\Rollbar::class)) {
return;
}
if (in_array($this->rpc, ['CHANNEL_PRIVATE', -404, -429, 'USERNAME_NOT_OCCUPIED', 'ACCESS_TOKEN_INVALID', 'AUTH_KEY_UNREGISTERED', 'SESSION_PASSWORD_NEEDED', 'PHONE_NUMBER_UNOCCUPIED', 'PEER_ID_INVALID', 'CHAT_ID_INVALID', 'USERNAME_INVALID', 'CHAT_WRITE_FORBIDDEN', 'CHAT_ADMIN_REQUIRED', 'PEER_FLOOD'])) {

@ -47,6 +47,12 @@ class ConnectionContext
* @var bool
*/
private $test = false;
/**
* Whether to use media servers.
*
* @var bool
*/
private $media = false;
/**
* The connection URI.
*
@ -96,6 +102,13 @@ class ConnectionContext
*/
private $key = 0;
/**
* Read callback
*
* @var callable
*/
private $readCallback;
/**
* Set the socket context.
*
@ -187,9 +200,9 @@ class ConnectionContext
return clone $this;
}
/**
* Set the secure boolean.
* Set the test boolean.
*
* @param bool $secure
* @param bool $test
*
* @return self
*/
@ -201,7 +214,7 @@ class ConnectionContext
}
/**
* Whether to use TLS with socket connections.
* Whether this is a test connection
*
* @return bool
*/
@ -209,6 +222,15 @@ class ConnectionContext
{
return $this->test;
}
/**
* Whether this is a media connection
*
* @return bool
*/
public function isMedia(): bool
{
return $this->media;
}
/**
* Whether this connection context will only be used by the DNS client
@ -269,6 +291,7 @@ class ConnectionContext
throw new Exception("Invalid DC id provided: $dc");
}
$this->dc = $dc;
$this->media = strpos($dc, '_media') !== false;
return $this;
}
@ -294,7 +317,7 @@ class ConnectionContext
if ($this->test) {
$dc += 10000;
}
if (strpos($this->dc, '_media')) {
if ($this->media) {
$dc = -$dc;
}
@ -341,6 +364,38 @@ class ConnectionContext
return $this;
}
/**
* Set read callback, called every time the socket reads at least a byte
*
* @param callback $callable Read callback
*
* @return void
*/
public function setReadCallback($callable)
{
$this->readCallback = $callable;
}
/**
* Check if a read callback is present
*
* @return boolean
*/
public function hasReadCallback(): bool
{
return $this->readCallback !== null;
}
/**
* Get read callback
*
* @return callable
*/
public function getReadCallback()
{
return $this->readCallback;
}
/**
* Get the current stream name from the stream chain.
*