diff --git a/composer.json b/composer.json index b210647e..6fd7e692 100644 --- a/composer.json +++ b/composer.json @@ -2,6 +2,7 @@ "name": "danog/madelineproto", "description": "PHP implementation of telegram's MTProto protocol.", "type": "project", + "minimum-stability": "dev", "license": "AGPL-3.0-only", "homepage": "https://daniil.it/MadelineProto", "keywords": ["telegram", "mtproto", "protocol", "bytes", "messenger", "client", "PHP", "video", "stickers", "audio", "files", "GB"], @@ -12,32 +13,34 @@ "php": ">=7.1.0", "danog/primemodule": "^1", "danog/magicalserializer": "^1.0", - "phpseclib/phpseclib": "dev-master#5e7d39153270dfd19d585504b0a29ac65a62adf9 as 2.0.15", + "phpseclib/phpseclib": "dev-master#f715b2928976aaef389839a056c947aa8023277b as 2.0.15", "vlucas/phpdotenv": "^3", "erusev/parsedown": "^1.7", "ext-mbstring": "*", "ext-json": "*", "ext-xml": "*", + "ext-dom": "*", + "ext-filter": "*", + "ext-hash": "*", + "ext-zlib": "*", "ext-fileinfo": "*", "amphp/amp": "^2.0", - - - "amphp/websocket": "dev-master#db2da8c5b3ed22eae37da5ffa10ab3ea8de19342 as 1", - "amphp/websocket-client": "dev-master#aff808025637bd705672338b4904ad03a4dbdc04 as 1", - "amphp/socket": "0.10.12 as 1", - "amphp/dns": "dev-master#aa1892bdf13b787d759df6f2523e8027a434d927 as v0.9.x-dev", - "amphp/artax": "dev-master as 3.0.99", - "amphp/file": "^0.3", - "amphp/uri": "^0.1.4", + "amphp/websocket-client": "dev-master as 1.0.0-rc2", + "amphp/http-client": "dev-master as 4.0.0-rc11", + "amphp/socket": "^1", + "amphp/dns": "dev-master#ecbeca2ae0e93c08e8150a92810a3961fad8ecbe as v1", + "amphp/file": "^1", "amphp/byte-stream": "^1.6", - "danog/dns-over-https": "^0.1" + "danog/dns-over-https": "^0.2", + "amphp/http-client-cookies": "dev-master" }, "require-dev": { "phpdocumentor/reflection-docblock": "^4.3", "ennexa/amp-update-cache": "dev-master", "phpunit/phpunit": "^8", "amphp/php-cs-fixer-config": "dev-master", - "haydenpierce/class-finder": "^0.4" + "haydenpierce/class-finder": "^0.4", + "ext-ctype":"*" }, "suggest": { "ext-libtgvoip": "Install the php-libtgvoip extension to make phone calls (https://github.com/danog/php-libtgvoip)" diff --git a/src/danog/MadelineProto/DataCenter.php b/src/danog/MadelineProto/DataCenter.php index 4801dfc4..48c8c07a 100644 --- a/src/danog/MadelineProto/DataCenter.php +++ b/src/danog/MadelineProto/DataCenter.php @@ -19,11 +19,6 @@ namespace danog\MadelineProto; -use Amp\Artax\Client; -use Amp\Artax\Cookie\ArrayCookieJar; -use Amp\Artax\Cookie\CookieJar; -use Amp\Artax\DefaultClient; -use Amp\Artax\HttpSocketPool; use Amp\CancellationToken; use Amp\Deferred; use Amp\Dns\Record; @@ -32,15 +27,20 @@ use Amp\Dns\Rfc1035StubResolver; use Amp\DoH\DoHConfig; use Amp\DoH\Nameserver; use Amp\DoH\Rfc8484StubResolver; +use Amp\Http\Client\Connection\DefaultConnectionFactory; +use Amp\Http\Client\Connection\UnlimitedConnectionPool; +use Amp\Http\Client\Cookie\CookieInterceptor; +use Amp\Http\Client\Cookie\CookieJar; +use Amp\Http\Client\Cookie\InMemoryCookieJar; +use Amp\Http\Client\DelegateHttpClient; +use Amp\Http\Client\HttpClientBuilder; use Amp\Loop; use Amp\MultiReasonException; use Amp\NullCancellationToken; use Amp\Promise; -use Amp\Socket\ClientConnectContext; use Amp\Socket\ClientSocket; use Amp\Socket\ClientTlsContext; use Amp\Socket\ConnectException; -use Amp\Socket\Socket; use Amp\TimeoutException; use danog\MadelineProto\MTProto\PermAuthKey; use danog\MadelineProto\MTProto\TempAuthKey; @@ -102,7 +102,7 @@ class DataCenter /** * HTTP client. * - * @var \Amp\Artax\Client + * @var \Amp\Http\Client\DelegateHttpClient */ private $HTTPClient; /** @@ -120,7 +120,7 @@ class DataCenter /** * Cookie jar. * - * @var \Amp\Artax\Cookie\CookieJar + * @var \Amp\Http\Client\Cookie\CookieJar */ private $CookieJar; @@ -227,101 +227,40 @@ class DataCenter } if ($reconnectAll || $changedSettings || !$this->CookieJar) { - $this->CookieJar = $jar ?? new ArrayCookieJar; - $this->HTTPClient = new DefaultClient($this->CookieJar, new HttpSocketPool(new ProxySocketPool([$this, 'rawConnect']))); + $this->CookieJar = $jar ?? new InMemoryCookieJar; + $this->HTTPClient = (new HttpClientBuilder) + ->interceptNetwork(new CookieInterceptor($this->CookieJar)) + ->usingPool(new UnlimitedConnectionPool(new DefaultConnectionFactory(new ProxyConnector($this)))) + ->build(); + + $DoHHTTPClient = (new HttpClientBuilder) + ->interceptNetwork(new CookieInterceptor($this->CookieJar)) + ->usingPool(new UnlimitedConnectionPool(new DefaultConnectionFactory(new ProxyConnector($this, true)))) + ->build(); - $DoHHTTPClient = new DefaultClient( - $this->CookieJar, - new HttpSocketPool( - new ProxySocketPool( - function (string $uri, CancellationToken $token = null, ClientConnectContext $ctx = null) { - return $this->rawConnect($uri, $token, $ctx, true); - } - ) - ) - ); $DoHConfig = new DoHConfig( [ - new Nameserver('https://mozilla.cloudflare-dns.com/dns-query'), - new Nameserver('https://google.com/resolve', Nameserver::GOOGLE_JSON, ["Host" => "dns.google.com"]), - ], + new Nameserver('https://mozilla.cloudflare-dns.com/dns-query'), + new Nameserver('https://dns.google/resolve'), + ], $DoHHTTPClient ); $NonProxiedDoHConfig = new DoHConfig( [ - new Nameserver('https://mozilla.cloudflare-dns.com/dns-query'), - new Nameserver('https://google.com/resolve', Nameserver::GOOGLE_JSON, ["Host" => "dns.google.com"]), - ] + new Nameserver('https://mozilla.cloudflare-dns.com/dns-query'), + new Nameserver('https://dns.google/resolve'), + ] ); - $this->DoHClient = Magic::$altervista || Magic::$zerowebhost ? new Rfc1035StubResolver() : new Rfc8484StubResolver($DoHConfig); - $this->NonProxiedDoHClient = Magic::$altervista || Magic::$zerowebhost ? new Rfc1035StubResolver() : new Rfc8484StubResolver($NonProxiedDoHConfig); + $this->DoHClient = Magic::$altervista || Magic::$zerowebhost ? + new Rfc1035StubResolver() : + new Rfc8484StubResolver($DoHConfig); + + $this->NonProxiedDoHClient = Magic::$altervista || Magic::$zerowebhost ? + new Rfc1035StubResolver() : + new Rfc8484StubResolver($NonProxiedDoHConfig); } } - /** - * Asynchronously establish an encrypted TCP connection (non-blocking). - * - * Note: Once resolved the socket stream will already be set to non-blocking mode. - * - * @param ConnectionContext $ctx - * @param string $uricall - * @param ClientConnectContext $socketContext - * @param ClientTlsContext $tlsContext - * @param CancellationToken $token - * - * @return Promise - */ - public function cryptoConnect( - ConnectionContext $ctx, - string $uri, - ClientConnectContext $socketContext = null, - ClientTlsContext $tlsContext = null, - CancellationToken $token = null - ): Promise { - return call(function () use ($ctx, $uri, $socketContext, $tlsContext, $token) { - $tlsContext = $tlsContext ?? new ClientTlsContext; - - if ($tlsContext->getPeerName() === null) { - $tlsContext = $tlsContext->withPeerName(\parse_url($uri, PHP_URL_HOST)); - } - - /** @var ClientSocket $socket */ - $socket = yield $this->socketConnect($ctx, $uri, $socketContext, $token); - - $promise = $socket->enableCrypto($tlsContext); - - if ($token) { - $deferred = new Deferred; - $id = $token->subscribe([$deferred, "fail"]); - - $promise->onResolve(function ($exception) use ($id, $token, $deferred) { - if ($token->isRequested()) { - return; - } - - $token->unsubscribe($id); - - if ($exception) { - $deferred->fail($exception); - return; - } - - $deferred->resolve(); - }); - - $promise = $deferred->promise(); - } - - try { - yield $promise; - } catch (\Throwable $exception) { - $socket->close(); - throw $exception; - } - - return $socket; - }); - } /** * Asynchronously establish a socket connection to the specified URI. * @@ -493,38 +432,6 @@ class DataCenter throw $e; }); } - - public function rawConnect(string $uri, CancellationToken $token = null, ClientConnectContext $ctx = null, $fromDns = false): \Generator - { - $ctxs = $this->generateContexts(0, $uri, $ctx); - if (empty($ctxs)) { - throw new Exception("No contexts for raw connection to URI $uri"); - } - foreach ($ctxs as $ctx) { - /* @var $ctx \danog\MadelineProto\Stream\ConnectionContext */ - try { - $ctx->setIsDns($fromDns); - $ctx->setCancellationToken($token); - $result = yield $ctx->getStream(); - $this->API->logger->logger('OK!', \danog\MadelineProto\Logger::WARNING); - - return $result->getSocket(); - } catch (\Throwable $e) { - if (\defined('MADELINEPROTO_TEST') && MADELINEPROTO_TEST === 'pony') { - throw $e; - } - $this->API->logger->logger('Connection failed: '.$e, \danog\MadelineProto\Logger::ERROR); - if ($e instanceof MultiReasonException) { - foreach ($e->getReasons() as $reason) { - $this->API->logger->logger('Multireason: '.$reason, \danog\MadelineProto\Logger::ERROR); - } - } - } - } - - throw new \danog\MadelineProto\Exception("Could not connect to URI $uri"); - } - public function dcConnect(string $dc_number, int $id = -1): \Generator { $old = isset($this->sockets[$dc_number]) && ( @@ -732,15 +639,7 @@ class DataCenter CancellationToken $token = null ) use ($ctx): Promise { return $this->socketConnect($ctx, $uri, $socketContext, $token); - }, - function ( - string $uri, - ClientConnectContext $socketContext = null, - ClientTlsContext $tlsContext = null, - CancellationToken $token = null - ) use ($ctx): Promise { - return $this->cryptoConnect($ctx, $uri, $socketContext, $tlsContext, $token); - }, + } ]; } $ctx->addStream(...$stream); @@ -844,19 +743,19 @@ class DataCenter } /** - * Get Artax async HTTP client. + * Get async HTTP client. * - * @return \Amp\Artax\Client + * @return \Amp\Http\Client\DelegateHttpClient */ - public function getHTTPClient(): Client + public function getHTTPClient(): DelegateHttpClient { return $this->HTTPClient; } /** - * Get Artax async HTTP client. + * Get async HTTP client cookies. * - * @return \Amp\Artax\CookieJar + * @return \Amp\Http\Client\Cookie\CookieJar */ public function getCookieJar(): CookieJar { diff --git a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php index cfa33d5a..d5ca6ce2 100644 --- a/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/AuthKeyHandler.php @@ -19,7 +19,7 @@ namespace danog\MadelineProto\MTProtoTools; -use Amp\Artax\Request; +use Amp\Http\Client\Request; use danog\MadelineProto\DataCenterConnection; use danog\MadelineProto\MTProto\AuthKey; use danog\MadelineProto\MTProto\PermAuthKey; @@ -605,9 +605,10 @@ trait AuthKeyHandler ]; $url = 'https://www.wolframalpha.com/input/json.jsp?'.\http_build_query($params); - $request = (new Request($url))->withHeader('referer', 'https://www.wolframalpha.com/input/?i='.\urlencode($query)); + $request = new Request($url); + $request->setHeader('referer', 'https://www.wolframalpha.com/input/?i='.\urlencode($query)); - $res = \json_decode(yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody(), true); + $res = \json_decode(yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody()->buffer(), true); if (!isset($res['queryresult']['pods'])) { return false; } diff --git a/src/danog/MadelineProto/MTProtoTools/Files.php b/src/danog/MadelineProto/MTProtoTools/Files.php index b3a19f62..6375abf6 100644 --- a/src/danog/MadelineProto/MTProtoTools/Files.php +++ b/src/danog/MadelineProto/MTProtoTools/Files.php @@ -19,13 +19,12 @@ namespace danog\MadelineProto\MTProtoTools; -use Amp\Artax\Client; use Amp\ByteStream\InputStream; use Amp\ByteStream\OutputStream; use Amp\ByteStream\ResourceOutputStream; use Amp\ByteStream\StreamException; use Amp\Deferred; -use Amp\File\BlockingHandle; +use Amp\File\BlockingFile; use Amp\File\Handle; use Amp\File\StatCache; use Amp\Success; @@ -34,6 +33,7 @@ use danog\MadelineProto\Exception; use danog\MadelineProto\FileCallbackInterface; use danog\MadelineProto\Logger; use danog\MadelineProto\RPCErrorException; +use danog\MadelineProto\Stream\Common\BufferedRawStream; use danog\MadelineProto\Stream\Common\SimpleBufferedRawStream; use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\Transport\PremadeStream; @@ -91,7 +91,7 @@ trait Files $cb = $url; $url = $url->getFile(); } - /** @var $response \Amp\Artax\Response */ + /** @var $response \Amp\Http\Client\Response */ $response = yield $this->datacenter->getHTTPClient()->request($url, [Client::OP_MAX_BODY_BYTES => 512 * 1024 * 3000, Client::OP_TRANSFER_TIMEOUT => 10*1000*3600]); if (200 !== $status = $response->getStatus()) { throw new Exception("Wrong status code: $status ".$response->getReason()); @@ -104,7 +104,7 @@ trait Files $this->logger->logger("No content length for $url, caching first"); $body = $stream; - $stream = new BlockingHandle(\fopen('php://temp', 'r+b'), 'php://temp', 'r+b'); + $stream = new BlockingFile(\fopen('php://temp', 'r+b'), 'php://temp', 'r+b'); while (null !== $chunk = yield $body->read()) { yield $stream->write($chunk); diff --git a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php index e02b13dc..de23aee6 100644 --- a/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php +++ b/src/danog/MadelineProto/MTProtoTools/UpdateHandler.php @@ -19,8 +19,8 @@ namespace danog\MadelineProto\MTProtoTools; -use Amp\Artax\Request; use Amp\Deferred; +use Amp\Http\Client\Request; use Amp\Loop; /** @@ -392,9 +392,11 @@ trait UpdateHandler return false; } \danog\MadelineProto\Tools::callFork((function () use ($payload) { - $request = (new Request($this->hook_url, 'POST'))->withHeader('content-type', 'application/json')->withBody($payload); + $request = new Request($this->hook_url, 'POST'); + $request->setHeader('content-type', 'application/json'); + $request->setBody($payload); - $result = yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody(); + $result = yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody()->buffer(); $this->logger->logger('Result of webhook query is '.$result, \danog\MadelineProto\Logger::NOTICE); $result = \json_decode($result, true); diff --git a/src/danog/MadelineProto/Proxy.php b/src/danog/MadelineProto/Proxy.php deleted file mode 100644 index e2b9047b..00000000 --- a/src/danog/MadelineProto/Proxy.php +++ /dev/null @@ -1,57 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2016-2019 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - * - * @link https://docs.madelineproto.xyz MadelineProto documentation - */ - -namespace danog\MadelineProto; - -interface Proxy -{ - public function __construct(int $domain, int $type, int $protocol); - - public function setOption(int $level, int $name, $value); - - public function getOption(int $level, int $name); - - public function setBlocking(bool $blocking); - - public function bind(string $address, int $port = 0); - - public function listen(int $backlog = 0); - - public function accept(); - - public function connect(string $address, int $port = 0); - - public function read(int $length, int $flags = 0); - - public function write(string $buffer, int $length = -1); - - public function send(string $data, int $length, int $flags); - - public function close(); - - public function getPeerName(bool $port = true); - - public function getSockName(bool $port = true); - - public function getProxyHeaders(); - - public function setExtra(array $extra = []); - - public function getResource(); -} diff --git a/src/danog/MadelineProto/ProxyConnector.php b/src/danog/MadelineProto/ProxyConnector.php new file mode 100644 index 00000000..a4cf872d --- /dev/null +++ b/src/danog/MadelineProto/ProxyConnector.php @@ -0,0 +1,71 @@ +. + * + * @author Daniil Gentili + * @copyright 2016-2019 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + * + * @link https://docs.madelineproto.xyz MadelineProto documentation + */ + +namespace danog\MadelineProto; + +use Amp\MultiReasonException; +use Amp\Socket\Connector; + +class ProxyConnector implements Connector +{ + private $dataCenter; + private $fromDns = false; + public function __construct(DataCenter $dataCenter, bool $fromDns = false) + { + $this->dataCenter = $dataCenter; + $this->fromDns = false; + } + + public function connect(string $uri, ?ConnectContext $ctx = null, ?CancellationToken $token = null): Promise + { + return Tools::call(static function () use ($uri, $ctx, $token) { + $ctx = $ctx ?? new ConnectContext; + $token = $token ?? new NullCancellationToken; + + $ctxs = $this->datacenter->generateContexts(0, $uri, $ctx); + if (empty($ctxs)) { + throw new Exception("No contexts for raw connection to URI $uri"); + } + foreach ($ctxs as $ctx) { + /* @var $ctx \danog\MadelineProto\Stream\ConnectionContext */ + try { + $ctx->setIsDns($this->fromDns); + $ctx->setCancellationToken($token); + $result = yield $ctx->getStream(); + $this->API->logger->logger('OK!', \danog\MadelineProto\Logger::WARNING); + + return $result->getSocket(); + } catch (\Throwable $e) { + if (\defined('MADELINEPROTO_TEST') && MADELINEPROTO_TEST === 'pony') { + throw $e; + } + $this->API->logger->logger('Connection failed: '.$e, \danog\MadelineProto\Logger::ERROR); + if ($e instanceof MultiReasonException) { + foreach ($e->getReasons() as $reason) { + $this->API->logger->logger('Multireason: '.$reason, \danog\MadelineProto\Logger::ERROR); + } + } + } + } + + throw new \danog\MadelineProto\Exception("Could not connect to URI $uri"); + + }); + } +} \ No newline at end of file diff --git a/src/danog/MadelineProto/ProxySocketPool.php b/src/danog/MadelineProto/ProxySocketPool.php deleted file mode 100644 index a189fb58..00000000 --- a/src/danog/MadelineProto/ProxySocketPool.php +++ /dev/null @@ -1,215 +0,0 @@ - null, - 'udp' => null, - 'unix' => null, - 'udg' => null, - ]; - private $sockets = []; - private $socketIdUriMap = []; - private $pendingCount = []; - private $idleTimeout; - private $socketContext; - private $connectCallback; - - public function __construct(callable $connectCallback, int $idleTimeout = 10000, ClientConnectContext $socketContext = null) - { - $this->idleTimeout = $idleTimeout; - $this->socketContext = $socketContext ?? new ClientConnectContext(); - $this->connectCallback = $connectCallback; - } - - /** - * @param string $uri - * - * @throws SocketException - * - * @return string - */ - private function normalizeUri(string $uri): string - { - if (\stripos($uri, 'unix://') === 0) { - return $uri; - } - - try { - $parts = \League\Uri\UriString::parse($uri); - } catch (\Exception $exception) { - throw new SocketException('Could not parse URI', 0, $exception); - } - if ($parts['scheme'] === null) { - throw new SocketException('Invalid URI for socket pool; no scheme given'); - } - $port = $parts['port'] ?? 0; - if ($parts['host'] === null || $port === 0) { - throw new SocketException('Invalid URI for socket pool; missing host or port'); - } - $scheme = \strtolower($parts['scheme']); - $host = \strtolower($parts['host']); - if (!\array_key_exists($scheme, self::ALLOWED_SCHEMES)) { - throw new SocketException(\sprintf( - "Invalid URI for socket pool; '%s' scheme not allowed - scheme must be one of %s", - $scheme, - \implode(', ', \array_keys(self::ALLOWED_SCHEMES)) - )); - } - if ($parts['query'] !== null || $parts['fragment'] !== null) { - throw new SocketException('Invalid URI for socket pool; query or fragment components not allowed'); - } - if ($parts['path'] !== '') { - throw new SocketException('Invalid URI for socket pool; path component must be empty'); - } - if ($parts['user'] !== null) { - throw new SocketException('Invalid URI for socket pool; user component not allowed'); - } - - return $scheme.'://'.$host.':'.$port; - } - - /** {@inheritdoc} */ - public function checkout(string $uri, CancellationToken $token = null): Promise - { - // A request might already be cancelled before we reach the checkout, so do not even attempt to checkout in that - // case. The weird logic is required to throw the token's exception instead of creating a new one. - if ($token && $token->isRequested()) { - try { - $token->throwIfRequested(); - } catch (CancelledException $e) { - return new Failure($e); - } - } - $uri = $this->normalizeUri($uri); - if (empty($this->sockets[$uri])) { - return $this->checkoutNewSocket($uri, $token); - } - foreach ($this->sockets[$uri] as $socketId => $socket) { - if (!$socket->isAvailable) { - continue; - } - if (!\is_resource($socket->resource) || \feof($socket->resource)) { - $this->clearFromId((int) $socket->resource); - continue; - } - $socket->isAvailable = false; - if ($socket->idleWatcher !== null) { - Loop::disable($socket->idleWatcher); - } - - return new Success(new ClientSocket($socket->resource)); - } - - return $this->checkoutNewSocket($uri, $token); - } - - private function checkoutNewSocket(string $uri, CancellationToken $token = null): Promise - { - return call(function () use ($uri, $token) { - $this->pendingCount[$uri] = ($this->pendingCount[$uri] ?? 0) + 1; - - try { - /** @var ClientSocket $rawSocket */ - $rawSocket = yield \danog\MadelineProto\Tools::call(($this->connectCallback)($uri, $token, $this->socketContext)); - } finally { - if (--$this->pendingCount[$uri] === 0) { - unset($this->pendingCount[$uri]); - } - } - $socketId = (int) $rawSocket->getResource(); - $socket = new class() { - use Struct; - public $id; - public $uri; - public $resource; - public $isAvailable; - public $idleWatcher; - }; - $socket->id = $socketId; - $socket->uri = $uri; - $socket->resource = $rawSocket->getResource(); - $socket->isAvailable = false; - $this->sockets[$uri][$socketId] = $socket; - $this->socketIdUriMap[$socketId] = $uri; - - return $rawSocket; - }); - } - - /** {@inheritdoc} */ - public function clear(ClientSocket $socket): void - { - $this->clearFromId((int) $socket->getResource()); - } - - /** - * @param int $socketId - */ - private function clearFromId(int $socketId): void - { - if (!isset($this->socketIdUriMap[$socketId])) { - throw new \Error( - \sprintf('Unknown socket: %d', $socketId) - ); - } - $uri = $this->socketIdUriMap[$socketId]; - $socket = $this->sockets[$uri][$socketId]; - if ($socket->idleWatcher) { - Loop::cancel($socket->idleWatcher); - } - unset( - $this->sockets[$uri][$socketId], - $this->socketIdUriMap[$socketId] - ); - if (empty($this->sockets[$uri])) { - unset($this->sockets[$uri]); - } - } - - /** {@inheritdoc} */ - public function checkin(ClientSocket $socket): void - { - $socketId = (int) $socket->getResource(); - if (!isset($this->socketIdUriMap[$socketId])) { - throw new \Error( - \sprintf('Unknown socket: %d', $socketId) - ); - } - $uri = $this->socketIdUriMap[$socketId]; - $resource = $socket->getResource(); - if (!\is_resource($resource) || \feof($resource)) { - $this->clearFromId((int) $resource); - - return; - } - $socket = $this->sockets[$uri][$socketId]; - $socket->isAvailable = true; - if (isset($socket->idleWatcher)) { - Loop::enable($socket->idleWatcher); - } else { - $socket->idleWatcher = Loop::delay($this->idleTimeout, function () use ($socket) { - $this->clearFromId((int) $socket->resource); - }); - Loop::unreference($socket->idleWatcher); - } - } -} diff --git a/src/danog/MadelineProto/Stream/ConnectionContext.php b/src/danog/MadelineProto/Stream/ConnectionContext.php index b423da05..f026e078 100644 --- a/src/danog/MadelineProto/Stream/ConnectionContext.php +++ b/src/danog/MadelineProto/Stream/ConnectionContext.php @@ -19,7 +19,7 @@ namespace danog\MadelineProto\Stream; use Amp\CancellationToken; -use Amp\Socket\ClientConnectContext; +use Amp\Socket\ConnectContext; use Amp\Uri\Uri; use danog\MadelineProto\Exception; use danog\MadelineProto\Stream\MTProtoTransport\ObfuscatedStream; @@ -74,7 +74,7 @@ class ConnectionContext /** * Socket context. * - * @var \Amp\Socket\ClientConnectionContext + * @var \Amp\Socket\ConnectContext */ private $socketContext; /** @@ -118,11 +118,11 @@ class ConnectionContext /** * Set the socket context. * - * @param ClientConnectContext $socketContext + * @param ConnectContext $socketContext * * @return self */ - public function setSocketContext(ClientConnectContext $socketContext): self + public function setSocketContext(ConnectContext $socketContext): self { $this->socketContext = $socketContext; @@ -132,9 +132,9 @@ class ConnectionContext /** * Get the socket context. * - * @return ClientConnectContext + * @return ConnectContext */ - public function getSocketContext(): ClientConnectContext + public function getSocketContext(): ConnectContext { return $this->socketContext; } diff --git a/src/danog/MadelineProto/Stream/MTProtoTransport/AbridgedStream.php b/src/danog/MadelineProto/Stream/MTProtoTransport/AbridgedStream.php index e31b2b9b..e929e9cd 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTransport/AbridgedStream.php +++ b/src/danog/MadelineProto/Stream/MTProtoTransport/AbridgedStream.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto\Stream\MTProtoTransport; use Amp\Promise; +use Amp\Socket\EncryptableSocket; use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\BufferedStreamInterface; use danog\MadelineProto\Stream\ConnectionContext; @@ -100,9 +101,9 @@ class AbridgedStream implements BufferedStreamInterface, MTProtoBufferInterface /** * {@inheritdoc} * - * @return \Amp\Socket\Socket + * @return EncryptableSocket */ - public function getSocket(): \Amp\Socket\Socket + public function getSocket(): EncryptableSocket { return $this->stream->getSocket(); } diff --git a/src/danog/MadelineProto/Stream/MTProtoTransport/FullStream.php b/src/danog/MadelineProto/Stream/MTProtoTransport/FullStream.php index c2b04f33..c8e8861b 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTransport/FullStream.php +++ b/src/danog/MadelineProto/Stream/MTProtoTransport/FullStream.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto\Stream\MTProtoTransport; use Amp\Promise; +use Amp\Socket\EncryptableSocket; use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\BufferedStreamInterface; use danog\MadelineProto\Stream\Common\HashedBufferedStream; @@ -110,9 +111,9 @@ class FullStream implements BufferedStreamInterface, MTProtoBufferInterface /** * {@inheritdoc} * - * @return \Amp\Socket\Socket + * @return EncryptableSocket */ - public function getSocket(): \Amp\Socket\Socket + public function getSocket(): EncryptableSocket { return $this->stream->getSocket(); } diff --git a/src/danog/MadelineProto/Stream/MTProtoTransport/HttpStream.php b/src/danog/MadelineProto/Stream/MTProtoTransport/HttpStream.php index d1d43a48..9fc0d56e 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTransport/HttpStream.php +++ b/src/danog/MadelineProto/Stream/MTProtoTransport/HttpStream.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto\Stream\MTProtoTransport; use Amp\Promise; +use Amp\Socket\EncryptableSocket; use Amp\Success; use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\BufferedProxyStreamInterface; @@ -183,9 +184,9 @@ class HttpStream implements MTProtoBufferInterface, BufferedProxyStreamInterface /** * {@inheritdoc} * - * @return \Amp\Socket\Socket + * @return EncryptableSocket */ - public function getSocket(): \Amp\Socket\Socket + public function getSocket(): EncryptableSocket { return $this->stream->getSocket(); } diff --git a/src/danog/MadelineProto/Stream/MTProtoTransport/HttpsStream.php b/src/danog/MadelineProto/Stream/MTProtoTransport/HttpsStream.php index 07a88f31..dd631c1c 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTransport/HttpsStream.php +++ b/src/danog/MadelineProto/Stream/MTProtoTransport/HttpsStream.php @@ -40,16 +40,6 @@ class HttpsStream extends HttpStream implements MTProtoBufferInterface return parent::connectGenerator($ctx->getCtx()->secure(true), $header); } - /** - * {@inheritdoc} - * - * @return \Amp\Socket\Socket - */ - public function getSocket(): \Amp\Socket\Socket - { - return $this->stream->getSocket(); - } - public static function getName(): string { return __CLASS__; diff --git a/src/danog/MadelineProto/Stream/MTProtoTransport/IntermediatePaddedStream.php b/src/danog/MadelineProto/Stream/MTProtoTransport/IntermediatePaddedStream.php index 2a6c67b9..39702f39 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTransport/IntermediatePaddedStream.php +++ b/src/danog/MadelineProto/Stream/MTProtoTransport/IntermediatePaddedStream.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto\Stream\MTProtoTransport; use Amp\Promise; +use Amp\Socket\EncryptableSocket; use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\BufferedStreamInterface; use danog\MadelineProto\Stream\ConnectionContext; @@ -92,9 +93,9 @@ class IntermediatePaddedStream implements BufferedStreamInterface, MTProtoBuffer /** * {@inheritdoc} * - * @return \Amp\Socket\Socket + * @return EncryptableSocket */ - public function getSocket(): \Amp\Socket\Socket + public function getSocket(): EncryptableSocket { return $this->stream->getSocket(); } diff --git a/src/danog/MadelineProto/Stream/MTProtoTransport/IntermediateStream.php b/src/danog/MadelineProto/Stream/MTProtoTransport/IntermediateStream.php index 842cb7cd..94dd6070 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTransport/IntermediateStream.php +++ b/src/danog/MadelineProto/Stream/MTProtoTransport/IntermediateStream.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto\Stream\MTProtoTransport; use Amp\Promise; +use Amp\Socket\EncryptableSocket; use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\BufferedStreamInterface; use danog\MadelineProto\Stream\ConnectionContext; @@ -91,13 +92,14 @@ class IntermediateStream implements BufferedStreamInterface, MTProtoBufferInterf /** * {@inheritdoc} * - * @return \Amp\Socket\Socket + * @return EncryptableSocket */ - public function getSocket(): \Amp\Socket\Socket + public function getSocket(): EncryptableSocket { return $this->stream->getSocket(); } + public static function getName(): string { return __CLASS__; diff --git a/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php b/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php index cd3b89dc..8198fad4 100644 --- a/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php +++ b/src/danog/MadelineProto/Stream/MTProtoTransport/ObfuscatedStream.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto\Stream\MTProtoTransport; use Amp\Promise; +use Amp\Socket\EncryptableSocket; use danog\MadelineProto\Exception; use danog\MadelineProto\Stream\Async\Buffer; use danog\MadelineProto\Stream\Async\BufferedStream; @@ -201,9 +202,9 @@ class ObfuscatedStream implements BufferedProxyStreamInterface /** * {@inheritdoc} * - * @return \Amp\Socket\Socket + * @return EncryptableSocket */ - public function getSocket(): \Amp\Socket\Socket + public function getSocket(): EncryptableSocket { return $this->stream->getSocket(); } diff --git a/src/danog/MadelineProto/Stream/Proxy/HttpProxy.php b/src/danog/MadelineProto/Stream/Proxy/HttpProxy.php index 545b716d..3cfc6b97 100644 --- a/src/danog/MadelineProto/Stream/Proxy/HttpProxy.php +++ b/src/danog/MadelineProto/Stream/Proxy/HttpProxy.php @@ -20,6 +20,7 @@ namespace danog\MadelineProto\Stream\Proxy; use Amp\Promise; use Amp\Socket\ClientTlsContext; +use Amp\Socket\EncryptableSocket; use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\BufferedProxyStreamInterface; use danog\MadelineProto\Stream\ConnectionContext; @@ -47,6 +48,11 @@ class HttpProxy implements RawProxyStreamInterface, BufferedProxyStreamInterface $ctx = $ctx->getCtx(); $uri = $ctx->getUri(); $secure = $ctx->isSecure(); + + if ($secure) { + $ctx->setSocketContext($ctx->getSocketContext()->withTlsContext(new ClientTlsContext($uri->getHost()))); + } + $ctx->setUri('tcp://'.$this->extra['address'].':'.$this->extra['port'])->secure(false); $this->stream = yield $ctx->getStream(); @@ -125,8 +131,8 @@ class HttpProxy implements RawProxyStreamInterface, BufferedProxyStreamInterface $read = yield $buffer->bufferRead($length); } - if ($secure && \method_exists($this->getSocket(), 'enableCrypto')) { - yield $this->getSocket()->enableCrypto((new ClientTlsContext())->withPeerName($uri->getHost())); + if ($secure) { + yield $this->getSocket()->setupTls(); } \danog\MadelineProto\Logger::log('Connected to '.$address.':'.$port.' via http'); @@ -203,9 +209,9 @@ class HttpProxy implements RawProxyStreamInterface, BufferedProxyStreamInterface /** * {@inheritdoc} * - * @return \Amp\Socket\Socket + * @return EncryptableSocket */ - public function getSocket(): \Amp\Socket\Socket + public function getSocket(): EncryptableSocket { return $this->stream->getSocket(); } diff --git a/src/danog/MadelineProto/Stream/Proxy/SocksProxy.php b/src/danog/MadelineProto/Stream/Proxy/SocksProxy.php index c0ab463d..e3647ce1 100644 --- a/src/danog/MadelineProto/Stream/Proxy/SocksProxy.php +++ b/src/danog/MadelineProto/Stream/Proxy/SocksProxy.php @@ -58,6 +58,11 @@ class SocksProxy implements RawProxyStreamInterface, BufferedProxyStreamInterfac $ctx = $ctx->getCtx(); $uri = $ctx->getUri(); $secure = $ctx->isSecure(); + + if ($secure) { + $ctx->setSocketContext($ctx->getSocketContext()->withTlsContext(new ClientTlsContext($uri->getHost()))); + } + $ctx->setUri('tcp://'.$this->extra['address'].':'.$this->extra['port'])->secure(false); $methods = \chr(0); @@ -150,8 +155,8 @@ class SocksProxy implements RawProxyStreamInterface, BufferedProxyStreamInterfac \danog\MadelineProto\Logger::log(['Connected to '.$ip.':'.$port.' via socks5']); - if ($secure && \method_exists($this->getSocket(), 'enableCrypto')) { - yield $this->getSocket()->enableCrypto((new ClientTlsContext())->withPeerName($uri->getHost())); + if ($secure) { + yield $this->getSocket()->setupTls(); } if (\strlen($header)) { yield (yield $this->stream->getWriteBuffer(\strlen($header)))->bufferWrite($header); @@ -217,9 +222,9 @@ class SocksProxy implements RawProxyStreamInterface, BufferedProxyStreamInterfac /** * {@inheritdoc} * - * @return \Amp\Socket\Socket + * @return EncryptableSocket */ - public function getSocket(): \Amp\Socket\Socket + public function getSocket(): EncryptableSocket { return $this->stream->getSocket(); } diff --git a/src/danog/MadelineProto/Stream/StreamInterface.php b/src/danog/MadelineProto/Stream/StreamInterface.php index 3745178b..4eb7ee1f 100644 --- a/src/danog/MadelineProto/Stream/StreamInterface.php +++ b/src/danog/MadelineProto/Stream/StreamInterface.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto\Stream; use Amp\Promise; +use Amp\Socket\EncryptableSocket; use Amp\Socket\Socket; /** @@ -47,7 +48,7 @@ interface StreamInterface /** * Get underlying AMPHP socket resource. * - * @return \Amp\Socket\Socket + * @return EncryptableSocket */ public function getSocket(): Socket; } diff --git a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php index 8d0ebe73..885ddbe6 100644 --- a/src/danog/MadelineProto/Stream/Transport/DefaultStream.php +++ b/src/danog/MadelineProto/Stream/Transport/DefaultStream.php @@ -19,12 +19,16 @@ namespace danog\MadelineProto\Stream\Transport; use Amp\ByteStream\ClosedException; +use Amp\CancellationToken; use Amp\Promise; +use Amp\Socket\EncryptableSocket; use Amp\Socket\Socket; use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\ProxyStreamInterface; use danog\MadelineProto\Stream\RawStreamInterface; +use function Amp\Socket\connect; + /** * Default stream wrapper. * @@ -32,20 +36,23 @@ use danog\MadelineProto\Stream\RawStreamInterface; * * @author Daniil Gentili */ -class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInterface +class DefaultStream extends Socket implements RawStreamInterface { use RawStream; + /** + * Socket + * + * @var EncryptableSocket + */ private $stream; - private $connector = 'Amp\\Socket\\connect'; - private $cryptoConnector = 'Amp\\Socket\\cryptoConnect'; public function __construct() { } - public function enableCrypto(ClientTlsContext $tlsContext = null): \Amp\Promise + public function setupTls(?CancellationToken $cancellationToken = null): \Amp\Promise { - return $this->stream->enableCrypto($tlsContext); + return $this->stream->setupTls($cancellationToken); } public function getStream() @@ -55,10 +62,9 @@ class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInt public function connectGenerator(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator { + $this->stream = yield connect($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken()); if ($ctx->isSecure()) { - $this->stream = yield ($this->cryptoConnector)($ctx->getStringUri(), $ctx->getSocketContext(), null, $ctx->getCancellationToken()); - } else { - $this->stream = yield ($this->connector)($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken()); + yield $this->stream->setupTls(); } yield $this->stream->write($header); } @@ -89,9 +95,9 @@ class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInt } /** - * Async close. + * Close. * - * @return Generator + * @return void */ public function disconnect() { @@ -107,6 +113,11 @@ class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInt } } + /** + * Close + * + * @return void + */ public function close() { $this->disconnect(); @@ -115,20 +126,13 @@ class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInt /** * {@inheritdoc} * - * @return \Amp\Socket\Socket + * @return EncryptableSocket */ - public function getSocket(): \Amp\Socket\Socket + public function getSocket(): EncryptableSocket { return $this->stream; } - /** - * {@inheritdoc} - */ - public function setExtra($extra) - { - list($this->connector, $this->cryptoConnector) = $extra; - } public static function getName(): string { return __CLASS__; diff --git a/src/danog/MadelineProto/Stream/Transport/PremadeStream.php b/src/danog/MadelineProto/Stream/Transport/PremadeStream.php index 44db4421..ad209420 100644 --- a/src/danog/MadelineProto/Stream/Transport/PremadeStream.php +++ b/src/danog/MadelineProto/Stream/Transport/PremadeStream.php @@ -19,6 +19,7 @@ namespace danog\MadelineProto\Stream\Transport; use Amp\ByteStream\ClosedException; +use Amp\CancellationToken; use Amp\Promise; use Amp\Socket\Socket; use danog\MadelineProto\Stream\Async\RawStream; @@ -42,11 +43,12 @@ class PremadeStream extends Socket implements RawStreamInterface, ProxyStreamInt { } - public function enableCrypto(ClientTlsContext $tlsContext = null): \Amp\Promise + public function setupTls(?CancellationToken $cancellationToken = null): \Amp\Promise { - return $this->stream->enableCrypto($tlsContext); + return $this->stream->setupTls($cancellationToken); } + public function getStream() { return $this->stream; @@ -115,7 +117,7 @@ class PremadeStream extends Socket implements RawStreamInterface, ProxyStreamInt * * @return \Amp\Socket\Socket */ - public function getSocket(): \Amp\Socket\Socket + public function getSocket(): Socket { return $this->stream; }