diff --git a/src/danog/MadelineProto/ContextConnector.php b/src/danog/MadelineProto/ContextConnector.php new file mode 100644 index 00000000..f701ba0e --- /dev/null +++ b/src/danog/MadelineProto/ContextConnector.php @@ -0,0 +1,170 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2019 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto; + +use Amp\Socket\Connector; +use danog\MadelineProto\Stream\ConnectionContext; + +class ContextConnector implements Connector +{ + /** + * Datacenter instance + * + * @property DataCenter $dataCenter + */ + private $dataCenter; + /** + * Connection context + * + * @var ConnectionContext + */ + private $ctx; + public function __construct(DataCenter $dataCenter, ConnectionContext $ctx) + { + $this->dataCenter = $dataCenter; + $this->ctx = $ctx; + } + + public function connect(string $uri, ?ConnectContext $socketContext = null, ?CancellationToken $token = null): Promise + { + return Tools::call(function () use ($uri, $socketContext, $token) { + $socketContext = $socketContext ?? new ConnectContext; + $token = $token ?? new NullCancellationToken + + $attempt = 0; + $uris = []; + $failures = []; + [$scheme, $host, $port] = parseUri($uri); + if ($host[0] === '[') { + $host = \substr($host, 1, -1); + } + if ($port === 0 || @\inet_pton($host)) { + // Host is already an IP address or file path. + $uris = [$uri]; + } else { + // Host is not an IP address, so resolve the domain name. + // When we're connecting to a host, we may need to resolve the domain name, first. + // The resolution is usually done using DNS over HTTPS. + // + // The DNS over HTTPS resolver needs to resolve the domain name of the DOH server: + // this is handled internally by the DNS over HTTPS client, + // by redirecting the resolution request to the plain DNS client. + // + // However, if the DoH connection is proxied with a proxy that has a domain name itself, + // we cannot resolve it with the DoH resolver, since this will cause an infinite loop + // + // resolve host.com => (DoH resolver) => resolve dohserver.com => (simple resolver) => OK + // + // |> resolve dohserver.com => (simple resolver) => OK + // resolve host.com => (DoH resolver) =| + // |> resolve proxy.com => (non-proxied resolver) => OK + // + // + // This means that we must detect if the domain name we're trying to resolve is a proxy domain name. + // + // Here, we simply check if the connection URI has changed since we first set it: + // this would indicate that a proxy class has changed the connection URI to the proxy URI. + // + if ($this->ctx->isDns()) { + $records = yield $this->dataCenter->getNonProxiedDNSClient()->resolve($host, $socketContext->getDnsTypeRestriction()); + } else { + $records = yield $this->dataCenter->getDNSClient()->resolve($host, $socketContext->getDnsTypeRestriction()); + } + \usort($records, function (Record $a, Record $b) { + return $a->getType() - $b->getType(); + }); + if ($this->ctx->getIpv6()) { + $records = \array_reverse($records); + } + + foreach ($records as $record) { + /** @var Record $record */ + if ($record->getType() === Record::AAAA) { + $uris[] = \sprintf("%s://[%s]:%d", $scheme, $record->getValue(), $port); + } else { + $uris[] = \sprintf("%s://%s:%d", $scheme, $record->getValue(), $port); + } + } + } + + $flags = \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_ASYNC_CONNECT; + $timeout = $socketContext->getConnectTimeout(); + foreach ($uris as $builtUri) { + try { + $streamContext = \stream_context_create($socketContext->withoutTlsContext()->toStreamContextArray()); + if (!$socket = @\stream_socket_client($builtUri, $errno, $errstr, null, $flags, $streamContext)) { + throw new ConnectException(\sprintf( + 'Connection to %s failed: [Error #%d] %s%s', + $uri, + $errno, + $errstr, + $failures ? '; previous attempts: ' . \implode($failures) : '' + ), $errno); + } + \stream_set_blocking($socket, false); + $deferred = new Deferred; + $watcher = Loop::onWritable($socket, [$deferred, 'resolve']); + $id = $token->subscribe([$deferred, 'fail']); + try { + yield Promise\timeout($deferred->promise(), $timeout); + } catch (TimeoutException $e) { + throw new ConnectException(\sprintf( + 'Connecting to %s failed: timeout exceeded (%d ms)%s', + $uri, + $timeout, + $failures ? '; previous attempts: ' . \implode($failures) : '' + ), 110); // See ETIMEDOUT in http://www.virtsync.com/c-error-codes-include-errno + } finally { + Loop::cancel($watcher); + $token->unsubscribe($id); + } + // The following hack looks like the only way to detect connection refused errors with PHP's stream sockets. + if (\stream_socket_get_name($socket, true) === false) { + \fclose($socket); + throw new ConnectException(\sprintf( + 'Connection to %s refused%s', + $uri, + $failures ? '; previous attempts: ' . \implode($failures) : '' + ), 111); // See ECONNREFUSED in http://www.virtsync.com/c-error-codes-include-errno + } + } catch (ConnectException $e) { + // Includes only error codes used in this file, as error codes on other OS families might be different. + // In fact, this might show a confusing error message on OS families that return 110 or 111 by itself. + $knownReasons = [ + 110 => 'connection timeout', + 111 => 'connection refused', + ]; + $code = $e->getCode(); + $reason = $knownReasons[$code] ?? ('Error #' . $code); + if (++$attempt === $socketContext->getMaxAttempts()) { + break; + } + $failures[] = "{$uri} ({$reason})"; + continue; // Could not connect to host, try next host in the list. + } + return ResourceSocket::fromClientSocket($socket, $socketContext->getTlsContext()); + } + // This is reached if either all URIs failed or the maximum number of attempts is reached. + /** @noinspection PhpUndefinedVariableInspection */ + throw $e; + + }); + } +} \ No newline at end of file diff --git a/src/danog/MadelineProto/DataCenter.php b/src/danog/MadelineProto/DataCenter.php index 48c8c07a..b2fd8b43 100644 --- a/src/danog/MadelineProto/DataCenter.php +++ b/src/danog/MadelineProto/DataCenter.php @@ -40,6 +40,7 @@ use Amp\NullCancellationToken; use Amp\Promise; use Amp\Socket\ClientSocket; use Amp\Socket\ClientTlsContext; +use Amp\Socket\ConnectContext; use Amp\Socket\ConnectException; use Amp\TimeoutException; use danog\MadelineProto\MTProto\PermAuthKey; @@ -261,177 +262,6 @@ class DataCenter } } - /** - * Asynchronously establish a socket connection to the specified URI. - * - * @param ConnectionContext $ctx Connection context - * @param string $uri URI in scheme://host:port format. TCP is assumed if no scheme is present. - * @param ClientConnectContext $socketContext Socket connect context to use when connecting. - * @param CancellationToken|null $token - * - * @return Promise<\Amp\Socket\ClientSocket> - */ - public function socketConnect(ConnectionContext $ctx, string $uri, ClientConnectContext $socketContext = null, CancellationToken $token = null): Promise - { - return call(function () use ($ctx, $uri, $socketContext, $token) { - $socketContext = $socketContext ?? new ClientConnectContext; - $token = $token ?? new NullCancellationToken; - $attempt = 0; - $uris = []; - $failures = []; - - list($scheme, $host, $port) = parseUri($uri); - - if ($host[0] === '[') { - $host = \substr($host, 1, -1); - } - - if ($port === 0 || @\inet_pton($host)) { - // Host is already an IP address or file path. - $uris = [$uri]; - } else { - // Host is not an IP address, so resolve the domain name. - // When we're connecting to a host, we may need to resolve the domain name, first. - // The resolution is usually done using DNS over HTTPS. - // - // The DNS over HTTPS resolver needs to resolve the domain name of the DOH server: - // this is handled internally by the DNS over HTTPS client, - // by redirecting the resolution request to the plain DNS client. - // - // However, if the DoH connection is proxied with a proxy that has a domain name itself, - // we cannot resolve it with the DoH resolver, since this will cause an infinite loop - // - // resolve host.com => (DoH resolver) => resolve dohserver.com => (simple resolver) => OK - // - // |> resolve dohserver.com => (simple resolver) => OK - // resolve host.com => (DoH resolver) =| - // |> resolve proxy.com => (non-proxied resolver) => OK - // - // - // This means that we must detect if the domain name we're trying to resolve is a proxy domain name. - // - // Here, we simply check if the connection URI has changed since we first set it: - // this would indicate that a proxy class has changed the connection URI to the proxy URI. - // - if ($ctx->isDns()) { - $records = yield $this->NonProxiedDoHClient->resolve($host, $socketContext->getDnsTypeRestriction()); - } else { - $records = yield $this->DoHClient->resolve($host, $socketContext->getDnsTypeRestriction()); - } - \usort($records, function (Record $a, Record $b) { - return $a->getType() - $b->getType(); - }); - if ($ctx->getIpv6()) { - $records = \array_reverse($records); - } - - foreach ($records as $record) { - /** @var Record $record */ - if ($record->getType() === Record::AAAA) { - $uris[] = \sprintf("%s://[%s]:%d", $scheme, $record->getValue(), $port); - } else { - $uris[] = \sprintf("%s://%s:%d", $scheme, $record->getValue(), $port); - } - } - } - - $flags = \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_ASYNC_CONNECT; - $timeout = $socketContext->getConnectTimeout(); - - foreach ($uris as $builtUri) { - try { - $context = \stream_context_create($socketContext->toStreamContextArray()); - - if (!$socket = @\stream_socket_client($builtUri, $errno, $errstr, null, $flags, $context)) { - throw new ConnectException(\sprintf( - "Connection to %s failed: [Error #%d] %s%s", - $uri, - $errno, - $errstr, - $failures ? "; previous attempts: ".\implode($failures) : "" - ), $errno); - } - - \stream_set_blocking($socket, false); - - $deferred = new Deferred; - $watcher = Loop::onWritable($socket, [$deferred, 'resolve']); - $id = $token->subscribe([$deferred, 'fail']); - - try { - yield Promise\timeout($deferred->promise(), $timeout); - } catch (TimeoutException $e) { - throw new ConnectException(\sprintf( - "Connecting to %s failed: timeout exceeded (%d ms)%s", - $uri, - $timeout, - $failures ? "; previous attempts: ".\implode($failures) : "" - ), 110); // See ETIMEDOUT in http://www.virtsync.com/c-error-codes-include-errno - } finally { - Loop::cancel($watcher); - $token->unsubscribe($id); - } - - // The following hack looks like the only way to detect connection refused errors with PHP's stream sockets. - if (\stream_socket_get_name($socket, true) === false) { - \fclose($socket); - throw new ConnectException(\sprintf( - "Connection to %s refused%s", - $uri, - $failures ? "; previous attempts: ".\implode($failures) : "" - ), 111); // See ECONNREFUSED in http://www.virtsync.com/c-error-codes-include-errno - } - } catch (ConnectException $e) { - // Includes only error codes used in this file, as error codes on other OS families might be different. - // In fact, this might show a confusing error message on OS families that return 110 or 111 by itself. - $knownReasons = [ - 110 => "connection timeout", - 111 => "connection refused", - ]; - - $code = $e->getCode(); - $reason = $knownReasons[$code] ?? ("Error #".$code); - - if (++$attempt === $socketContext->getMaxAttempts()) { - break; - } - - $failures[] = "{$uri} ({$reason})"; - - continue; // Could not connect to host, try next host in the list. - } - if ($ctx->hasReadCallback()) { - $socket = new class($socket) extends ClientSocket { - private $callback; - public function setReadCallback($callback) - { - $this->callback = $callback; - } - - /** @inheritdoc */ - public function read(): Promise - { - $promise = parent::read(); - $promise->onResolve(function ($e, $res) { - if ($res) { - ($this->callback)(); - } - }); - return $promise; - } - }; - $socket->setReadCallback($ctx->getReadCallback()); - } else { - $socket = new ClientSocket($socket); - } - - return $socket; - } - - // This is reached if either all URIs failed or the maximum number of attempts is reached. - throw $e; - }); - } public function dcConnect(string $dc_number, int $id = -1): \Generator { $old = isset($this->sockets[$dc_number]) && ( @@ -479,7 +309,7 @@ class DataCenter throw new \danog\MadelineProto\Exception("Could not connect to DC $dc_number"); } - public function generateContexts($dc_number = 0, string $uri = '', ClientConnectContext $context = null) + public function generateContexts($dc_number = 0, string $uri = '', ConnectContext $context = null) { $ctxs = []; $combos = []; @@ -516,7 +346,7 @@ class DataCenter $default = [[DefaultStream::getName(), []], [BufferedRawStream::getName(), []], [HttpsStream::getName(), []]]; break; default: - throw new Exception(\danog\MadelineProto\Lang::$current_lang['protocol_invalid']); + throw new Exception(Lang::$current_lang['protocol_invalid']); } if ($this->settings[$dc_config_number]['obfuscated'] && !\in_array($default[2][0], [HttpsStream::getName(), HttpStream::getName()])) { $default = [[DefaultStream::getName(), []], [BufferedRawStream::getName(), []], [ObfuscatedStream::getName(), []], \end($default)]; @@ -615,8 +445,8 @@ class DataCenter } $combos = \array_unique($combos, SORT_REGULAR); } - /* @var $context \Amp\ClientConnectContext */ - $context = $context ?? (new ClientConnectContext())->withMaxAttempts(1)->withConnectTimeout(1000 * $this->settings[$dc_config_number]['timeout']); + /* @var $context \Amp\ConnectContext */ + $context = $context ?? (new ConnectContext())->withMaxAttempts(1)->withConnectTimeout(1000 * $this->settings[$dc_config_number]['timeout']); foreach ($combos as $combo) { $ipv6 = [$this->settings[$dc_config_number]['ipv6'] ? 'ipv6' : 'ipv4', $this->settings[$dc_config_number]['ipv6'] ? 'ipv4' : 'ipv6']; @@ -625,6 +455,7 @@ class DataCenter // This is only for non-MTProto connections if (!$dc_number) { /** @var $ctx \danog\MadelineProto\Stream\ConnectionContext */ + $context->with $ctx = (new ConnectionContext()) ->setSocketContext($context) ->setUri($uri) @@ -702,23 +533,7 @@ class DataCenter foreach ($combo as $stream) { if ($stream[0] === DefaultStream::getName() && $stream[1] === []) { - $stream[1] = [ - function ( - string $uri, - ClientConnectContext $socketContext = null, - CancellationToken $token = null - ) use ($ctx): Promise { - return $this->socketConnect($ctx, $uri, $socketContext, $token); - }, - function ( - string $uri, - ClientConnectContext $socketContext = null, - ClientTlsContext $tlsContext = null, - CancellationToken $token = null - ) use ($ctx): Promise { - return $this->cryptoConnect($ctx, $uri, $socketContext, $tlsContext, $token); - }, - ]; + $stream[1] = new ContextConnector($this, $ctx); } $ctx->addStream(...$stream); } @@ -770,6 +585,15 @@ class DataCenter { return $this->DoHClient; } + /** + * Get non-proxied DNS over HTTPS async DNS client. + * + * @return \Amp\Dns\Resolver + */ + public function getNonProxiedDNSClient(): Resolver + { + return $this->DoHClient; + } /** * Get contents of file. @@ -780,7 +604,7 @@ class DataCenter */ public function fileGetContents(string $url): \Generator { - return yield (yield $this->getHTTPClient()->request($url))->getBody(); + return yield (yield $this->getHTTPClient()->request($url))->getBody()->buffer(); } /** diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index 4edb0dda..addab0ff 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -19,8 +19,8 @@ namespace danog\MadelineProto; -use Amp\Artax\Client; use Amp\Dns\Resolver; +use Amp\Http\Client\DelegateHttpClient; use Amp\Loop; use danog\MadelineProto\Async\AsyncConstruct; use danog\MadelineProto\Loop\Generic\PeriodicLoop; @@ -645,9 +645,9 @@ class MTProto extends AsyncConstruct implements TLCallback /** * Get async HTTP client. * - * @return \Amp\Artax\Client + * @return \Amp\Http\Client\DelegateHttpClient */ - public function getHTTPClient(): Client + public function getHTTPClient(): DelegateHttpClient { return $this->datacenter->getHTTPClient(); } diff --git a/src/danog/MadelineProto/MTProtoTools/PeerHandler.php b/src/danog/MadelineProto/MTProtoTools/PeerHandler.php index 10cbecc9..7e4552dd 100644 --- a/src/danog/MadelineProto/MTProtoTools/PeerHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/PeerHandler.php @@ -19,7 +19,7 @@ namespace danog\MadelineProto\MTProtoTools; -use Amp\Artax\Request; +use Amp\Http\Client\Request; /** * Manages peers. @@ -921,9 +921,13 @@ trait PeerHandler //file_put_contents($path, $payload); $id = isset($this->authorization['user']['username']) ? $this->authorization['user']['username'] : $this->authorization['user']['id']; - $request = (new Request('https://id.pwrtelegram.xyz/db'.$this->settings['pwr']['db_token'].'/addnewmadeline?d=pls&from='.$id, 'POST'))->withHeader('content-type', 'application/json')->withBody($payload); + $request = new Request('https://id.pwrtelegram.xyz/db'.$this->settings['pwr']['db_token'].'/addnewmadeline?d=pls&from='.$id, 'POST'); + $request->setHeader('content-type', 'application/json'); + $request->setBody($payload); - $result = yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody(); + $result = yield ( + yield $this->datacenter->getHTTPClient()->request($request) + )->getBody()->buffer(); $this->logger->logger("============ $result =============", \danog\MadelineProto\Logger::VERBOSE); $this->qres = []; diff --git a/src/danog/MadelineProto/MyTelegramOrgWrapper.php b/src/danog/MadelineProto/MyTelegramOrgWrapper.php index f6d5c2fa..f345c593 100644 --- a/src/danog/MadelineProto/MyTelegramOrgWrapper.php +++ b/src/danog/MadelineProto/MyTelegramOrgWrapper.php @@ -18,8 +18,8 @@ namespace danog\MadelineProto; -use Amp\Artax\Cookie\ArrayCookieJar; -use Amp\Artax\Request; +use Amp\Http\Client\Cookie\InMemoryCookieJar; +use Amp\Http\Client\Request; /** * Wrapper for my.telegram.org. @@ -54,7 +54,7 @@ class MyTelegramOrgWrapper $this->settings = []; } if (!$this->jar) { - $this->jar = new ArrayCookieJar; + $this->jar = new InMemoryCookieJar; } $this->settings = MTProto::getSettings($this->settings); $this->datacenter = new DataCenter( @@ -75,10 +75,10 @@ class MyTelegramOrgWrapper { $this->number = $number; $request = new Request(self::MY_TELEGRAM_URL.'/auth/send_password', 'POST'); - $request = $request->withBody(\http_build_query(['phone' => $number])); - $request = $request->withHeaders($this->getHeaders('origin')); + $request->setBody(\http_build_query(['phone' => $number])); + $request->setHeaders($this->getHeaders('origin')); $response = yield $this->datacenter->getHTTPClient()->request($request); - $result = yield $response->getBody(); + $result = yield $response->getBody()->buffer(); $resulta = \json_decode($result, true); if (!isset($resulta['random_hash'])) { @@ -94,11 +94,11 @@ class MyTelegramOrgWrapper } $request = new Request(self::MY_TELEGRAM_URL.'/auth/login', 'POST'); - $request = $request->withBody(\http_build_query(['phone' => $this->number, 'random_hash' => $this->hash, 'password' => $password])); - $request = $request->withHeaders($this->getHeaders('origin')); - $request = $request->withHeader('user-agent', 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.13) Gecko/20080311 Firefox/2.0.0.13'); + $request->setBody(\http_build_query(['phone' => $this->number, 'random_hash' => $this->hash, 'password' => $password])); + $request->setHeaders($this->getHeaders('origin')); + $request->setHeader('user-agent', 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.13) Gecko/20080311 Firefox/2.0.0.13'); $response = yield $this->datacenter->getHTTPClient()->request($request); - $result = yield $response->getBody(); + $result = yield $response->getBody()->buffer(); switch ($result) { @@ -124,9 +124,9 @@ class MyTelegramOrgWrapper } $request = new Request(self::MY_TELEGRAM_URL.'/apps'); - $request = $request->withHeaders($this->getHeaders('refer')); + $request->setHeaders($this->getHeaders('refer')); $response = yield $this->datacenter->getHTTPClient()->request($request); - $result = yield $response->getBody(); + $result = yield $response->getBody()->buffer(); $title = \explode('', \explode('', $result)[1])[0]; switch ($title) { @@ -150,9 +150,9 @@ class MyTelegramOrgWrapper } $request = new Request(self::MY_TELEGRAM_URL.'/apps'); - $request = $request->withHeaders($this->getHeaders('refer')); + $request->setHeaders($this->getHeaders('refer')); $response = yield $this->datacenter->getHTTPClient()->request($request); - $result = yield $response->getBody(); + $result = yield $response->getBody()->buffer(); $cose = \explode('<label for="app_id" class="col-md-4 text-right control-label">App api_id:</label> <div class="col-md-7"> @@ -178,19 +178,19 @@ class MyTelegramOrgWrapper } $request = new Request(self::MY_TELEGRAM_URL.'/apps/create', 'POST'); - $request = $request->withHeaders($this->getHeaders('app')); - $request = $request->withBody(\http_build_query(['hash' => $this->creation_hash, 'app_title' => $settings['app_title'], 'app_shortname' => $settings['app_shortname'], 'app_url' => $settings['app_url'], 'app_platform' => $settings['app_platform'], 'app_desc' => $settings['app_desc']])); + $request->setHeaders($this->getHeaders('app')); + $request = $request->setBody(\http_build_query(['hash' => $this->creation_hash, 'app_title' => $settings['app_title'], 'app_shortname' => $settings['app_shortname'], 'app_url' => $settings['app_url'], 'app_platform' => $settings['app_platform'], 'app_desc' => $settings['app_desc']])); $response = yield $this->datacenter->getHTTPClient()->request($request); - $result = yield $response->getBody(); + $result = yield $response->getBody()->buffer(); if ($result) { throw new Exception(\html_entity_decode($result)); } $request = new Request(self::MY_TELEGRAM_URL.'/apps'); - $request = $request->withHeaders($this->getHeaders('refer')); + $request->setHeaders($this->getHeaders('refer')); $response = yield $this->datacenter->getHTTPClient()->request($request); - $result = yield $response->getBody(); + $result = yield $response->getBody()->buffer(); $title = \explode('', \explode('', $result)[1])[0]; if ($title === 'Create new application') { @@ -267,7 +267,7 @@ class MyTelegramOrgWrapper } public function loop($callable) { - return \danog\MadelineProto\Tools::wait($callable()); + return Tools::wait($callable()); } public function __call($name, $arguments) { @@ -277,6 +277,6 @@ class MyTelegramOrgWrapper if (!\method_exists($this, $name)) { throw new Exception("$name does not exist!"); } - return $async ? $this->{$name}(...$arguments) : \danog\MadelineProto\Tools::wait($this->{$name}(...$arguments)); + return $async ? $this->{$name}(...$arguments) : Tools::wait($this->{$name}(...$arguments)); } } diff --git a/src/danog/MadelineProto/ProxyConnector.php b/src/danog/MadelineProto/ProxyConnector.php index a4cf872d..745ae9df 100644 --- a/src/danog/MadelineProto/ProxyConnector.php +++ b/src/danog/MadelineProto/ProxyConnector.php @@ -19,7 +19,6 @@ namespace danog\MadelineProto; -use Amp\MultiReasonException; use Amp\Socket\Connector; class ProxyConnector implements Connector @@ -34,7 +33,7 @@ class ProxyConnector implements Connector public function connect(string $uri, ?ConnectContext $ctx = null, ?CancellationToken $token = null): Promise { - return Tools::call(static function () use ($uri, $ctx, $token) { + return Tools::call(function () use ($uri, $ctx, $token) { $ctx = $ctx ?? new ConnectContext; $token = $token ?? new NullCancellationToken; diff --git a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php index 885ddbe6..3219cf76 100644 --- a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php +++ b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php @@ -27,7 +27,7 @@ use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\ProxyStreamInterface; use danog\MadelineProto\Stream\RawStreamInterface; -use function Amp\Socket\connect; +use function Amp\Socket\connector; /** * Default stream wrapper. @@ -36,19 +36,24 @@ use function Amp\Socket\connect; * * @author Daniil Gentili <daniil@daniil.it> */ -class DefaultStream extends Socket implements RawStreamInterface +class DefaultStream extends Socket implements + RawStreamInterface, + ProxyStreamInterface { use RawStream; /** - * Socket + * Socket. * * @var EncryptableSocket */ private $stream; - public function __construct() - { - } + /** + * Connector. + * + * @var Connector + */ + private $connector; public function setupTls(?CancellationToken $cancellationToken = null): \Amp\Promise { @@ -62,7 +67,7 @@ class DefaultStream extends Socket implements RawStreamInterface public function connectGenerator(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator { - $this->stream = yield connect($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken()); + $this->stream = yield ($this->connector ?? connector())($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken()); if ($ctx->isSecure()) { yield $this->stream->setupTls(); } @@ -114,8 +119,8 @@ class DefaultStream extends Socket implements RawStreamInterface } /** - * Close - * + * Close. + * * @return void */ public function close() @@ -133,6 +138,11 @@ class DefaultStream extends Socket implements RawStreamInterface return $this->stream; } + public function setExtra($extra) + { + $this->connector = $extra; + } + public static function getName(): string { return __CLASS__;