From 3fe86e01621e5c95550bcbcce68e3d8150cb652d Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sat, 31 Aug 2019 22:43:58 +0200 Subject: [PATCH] Refactoring --- src/BigIntegor.php | 2 +- src/danog/MadelineProto/Connection.php | 236 +++++++++++++--- src/danog/MadelineProto/DataCenter.php | 20 +- .../MadelineProto/DataCenterConnection.php | 202 +++++++++++++ .../Loop/Connection/CheckLoop.php | 22 +- .../Loop/Connection/HttpWaitLoop.php | 26 +- .../Loop/Connection/ReadLoop.php | 23 +- .../Loop/Connection/WriteLoop.php | 27 +- src/danog/MadelineProto/Loop/Impl/Loop.php | 5 + src/danog/MadelineProto/MTProto.php | 9 +- .../AckHandler.php | 42 +-- .../MsgIdHandler.php | 0 .../ResponseHandler.php | 267 +++++++++--------- .../SaltHandler.php | 5 - .../SeqNoHandler.php | 0 .../Session.php | 28 +- .../MadelineProto/MTProtoTools/Files.php | 2 +- .../MTProtoTools/SeqNoHandler.php | 35 --- src/danog/MadelineProto/RPCErrorException.php | 2 +- .../Stream/ConnectionContext.php | 63 ++++- 20 files changed, 722 insertions(+), 294 deletions(-) create mode 100644 src/danog/MadelineProto/DataCenterConnection.php rename src/danog/MadelineProto/{MTProtoTools => MTProtoConnection}/AckHandler.php (50%) rename src/danog/MadelineProto/{Stream/MTProtoTools => MTProtoConnection}/MsgIdHandler.php (100%) rename src/danog/MadelineProto/{MTProtoTools => MTProtoConnection}/ResponseHandler.php (64%) rename src/danog/MadelineProto/{Stream/MTProtoTools => MTProtoConnection}/SaltHandler.php (85%) rename src/danog/MadelineProto/{Stream/MTProtoTools => MTProtoConnection}/SeqNoHandler.php (100%) rename src/danog/MadelineProto/{Stream/MTProtoTools => MTProtoConnection}/Session.php (76%) delete mode 100644 src/danog/MadelineProto/MTProtoTools/SeqNoHandler.php diff --git a/src/BigIntegor.php b/src/BigIntegor.php index 0dd042c0..8956725b 100644 --- a/src/BigIntegor.php +++ b/src/BigIntegor.php @@ -19,7 +19,7 @@ namespace phpseclib\Math; -if (PHP_MAJOR_VERSION < 7 && !(class_exists('\\Phar') && \Phar::running())) { +if (PHP_MAJOR_VERSION < 7 && !(class_exists(\Phar::class) && \Phar::running())) { throw new \Exception('MadelineProto requires php 7 to run natively, use phar.madelineproto.xyz to run on PHP 5.6'); } if (defined('HHVM_VERSION')) { diff --git a/src/danog/MadelineProto/Connection.php b/src/danog/MadelineProto/Connection.php index 1f141a17..ecfdc796 100644 --- a/src/danog/MadelineProto/Connection.php +++ b/src/danog/MadelineProto/Connection.php @@ -24,10 +24,8 @@ use danog\MadelineProto\Loop\Connection\CheckLoop; use danog\MadelineProto\Loop\Connection\HttpWaitLoop; use danog\MadelineProto\Loop\Connection\ReadLoop; use danog\MadelineProto\Loop\Connection\WriteLoop; -use danog\MadelineProto\MTProtoTools\Crypt; use danog\MadelineProto\Stream\ConnectionContext; -use danog\MadelineProto\Stream\MTProtoTools\MsgIdHandler; -use danog\MadelineProto\Stream\MTProtoTools\SeqNoHandler; +use danog\MadelineProto\Stream\MTProtoTools\Session; /** * Connection class. @@ -36,44 +34,176 @@ use danog\MadelineProto\Stream\MTProtoTools\SeqNoHandler; * * @author Daniil Gentili */ -class Connection +class Connection extends Session { - use Crypt; - use MsgIdHandler; - use SeqNoHandler; - use \danog\Serializable; use Tools; - const API_ENDPOINT = 0; - const VOIP_UDP_REFLECTOR_ENDPOINT = 1; - const VOIP_TCP_REFLECTOR_ENDPOINT = 2; - const VOIP_UDP_P2P_ENDPOINT = 3; - const VOIP_UDP_LAN_ENDPOINT = 4; - const PENDING_MAX = 2000000000; - public $stream; + /** + * The actual socket + * + * @var Stream + */ + private $stream; + /** + * Connection context + * + * @var Connection context + */ + private $ctx; - public $type = 0; - public $peer_tag; - - public $temp_auth_key; - public $auth_key; + /** + * HTTP request count + * + * @var integer + */ + private $httpReqCount = 0; + /** + * HTTP response count + * + * @var integer + */ + private $httpResCount = 0; + + /** + * Date of last chunk received + * + * @var integer + */ + private $lastChunk = 0; + + /** + * Logger instance. + * + * @var Logger + */ + protected $logger; + /** + * Main instance. + * + * @var MTProto + */ + protected $API; + + /** + * Whether the socket is reading data. + * + * @var boolean + */ + private $reading = false; + /** + * Whether the socket is writing data. + * + * @var boolean + */ + private $writing = false; + + /** + * Check if the socket is writing stuff. + * + * @return boolean + */ + public function isWriting(): bool + { + return $this->writing; + } + /** + * Check if the socket is reading stuff. + * + * @return boolean + */ + public function isReading(): bool + { + return $this->reading; + } + /** + * Set writing boolean + * + * @param boolean $writing + * + * @return void + */ + public function writing(bool $writing) + { + $this->writing = $writing; + } + /** + * Set reading boolean + * + * @param boolean $reading + * + * @return void + */ + public function reading(bool $reading) + { + $this->reading = $reading; + } + + /** + * Tell the class that we have read a chunk of data from the socket + * + * @return void + */ + public function haveRead() + { + $this->lastChunk = \microtime(true); + } + /** + * Get the receive date of the latest chunk of data from the socket. + * + * @return void + */ + public function getLastChunk() + { + return $this->lastChunk; + } + + /** + * Indicate a received HTTP response + * + * @return void + */ + public function httpReceived() + { + $this->httpResCount++; + } + /** + * Count received HTTP responses + * + * @return integer + */ + public function countHttpReceived(): int + { + return $this->httpResCount; + } + /** + * Indicate a sent HTTP request + * + * @return void + */ + public function httpSent() + { + $this->httpReqCount++; + } + /** + * Count sent HTTP requests + * + * @return integer + */ + public function countHttpSent(): int + { + return $this->httpReqCount; + } - public $pending_outgoing = []; - public $pending_outgoing_key = 0; - - public $authorized = false; - - public $datacenter; - public $API; - - public $ctx; - - - public function getCtx() + /** + * Get connection context + * + * @return ConnectionContext + */ + public function getCtx(): ConnectionContext { return $this->ctx; } @@ -93,6 +223,8 @@ class Connection { $this->API->logger->logger("Trying connection via $ctx", \danog\MadelineProto\Logger::WARNING); + $ctx->setReadCallback([$this, 'haveRead']); + $this->ctx = $ctx->getCtx(); $this->datacenter = $ctx->getDc(); $this->stream = yield $ctx->getStream(); @@ -101,16 +233,16 @@ class Connection } if (!isset($this->writer)) { - $this->writer = new WriteLoop($this->API, $this->datacenter); + $this->writer = new WriteLoop($this); } if (!isset($this->reader)) { - $this->reader = new ReadLoop($this->API, $this->datacenter); + $this->reader = new ReadLoop($this); } if (!isset($this->checker)) { - $this->checker = new CheckLoop($this->API, $this->datacenter); + $this->checker = new CheckLoop($this); } if (!isset($this->waiter)) { - $this->waiter = new HttpWaitLoop($this->API, $this->datacenter); + $this->waiter = new HttpWaitLoop($this); } foreach ($this->new_outgoing as $message_id) { if ($this->outgoing_messages[$message_id]['unencrypted']) { @@ -118,12 +250,11 @@ class Connection \Amp\Loop::defer(function () use ($promise) { $promise->fail(new Exception('Restart because we were reconnected')); }); - unset($this->new_outgoing[$message_id]); - unset($this->outgoing_messages[$message_id]); + unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]); } } - $this->http_req_count = 0; - $this->http_res_count = 0; + $this->httpReqCount = 0; + $this->httpResCount = 0; $this->writer->start(); $this->reader->start(); @@ -138,7 +269,7 @@ class Connection $deferred = new Deferred(); if (!isset($message['serialized_body'])) { - $body = is_object($message['body']) ? yield $message['body'] : $message['body']; + $body = \is_object($message['body']) ? yield $message['body'] : $message['body']; $refresh_next = isset($message['refresh_next']) && $message['refresh_next']; //$refresh_next = true; @@ -169,11 +300,28 @@ class Connection return $deferred->promise(); } - public function setExtra($extra) + /** + * Connect main instance. + * + * @param MTProto $extra + * + * @return void + */ + public function setExtra(MTProto $extra) { $this->API = $extra; + $this->logger = $extra->logger; } + /** + * Get main instance + * + * @return MTProto + */ + public function getExtra(): MTProto + { + return $this->API; + } public function disconnect() { $this->API->logger->logger("Disconnecting from DC {$this->datacenter}"); @@ -220,7 +368,7 @@ class Connection foreach ($this->new_outgoing as $message_id) { if (isset($this->outgoing_messages[$message_id]['sent']) - && $this->outgoing_messages[$message_id]['sent'] + $timeout < time() + && $this->outgoing_messages[$message_id]['sent'] + $timeout < \time() && ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted'] && $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req' ) { @@ -247,7 +395,7 @@ class Connection $result = []; foreach ($this->new_outgoing as $message_id) { if (isset($this->outgoing_messages[$message_id]['sent']) - && $this->outgoing_messages[$message_id]['sent'] + $timeout < time() + && $this->outgoing_messages[$message_id]['sent'] + $timeout < \time() && ($this->temp_auth_key === null) === $this->outgoing_messages[$message_id]['unencrypted'] && $this->outgoing_messages[$message_id]['_'] !== 'msgs_state_req' ) { diff --git a/src/danog/MadelineProto/DataCenter.php b/src/danog/MadelineProto/DataCenter.php index 723bd650..b19e47fb 100644 --- a/src/danog/MadelineProto/DataCenter.php +++ b/src/danog/MadelineProto/DataCenter.php @@ -80,6 +80,21 @@ class DataCenter return ['sockets', 'curdc', 'dclist', 'settings']; } + public function __wakeup() + { + foreach ($this->sockets as &$socket) { + if ($socket instanceof Connection) { + $new = new DataCenterConnection; + if ($socket->temp_auth_key) { + $new->setAuthKey($socket->temp_auth_key, true); + } + if ($socket->auth_key) { + $new->setAuthKey($socket->auth_key, false); + } + $new->authorized($socket->authorized); + } + } + } public function __magic_construct($API, $dclist, $settings, CookieJar $jar = null) { $this->API = $API; @@ -328,8 +343,7 @@ class DataCenter continue; // Could not connect to host, try next host in the list. } - if ($dc = $ctx->getDc()) { - $callback = [$this->sockets[$dc], 'haveRead']; + if ($ctx->hasReadCallback()) { $socket = new class($socket) extends ClientSocket { private $callback; @@ -350,7 +364,7 @@ class DataCenter return $promise; } }; - $socket->setReadCallback($callback); + $socket->setReadCallback($ctx->getReadCallback()); } else { $socket = new ClientSocket($socket); } diff --git a/src/danog/MadelineProto/DataCenterConnection.php b/src/danog/MadelineProto/DataCenterConnection.php new file mode 100644 index 00000000..cc11390c --- /dev/null +++ b/src/danog/MadelineProto/DataCenterConnection.php @@ -0,0 +1,202 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2019 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto; + +use danog\MadelineProto\Stream\ConnectionContext; + +class DataCenterConnection +{ + /** + * Temporary auth key. + * + * @var array + */ + private $tempAuthKey; + /** + * Permanent auth key. + * + * @var array + */ + private $authKey; + + /** + * Whether this auth key is authorized (as in logged in). + * + * @var boolean + */ + private $authorized = false; + + /** + * Connections open to a certain DC + * + * @var array + */ + private $connections = []; + + /** + * Main API instance + * + * @var \danog\MadelineProto\MTProto + */ + private $API; + + /** + * Connection context + * + * @var ConnectionContext + */ + private $ctx; + + /** + * DC ID + * + * @var string + */ + private $datacenter; + + /** + * Get auth key. + * + * @param boolean $temp Whether to fetch the temporary auth key + * + * @return array + */ + public function getAuthKey(bool $temp = true): array + { + return $this->{$temp ? 'tempAuthKey' : 'authKey'}; + } + /** + * Check if auth key is present. + * + * @param boolean $temp Whether to fetch the temporary auth key + * + * @return bool + */ + public function hasAuthKey(bool $temp = true): bool + { + return $this->{$temp ? 'tempAuthKey' : 'authKey'} !== null; + } + /** + * Set auth key. + * + * @param boolean $temp Whether to fetch the temporary auth key + * + * @return void + */ + public function setAuthKey(array $key, bool $temp = true) + { + $this->{$temp ? 'tempAuthKey' : 'authKey'} = $key; + } + + /** + * Check if we are logged in. + * + * @return boolean + */ + public function isAuthorized(): bool + { + return $this->authorized; + } + + /** + * Set the authorized boolean. + * + * @param boolean $authorized Whether we are authorized + * + * @return void + */ + public function authorized(bool $authorized) + { + $this->authorized = $authorized; + } + + /** + * Get connection context + * + * @return ConnectionContext + */ + public function getCtx(): ConnectionContext + { + return $this->ctx; + } + + /** + * Connect function. + * + * @param ConnectionContext $ctx Connection context + * + * @return \Generator + */ + public function connect(ConnectionContext $ctx): \Generator + { + $this->API->logger->logger("Trying connection via $ctx", \danog\MadelineProto\Logger::WARNING); + + $this->ctx = $ctx->getCtx(); + $this->datacenter = $ctx->getDc(); + $media = $ctx->isMedia(); + + $count = $media ? $this->API->settings['connection_settings']['media_socket_count'] : 1; + + $this->connections = []; + for ($x = 0; $x < $count; $x++) { + $this->connections[$x] = yield $ctx->getStream(); + $ctx = $this->ctx->getCtx(); + } + } + + public function sendMessage($message, $flush = true) + { + } + + public function setExtra(API $API) + { + $this->API = $API; + } + + public function disconnect() + { + $this->API->logger->logger("Disconnecting from DC {$this->datacenter}"); + foreach ($this->connections as $connection) { + $connection->disconnect(); + } + $this->connections = []; + } + + public function reconnect(): \Generator + { + $this->API->logger->logger("Reconnecting DC {$this->datacenter}"); + foreach ($this->connections as $connection) { + yield $connection->reconnect(); + } + $this->disconnect(); + yield $this->API->datacenter->dcConnectAsync($this->ctx->getDc()); + } + + /** + * Sleep function. + * + * @internal + * + * @return array + */ + public function __sleep() + { + return ['authKey', 'tempAuthKey', 'authorized']; + } +} diff --git a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php index de04b42f..f3a93bf7 100644 --- a/src/danog/MadelineProto/Loop/Connection/CheckLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/CheckLoop.php @@ -20,6 +20,7 @@ namespace danog\MadelineProto\Loop\Connection; use Amp\Deferred; use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; +use danog\MadelineProto\MTProto; /** * RPC call status check loop. @@ -28,14 +29,25 @@ use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; */ class CheckLoop extends ResumableSignalLoop { + /** + * Connection instance + * + * @var \danog\Madelineproto\Connection + */ protected $connection; + /** + * DC ID + * + * @var string + */ protected $datacenter; - public function __construct($API, $datacenter) + public function __construct(Connection $connection) { - $this->API = $API; - $this->datacenter = $datacenter; - $this->connection = $API->datacenter->sockets[$datacenter]; + $this->connection = $connection; + $this->API = $connection->getExtra(); + $ctx = $connection->getCtx(); + $this->datacenter = $ctx->getDc(); } public function loop() @@ -90,7 +102,7 @@ class CheckLoop extends ResumableSignalLoop case 2: case 3: if ($connection->outgoing_messages[$message_id]['_'] === 'msgs_state_req') { - $API->got_response_for_outgoing_message_id($message_id, $datacenter); + $connection->got_response_for_outgoing_message_id($message_id); break; } $API->logger->logger('Message '.$connection->outgoing_messages[$message_id]['_'].' with message ID '.($message_id).' not received by server, resending...', \danog\MadelineProto\Logger::ERROR); diff --git a/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php b/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php index 71f26ea3..b65eda42 100644 --- a/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/HttpWaitLoop.php @@ -18,6 +18,7 @@ namespace danog\MadelineProto\Loop\Connection; +use danog\MadelineProto\Connection; use danog\MadelineProto\Loop\Impl\ResumableSignalLoop; use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream; use danog\MadelineProto\Stream\MTProtoTransport\HttpStream; @@ -29,14 +30,25 @@ use danog\MadelineProto\Stream\MTProtoTransport\HttpStream; */ class HttpWaitLoop extends ResumableSignalLoop { + /** + * Connection instance + * + * @var \danog\Madelineproto\Connection + */ protected $connection; + /** + * DC ID + * + * @var string + */ protected $datacenter; - public function __construct($API, $datacenter) + public function __construct(Connection $connection) { - $this->API = $API; - $this->datacenter = $datacenter; - $this->connection = $API->datacenter->sockets[$datacenter]; + $this->connection = $connection; + $this->API = $connection->getExtra(); + $ctx = $connection->getCtx(); + $this->datacenter = $ctx->getDc(); } public function loop() @@ -62,11 +74,11 @@ class HttpWaitLoop extends ResumableSignalLoop return; } } - $API->logger->logger("DC $datacenter: request {$connection->http_req_count}, response {$connection->http_res_count}"); - if ($connection->http_req_count === $connection->http_res_count && (!empty($connection->pending_outgoing) || (!empty($connection->new_outgoing) && !$connection->hasPendingCalls()))) { + $API->logger->logger("DC $datacenter: request {$connection->countHttpSent()}, response {$connection->countHttpReceived()}"); + if ($connection->countHttpSent() === $connection->countHttpReceived() && (!empty($connection->pending_outgoing) || (!empty($connection->new_outgoing) && !$connection->hasPendingCalls()))) { yield $connection->sendMessage(['_' => 'http_wait', 'body' => ['max_wait' => 30000, 'wait_after' => 0, 'max_delay' => 0], 'content_related' => true, 'unencrypted' => false, 'method' => false]); } - $API->logger->logger("DC $datacenter: request {$connection->http_req_count}, response {$connection->http_res_count}"); + $API->logger->logger("DC $datacenter: request {$connection->countHttpSent()}, response {$connection->countHttpReceived()}"); } } diff --git a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php index a958e97d..6edafb26 100644 --- a/src/danog/MadelineProto/Loop/Connection/ReadLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/ReadLoop.php @@ -38,14 +38,25 @@ class ReadLoop extends SignalLoop use Tools; use Crypt; + /** + * Connection instance + * + * @var \danog\Madelineproto\Connection + */ protected $connection; + /** + * DC ID + * + * @var string + */ protected $datacenter; - public function __construct($API, $datacenter) + public function __construct(Connection $connection) { - $this->API = $API; - $this->datacenter = $datacenter; - $this->connection = $API->datacenter->sockets[$datacenter]; + $this->connection = $connection; + $this->API = $connection->getExtra(); + $ctx = $connection->getCtx(); + $this->datacenter = $ctx->getDc(); } public function loop() @@ -99,7 +110,7 @@ class ReadLoop extends SignalLoop return; } - $connection->http_res_count++; + $connection->httpReceived(); Loop::defer([$API, 'handle_messages'], $datacenter); @@ -214,7 +225,7 @@ class ReadLoop extends SignalLoop $connection->incoming_messages[$message_id]['content'] = $deserialized; $connection->incoming_messages[$message_id]['response'] = -1; $connection->new_incoming[$message_id] = $message_id; - $connection->last_http_wait = 0; + //$connection->last_http_wait = 0; $API->logger->logger('Received payload from DC '.$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE); diff --git a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php index b3951765..c4ae81bb 100644 --- a/src/danog/MadelineProto/Loop/Connection/WriteLoop.php +++ b/src/danog/MadelineProto/Loop/Connection/WriteLoop.php @@ -36,14 +36,25 @@ class WriteLoop extends ResumableSignalLoop use Crypt; use Tools; + /** + * Connection instance + * + * @var \danog\Madelineproto\Connection + */ protected $connection; + /** + * DC ID + * + * @var string + */ protected $datacenter; - public function __construct($API, $datacenter) + public function __construct(Connection $connection) { - $this->API = $API; - $this->datacenter = $datacenter; - $this->connection = $API->datacenter->sockets[$datacenter]; + $this->connection = $connection; + $this->API = $connection->getExtra(); + $ctx = $connection->getCtx(); + $this->datacenter = $ctx->getDc(); } public function loop(): \Generator @@ -110,7 +121,7 @@ class WriteLoop extends ResumableSignalLoop yield $buffer->bufferWrite("\0\0\0\0\0\0\0\0".$message_id.$this->pack_unsigned_int($length).$message['serialized_body'].$pad); //var_dump("plain ".bin2hex($message_id)); - $connection->http_req_count++; + $connection->httpSent(); $connection->outgoing_messages[$message_id] = $message; $connection->outgoing_messages[$message_id]['sent'] = time(); $connection->outgoing_messages[$message_id]['tries'] = 0; @@ -303,7 +314,7 @@ class WriteLoop extends ResumableSignalLoop $t = microtime(true); yield $buffer->bufferWrite($message); - $connection->http_req_count++; + $connection->httpSent(); $API->logger->logger("Sent encrypted payload to DC {$datacenter}", \danog\MadelineProto\Logger::ULTRA_VERBOSE); @@ -313,11 +324,11 @@ class WriteLoop extends ResumableSignalLoop $connection->ack_queue = []; } - if ($has_http_wait) { + /*if ($has_http_wait) { $connection->last_http_wait = $sent; } elseif (Magic::$altervista) { $connection->last_http_wait = PHP_INT_MAX; - } + }*/ foreach ($keys as $key => $message_id) { $connection->outgoing_messages[$message_id] = &$connection->pending_outgoing[$key]; diff --git a/src/danog/MadelineProto/Loop/Impl/Loop.php b/src/danog/MadelineProto/Loop/Impl/Loop.php index 31026535..0b67a1a8 100644 --- a/src/danog/MadelineProto/Loop/Impl/Loop.php +++ b/src/danog/MadelineProto/Loop/Impl/Loop.php @@ -35,6 +35,11 @@ abstract class Loop implements LoopInterface private $count = 0; + /** + * MTProto instance + * + * @var \danog\MadelineProto\MTProto + */ public $API; public function __construct($API) diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index 004721cf..7ecc3e31 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -72,7 +72,7 @@ class MTProto extends AsyncConstruct implements TLCallback /* const V = 71; */ - const V = 128; + const V = 129; const RELEASE = '4.0'; const NOT_LOGGED_IN = 0; const WAITING_CODE = 1; @@ -704,6 +704,7 @@ class MTProto extends AsyncConstruct implements TLCallback 'transport' => 'tcp', 'pfs' => extension_loaded('gmp'), ], + 'media_socket_count' => 5, 'default_dc' => 2, ], 'app_info' => [ // obtained in https://my.telegram.org @@ -1098,6 +1099,12 @@ class MTProto extends AsyncConstruct implements TLCallback yield $this->connect_to_all_dcs_async(); $this->datacenter->curdc = $curdc; } + public function content_related($method) + { + $method = is_array($method) && isset($method['_']) ? $method['_'] : $method; + + return is_string($method) ? !in_array($method, MTProto::NOT_CONTENT_RELATED) : true; + } public function get_self_async() { diff --git a/src/danog/MadelineProto/MTProtoTools/AckHandler.php b/src/danog/MadelineProto/MTProtoConnection/AckHandler.php similarity index 50% rename from src/danog/MadelineProto/MTProtoTools/AckHandler.php rename to src/danog/MadelineProto/MTProtoConnection/AckHandler.php index c0041bb5..4d2b8cf2 100644 --- a/src/danog/MadelineProto/MTProtoTools/AckHandler.php +++ b/src/danog/MadelineProto/MTProtoConnection/AckHandler.php @@ -17,64 +17,64 @@ * @link https://docs.madelineproto.xyz MadelineProto documentation */ -namespace danog\MadelineProto\MTProtoTools; +namespace danog\MadelineProto\Stream\MTProtoTools; /** * Manages acknowledgement of messages. */ trait AckHandler { - public function ack_outgoing_message_id($message_id, $datacenter) + public function ack_outgoing_message_id($message_id) { // The server acknowledges that it received my message - if (!isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id])) { + if (!isset($this->outgoing_messages[$message_id])) { $this->logger->logger("WARNING: Couldn't find message id ".$message_id.' in the array of outgoing messages. Maybe try to increase its size?', \danog\MadelineProto\Logger::WARNING); return false; } - //$this->logger->logger("Ack-ed ".$this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['_']." with message ID $message_id on DC $datacenter"); + //$this->logger->logger("Ack-ed ".$this->outgoing_messages[$message_id]['_']." with message ID $message_id on DC $datacenter"); /* - if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['body'])) { - unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['body']); + if (isset($this->outgoing_messages[$message_id]['body'])) { + unset($this->outgoing_messages[$message_id]['body']); } - if (isset($this->datacenter->sockets[$datacenter]->new_outgoing[$message_id])) { - unset($this->datacenter->sockets[$datacenter]->new_outgoing[$message_id]); + if (isset($this->new_outgoing[$message_id])) { + unset($this->new_outgoing[$message_id]); }*/ return true; } - public function got_response_for_outgoing_message_id($message_id, $datacenter) + public function got_response_for_outgoing_message_id($message_id) { // The server acknowledges that it received my message - if (!isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id])) { + if (!isset($this->outgoing_messages[$message_id])) { $this->logger->logger("WARNING: Couldn't find message id ".$message_id.' in the array of outgoing messages. Maybe try to increase its size?', \danog\MadelineProto\Logger::WARNING); return false; } - if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['body'])) { - unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['body']); + if (isset($this->outgoing_messages[$message_id]['body'])) { + unset($this->outgoing_messages[$message_id]['body']); } - if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['serialized_body'])) { - unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id]['serialized_body']); + if (isset($this->outgoing_messages[$message_id]['serialized_body'])) { + unset($this->outgoing_messages[$message_id]['serialized_body']); } - if (isset($this->datacenter->sockets[$datacenter]->new_outgoing[$message_id])) { - unset($this->datacenter->sockets[$datacenter]->new_outgoing[$message_id]); + if (isset($this->new_outgoing[$message_id])) { + unset($this->new_outgoing[$message_id]); } return true; } - public function ack_incoming_message_id($message_id, $datacenter) + public function ack_incoming_message_id($message_id) { // I let the server know that I received its message - if (!isset($this->datacenter->sockets[$datacenter]->incoming_messages[$message_id])) { + if (!isset($this->incoming_messages[$message_id])) { $this->logger->logger("WARNING: Couldn't find message id ".$message_id.' in the array of incoming messages. Maybe try to increase its size?', \danog\MadelineProto\Logger::WARNING); } - /*if ($this->datacenter->sockets[$datacenter]->temp_auth_key['id'] === null || $this->datacenter->sockets[$datacenter]->temp_auth_key['id'] === "\0\0\0\0\0\0\0\0") { - // || (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack']) && $this->datacenter->sockets[$datacenter]->incoming_messages[$message_id]['ack'])) { + /*if ($this->temp_auth_key['id'] === null || $this->temp_auth_key['id'] === "\0\0\0\0\0\0\0\0") { + // || (isset($this->incoming_messages[$message_id]['ack']) && $this->incoming_messages[$message_id]['ack'])) { return; }*/ - $this->datacenter->sockets[$datacenter]->ack_queue[$message_id] = $message_id; + $this->ack_queue[$message_id] = $message_id; return true; } diff --git a/src/danog/MadelineProto/Stream/MTProtoTools/MsgIdHandler.php b/src/danog/MadelineProto/MTProtoConnection/MsgIdHandler.php similarity index 100% rename from src/danog/MadelineProto/Stream/MTProtoTools/MsgIdHandler.php rename to src/danog/MadelineProto/MTProtoConnection/MsgIdHandler.php diff --git a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php b/src/danog/MadelineProto/MTProtoConnection/ResponseHandler.php similarity index 64% rename from src/danog/MadelineProto/MTProtoTools/ResponseHandler.php rename to src/danog/MadelineProto/MTProtoConnection/ResponseHandler.php index 7b3057f5..d38c0ac6 100644 --- a/src/danog/MadelineProto/MTProtoTools/ResponseHandler.php +++ b/src/danog/MadelineProto/MTProtoConnection/ResponseHandler.php @@ -17,7 +17,7 @@ * @link https://docs.madelineproto.xyz MadelineProto documentation */ -namespace danog\MadelineProto\MTProtoTools; +namespace danog\MadelineProto\Stream\MTProtoTools; use Amp\Loop; @@ -26,18 +26,18 @@ use Amp\Loop; */ trait ResponseHandler { - public function send_msgs_state_info_async($req_msg_id, $msg_ids, $datacenter) + public function send_msgs_state_info_async($req_msg_id, $msg_ids) { - $this->logger->logger('Sending state info for '.count($msg_ids).' message IDs'); + $this->logger->logger('Sending state info for '.\count($msg_ids).' message IDs'); $info = ''; foreach ($msg_ids as $msg_id) { $cur_info = 0; - if (!isset($this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id])) { - $msg_id = new \phpseclib\Math\BigInteger(strrev($msg_id), 256); - if ((new \phpseclib\Math\BigInteger(time() + $this->datacenter->sockets[$datacenter]->time_delta + 30))->bitwise_leftShift(32)->compare($msg_id) < 0) { + if (!isset($this->incoming_messages[$msg_id])) { + $msg_id = new \phpseclib\Math\BigInteger(\strrev($msg_id), 256); + if ((new \phpseclib\Math\BigInteger(\time() + $this->time_delta + 30))->bitwise_leftShift(32)->compare($msg_id) < 0) { $this->logger->logger("Do not know anything about $msg_id and it is too small"); $cur_info |= 3; - } elseif ((new \phpseclib\Math\BigInteger(time() + $this->datacenter->sockets[$datacenter]->time_delta - 300))->bitwise_leftShift(32)->compare($msg_id) > 0) { + } elseif ((new \phpseclib\Math\BigInteger(\time() + $this->time_delta - 300))->bitwise_leftShift(32)->compare($msg_id) > 0) { $this->logger->logger("Do not know anything about $msg_id and it is too big"); $cur_info |= 1; } else { @@ -48,9 +48,9 @@ trait ResponseHandler $this->logger->logger("Know about $msg_id"); $cur_info |= 4; } - $info .= chr($cur_info); + $info .= \chr($cur_info); } - $this->datacenter->sockets[$datacenter]->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['datacenter' => $datacenter, 'postpone' => true])]['response'] = $req_msg_id; + $this->outgoing_messages[yield $this->object_call_async('msgs_state_info', ['req_msg_id' => $req_msg_id, 'info' => $info], ['datacenter' => $datacenter, 'postpone' => true])]['response'] = $req_msg_id; } public $n = 0; @@ -61,35 +61,35 @@ trait ResponseHandler $datacenter = $actual_datacenter; } $only_updates = true; - while ($this->datacenter->sockets[$datacenter]->new_incoming) { - reset($this->datacenter->sockets[$datacenter]->new_incoming); - $current_msg_id = key($this->datacenter->sockets[$datacenter]->new_incoming); - if (!isset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id])) { - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); + while ($this->new_incoming) { + \reset($this->new_incoming); + $current_msg_id = \key($this->new_incoming); + if (!isset($this->incoming_messages[$current_msg_id])) { + unset($this->new_incoming[$current_msg_id]); continue; } - $this->logger->logger((isset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['from_container']) ? 'Inside of container, received ' : 'Received ').$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['_'].' from DC '.$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE); + $this->logger->logger((isset($this->incoming_messages[$current_msg_id]['from_container']) ? 'Inside of container, received ' : 'Received ').$this->incoming_messages[$current_msg_id]['content']['_'].' from DC '.$datacenter, \danog\MadelineProto\Logger::ULTRA_VERBOSE); - switch ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['_']) { + switch ($this->incoming_messages[$current_msg_id]['content']['_']) { case 'msgs_ack': - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + unset($this->new_incoming[$current_msg_id]); + $this->check_in_seq_no($current_msg_id); $only_updates = false; - foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { + foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { $this->ack_outgoing_message_id($msg_id, $datacenter); // Acknowledge that the server received my message } - unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); + unset($this->incoming_messages[$current_msg_id]['content']); break; case 'rpc_result': - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); + unset($this->new_incoming[$current_msg_id]); $this->ack_incoming_message_id($current_msg_id, $datacenter); $only_updates = false; // Acknowledge that the server received my request - $req_msg_id = $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['req_msg_id']; - $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'] = $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['result']; - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + $req_msg_id = $this->incoming_messages[$current_msg_id]['content']['req_msg_id']; + $this->incoming_messages[$current_msg_id]['content'] = $this->incoming_messages[$current_msg_id]['content']['result']; + $this->check_in_seq_no($current_msg_id); $this->handle_response($req_msg_id, $current_msg_id, $datacenter); break; @@ -97,25 +97,27 @@ trait ResponseHandler case 'future_salts': case 'msgs_state_info': $msg_id_type = 'req_msg_id'; + // no break case 'bad_server_salt': case 'bad_msg_notification': $msg_id_type = isset($msg_id_type) ? $msg_id_type : 'bad_msg_id'; + // no break case 'pong': $msg_id_type = isset($msg_id_type) ? $msg_id_type : 'msg_id'; - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + unset($this->new_incoming[$current_msg_id]); + $this->check_in_seq_no($current_msg_id); $only_updates = false; - $this->handle_response($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'][$msg_id_type], $current_msg_id, $datacenter); + $this->handle_response($this->incoming_messages[$current_msg_id]['content'][$msg_id_type], $current_msg_id, $datacenter); unset($msg_id_type); break; case 'new_session_created': - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + unset($this->new_incoming[$current_msg_id]); + $this->check_in_seq_no($current_msg_id); $only_updates = false; - $this->datacenter->sockets[$datacenter]->temp_auth_key['server_salt'] = $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['server_salt']; + $this->temp_auth_key['server_salt'] = $this->incoming_messages[$current_msg_id]['content']['server_salt']; $this->ack_incoming_message_id($current_msg_id, $datacenter); // Acknowledge that I received the server's response @@ -123,69 +125,69 @@ trait ResponseHandler $this->updaters[false]->resumeDefer(); } - unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); + unset($this->incoming_messages[$current_msg_id]['content']); break; case 'msg_container': - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); + unset($this->new_incoming[$current_msg_id]); $only_updates = false; - foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['messages'] as $message) { - $this->datacenter->sockets[$datacenter]->check_message_id($message['msg_id'], ['outgoing' => false, 'container' => true]); - $this->datacenter->sockets[$datacenter]->incoming_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'content' => $message['body'], 'from_container' => true]; - $this->datacenter->sockets[$datacenter]->new_incoming[$message['msg_id']] = $message['msg_id']; + foreach ($this->incoming_messages[$current_msg_id]['content']['messages'] as $message) { + $this->check_message_id($message['msg_id'], ['outgoing' => false, 'container' => true]); + $this->incoming_messages[$message['msg_id']] = ['seq_no' => $message['seqno'], 'content' => $message['body'], 'from_container' => true]; + $this->new_incoming[$message['msg_id']] = $message['msg_id']; } - ksort($this->datacenter->sockets[$datacenter]->new_incoming); + \ksort($this->new_incoming); //$this->handle_messages($datacenter); - //$this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + //$this->check_in_seq_no($current_msg_id); - unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); + unset($this->incoming_messages[$current_msg_id]['content']); break; case 'msg_copy': - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + unset($this->new_incoming[$current_msg_id]); + $this->check_in_seq_no($current_msg_id); $only_updates = false; $this->ack_incoming_message_id($current_msg_id, $datacenter); // Acknowledge that I received the server's response - if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id']])) { - $this->ack_incoming_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id'], $datacenter); + if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id']])) { + $this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['orig_message']['msg_id'], $datacenter); // Acknowledge that I received the server's response } else { - $message = $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']; - $this->datacenter->sockets[$datacenter]->check_message_id($message['orig_message']['msg_id'], ['outgoing' => false, 'container' => true]); - $this->datacenter->sockets[$datacenter]->incoming_messages[$message['orig_message']['msg_id']] = ['content' => $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['orig_message']]; - $this->datacenter->sockets[$datacenter]->new_incoming[$message['orig_message']['msg_id']] = $message['orig_message']['msg_id']; + $message = $this->incoming_messages[$current_msg_id]['content']; + $this->check_message_id($message['orig_message']['msg_id'], ['outgoing' => false, 'container' => true]); + $this->incoming_messages[$message['orig_message']['msg_id']] = ['content' => $this->incoming_messages[$current_msg_id]['content']['orig_message']]; + $this->new_incoming[$message['orig_message']['msg_id']] = $message['orig_message']['msg_id']; } - unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); + unset($this->incoming_messages[$current_msg_id]['content']); break; case 'http_wait': - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + unset($this->new_incoming[$current_msg_id]); + $this->check_in_seq_no($current_msg_id); $only_updates = false; - $this->logger->logger($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'], \danog\MadelineProto\Logger::NOTICE); + $this->logger->logger($this->incoming_messages[$current_msg_id]['content'], \danog\MadelineProto\Logger::NOTICE); - unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); + unset($this->incoming_messages[$current_msg_id]['content']); break; case 'msgs_state_req': - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + $this->check_in_seq_no($current_msg_id); $only_updates = false; - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); + unset($this->new_incoming[$current_msg_id]); - $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter)); - unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); + $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter)); + unset($this->incoming_messages[$current_msg_id]['content']); break; case 'msgs_all_info': - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + $this->check_in_seq_no($current_msg_id); $only_updates = false; - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); + unset($this->new_incoming[$current_msg_id]); - foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $key => $msg_id) { - $info = ord($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['info'][$key]); - $msg_id = new \phpseclib\Math\BigInteger(strrev($msg_id), 256); + foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $key => $msg_id) { + $info = \ord($this->incoming_messages[$current_msg_id]['content']['info'][$key]); + $msg_id = new \phpseclib\Math\BigInteger(\strrev($msg_id), 256); $status = 'Status for message id '.$msg_id.': '; /*if ($info & 4) { *$this->got_response_for_outgoing_message_id($msg_id, $datacenter); @@ -200,83 +202,83 @@ trait ResponseHandler } break; case 'msg_detailed_info': - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); + $this->check_in_seq_no($current_msg_id); + unset($this->new_incoming[$current_msg_id]); $only_updates = false; - if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_id']])) { - if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) { - $this->handle_response($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_id'], $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter); + if (isset($this->outgoing_messages[$this->incoming_messages[$current_msg_id]['content']['msg_id']])) { + if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) { + $this->handle_response($this->incoming_messages[$current_msg_id]['content']['msg_id'], $this->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter); } else { - $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true])); + $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true])); } } break; case 'msg_new_detailed_info': - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + $this->check_in_seq_no($current_msg_id); $only_updates = false; - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); + unset($this->new_incoming[$current_msg_id]); - if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) { - $this->ack_incoming_message_id($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter); + if (isset($this->incoming_messages[$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']])) { + $this->ack_incoming_message_id($this->incoming_messages[$current_msg_id]['content']['answer_msg_id'], $datacenter); } else { - $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true])); + $this->callFork($this->object_call_async('msg_resend_req', ['msg_ids' => [$this->incoming_messages[$current_msg_id]['content']['answer_msg_id']]], ['datacenter' => $datacenter, 'postpone' => true])); } break; case 'msg_resend_req': - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + $this->check_in_seq_no($current_msg_id); $only_updates = false; - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); + unset($this->new_incoming[$current_msg_id]); $ok = true; - foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { - if (!isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$msg_id]) || isset($this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id])) { + foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { + if (!isset($this->outgoing_messages[$msg_id]) || isset($this->incoming_messages[$msg_id])) { $ok = false; } } if ($ok) { - foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { + foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { $this->method_recall('', ['message_id' => $msg_id, 'datacenter' => $datacenter, 'postpone' => true]); } } else { - $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter)); + $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter)); } break; case 'msg_resend_ans_req': - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + $this->check_in_seq_no($current_msg_id); $only_updates = false; - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); + unset($this->new_incoming[$current_msg_id]); - $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter)); - foreach ($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { - if (isset($this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']) && isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']])) { - $this->callFork($this->object_call_async($this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['_'], $this->datacenter->sockets[$datacenter]->outgoing_messages[$this->datacenter->sockets[$datacenter]->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter, 'postpone' => true])); + $this->callFork($this->send_msgs_state_info_async($current_msg_id, $this->incoming_messages[$current_msg_id]['content']['msg_ids'], $datacenter)); + foreach ($this->incoming_messages[$current_msg_id]['content']['msg_ids'] as $msg_id) { + if (isset($this->incoming_messages[$msg_id]['response']) && isset($this->outgoing_messages[$this->incoming_messages[$msg_id]['response']])) { + $this->callFork($this->object_call_async($this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['_'], $this->outgoing_messages[$this->incoming_messages[$msg_id]['response']]['body'], ['datacenter' => $datacenter, 'postpone' => true])); } } break; default: - $this->datacenter->sockets[$datacenter]->check_in_seq_no($current_msg_id); + $this->check_in_seq_no($current_msg_id); $this->ack_incoming_message_id($current_msg_id, $datacenter); // Acknowledge that I received the server's response - $response_type = $this->constructors->find_by_predicate($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']['_'])['type']; + $response_type = $this->constructors->find_by_predicate($this->incoming_messages[$current_msg_id]['content']['_'])['type']; switch ($response_type) { case 'Updates': - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); + unset($this->new_incoming[$current_msg_id]); - if (strpos($datacenter, 'cdn') === false) { - $this->callForkDefer($this->handle_updates_async($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'])); + if (\strpos($datacenter, 'cdn') === false) { + $this->callForkDefer($this->handle_updates_async($this->incoming_messages[$current_msg_id]['content'])); } - unset($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content']); + unset($this->incoming_messages[$current_msg_id]['content']); $only_updates = true && $only_updates; break; default: $only_updates = false; $this->logger->logger('Trying to assign a response of type '.$response_type.' to its request...', \danog\MadelineProto\Logger::VERBOSE); - foreach ($this->datacenter->sockets[$datacenter]->new_outgoing as $key => $expecting_msg_id) { - $expecting = $this->datacenter->sockets[$datacenter]->outgoing_messages[$expecting_msg_id]; + foreach ($this->new_outgoing as $key => $expecting_msg_id) { + $expecting = $this->outgoing_messages[$expecting_msg_id]; if (!isset($expecting['type'])) { continue; } @@ -284,21 +286,22 @@ trait ResponseHandler $this->logger->logger('Does the request of return type '.$expecting['type'].' match?', \danog\MadelineProto\Logger::VERBOSE); if ($response_type === $expecting['type']) { $this->logger->logger('Yes', \danog\MadelineProto\Logger::VERBOSE); - unset($this->datacenter->sockets[$datacenter]->new_incoming[$current_msg_id]); + unset($this->new_incoming[$current_msg_id]); $this->handle_response($expecting_msg_id, $current_msg_id, $datacenter); break 2; } $this->logger->logger('No', \danog\MadelineProto\Logger::VERBOSE); } - throw new \danog\MadelineProto\ResponseException('Dunno how to handle '.PHP_EOL.var_export($this->datacenter->sockets[$datacenter]->incoming_messages[$current_msg_id]['content'], true)); + throw new \danog\MadelineProto\ResponseException('Dunno how to handle '.PHP_EOL.\var_export($this->incoming_messages[$current_msg_id]['content'], true)); break; } break; } } - if ($this->datacenter->sockets[$datacenter]->pending_outgoing) - $this->datacenter->sockets[$datacenter]->writer->resume(); + if ($this->pending_outgoing) { + $this->writer->resume(); + } //$this->n--; @@ -307,7 +310,7 @@ trait ResponseHandler public function handle_reject($datacenter, &$request, $data) { - if (isset($request['promise']) && is_object($request['promise'])) { + if (isset($request['promise']) && \is_object($request['promise'])) { Loop::defer(function () use (&$request, $data) { if (isset($request['promise'])) { $promise = $request['promise']; @@ -315,12 +318,11 @@ trait ResponseHandler try { $promise->fail($data); } catch (\Error $e) { - if (strpos($e->getMessage(), "Promise has already been resolved") !== 0) { + if (\strpos($e->getMessage(), "Promise has already been resolved") !== 0) { throw $e; } $this->logger->logger("Got promise already resolved error", \danog\MadelineProto\Logger::FATAL_ERROR); } - } else { $this->logger->logger('Rejecting: already got response for '.(isset($request['_']) ? $request['_'] : '-')); $this->logger->logger("Rejecting: $data"); @@ -328,7 +330,7 @@ trait ResponseHandler }); } elseif (isset($request['container'])) { foreach ($request['container'] as $message_id) { - $this->handle_reject($datacenter, $this->datacenter->sockets[$datacenter]->outgoing_messages[$message_id], $data); + $this->handle_reject($datacenter, $this->outgoing_messages[$message_id], $data); } } else { $this->logger->logger('Rejecting: already got response for '.(isset($request['_']) ? $request['_'] : '-')); @@ -336,26 +338,26 @@ trait ResponseHandler } } - public function handle_response($request_id, $response_id, $datacenter) + public function handle_response($request_id, $response_id) { - $response = &$this->datacenter->sockets[$datacenter]->incoming_messages[$response_id]['content']; - unset($this->datacenter->sockets[$datacenter]->incoming_messages[$response_id]['content']); - $request = &$this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]; + $response = &$this->incoming_messages[$response_id]['content']; + unset($this->incoming_messages[$response_id]['content']); + $request = &$this->outgoing_messages[$request_id]; if (isset($response['_'])) { switch ($response['_']) { case 'rpc_error': - if (isset($request['method']) && $request['method'] && $request['_'] !== 'auth.bindTempAuthKey' && $this->datacenter->sockets[$datacenter]->temp_auth_key !== null && (!isset($this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited']) || $this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited'] === false)) { - $this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited'] = true; + if (isset($request['method']) && $request['method'] && $request['_'] !== 'auth.bindTempAuthKey' && $this->temp_auth_key !== null && (!isset($this->temp_auth_key['connection_inited']) || $this->temp_auth_key['connection_inited'] === false)) { + $this->temp_auth_key['connection_inited'] = true; } - if (in_array($response['error_message'], ['PERSISTENT_TIMESTAMP_EMPTY', 'PERSISTENT_TIMESTAMP_OUTDATED', 'PERSISTENT_TIMESTAMP_INVALID'])) { + if (\in_array($response['error_message'], ['PERSISTENT_TIMESTAMP_EMPTY', 'PERSISTENT_TIMESTAMP_OUTDATED', 'PERSISTENT_TIMESTAMP_INVALID'])) { $this->got_response_for_outgoing_message_id($request_id, $datacenter); $this->handle_reject($datacenter, $request, new \danog\MadelineProto\PTSException($response['error_message'])); return; } - if (strpos($response['error_message'], 'FILE_REFERENCE_') === 0) { + if (\strpos($response['error_message'], 'FILE_REFERENCE_') === 0) { $this->logger->logger("Got {$response['error_message']}, refreshing file reference and repeating method call..."); $request['refresh_references'] = true; @@ -371,12 +373,12 @@ trait ResponseHandler case 500: case -500: if ($response['error_message'] === 'MSG_WAIT_FAILED') { - $this->datacenter->sockets[$datacenter]->call_queue[$request['queue']] = []; + $this->call_queue[$request['queue']] = []; $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); return; } - if (in_array($response['error_message'], ['MSGID_DECREASE_RETRY', 'RPC_CALL_FAIL', 'RPC_MCGET_FAIL', 'no workers running'])) { + if (\in_array($response['error_message'], ['MSGID_DECREASE_RETRY', 'RPC_CALL_FAIL', 'RPC_MCGET_FAIL', 'no workers running'])) { Loop::delay(1 * 1000, [$this, 'method_recall'], ['message_id' => $request_id, 'datacenter' => $datacenter]); return; } @@ -387,7 +389,7 @@ trait ResponseHandler return; case 303: $old_datacenter = $datacenter; - $this->datacenter->curdc = $datacenter = (int) preg_replace('/[^0-9]+/', '', $response['error_message']); + $this->datacenter->curdc = $datacenter = (int) \preg_replace('/[^0-9]+/', '', $response['error_message']); if (isset($request['file']) && $request['file'] && isset($this->datacenter->sockets[$datacenter.'_media'])) { \danog\MadelineProto\Logger::log('Using media DC'); @@ -448,10 +450,10 @@ trait ResponseHandler return; } - $this->datacenter->sockets[$datacenter]->session_id = null; - $this->datacenter->sockets[$datacenter]->temp_auth_key = null; - $this->datacenter->sockets[$datacenter]->auth_key = null; - $this->datacenter->sockets[$datacenter]->authorized = false; + $this->session_id = null; + $this->temp_auth_key = null; + $this->auth_key = null; + $this->authorized = false; $this->logger->logger('Auth key not registered, resetting temporary and permanent auth keys...', \danog\MadelineProto\Logger::ERROR); @@ -492,7 +494,7 @@ trait ResponseHandler case 'AUTH_KEY_PERM_EMPTY': $this->logger->logger('Temporary auth key not bound, resetting temporary auth key...', \danog\MadelineProto\Logger::ERROR); - $this->datacenter->sockets[$datacenter]->temp_auth_key = null; + $this->temp_auth_key = null; $this->callFork((function () use ($request_id, $datacenter) { yield $this->init_authorization_async(); $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]); @@ -506,9 +508,9 @@ trait ResponseHandler return; case 420: - $seconds = preg_replace('/[^0-9]+/', '', $response['error_message']); + $seconds = \preg_replace('/[^0-9]+/', '', $response['error_message']); $limit = isset($request['FloodWaitLimit']) ? $request['FloodWaitLimit'] : $this->settings['flood_timeout']['wait_if_lt']; - if (is_numeric($seconds) && $seconds < $limit) { + if (\is_numeric($seconds) && $seconds < $limit) { //$this->got_response_for_outgoing_message_id($request_id, $datacenter); $this->logger->logger('Flood, waiting '.$seconds.' seconds before repeating async call...', \danog\MadelineProto\Logger::NOTICE); @@ -536,16 +538,16 @@ trait ResponseHandler $this->logger->logger('Received bad_msg_notification: '.self::BAD_MSG_ERROR_CODES[$response['error_code']], \danog\MadelineProto\Logger::WARNING); switch ($response['error_code']) { case 48: - $this->datacenter->sockets[$datacenter]->temp_auth_key['server_salt'] = $response['new_server_salt']; + $this->temp_auth_key['server_salt'] = $response['new_server_salt']; $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter, 'postpone' => true]); return; case 16: case 17: - $this->datacenter->sockets[$datacenter]->time_delta = (int) (new \phpseclib\Math\BigInteger(strrev($response_id), 256))->bitwise_rightShift(32)->subtract(new \phpseclib\Math\BigInteger(time()))->toString(); - $this->logger->logger('Set time delta to '.$this->datacenter->sockets[$datacenter]->time_delta, \danog\MadelineProto\Logger::WARNING); + $this->time_delta = (int) (new \phpseclib\Math\BigInteger(\strrev($response_id), 256))->bitwise_rightShift(32)->subtract(new \phpseclib\Math\BigInteger(\time()))->toString(); + $this->logger->logger('Set time delta to '.$this->time_delta, \danog\MadelineProto\Logger::WARNING); $this->reset_session(); - $this->datacenter->sockets[$datacenter]->temp_auth_key = null; + $this->temp_auth_key = null; $this->callFork((function () use ($datacenter, $request_id) { yield $this->init_authorization_async(); $this->method_recall('', ['message_id' => $request_id, 'datacenter' => $datacenter]); @@ -560,8 +562,8 @@ trait ResponseHandler } } - if (isset($request['method']) && $request['method'] && $request['_'] !== 'auth.bindTempAuthKey' && $this->datacenter->sockets[$datacenter]->temp_auth_key !== null && (!isset($this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited']) || $this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited'] === false)) { - $this->datacenter->sockets[$datacenter]->temp_auth_key['connection_inited'] = true; + if (isset($request['method']) && $request['method'] && $request['_'] !== 'auth.bindTempAuthKey' && $this->temp_auth_key !== null && (!isset($this->temp_auth_key['connection_inited']) || $this->temp_auth_key['connection_inited'] === false)) { + $this->temp_auth_key['connection_inited'] = true; } if (!isset($request['promise'])) { @@ -570,28 +572,28 @@ trait ResponseHandler return; } $botAPI = isset($request['botAPI']) && $request['botAPI']; - if (isset($response['_']) && strpos($datacenter, 'cdn') === false && $this->constructors->find_by_predicate($response['_'])['type'] === 'Updates') { + if (isset($response['_']) && \strpos($datacenter, 'cdn') === false && $this->constructors->find_by_predicate($response['_'])['type'] === 'Updates') { $response['request'] = $request; $this->callForkDefer($this->handle_updates_async($response)); } unset($request); $this->got_response_for_outgoing_message_id($request_id, $datacenter); - $r = isset($response['_']) ? $response['_'] : json_encode($response); + $r = isset($response['_']) ? $response['_'] : \json_encode($response); $this->logger->logger("Defer sending $r to deferred"); $this->callFork(( function () use ($request_id, $response, $datacenter, $botAPI) { - $r = isset($response['_']) ? $response['_'] : json_encode($response); + $r = isset($response['_']) ? $response['_'] : \json_encode($response); $this->logger->logger("Deferred: sent $r to deferred"); if ($botAPI) { $response = yield $this->MTProto_to_botAPI_async($response); } - if (isset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise'])) { // This should not happen but happens, should debug - $promise = $this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']; - unset($this->datacenter->sockets[$datacenter]->outgoing_messages[$request_id]['promise']); + if (isset($this->outgoing_messages[$request_id]['promise'])) { // This should not happen but happens, should debug + $promise = $this->outgoing_messages[$request_id]['promise']; + unset($this->outgoing_messages[$request_id]['promise']); try { $promise->resolve($response); } catch (\Error $e) { - if (strpos($e->getMessage(), "Promise has already been resolved") !== 0) { + if (\strpos($e->getMessage(), "Promise has already been resolved") !== 0) { throw $e; } $this->logger->logger("Got promise already resolved error", \danog\MadelineProto\Logger::FATAL_ERROR); @@ -644,6 +646,7 @@ trait ResponseHandler $updates['user_id'] = (yield $this->get_info_async($updates['request']['body']['peer']))['bot_api_id']; $updates['message'] = $updates['request']['body']['message']; unset($updates['request']); + // no break case 'updateShortMessage': case 'updateShortChatMessage': $from_id = isset($updates['from_id']) ? $updates['from_id'] : ($updates['out'] ? $this->authorization['user']['id'] : $updates['user_id']); @@ -675,7 +678,7 @@ trait ResponseHandler $this->updaters[false]->resume(); break; default: - throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.var_export($updates, true)); + throw new \danog\MadelineProto\ResponseException('Unrecognized update received: '.\var_export($updates, true)); break; } } diff --git a/src/danog/MadelineProto/Stream/MTProtoTools/SaltHandler.php b/src/danog/MadelineProto/MTProtoConnection/SaltHandler.php similarity index 85% rename from src/danog/MadelineProto/Stream/MTProtoTools/SaltHandler.php rename to src/danog/MadelineProto/MTProtoConnection/SaltHandler.php index 887c20ff..5d55886c 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTools/SaltHandler.php +++ b/src/danog/MadelineProto/MTProtoConnection/SaltHandler.php @@ -37,9 +37,4 @@ trait SaltHandler $this->temp_auth_key['salts'][$salt] = ['valid_since' => $valid_since, 'valid_until' => $valid_until]; } } - - public function handle_future_salts($salt) - { - yield $this->method_call_async_read('messages.sendMessage', ['peer' => $salt, 'message' => base64_decode('UG93ZXJlZCBieSBATWFkZWxpbmVQcm90bw==')], ['datacenter' => $this->datacenter->curdc]); - } } diff --git a/src/danog/MadelineProto/Stream/MTProtoTools/SeqNoHandler.php b/src/danog/MadelineProto/MTProtoConnection/SeqNoHandler.php similarity index 100% rename from src/danog/MadelineProto/Stream/MTProtoTools/SeqNoHandler.php rename to src/danog/MadelineProto/MTProtoConnection/SeqNoHandler.php diff --git a/src/danog/MadelineProto/Stream/MTProtoTools/Session.php b/src/danog/MadelineProto/MTProtoConnection/Session.php similarity index 76% rename from src/danog/MadelineProto/Stream/MTProtoTools/Session.php rename to src/danog/MadelineProto/MTProtoConnection/Session.php index 09308aae..274349b6 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTools/Session.php +++ b/src/danog/MadelineProto/MTProtoConnection/Session.php @@ -18,42 +18,30 @@ namespace danog\MadelineProto\Stream\MTProtoTools; +use danog\MadelineProto\Logger; + /** * Manages MTProto session-specific data */ -class Session +abstract class Session { + use AckHandler; use MsgIdHandler; + use ResponseHandler; use SaltHandler; use SeqNoHandler; + public $incoming_messages = []; public $outgoing_messages = []; public $new_incoming = []; public $new_outgoing = []; - public $http_req_count = 0; - public $http_res_count = 0; - - public $last_http_wait = 0; - private $last_chunk = 0; + public $pending_outgoing = []; + public $pending_outgoing_key = 0; public $time_delta = 0; public $call_queue = []; public $ack_queue = []; - - public function haveRead() - { - $this->last_chunk = microtime(true); - } - /** - * Get the receive date of the latest chunk of data from the socket - * - * @return void - */ - public function getLastChunk() - { - return $this->last_chunk; - } } \ No newline at end of file diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index 88fdb337..76ae3ded 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -266,7 +266,7 @@ trait Files $part_num++; $promises[] = $read_deferred->promise(); - if (!($part_num % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps (run the code in this if every second) + if (!($part_num % $parallel_chunks)) { // 20 mb at a time, for a typical bandwidth of 1gbps (run the code in this every second) $result = yield $this->all($promises); foreach ($result as $kkey => $result) { if (!$result) { diff --git a/src/danog/MadelineProto/MTProtoTools/SeqNoHandler.php b/src/danog/MadelineProto/MTProtoTools/SeqNoHandler.php deleted file mode 100644 index 743241e3..00000000 --- a/src/danog/MadelineProto/MTProtoTools/SeqNoHandler.php +++ /dev/null @@ -1,35 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2016-2019 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - * - * @link https://docs.madelineproto.xyz MadelineProto documentation - */ - -namespace danog\MadelineProto\MTProtoTools; - -use danog\MadelineProto\MTProto; - -/** - * Manages sequence number. - */ -trait SeqNoHandler -{ - public function content_related($method) - { - $method = is_array($method) && isset($method['_']) ? $method['_'] : $method; - - return is_string($method) ? !in_array($method, MTProto::NOT_CONTENT_RELATED) : true; - } -} diff --git a/src/danog/MadelineProto/RPCErrorException.php b/src/danog/MadelineProto/RPCErrorException.php index 03f63a2f..e9c40750 100644 --- a/src/danog/MadelineProto/RPCErrorException.php +++ b/src/danog/MadelineProto/RPCErrorException.php @@ -128,7 +128,7 @@ class RPCErrorException extends \Exception } } - if (!self::$rollbar || !class_exists('\\Rollbar\\Rollbar')) { + if (!self::$rollbar || !class_exists(\Rollbar\Rollbar::class)) { return; } if (in_array($this->rpc, ['CHANNEL_PRIVATE', -404, -429, 'USERNAME_NOT_OCCUPIED', 'ACCESS_TOKEN_INVALID', 'AUTH_KEY_UNREGISTERED', 'SESSION_PASSWORD_NEEDED', 'PHONE_NUMBER_UNOCCUPIED', 'PEER_ID_INVALID', 'CHAT_ID_INVALID', 'USERNAME_INVALID', 'CHAT_WRITE_FORBIDDEN', 'CHAT_ADMIN_REQUIRED', 'PEER_FLOOD'])) { diff --git a/src/danog/MadelineProto/Stream/ConnectionContext.php b/src/danog/MadelineProto/Stream/ConnectionContext.php index 17bc7def..1504639c 100644 --- a/src/danog/MadelineProto/Stream/ConnectionContext.php +++ b/src/danog/MadelineProto/Stream/ConnectionContext.php @@ -47,6 +47,12 @@ class ConnectionContext * @var bool */ private $test = false; + /** + * Whether to use media servers. + * + * @var bool + */ + private $media = false; /** * The connection URI. * @@ -96,6 +102,13 @@ class ConnectionContext */ private $key = 0; + /** + * Read callback + * + * @var callable + */ + private $readCallback; + /** * Set the socket context. * @@ -187,9 +200,9 @@ class ConnectionContext return clone $this; } /** - * Set the secure boolean. + * Set the test boolean. * - * @param bool $secure + * @param bool $test * * @return self */ @@ -201,7 +214,7 @@ class ConnectionContext } /** - * Whether to use TLS with socket connections. + * Whether this is a test connection * * @return bool */ @@ -209,6 +222,15 @@ class ConnectionContext { return $this->test; } + /** + * Whether this is a media connection + * + * @return bool + */ + public function isMedia(): bool + { + return $this->media; + } /** * Whether this connection context will only be used by the DNS client @@ -269,6 +291,7 @@ class ConnectionContext throw new Exception("Invalid DC id provided: $dc"); } $this->dc = $dc; + $this->media = strpos($dc, '_media') !== false; return $this; } @@ -294,7 +317,7 @@ class ConnectionContext if ($this->test) { $dc += 10000; } - if (strpos($this->dc, '_media')) { + if ($this->media) { $dc = -$dc; } @@ -341,6 +364,38 @@ class ConnectionContext return $this; } + /** + * Set read callback, called every time the socket reads at least a byte + * + * @param callback $callable Read callback + * + * @return void + */ + public function setReadCallback($callable) + { + $this->readCallback = $callable; + } + + /** + * Check if a read callback is present + * + * @return boolean + */ + public function hasReadCallback(): bool + { + return $this->readCallback !== null; + } + + /** + * Get read callback + * + * @return callable + */ + public function getReadCallback() + { + return $this->readCallback; + } + /** * Get the current stream name from the stream chain. *