. * * @author Daniil Gentili * @copyright 2016-2020 Daniil Gentili * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 * * @link https://docs.madelineproto.xyz MadelineProto documentation */ namespace danog\MadelineProto; use Amp\ByteStream\ClosedException; use Amp\Deferred; use Amp\Failure; use danog\MadelineProto\Loop\Connection\CheckLoop; use danog\MadelineProto\Loop\Connection\HttpWaitLoop; use danog\MadelineProto\Loop\Connection\PingLoop; use danog\MadelineProto\Loop\Connection\ReadLoop; use danog\MadelineProto\Loop\Connection\WriteLoop; use danog\MadelineProto\MTProto\OutgoingMessage; use danog\MadelineProto\MTProtoSession\Session; use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream; use danog\MadelineProto\Stream\MTProtoTransport\HttpStream; use danog\MadelineProto\Stream\StreamInterface; use danog\MadelineProto\Stream\Transport\WssStream; use danog\MadelineProto\Stream\Transport\WsStream; /** * Connection class. * * Manages connection to Telegram datacenters * * @internal * * @author Daniil Gentili */ class Connection { use Session; use \danog\Serializable; /** * Writer loop. * * @var \danog\MadelineProto\Loop\Connection\WriteLoop */ protected $writer; /** * Reader loop. * * @var \danog\MadelineProto\Loop\Connection\ReadLoop */ protected $reader; /** * Checker loop. * * @var \danog\MadelineProto\Loop\Connection\CheckLoop */ protected $checker; /** * Waiter loop. * * @var \danog\MadelineProto\Loop\Connection\HttpWaitLoop */ protected $waiter; /** * Ping loop. * * @var \danog\MadelineProto\Loop\Connection\PingLoop */ protected $pinger; /** * The actual socket. * * @var StreamInterface */ public $stream; /** * Connection context. * * @var ConnectionContext */ private $ctx; /** * HTTP request count. * * @var integer */ private $httpReqCount = 0; /** * HTTP response count. * * @var integer */ private $httpResCount = 0; /** * Date of last chunk received. * * @var float */ private float $lastChunk = 0; /** * Logger instance. * * @var Logger */ protected $logger; /** * Main instance. * * @var MTProto */ public $API; /** * Shared connection instance. * * @var DataCenterConnection */ protected $shared; /** * DC ID. * * @var string */ protected $datacenter; /** * Connection ID. * * @var int */ private $id = 0; /** * DC ID and connection ID concatenated. * * @var */ private $datacenterId = ''; /** * Whether this socket has to be reconnected. * * @var boolean */ private $needsReconnect = false; /** * Indicate if this socket needs to be reconnected. * * @param boolean $needsReconnect Whether the socket has to be reconnected * * @return void */ public function needReconnect(bool $needsReconnect) { $this->needsReconnect = $needsReconnect; } /** * Whether this sockets needs to be reconnected. * * @return boolean */ public function shouldReconnect(): bool { return $this->needsReconnect; } /** * Set writing boolean. * * @param boolean $writing * * @return void */ public function writing(bool $writing): void { $this->shared->writing($writing, $this->id); } /** * Set reading boolean. * * @param boolean $reading * * @return void */ public function reading(bool $reading): void { $this->shared->reading($reading, $this->id); } /** * Tell the class that we have read a chunk of data from the socket. * * @return void */ public function haveRead() { $this->lastChunk = \microtime(true); } /** * Get the receive date of the latest chunk of data from the socket. * * @return float */ public function getLastChunk(): float { return $this->lastChunk; } /** * Indicate a received HTTP response. * * @return void */ public function httpReceived() { $this->httpResCount++; } /** * Count received HTTP responses. * * @return integer */ public function countHttpReceived(): int { return $this->httpResCount; } /** * Indicate a sent HTTP request. * * @return void */ public function httpSent() { $this->httpReqCount++; } /** * Count sent HTTP requests. * * @return integer */ public function countHttpSent(): int { return $this->httpReqCount; } /** * Get connection ID. * * @return integer */ public function getID(): int { return $this->id; } /** * Get datacenter concatenated with connection ID. * * @return string */ public function getDatacenterID(): string { return $this->datacenterId; } /** * Get connection context. * * @return ConnectionContext */ public function getCtx(): ConnectionContext { return $this->ctx; } /** * Check if is an HTTP connection. * * @return boolean */ public function isHttp(): bool { return \in_array($this->ctx->getStreamName(), [HttpStream::class, HttpsStream::class]); } /** * Check if is a media connection. * * @return boolean */ public function isMedia(): bool { return $this->ctx->isMedia(); } /** * Check if is a CDN connection. * * @return boolean */ public function isCDN(): bool { return $this->ctx->isCDN(); } /** * Connects to a telegram DC using the specified protocol, proxy and connection parameters. * * @param ConnectionContext $ctx Connection context * * @return \Generator * * @psalm-return \Generator */ public function connect(ConnectionContext $ctx): \Generator { $this->ctx = $ctx->getCtx(); $this->datacenter = $ctx->getDc(); $this->datacenterId = $this->datacenter.'.'.$this->id; $this->API->logger->logger("Connecting to DC {$this->datacenterId}", \danog\MadelineProto\Logger::WARNING); $this->createSession(); $ctx->setReadCallback([$this, 'haveRead']); $this->stream = (yield from $ctx->getStream()); $this->API->logger->logger("Connected to DC {$this->datacenterId}!", \danog\MadelineProto\Logger::WARNING); if ($this->needsReconnect) { $this->needsReconnect = false; } $this->httpReqCount = 0; $this->httpResCount = 0; if (!isset($this->writer)) { $this->writer = new WriteLoop($this); } if (!isset($this->reader)) { $this->reader = new ReadLoop($this); } if (!isset($this->checker)) { $this->checker = new CheckLoop($this); } if (!isset($this->waiter)) { $this->waiter = new HttpWaitLoop($this); } if (!isset($this->pinger) && ($this->ctx->hasStreamName(WssStream::class) || $this->ctx->hasStreamName(WsStream::class))) { $this->pinger = new PingLoop($this); } foreach ($this->new_outgoing as $message_id => $message) { if ($message->isUnencrypted()) { \Amp\Loop::defer(function () use ($message) { if (!($message->getState() & OutgoingMessage::STATE_REPLIED)) { $message->reply(new Failure(new Exception('Restart because we were reconnected'))); } }); unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]); } } $this->writer->start(); $this->reader->start(); if (!$this->checker->start()) { $this->checker->resume(); } $this->waiter->start(); if ($this->pinger) { $this->pinger->start(); } } /** * Apply method abstractions. * * @param string $method Method name * @param array $arguments Arguments * * @return \Generator Whether we need to resolve a queue promise */ private function methodAbstractions(string &$method, array &$arguments): \Generator { if ($method === 'messages.importChatInvite' && isset($arguments['hash']) && \is_string($arguments['hash']) && \preg_match('@(?:t|telegram)\\.(?:me|dog)/(joinchat/)?([a-z0-9_-]*)@i', $arguments['hash'], $matches)) { if ($matches[1] === '') { $method = 'channels.joinChannel'; $arguments['channel'] = $matches[2]; } else { $arguments['hash'] = $matches[2]; } } elseif ($method === 'messages.checkChatInvite' && isset($arguments['hash']) && \is_string($arguments['hash']) && \preg_match('@(?:t|telegram)\\.(?:me|dog)/joinchat/([a-z0-9_-]*)@i', $arguments['hash'], $matches)) { $arguments['hash'] = $matches[1]; } elseif ($method === 'channels.joinChannel' && isset($arguments['channel']) && \is_string($arguments['channel']) && \preg_match('@(?:t|telegram)\\.(?:me|dog)/(joinchat/)?([a-z0-9_-]*)@i', $arguments['channel'], $matches)) { if ($matches[1] !== '') { $method = 'messages.importChatInvite'; $arguments['hash'] = $matches[2]; } } elseif ($method === 'messages.sendMessage' && isset($arguments['peer']['_']) && \in_array($arguments['peer']['_'], ['inputEncryptedChat', 'updateEncryption', 'updateEncryptedChatTyping', 'updateEncryptedMessagesRead', 'updateNewEncryptedMessage', 'encryptedMessage', 'encryptedMessageService'])) { $method = 'messages.sendEncrypted'; $arguments = ['peer' => $arguments['peer'], 'message' => $arguments]; if (!isset($arguments['message']['_'])) { $arguments['message']['_'] = 'decryptedMessage'; } if (!isset($arguments['message']['ttl'])) { $arguments['message']['ttl'] = 0; } if (isset($arguments['message']['reply_to_msg_id'])) { $arguments['message']['reply_to_random_id'] = $arguments['message']['reply_to_msg_id']; } } elseif ($method === 'messages.sendEncryptedFile' || $method === 'messages.uploadEncryptedFile') { if (isset($arguments['file'])) { if ((!\is_array($arguments['file']) || !(isset($arguments['file']['_']) && $this->API->getTL()->getConstructors()->findByPredicate($arguments['file']['_']) === 'InputEncryptedFile')) && $this->API->getSettings()->getFiles()->getAllowAutomaticUpload()) { $arguments['file'] = (yield from $this->API->uploadEncrypted($arguments['file'])); } if (isset($arguments['file']['key'])) { $arguments['message']['media']['key'] = $arguments['file']['key']; } if (isset($arguments['file']['iv'])) { $arguments['message']['media']['iv'] = $arguments['file']['iv']; } } $arguments['queuePromise'] = new Deferred; return $arguments['queuePromise']; } elseif (\in_array($method, ['messages.addChatUser', 'messages.deleteChatUser', 'messages.editChatAdmin', 'messages.editChatPhoto', 'messages.editChatTitle', 'messages.getFullChat', 'messages.exportChatInvite', 'messages.editChatAdmin', 'messages.migrateChat']) && isset($arguments['chat_id']) && (!\is_numeric($arguments['chat_id']) || $arguments['chat_id'] < 0)) { $res = (yield from $this->API->getInfo($arguments['chat_id'])); if ($res['type'] !== 'chat') { throw new \danog\MadelineProto\Exception('chat_id is not a chat id (only normal groups allowed, not supergroups)!'); } $arguments['chat_id'] = $res['chat_id']; } elseif ($method === 'photos.updateProfilePhoto') { if (isset($arguments['id'])) { if (!\is_array($arguments['id'])) { $method = 'photos.uploadProfilePhoto'; $arguments['file'] = $arguments['id']; } } elseif (isset($arguments['file'])) { $method = 'photos.uploadProfilePhoto'; } } elseif ($method === 'photos.uploadProfilePhoto') { if (isset($arguments['file'])) { if (\is_array($arguments['file']) && !\in_array($arguments['file']['_'], ['inputFile', 'inputFileBig'])) { $method = 'photos.uploadProfilePhoto'; $arguments['id'] = $arguments['file']; } } elseif (isset($arguments['id'])) { $method = 'photos.updateProfilePhoto'; } } elseif ($method === 'messages.uploadMedia') { if (!isset($arguments['peer']) && !$this->API->getSelf()['bot']) { $arguments['peer'] = 'me'; } } if ($method === 'messages.sendEncrypted' || $method === 'messages.sendEncryptedService') { $arguments['queuePromise'] = new Deferred; return $arguments['queuePromise']; } return null; } /** * Send an MTProto message. * * @param OutgoingMessage $message The message to send * @param boolean $flush Whether to flush the message right away * * @return \Generator */ public function sendMessage(OutgoingMessage $message, bool $flush = true): \Generator { $message->trySend(); $promise = $message->getSendPromise(); if (!$message->hasSerializedBody() || $message->shouldRefreshReferences()) { $body = yield from $message->getBody(); if ($message->shouldRefreshReferences()) { $this->API->referenceDatabase->refreshNext(true); } if ($message->isMethod()) { $method = $message->getConstructor(); $queuePromise = yield from $this->methodAbstractions($method, $body); $body = yield from $this->API->getTL()->serializeMethod($method, $body); } else { $body['_'] = $message->getConstructor(); $body = yield from $this->API->getTL()->serializeObject(['type' => ''], $body, $message->getConstructor()); } if ($message->shouldRefreshReferences()) { $this->API->referenceDatabase->refreshNext(false); } $message->setSerializedBody($body); unset($body); } $this->pendingOutgoing[$this->pendingOutgoingKey++] = $message; if (isset($queuePromise)) { $queuePromise->resolve(); } if ($flush && isset($this->writer)) { $this->writer->resume(); } return yield $promise; } /** * Flush pending packets. * * @return void */ public function flush() { if (isset($this->writer)) { $this->writer->resume(); } } /** * Resume HttpWaiter. * * @return void */ public function pingHttpWaiter() { if (isset($this->waiter)) { $this->waiter->resume(); } if (isset($this->pinger)) { $this->pinger->resume(); } } /** * Connect main instance. * * @param DataCenterConnection $extra Shared instance * @param int $id Connection ID * * @return void */ public function setExtra($extra, int $id) { $this->shared = $extra; $this->id = $id; $this->API = $extra->getExtra(); $this->logger = $this->API->logger; } /** * Get main instance. * * @return MTProto */ public function getExtra(): MTProto { return $this->API; } /** * Get shared connection instance. * * @return DataCenterConnection */ public function getShared(): DataCenterConnection { return $this->shared; } /** * Disconnect from DC. * * @param bool $temporary Whether the disconnection is temporary, triggered by the reconnect method * * @return void */ public function disconnect(bool $temporary = false) { $this->API->logger->logger("Disconnecting from DC {$this->datacenterId}"); $this->needsReconnect = true; foreach (['reader', 'writer', 'checker', 'waiter', 'updater', 'pinger'] as $loop) { if (isset($this->{$loop}) && $this->{$loop}) { $this->{$loop}->signal($loop === 'reader' ? new NothingInTheSocketException() : true); } } if ($this->stream) { try { $this->stream->disconnect(); } catch (ClosedException $e) { $this->API->logger->logger($e); } } if (!$temporary) { $this->shared->signalDisconnect($this->id); } $this->API->logger->logger("Disconnected from DC {$this->datacenterId}"); } /** * Reconnect to DC. * * @return \Generator */ public function reconnect(): \Generator { $this->API->logger->logger("Reconnecting DC {$this->datacenterId}"); $this->disconnect(true); yield from $this->API->datacenter->dcConnect($this->ctx->getDc(), $this->id); } /** * Get name. * * @return string */ public function getName(): string { return __CLASS__; } }