diff --git a/composer.json b/composer.json index 6fd7e692..6dc6574f 100644 --- a/composer.json +++ b/composer.json @@ -32,7 +32,8 @@ "amphp/file": "^1", "amphp/byte-stream": "^1.6", "danog/dns-over-https": "^0.2", - "amphp/http-client-cookies": "dev-master" + "amphp/http-client-cookies": "dev-master", + "amphp/uri": "dev-master" }, "require-dev": { "phpdocumentor/reflection-docblock": "^4.3", diff --git a/src/danog/MadelineProto/ContextConnector.php b/src/danog/MadelineProto/ContextConnector.php index bd9f39a3..58b4570c 100644 --- a/src/danog/MadelineProto/ContextConnector.php +++ b/src/danog/MadelineProto/ContextConnector.php @@ -18,6 +18,7 @@ */ namespace danog\MadelineProto; + use Amp\CancellationToken; use Amp\MultiReasonException; use Amp\NullCancellationToken; @@ -28,20 +29,22 @@ use Amp\Socket\Connector; class ContextConnector implements Connector { private $dataCenter; + private $logger; private $fromDns = false; public function __construct(DataCenter $dataCenter, bool $fromDns = false) { $this->dataCenter = $dataCenter; $this->fromDns = false; + $this->logger = $dataCenter->getAPI()->getLogger(); } public function connect(string $uri, ?ConnectContext $ctx = null, ?CancellationToken $token = null): Promise { - return Tools::call(function () use ($uri, $ctx, $token) { + return Tools::call((function () use ($uri, $ctx, $token) { $ctx = $ctx ?? new ConnectContext; $token = $token ?? new NullCancellationToken; - $ctxs = $this->datacenter->generateContexts(0, $uri, $ctx); + $ctxs = $this->dataCenter->generateContexts(0, $uri, $ctx); if (empty($ctxs)) { throw new Exception("No contexts for raw connection to URI $uri"); } @@ -51,24 +54,23 @@ class ContextConnector implements Connector $ctx->setIsDns($this->fromDns); $ctx->setCancellationToken($token); $result = yield $ctx->getStream(); - $this->API->logger->logger('OK!', \danog\MadelineProto\Logger::WARNING); - + $this->logger->logger('OK!', \danog\MadelineProto\Logger::WARNING); + return $result->getSocket(); } catch (\Throwable $e) { if (\MADELINEPROTO_TEST === 'pony') { throw $e; } - $this->API->logger->logger('Connection failed: '.$e, \danog\MadelineProto\Logger::ERROR); + $this->logger->logger('Connection failed: '.$e, \danog\MadelineProto\Logger::ERROR); if ($e instanceof MultiReasonException) { foreach ($e->getReasons() as $reason) { - $this->API->logger->logger('Multireason: '.$reason, \danog\MadelineProto\Logger::ERROR); + $this->logger->logger('Multireason: '.$reason, \danog\MadelineProto\Logger::ERROR); } } } } - - throw new \danog\MadelineProto\Exception("Could not connect to URI $uri"); - }); + throw new \danog\MadelineProto\Exception("Could not connect to URI $uri"); + })()); } } diff --git a/src/danog/MadelineProto/DataCenter.php b/src/danog/MadelineProto/DataCenter.php index f966d85d..4a5c58a9 100644 --- a/src/danog/MadelineProto/DataCenter.php +++ b/src/danog/MadelineProto/DataCenter.php @@ -31,6 +31,7 @@ use Amp\Http\Client\Cookie\CookieJar; use Amp\Http\Client\Cookie\InMemoryCookieJar; use Amp\Http\Client\DelegateHttpClient; use Amp\Http\Client\HttpClientBuilder; +use Amp\Http\Client\Request; use Amp\Socket\ConnectContext; use Amp\Websocket\Client\Rfc6455Connector; use danog\MadelineProto\MTProto\PermAuthKey; @@ -513,17 +514,7 @@ class DataCenter $stream[1] = new DoHConnector($this, $ctx); } if (\in_array($stream[0], [WsStream::class, WssStream::class]) && $stream[1] === []) { - $stream[1] = new Rfc6455Connector( - (new HttpClientBuilder) - ->usingPool( - new UnlimitedConnectionPool( - new DefaultConnectionFactory( - new DoHConnector($this, $ctx) - ) - ) - ) - ->build() - ); + $stream[1] = new Rfc6455Connector($this->HTTPClient); } $ctx->addStream(...$stream); } @@ -547,6 +538,16 @@ class DataCenter return $ctxs; } + /** + * Get main API. + * + * @return MTProto + */ + public function getAPI(): MTProto + { + return $this->API; + } + /** * Get async HTTP client. * @@ -594,7 +595,7 @@ class DataCenter */ public function fileGetContents(string $url): \Generator { - return yield (yield $this->getHTTPClient()->request($url))->getBody()->buffer(); + return yield (yield $this->getHTTPClient()->request(new Request($url)))->getBody()->buffer(); } /** diff --git a/src/danog/MadelineProto/DoHConnector.php b/src/danog/MadelineProto/DoHConnector.php index 0454eb5c..d5037cbe 100644 --- a/src/danog/MadelineProto/DoHConnector.php +++ b/src/danog/MadelineProto/DoHConnector.php @@ -37,13 +37,13 @@ use function Amp\Socket\Internal\parseUri; class DoHConnector implements Connector { /** - * Datacenter instance - * + * Datacenter instance. + * * @property DataCenter $dataCenter */ private $dataCenter; /** - * Connection context + * Connection context. * * @var ConnectionContext */ @@ -56,7 +56,7 @@ class DoHConnector implements Connector public function connect(string $uri, ?ConnectContext $socketContext = null, ?CancellationToken $token = null): Promise { - return Tools::call(function () use ($uri, $socketContext, $token) { + return Tools::call((function () use ($uri, $socketContext, $token) { $socketContext = $socketContext ?? new ConnectContext; $token = $token ?? new NullCancellationToken; @@ -115,7 +115,7 @@ class DoHConnector implements Connector } } } - + $flags = \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_ASYNC_CONNECT; $timeout = $socketContext->getConnectTimeout(); foreach ($uris as $builtUri) { @@ -176,7 +176,6 @@ class DoHConnector implements Connector // This is reached if either all URIs failed or the maximum number of attempts is reached. /** @noinspection PhpUndefinedVariableInspection */ throw $e; - - }); + })()); } } diff --git a/src/danog/MadelineProto/InternalDoc.php b/src/danog/MadelineProto/InternalDoc.php index 7b0e799d..d65ab2b3 100644 --- a/src/danog/MadelineProto/InternalDoc.php +++ b/src/danog/MadelineProto/InternalDoc.php @@ -4067,12 +4067,21 @@ class InternalDoc extends APIFactory { return $this->__call(__FUNCTION__, [$extra]); } + /** + * Get logger. + * + * @return Logger + */ + public function getLogger(array $extra = []): danog\MadelineProto\Logger + { + return $this->__call(__FUNCTION__, [$extra]); + } /** * Get async HTTP client. * - * @return \Amp\Artax\Client + * @return \Amp\Http\Client\DelegateHttpClient */ - public function getHTTPClient(array $extra = []): Amp\Artax\Client + public function getHTTPClient(array $extra = []): Amp\Http\Client\DelegateHttpClient { return $this->__call(__FUNCTION__, [$extra]); } diff --git a/src/danog/MadelineProto/MTProto.php b/src/danog/MadelineProto/MTProto.php index addab0ff..9b961b04 100644 --- a/src/danog/MadelineProto/MTProto.php +++ b/src/danog/MadelineProto/MTProto.php @@ -637,10 +637,19 @@ class MTProto extends AsyncConstruct implements TLCallback * * @return TL */ - public function getTL(): TL + public function getTL(): \danog\MadelineProto\TL\TL { return $this->TL; } + /** + * Get logger. + * + * @return Logger + */ + public function getLogger(): Logger + { + return $this->logger; + } /** * Get async HTTP client. diff --git a/src/danog/MadelineProto/Stream/ConnectionContext.php b/src/danog/MadelineProto/Stream/ConnectionContext.php index 7c6e4c6c..f026e078 100644 --- a/src/danog/MadelineProto/Stream/ConnectionContext.php +++ b/src/danog/MadelineProto/Stream/ConnectionContext.php @@ -20,6 +20,7 @@ namespace danog\MadelineProto\Stream; use Amp\CancellationToken; use Amp\Socket\ConnectContext; +use Amp\Uri\Uri; use danog\MadelineProto\Exception; use danog\MadelineProto\Stream\MTProtoTransport\ObfuscatedStream; use danog\MadelineProto\Stream\Transport\DefaultStream; diff --git a/src/danog/MadelineProto/Stream/MTProtoTransport/FullStream.php b/src/danog/MadelineProto/Stream/MTProtoTransport/FullStream.php index c8e8861b..7719734d 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTransport/FullStream.php +++ b/src/danog/MadelineProto/Stream/MTProtoTransport/FullStream.php @@ -102,7 +102,7 @@ class FullStream implements BufferedStreamInterface, MTProtoBufferInterface $this->in_seq_no++; $in_seq_no = \unpack('V', yield $buffer->bufferRead(4))[1]; if ($in_seq_no != $this->in_seq_no) { - throw new Exception('Incoming seq_no mismatch'); + throw new \danog\MadelineProto\Exception('Incoming seq_no mismatch'); } return $buffer; diff --git a/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php b/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php index 88e13c58..1b8b2574 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php +++ b/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php @@ -20,7 +20,6 @@ namespace danog\MadelineProto\Stream\MTProtoTransport; use Amp\Promise; use Amp\Socket\EncryptableSocket; -use danog\MadelineProto\Exception; use danog\MadelineProto\Stream\Async\Buffer; use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\BufferedProxyStreamInterface; @@ -171,7 +170,7 @@ class ObfuscatedStream implements BufferedProxyStreamInterface $this->append_after = 0; $this->append = ''; - throw new Exception('Tried to send too much out of frame data, cannot append'); + throw new \danog\MadelineProto\Exception('Tried to send too much out of frame data, cannot append'); } } diff --git a/src/danog/MadelineProto/Stream/Proxy/SocksProxy.php b/src/danog/MadelineProto/Stream/Proxy/SocksProxy.php index e3647ce1..5e3f07ad 100644 --- a/src/danog/MadelineProto/Stream/Proxy/SocksProxy.php +++ b/src/danog/MadelineProto/Stream/Proxy/SocksProxy.php @@ -20,6 +20,7 @@ namespace danog\MadelineProto\Stream\Proxy; use Amp\Promise; use Amp\Socket\ClientTlsContext; +use Amp\Socket\EncryptableSocket; use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\BufferedProxyStreamInterface; use danog\MadelineProto\Stream\ConnectionContext; diff --git a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php index e61c9c6d..68487131 100644 --- a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php +++ b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php @@ -21,6 +21,7 @@ namespace danog\MadelineProto\Stream\Transport; use Amp\ByteStream\ClosedException; use Amp\CancellationToken; use Amp\Promise; +use Amp\Socket\ClientTlsContext; use Amp\Socket\EncryptableSocket; use Amp\Socket\Socket; use danog\MadelineProto\Stream\Async\RawStream; @@ -67,7 +68,18 @@ class DefaultStream implements public function connectGenerator(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator { - $this->stream = yield ($this->connector ?? connector())($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken()); + $ctx = $ctx->getCtx(); + $uri = $ctx->getUri(); + $secure = $ctx->isSecure(); + if ($secure) { + $ctx->setSocketContext( + $ctx->getSocketContext()->withTlsContext( + new ClientTlsContext($uri->getHost()) + ) + ); + } + + $this->stream = yield ($this->connector ?? connector())->connect((string) $uri, $ctx->getSocketContext(), $ctx->getCancellationToken()); if ($ctx->isSecure()) { yield $this->stream->setupTls(); } diff --git a/src/danog/MadelineProto/Stream/Transport/WsStream.php b/src/danog/MadelineProto/Stream/Transport/WsStream.php index 045c03ad..6f1c7e39 100644 --- a/src/danog/MadelineProto/Stream/Transport/WsStream.php +++ b/src/danog/MadelineProto/Stream/Transport/WsStream.php @@ -18,28 +18,22 @@ namespace danog\MadelineProto\Stream\Transport; -use Amp\Http\Rfc7230; -use Amp\Http\Status; use Amp\Promise; -use Amp\Socket\ConnectException; -use Amp\Websocket\Client\ConnectionException; +use Amp\Socket\EncryptableSocket; use Amp\Websocket\Client\Handshake; -use Amp\Websocket\Client\Internal\ClientSocket; -use Amp\Websocket\Client\Rfc6455Connection; -use Amp\Websocket\Rfc6455Client; -use Amp\Websocket\Rfc7692CompressionFactory; use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\ConnectionContext; +use danog\MadelineProto\Stream\ProxyStreamInterface; use danog\MadelineProto\Stream\RawStreamInterface; -use function Amp\Websocket\generateKey; -use function Amp\Websocket\validateAcceptForKey; + +use function Amp\Websocket\Client\connector; /** * Websocket stream wrapper. * * @author Daniil Gentili */ -class WsStream implements RawStreamInterface +class WsStream implements RawStreamInterface, ProxyStreamInterface { use RawStream; @@ -55,6 +49,12 @@ class WsStream implements RawStreamInterface * @var Message */ private $message; + /** + * Websocket Connector. + * + * @var Connector + */ + private $connector; /** * Connect to stream. @@ -66,38 +66,11 @@ class WsStream implements RawStreamInterface public function connectGenerator(ConnectionContext $ctx, string $header = ''): \Generator { $this->dc = $ctx->getIntDc(); - $stream = yield $ctx->getStream(); - $resource = $stream->getStream()->getResource(); - - $this->compressionFactory = new Rfc7692CompressionFactory(); $handshake = new Handshake(\str_replace('tcp://', $ctx->isSecure() ? 'ws://' : 'wss://', $ctx->getStringUri())); - $key = generateKey(); - yield $stream->write($this->generateRequest($handshake, $key)); + $this->stream = yield ($this->connector ?? connector())->connect($handshake, $ctx->getCancellationToken()); - $buffer = ''; - while (($chunk = yield $stream->read()) !== null) { - $buffer .= $chunk; - if ($position = \strpos($buffer, "\r\n\r\n")) { - $headerBuffer = \substr($buffer, 0, $position + 4); - $buffer = \substr($buffer, $position + 4); - $headers = $this->handleResponse($headerBuffer, $key); - - $client = new Rfc6455Client( - new ClientSocket($resource, $buffer), - $handshake->getOptions(), - true - ); - $this->stream = new Rfc6455Connection($client, $headers); - //$this->stream = new Rfc6455Connection($this->stream, $headers, $buffer); - break; - } - } - - if (!$this->stream) { - throw new ConnectionException('Failed to read response from server'); - } yield $this->write($header); } @@ -108,7 +81,7 @@ class WsStream implements RawStreamInterface { try { $this->stream->close(); - } catch (Exception $e) { + } catch (\Throwable $e) { } } @@ -123,7 +96,7 @@ class WsStream implements RawStreamInterface $data = yield $this->message->buffer(); $this->message = null; } - } catch (Exception $e) { + } catch (\Exception $e) { if ($e->getReason() !== 'Client closed the underlying TCP connection') { throw $e; } @@ -145,88 +118,19 @@ class WsStream implements RawStreamInterface return $this->stream->sendBinary($data); } - private function generateRequest(Handshake $handshake, string $key): string - { - $uri = $handshake->getUri(); - $headers = $handshake->getHeaders(); - $headers['host'] = [$uri->getAuthority()]; - $headers['connection'] = ['Upgrade']; - $headers['upgrade'] = ['websocket']; - $headers['sec-websocket-version'] = ['13']; - $headers['sec-websocket-key'] = [$key]; - if ($handshake->getOptions()->isCompressionEnabled()) { - $headers['sec-websocket-extensions'] = [$this->compressionFactory->createRequestHeader()]; - } - if (($path = $uri->getPath()) === '') { - $path = '/'; - } - if (($query = $uri->getQuery()) !== '') { - $path .= '?'.$query; - } - - return \sprintf("GET %s HTTP/1.1\r\n%s\r\n", $path, Rfc7230::formatHeaders($headers)); - } - - private function handleResponse(string $headerBuffer, string $key): array - { - if (\substr($headerBuffer, -4) !== "\r\n\r\n") { - throw new ConnectException('Invalid header provided'); - } - $position = \strpos($headerBuffer, "\r\n"); - $startLine = \substr($headerBuffer, 0, $position); - if (!\preg_match("/^HTTP\/(1\.[01]) (\d{3}) ([^\x01-\x08\x10-\x19]*)$/i", $startLine, $matches)) { - throw new ConnectException('Invalid response start line: '.$startLine); - } - $version = $matches[1]; - $status = (int) $matches[2]; - $reason = $matches[3]; - - if ($version !== '1.1' || $status !== Status::SWITCHING_PROTOCOLS) { - throw new ConnectionException( - \sprintf('Did not receive switching protocols response: %d %s on DC %d', $status, $reason, $this->dc), - $status - ); - } - $headerBuffer = \substr($headerBuffer, $position + 2, -2); - $headers = Rfc7230::parseHeaders($headerBuffer); - $upgrade = $headers['upgrade'][0] ?? ''; - if (\strtolower($upgrade) !== 'websocket') { - throw new ConnectionException('Missing "Upgrade: websocket" header'); - } - $connection = $headers['connection'][0] ?? ''; - if (!\in_array('upgrade', \array_map('trim', \array_map('strtolower', \explode(',', $connection))), true)) { - throw new ConnectionException('Missing "Connection: upgrade" header'); - } - $secWebsocketAccept = $headers['sec-websocket-accept'][0] ?? ''; - if (!validateAcceptForKey($secWebsocketAccept, $key)) { - throw new ConnectionException('Invalid "Sec-WebSocket-Accept" header'); - } - - return $headers; - } - - final protected function createCompressionContext(array $headers): ?Websocket\CompressionContext - { - $extensions = $headers['sec-websocket-extensions'][0] ?? ''; - $extensions = \array_map('trim', \explode(',', $extensions)); - foreach ($extensions as $extension) { - if ($compressionContext = $this->compressionFactory->fromServerHeader($extension)) { - return $compressionContext; - } - } - - return null; - } - /** * {@inheritdoc} * * @return \Amp\Socket\Socket */ - public function getSocket(): \Amp\Socket\Socket + public function getSocket(): EncryptableSocket { return $this->stream->getSocket(); } + public function setExtra($extra) + { + $this->connector = $extra; + } public static function getName(): string { diff --git a/src/danog/MadelineProto/Tools.php b/src/danog/MadelineProto/Tools.php index cc2bfb20..f5d7413a 100644 --- a/src/danog/MadelineProto/Tools.php +++ b/src/danog/MadelineProto/Tools.php @@ -665,9 +665,9 @@ trait Tools public static function isArrayOrAlike($var): bool { return \is_array($var) || - ($var instanceof ArrayAccess && - $var instanceof Traversable && - $var instanceof Countable); + ($var instanceof \ArrayAccess && + $var instanceof \Traversable && + $var instanceof \Countable); } /**