Refactor, update amphp libs

This commit is contained in:
Daniil Gentili 2019-12-13 12:57:47 +01:00
parent 07b71923c8
commit 170eb6cf10
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
21 changed files with 211 additions and 492 deletions

View File

@ -2,6 +2,7 @@
"name": "danog/madelineproto", "name": "danog/madelineproto",
"description": "PHP implementation of telegram's MTProto protocol.", "description": "PHP implementation of telegram's MTProto protocol.",
"type": "project", "type": "project",
"minimum-stability": "dev",
"license": "AGPL-3.0-only", "license": "AGPL-3.0-only",
"homepage": "https://daniil.it/MadelineProto", "homepage": "https://daniil.it/MadelineProto",
"keywords": ["telegram", "mtproto", "protocol", "bytes", "messenger", "client", "PHP", "video", "stickers", "audio", "files", "GB"], "keywords": ["telegram", "mtproto", "protocol", "bytes", "messenger", "client", "PHP", "video", "stickers", "audio", "files", "GB"],
@ -12,32 +13,34 @@
"php": ">=7.1.0", "php": ">=7.1.0",
"danog/primemodule": "^1", "danog/primemodule": "^1",
"danog/magicalserializer": "^1.0", "danog/magicalserializer": "^1.0",
"phpseclib/phpseclib": "dev-master#5e7d39153270dfd19d585504b0a29ac65a62adf9 as 2.0.15", "phpseclib/phpseclib": "dev-master#f715b2928976aaef389839a056c947aa8023277b as 2.0.15",
"vlucas/phpdotenv": "^3", "vlucas/phpdotenv": "^3",
"erusev/parsedown": "^1.7", "erusev/parsedown": "^1.7",
"ext-mbstring": "*", "ext-mbstring": "*",
"ext-json": "*", "ext-json": "*",
"ext-xml": "*", "ext-xml": "*",
"ext-dom": "*",
"ext-filter": "*",
"ext-hash": "*",
"ext-zlib": "*",
"ext-fileinfo": "*", "ext-fileinfo": "*",
"amphp/amp": "^2.0", "amphp/amp": "^2.0",
"amphp/websocket-client": "dev-master as 1.0.0-rc2",
"amphp/http-client": "dev-master as 4.0.0-rc11",
"amphp/websocket": "dev-master#db2da8c5b3ed22eae37da5ffa10ab3ea8de19342 as 1", "amphp/socket": "^1",
"amphp/websocket-client": "dev-master#aff808025637bd705672338b4904ad03a4dbdc04 as 1", "amphp/dns": "dev-master#ecbeca2ae0e93c08e8150a92810a3961fad8ecbe as v1",
"amphp/socket": "0.10.12 as 1", "amphp/file": "^1",
"amphp/dns": "dev-master#aa1892bdf13b787d759df6f2523e8027a434d927 as v0.9.x-dev",
"amphp/artax": "dev-master as 3.0.99",
"amphp/file": "^0.3",
"amphp/uri": "^0.1.4",
"amphp/byte-stream": "^1.6", "amphp/byte-stream": "^1.6",
"danog/dns-over-https": "^0.1" "danog/dns-over-https": "^0.2",
"amphp/http-client-cookies": "dev-master"
}, },
"require-dev": { "require-dev": {
"phpdocumentor/reflection-docblock": "^4.3", "phpdocumentor/reflection-docblock": "^4.3",
"ennexa/amp-update-cache": "dev-master", "ennexa/amp-update-cache": "dev-master",
"phpunit/phpunit": "^8", "phpunit/phpunit": "^8",
"amphp/php-cs-fixer-config": "dev-master", "amphp/php-cs-fixer-config": "dev-master",
"haydenpierce/class-finder": "^0.4" "haydenpierce/class-finder": "^0.4",
"ext-ctype":"*"
}, },
"suggest": { "suggest": {
"ext-libtgvoip": "Install the php-libtgvoip extension to make phone calls (https://github.com/danog/php-libtgvoip)" "ext-libtgvoip": "Install the php-libtgvoip extension to make phone calls (https://github.com/danog/php-libtgvoip)"

View File

@ -19,11 +19,6 @@
namespace danog\MadelineProto; namespace danog\MadelineProto;
use Amp\Artax\Client;
use Amp\Artax\Cookie\ArrayCookieJar;
use Amp\Artax\Cookie\CookieJar;
use Amp\Artax\DefaultClient;
use Amp\Artax\HttpSocketPool;
use Amp\CancellationToken; use Amp\CancellationToken;
use Amp\Deferred; use Amp\Deferred;
use Amp\Dns\Record; use Amp\Dns\Record;
@ -32,15 +27,20 @@ use Amp\Dns\Rfc1035StubResolver;
use Amp\DoH\DoHConfig; use Amp\DoH\DoHConfig;
use Amp\DoH\Nameserver; use Amp\DoH\Nameserver;
use Amp\DoH\Rfc8484StubResolver; use Amp\DoH\Rfc8484StubResolver;
use Amp\Http\Client\Connection\DefaultConnectionFactory;
use Amp\Http\Client\Connection\UnlimitedConnectionPool;
use Amp\Http\Client\Cookie\CookieInterceptor;
use Amp\Http\Client\Cookie\CookieJar;
use Amp\Http\Client\Cookie\InMemoryCookieJar;
use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Loop; use Amp\Loop;
use Amp\MultiReasonException; use Amp\MultiReasonException;
use Amp\NullCancellationToken; use Amp\NullCancellationToken;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\ClientConnectContext;
use Amp\Socket\ClientSocket; use Amp\Socket\ClientSocket;
use Amp\Socket\ClientTlsContext; use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectException; use Amp\Socket\ConnectException;
use Amp\Socket\Socket;
use Amp\TimeoutException; use Amp\TimeoutException;
use danog\MadelineProto\MTProto\PermAuthKey; use danog\MadelineProto\MTProto\PermAuthKey;
use danog\MadelineProto\MTProto\TempAuthKey; use danog\MadelineProto\MTProto\TempAuthKey;
@ -102,7 +102,7 @@ class DataCenter
/** /**
* HTTP client. * HTTP client.
* *
* @var \Amp\Artax\Client * @var \Amp\Http\Client\DelegateHttpClient
*/ */
private $HTTPClient; private $HTTPClient;
/** /**
@ -120,7 +120,7 @@ class DataCenter
/** /**
* Cookie jar. * Cookie jar.
* *
* @var \Amp\Artax\Cookie\CookieJar * @var \Amp\Http\Client\Cookie\CookieJar
*/ */
private $CookieJar; private $CookieJar;
@ -227,101 +227,40 @@ class DataCenter
} }
if ($reconnectAll || $changedSettings || !$this->CookieJar) { if ($reconnectAll || $changedSettings || !$this->CookieJar) {
$this->CookieJar = $jar ?? new ArrayCookieJar; $this->CookieJar = $jar ?? new InMemoryCookieJar;
$this->HTTPClient = new DefaultClient($this->CookieJar, new HttpSocketPool(new ProxySocketPool([$this, 'rawConnect']))); $this->HTTPClient = (new HttpClientBuilder)
->interceptNetwork(new CookieInterceptor($this->CookieJar))
->usingPool(new UnlimitedConnectionPool(new DefaultConnectionFactory(new ProxyConnector($this))))
->build();
$DoHHTTPClient = (new HttpClientBuilder)
->interceptNetwork(new CookieInterceptor($this->CookieJar))
->usingPool(new UnlimitedConnectionPool(new DefaultConnectionFactory(new ProxyConnector($this, true))))
->build();
$DoHHTTPClient = new DefaultClient(
$this->CookieJar,
new HttpSocketPool(
new ProxySocketPool(
function (string $uri, CancellationToken $token = null, ClientConnectContext $ctx = null) {
return $this->rawConnect($uri, $token, $ctx, true);
}
)
)
);
$DoHConfig = new DoHConfig( $DoHConfig = new DoHConfig(
[ [
new Nameserver('https://mozilla.cloudflare-dns.com/dns-query'), new Nameserver('https://mozilla.cloudflare-dns.com/dns-query'),
new Nameserver('https://google.com/resolve', Nameserver::GOOGLE_JSON, ["Host" => "dns.google.com"]), new Nameserver('https://dns.google/resolve'),
], ],
$DoHHTTPClient $DoHHTTPClient
); );
$NonProxiedDoHConfig = new DoHConfig( $NonProxiedDoHConfig = new DoHConfig(
[ [
new Nameserver('https://mozilla.cloudflare-dns.com/dns-query'), new Nameserver('https://mozilla.cloudflare-dns.com/dns-query'),
new Nameserver('https://google.com/resolve', Nameserver::GOOGLE_JSON, ["Host" => "dns.google.com"]), new Nameserver('https://dns.google/resolve'),
] ]
); );
$this->DoHClient = Magic::$altervista || Magic::$zerowebhost ? new Rfc1035StubResolver() : new Rfc8484StubResolver($DoHConfig); $this->DoHClient = Magic::$altervista || Magic::$zerowebhost ?
$this->NonProxiedDoHClient = Magic::$altervista || Magic::$zerowebhost ? new Rfc1035StubResolver() : new Rfc8484StubResolver($NonProxiedDoHConfig); new Rfc1035StubResolver() :
new Rfc8484StubResolver($DoHConfig);
$this->NonProxiedDoHClient = Magic::$altervista || Magic::$zerowebhost ?
new Rfc1035StubResolver() :
new Rfc8484StubResolver($NonProxiedDoHConfig);
} }
} }
/**
* Asynchronously establish an encrypted TCP connection (non-blocking).
*
* Note: Once resolved the socket stream will already be set to non-blocking mode.
*
* @param ConnectionContext $ctx
* @param string $uricall
* @param ClientConnectContext $socketContext
* @param ClientTlsContext $tlsContext
* @param CancellationToken $token
*
* @return Promise<ClientSocket>
*/
public function cryptoConnect(
ConnectionContext $ctx,
string $uri,
ClientConnectContext $socketContext = null,
ClientTlsContext $tlsContext = null,
CancellationToken $token = null
): Promise {
return call(function () use ($ctx, $uri, $socketContext, $tlsContext, $token) {
$tlsContext = $tlsContext ?? new ClientTlsContext;
if ($tlsContext->getPeerName() === null) {
$tlsContext = $tlsContext->withPeerName(\parse_url($uri, PHP_URL_HOST));
}
/** @var ClientSocket $socket */
$socket = yield $this->socketConnect($ctx, $uri, $socketContext, $token);
$promise = $socket->enableCrypto($tlsContext);
if ($token) {
$deferred = new Deferred;
$id = $token->subscribe([$deferred, "fail"]);
$promise->onResolve(function ($exception) use ($id, $token, $deferred) {
if ($token->isRequested()) {
return;
}
$token->unsubscribe($id);
if ($exception) {
$deferred->fail($exception);
return;
}
$deferred->resolve();
});
$promise = $deferred->promise();
}
try {
yield $promise;
} catch (\Throwable $exception) {
$socket->close();
throw $exception;
}
return $socket;
});
}
/** /**
* Asynchronously establish a socket connection to the specified URI. * Asynchronously establish a socket connection to the specified URI.
* *
@ -493,38 +432,6 @@ class DataCenter
throw $e; throw $e;
}); });
} }
public function rawConnect(string $uri, CancellationToken $token = null, ClientConnectContext $ctx = null, $fromDns = false): \Generator
{
$ctxs = $this->generateContexts(0, $uri, $ctx);
if (empty($ctxs)) {
throw new Exception("No contexts for raw connection to URI $uri");
}
foreach ($ctxs as $ctx) {
/* @var $ctx \danog\MadelineProto\Stream\ConnectionContext */
try {
$ctx->setIsDns($fromDns);
$ctx->setCancellationToken($token);
$result = yield $ctx->getStream();
$this->API->logger->logger('OK!', \danog\MadelineProto\Logger::WARNING);
return $result->getSocket();
} catch (\Throwable $e) {
if (\defined('MADELINEPROTO_TEST') && MADELINEPROTO_TEST === 'pony') {
throw $e;
}
$this->API->logger->logger('Connection failed: '.$e, \danog\MadelineProto\Logger::ERROR);
if ($e instanceof MultiReasonException) {
foreach ($e->getReasons() as $reason) {
$this->API->logger->logger('Multireason: '.$reason, \danog\MadelineProto\Logger::ERROR);
}
}
}
}
throw new \danog\MadelineProto\Exception("Could not connect to URI $uri");
}
public function dcConnect(string $dc_number, int $id = -1): \Generator public function dcConnect(string $dc_number, int $id = -1): \Generator
{ {
$old = isset($this->sockets[$dc_number]) && ( $old = isset($this->sockets[$dc_number]) && (
@ -732,15 +639,7 @@ class DataCenter
CancellationToken $token = null CancellationToken $token = null
) use ($ctx): Promise { ) use ($ctx): Promise {
return $this->socketConnect($ctx, $uri, $socketContext, $token); return $this->socketConnect($ctx, $uri, $socketContext, $token);
}, }
function (
string $uri,
ClientConnectContext $socketContext = null,
ClientTlsContext $tlsContext = null,
CancellationToken $token = null
) use ($ctx): Promise {
return $this->cryptoConnect($ctx, $uri, $socketContext, $tlsContext, $token);
},
]; ];
} }
$ctx->addStream(...$stream); $ctx->addStream(...$stream);
@ -844,19 +743,19 @@ class DataCenter
} }
/** /**
* Get Artax async HTTP client. * Get async HTTP client.
* *
* @return \Amp\Artax\Client * @return \Amp\Http\Client\DelegateHttpClient
*/ */
public function getHTTPClient(): Client public function getHTTPClient(): DelegateHttpClient
{ {
return $this->HTTPClient; return $this->HTTPClient;
} }
/** /**
* Get Artax async HTTP client. * Get async HTTP client cookies.
* *
* @return \Amp\Artax\CookieJar * @return \Amp\Http\Client\Cookie\CookieJar
*/ */
public function getCookieJar(): CookieJar public function getCookieJar(): CookieJar
{ {

View File

@ -19,7 +19,7 @@
namespace danog\MadelineProto\MTProtoTools; namespace danog\MadelineProto\MTProtoTools;
use Amp\Artax\Request; use Amp\Http\Client\Request;
use danog\MadelineProto\DataCenterConnection; use danog\MadelineProto\DataCenterConnection;
use danog\MadelineProto\MTProto\AuthKey; use danog\MadelineProto\MTProto\AuthKey;
use danog\MadelineProto\MTProto\PermAuthKey; use danog\MadelineProto\MTProto\PermAuthKey;
@ -605,9 +605,10 @@ trait AuthKeyHandler
]; ];
$url = 'https://www.wolframalpha.com/input/json.jsp?'.\http_build_query($params); $url = 'https://www.wolframalpha.com/input/json.jsp?'.\http_build_query($params);
$request = (new Request($url))->withHeader('referer', 'https://www.wolframalpha.com/input/?i='.\urlencode($query)); $request = new Request($url);
$request->setHeader('referer', 'https://www.wolframalpha.com/input/?i='.\urlencode($query));
$res = \json_decode(yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody(), true); $res = \json_decode(yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody()->buffer(), true);
if (!isset($res['queryresult']['pods'])) { if (!isset($res['queryresult']['pods'])) {
return false; return false;
} }

View File

@ -19,13 +19,12 @@
namespace danog\MadelineProto\MTProtoTools; namespace danog\MadelineProto\MTProtoTools;
use Amp\Artax\Client;
use Amp\ByteStream\InputStream; use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream; use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ResourceOutputStream; use Amp\ByteStream\ResourceOutputStream;
use Amp\ByteStream\StreamException; use Amp\ByteStream\StreamException;
use Amp\Deferred; use Amp\Deferred;
use Amp\File\BlockingHandle; use Amp\File\BlockingFile;
use Amp\File\Handle; use Amp\File\Handle;
use Amp\File\StatCache; use Amp\File\StatCache;
use Amp\Success; use Amp\Success;
@ -34,6 +33,7 @@ use danog\MadelineProto\Exception;
use danog\MadelineProto\FileCallbackInterface; use danog\MadelineProto\FileCallbackInterface;
use danog\MadelineProto\Logger; use danog\MadelineProto\Logger;
use danog\MadelineProto\RPCErrorException; use danog\MadelineProto\RPCErrorException;
use danog\MadelineProto\Stream\Common\BufferedRawStream;
use danog\MadelineProto\Stream\Common\SimpleBufferedRawStream; use danog\MadelineProto\Stream\Common\SimpleBufferedRawStream;
use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\Transport\PremadeStream; use danog\MadelineProto\Stream\Transport\PremadeStream;
@ -91,7 +91,7 @@ trait Files
$cb = $url; $cb = $url;
$url = $url->getFile(); $url = $url->getFile();
} }
/** @var $response \Amp\Artax\Response */ /** @var $response \Amp\Http\Client\Response */
$response = yield $this->datacenter->getHTTPClient()->request($url, [Client::OP_MAX_BODY_BYTES => 512 * 1024 * 3000, Client::OP_TRANSFER_TIMEOUT => 10*1000*3600]); $response = yield $this->datacenter->getHTTPClient()->request($url, [Client::OP_MAX_BODY_BYTES => 512 * 1024 * 3000, Client::OP_TRANSFER_TIMEOUT => 10*1000*3600]);
if (200 !== $status = $response->getStatus()) { if (200 !== $status = $response->getStatus()) {
throw new Exception("Wrong status code: $status ".$response->getReason()); throw new Exception("Wrong status code: $status ".$response->getReason());
@ -104,7 +104,7 @@ trait Files
$this->logger->logger("No content length for $url, caching first"); $this->logger->logger("No content length for $url, caching first");
$body = $stream; $body = $stream;
$stream = new BlockingHandle(\fopen('php://temp', 'r+b'), 'php://temp', 'r+b'); $stream = new BlockingFile(\fopen('php://temp', 'r+b'), 'php://temp', 'r+b');
while (null !== $chunk = yield $body->read()) { while (null !== $chunk = yield $body->read()) {
yield $stream->write($chunk); yield $stream->write($chunk);

View File

@ -19,8 +19,8 @@
namespace danog\MadelineProto\MTProtoTools; namespace danog\MadelineProto\MTProtoTools;
use Amp\Artax\Request;
use Amp\Deferred; use Amp\Deferred;
use Amp\Http\Client\Request;
use Amp\Loop; use Amp\Loop;
/** /**
@ -392,9 +392,11 @@ trait UpdateHandler
return false; return false;
} }
\danog\MadelineProto\Tools::callFork((function () use ($payload) { \danog\MadelineProto\Tools::callFork((function () use ($payload) {
$request = (new Request($this->hook_url, 'POST'))->withHeader('content-type', 'application/json')->withBody($payload); $request = new Request($this->hook_url, 'POST');
$request->setHeader('content-type', 'application/json');
$request->setBody($payload);
$result = yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody(); $result = yield (yield $this->datacenter->getHTTPClient()->request($request))->getBody()->buffer();
$this->logger->logger('Result of webhook query is '.$result, \danog\MadelineProto\Logger::NOTICE); $this->logger->logger('Result of webhook query is '.$result, \danog\MadelineProto\Logger::NOTICE);
$result = \json_decode($result, true); $result = \json_decode($result, true);

View File

@ -1,57 +0,0 @@
<?php
/**
* Proxy module.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2019 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto;
interface Proxy
{
public function __construct(int $domain, int $type, int $protocol);
public function setOption(int $level, int $name, $value);
public function getOption(int $level, int $name);
public function setBlocking(bool $blocking);
public function bind(string $address, int $port = 0);
public function listen(int $backlog = 0);
public function accept();
public function connect(string $address, int $port = 0);
public function read(int $length, int $flags = 0);
public function write(string $buffer, int $length = -1);
public function send(string $data, int $length, int $flags);
public function close();
public function getPeerName(bool $port = true);
public function getSockName(bool $port = true);
public function getProxyHeaders();
public function setExtra(array $extra = []);
public function getResource();
}

View File

@ -0,0 +1,71 @@
<?php
/**
* Proxying AMPHP connector.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2019 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto;
use Amp\MultiReasonException;
use Amp\Socket\Connector;
class ProxyConnector implements Connector
{
private $dataCenter;
private $fromDns = false;
public function __construct(DataCenter $dataCenter, bool $fromDns = false)
{
$this->dataCenter = $dataCenter;
$this->fromDns = false;
}
public function connect(string $uri, ?ConnectContext $ctx = null, ?CancellationToken $token = null): Promise
{
return Tools::call(static function () use ($uri, $ctx, $token) {
$ctx = $ctx ?? new ConnectContext;
$token = $token ?? new NullCancellationToken;
$ctxs = $this->datacenter->generateContexts(0, $uri, $ctx);
if (empty($ctxs)) {
throw new Exception("No contexts for raw connection to URI $uri");
}
foreach ($ctxs as $ctx) {
/* @var $ctx \danog\MadelineProto\Stream\ConnectionContext */
try {
$ctx->setIsDns($this->fromDns);
$ctx->setCancellationToken($token);
$result = yield $ctx->getStream();
$this->API->logger->logger('OK!', \danog\MadelineProto\Logger::WARNING);
return $result->getSocket();
} catch (\Throwable $e) {
if (\defined('MADELINEPROTO_TEST') && MADELINEPROTO_TEST === 'pony') {
throw $e;
}
$this->API->logger->logger('Connection failed: '.$e, \danog\MadelineProto\Logger::ERROR);
if ($e instanceof MultiReasonException) {
foreach ($e->getReasons() as $reason) {
$this->API->logger->logger('Multireason: '.$reason, \danog\MadelineProto\Logger::ERROR);
}
}
}
}
throw new \danog\MadelineProto\Exception("Could not connect to URI $uri");
});
}
}

View File

@ -1,215 +0,0 @@
<?php
// Based on AMPHP's default socket pool
namespace danog\MadelineProto;
use Amp\CancellationToken;
use Amp\CancelledException;
use Amp\Failure;
use Amp\Loop;
use Amp\Promise;
use Amp\Socket\ClientConnectContext;
use Amp\Socket\ClientSocket;
use Amp\Socket\SocketPool;
use Amp\Struct;
use Amp\Success;
use League\Uri;
use function Amp\call;
class ProxySocketPool implements SocketPool
{
use Tools;
const ALLOWED_SCHEMES = [
'tcp' => null,
'udp' => null,
'unix' => null,
'udg' => null,
];
private $sockets = [];
private $socketIdUriMap = [];
private $pendingCount = [];
private $idleTimeout;
private $socketContext;
private $connectCallback;
public function __construct(callable $connectCallback, int $idleTimeout = 10000, ClientConnectContext $socketContext = null)
{
$this->idleTimeout = $idleTimeout;
$this->socketContext = $socketContext ?? new ClientConnectContext();
$this->connectCallback = $connectCallback;
}
/**
* @param string $uri
*
* @throws SocketException
*
* @return string
*/
private function normalizeUri(string $uri): string
{
if (\stripos($uri, 'unix://') === 0) {
return $uri;
}
try {
$parts = \League\Uri\UriString::parse($uri);
} catch (\Exception $exception) {
throw new SocketException('Could not parse URI', 0, $exception);
}
if ($parts['scheme'] === null) {
throw new SocketException('Invalid URI for socket pool; no scheme given');
}
$port = $parts['port'] ?? 0;
if ($parts['host'] === null || $port === 0) {
throw new SocketException('Invalid URI for socket pool; missing host or port');
}
$scheme = \strtolower($parts['scheme']);
$host = \strtolower($parts['host']);
if (!\array_key_exists($scheme, self::ALLOWED_SCHEMES)) {
throw new SocketException(\sprintf(
"Invalid URI for socket pool; '%s' scheme not allowed - scheme must be one of %s",
$scheme,
\implode(', ', \array_keys(self::ALLOWED_SCHEMES))
));
}
if ($parts['query'] !== null || $parts['fragment'] !== null) {
throw new SocketException('Invalid URI for socket pool; query or fragment components not allowed');
}
if ($parts['path'] !== '') {
throw new SocketException('Invalid URI for socket pool; path component must be empty');
}
if ($parts['user'] !== null) {
throw new SocketException('Invalid URI for socket pool; user component not allowed');
}
return $scheme.'://'.$host.':'.$port;
}
/** {@inheritdoc} */
public function checkout(string $uri, CancellationToken $token = null): Promise
{
// A request might already be cancelled before we reach the checkout, so do not even attempt to checkout in that
// case. The weird logic is required to throw the token's exception instead of creating a new one.
if ($token && $token->isRequested()) {
try {
$token->throwIfRequested();
} catch (CancelledException $e) {
return new Failure($e);
}
}
$uri = $this->normalizeUri($uri);
if (empty($this->sockets[$uri])) {
return $this->checkoutNewSocket($uri, $token);
}
foreach ($this->sockets[$uri] as $socketId => $socket) {
if (!$socket->isAvailable) {
continue;
}
if (!\is_resource($socket->resource) || \feof($socket->resource)) {
$this->clearFromId((int) $socket->resource);
continue;
}
$socket->isAvailable = false;
if ($socket->idleWatcher !== null) {
Loop::disable($socket->idleWatcher);
}
return new Success(new ClientSocket($socket->resource));
}
return $this->checkoutNewSocket($uri, $token);
}
private function checkoutNewSocket(string $uri, CancellationToken $token = null): Promise
{
return call(function () use ($uri, $token) {
$this->pendingCount[$uri] = ($this->pendingCount[$uri] ?? 0) + 1;
try {
/** @var ClientSocket $rawSocket */
$rawSocket = yield \danog\MadelineProto\Tools::call(($this->connectCallback)($uri, $token, $this->socketContext));
} finally {
if (--$this->pendingCount[$uri] === 0) {
unset($this->pendingCount[$uri]);
}
}
$socketId = (int) $rawSocket->getResource();
$socket = new class() {
use Struct;
public $id;
public $uri;
public $resource;
public $isAvailable;
public $idleWatcher;
};
$socket->id = $socketId;
$socket->uri = $uri;
$socket->resource = $rawSocket->getResource();
$socket->isAvailable = false;
$this->sockets[$uri][$socketId] = $socket;
$this->socketIdUriMap[$socketId] = $uri;
return $rawSocket;
});
}
/** {@inheritdoc} */
public function clear(ClientSocket $socket): void
{
$this->clearFromId((int) $socket->getResource());
}
/**
* @param int $socketId
*/
private function clearFromId(int $socketId): void
{
if (!isset($this->socketIdUriMap[$socketId])) {
throw new \Error(
\sprintf('Unknown socket: %d', $socketId)
);
}
$uri = $this->socketIdUriMap[$socketId];
$socket = $this->sockets[$uri][$socketId];
if ($socket->idleWatcher) {
Loop::cancel($socket->idleWatcher);
}
unset(
$this->sockets[$uri][$socketId],
$this->socketIdUriMap[$socketId]
);
if (empty($this->sockets[$uri])) {
unset($this->sockets[$uri]);
}
}
/** {@inheritdoc} */
public function checkin(ClientSocket $socket): void
{
$socketId = (int) $socket->getResource();
if (!isset($this->socketIdUriMap[$socketId])) {
throw new \Error(
\sprintf('Unknown socket: %d', $socketId)
);
}
$uri = $this->socketIdUriMap[$socketId];
$resource = $socket->getResource();
if (!\is_resource($resource) || \feof($resource)) {
$this->clearFromId((int) $resource);
return;
}
$socket = $this->sockets[$uri][$socketId];
$socket->isAvailable = true;
if (isset($socket->idleWatcher)) {
Loop::enable($socket->idleWatcher);
} else {
$socket->idleWatcher = Loop::delay($this->idleTimeout, function () use ($socket) {
$this->clearFromId((int) $socket->resource);
});
Loop::unreference($socket->idleWatcher);
}
}
}

View File

@ -19,7 +19,7 @@
namespace danog\MadelineProto\Stream; namespace danog\MadelineProto\Stream;
use Amp\CancellationToken; use Amp\CancellationToken;
use Amp\Socket\ClientConnectContext; use Amp\Socket\ConnectContext;
use Amp\Uri\Uri; use Amp\Uri\Uri;
use danog\MadelineProto\Exception; use danog\MadelineProto\Exception;
use danog\MadelineProto\Stream\MTProtoTransport\ObfuscatedStream; use danog\MadelineProto\Stream\MTProtoTransport\ObfuscatedStream;
@ -74,7 +74,7 @@ class ConnectionContext
/** /**
* Socket context. * Socket context.
* *
* @var \Amp\Socket\ClientConnectionContext * @var \Amp\Socket\ConnectContext
*/ */
private $socketContext; private $socketContext;
/** /**
@ -118,11 +118,11 @@ class ConnectionContext
/** /**
* Set the socket context. * Set the socket context.
* *
* @param ClientConnectContext $socketContext * @param ConnectContext $socketContext
* *
* @return self * @return self
*/ */
public function setSocketContext(ClientConnectContext $socketContext): self public function setSocketContext(ConnectContext $socketContext): self
{ {
$this->socketContext = $socketContext; $this->socketContext = $socketContext;
@ -132,9 +132,9 @@ class ConnectionContext
/** /**
* Get the socket context. * Get the socket context.
* *
* @return ClientConnectContext * @return ConnectContext
*/ */
public function getSocketContext(): ClientConnectContext public function getSocketContext(): ConnectContext
{ {
return $this->socketContext; return $this->socketContext;
} }

View File

@ -19,6 +19,7 @@
namespace danog\MadelineProto\Stream\MTProtoTransport; namespace danog\MadelineProto\Stream\MTProtoTransport;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\Async\BufferedStream;
use danog\MadelineProto\Stream\BufferedStreamInterface; use danog\MadelineProto\Stream\BufferedStreamInterface;
use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\ConnectionContext;
@ -100,9 +101,9 @@ class AbridgedStream implements BufferedStreamInterface, MTProtoBufferInterface
/** /**
* {@inheritdoc} * {@inheritdoc}
* *
* @return \Amp\Socket\Socket * @return EncryptableSocket
*/ */
public function getSocket(): \Amp\Socket\Socket public function getSocket(): EncryptableSocket
{ {
return $this->stream->getSocket(); return $this->stream->getSocket();
} }

View File

@ -19,6 +19,7 @@
namespace danog\MadelineProto\Stream\MTProtoTransport; namespace danog\MadelineProto\Stream\MTProtoTransport;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\Async\BufferedStream;
use danog\MadelineProto\Stream\BufferedStreamInterface; use danog\MadelineProto\Stream\BufferedStreamInterface;
use danog\MadelineProto\Stream\Common\HashedBufferedStream; use danog\MadelineProto\Stream\Common\HashedBufferedStream;
@ -110,9 +111,9 @@ class FullStream implements BufferedStreamInterface, MTProtoBufferInterface
/** /**
* {@inheritdoc} * {@inheritdoc}
* *
* @return \Amp\Socket\Socket * @return EncryptableSocket
*/ */
public function getSocket(): \Amp\Socket\Socket public function getSocket(): EncryptableSocket
{ {
return $this->stream->getSocket(); return $this->stream->getSocket();
} }

View File

@ -19,6 +19,7 @@
namespace danog\MadelineProto\Stream\MTProtoTransport; namespace danog\MadelineProto\Stream\MTProtoTransport;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use Amp\Success; use Amp\Success;
use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\Async\BufferedStream;
use danog\MadelineProto\Stream\BufferedProxyStreamInterface; use danog\MadelineProto\Stream\BufferedProxyStreamInterface;
@ -183,9 +184,9 @@ class HttpStream implements MTProtoBufferInterface, BufferedProxyStreamInterface
/** /**
* {@inheritdoc} * {@inheritdoc}
* *
* @return \Amp\Socket\Socket * @return EncryptableSocket
*/ */
public function getSocket(): \Amp\Socket\Socket public function getSocket(): EncryptableSocket
{ {
return $this->stream->getSocket(); return $this->stream->getSocket();
} }

View File

@ -40,16 +40,6 @@ class HttpsStream extends HttpStream implements MTProtoBufferInterface
return parent::connectGenerator($ctx->getCtx()->secure(true), $header); return parent::connectGenerator($ctx->getCtx()->secure(true), $header);
} }
/**
* {@inheritdoc}
*
* @return \Amp\Socket\Socket
*/
public function getSocket(): \Amp\Socket\Socket
{
return $this->stream->getSocket();
}
public static function getName(): string public static function getName(): string
{ {
return __CLASS__; return __CLASS__;

View File

@ -19,6 +19,7 @@
namespace danog\MadelineProto\Stream\MTProtoTransport; namespace danog\MadelineProto\Stream\MTProtoTransport;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\Async\BufferedStream;
use danog\MadelineProto\Stream\BufferedStreamInterface; use danog\MadelineProto\Stream\BufferedStreamInterface;
use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\ConnectionContext;
@ -92,9 +93,9 @@ class IntermediatePaddedStream implements BufferedStreamInterface, MTProtoBuffer
/** /**
* {@inheritdoc} * {@inheritdoc}
* *
* @return \Amp\Socket\Socket * @return EncryptableSocket
*/ */
public function getSocket(): \Amp\Socket\Socket public function getSocket(): EncryptableSocket
{ {
return $this->stream->getSocket(); return $this->stream->getSocket();
} }

View File

@ -19,6 +19,7 @@
namespace danog\MadelineProto\Stream\MTProtoTransport; namespace danog\MadelineProto\Stream\MTProtoTransport;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\Async\BufferedStream;
use danog\MadelineProto\Stream\BufferedStreamInterface; use danog\MadelineProto\Stream\BufferedStreamInterface;
use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\ConnectionContext;
@ -91,13 +92,14 @@ class IntermediateStream implements BufferedStreamInterface, MTProtoBufferInterf
/** /**
* {@inheritdoc} * {@inheritdoc}
* *
* @return \Amp\Socket\Socket * @return EncryptableSocket
*/ */
public function getSocket(): \Amp\Socket\Socket public function getSocket(): EncryptableSocket
{ {
return $this->stream->getSocket(); return $this->stream->getSocket();
} }
public static function getName(): string public static function getName(): string
{ {
return __CLASS__; return __CLASS__;

View File

@ -19,6 +19,7 @@
namespace danog\MadelineProto\Stream\MTProtoTransport; namespace danog\MadelineProto\Stream\MTProtoTransport;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use danog\MadelineProto\Exception; use danog\MadelineProto\Exception;
use danog\MadelineProto\Stream\Async\Buffer; use danog\MadelineProto\Stream\Async\Buffer;
use danog\MadelineProto\Stream\Async\BufferedStream; use danog\MadelineProto\Stream\Async\BufferedStream;
@ -201,9 +202,9 @@ class ObfuscatedStream implements BufferedProxyStreamInterface
/** /**
* {@inheritdoc} * {@inheritdoc}
* *
* @return \Amp\Socket\Socket * @return EncryptableSocket
*/ */
public function getSocket(): \Amp\Socket\Socket public function getSocket(): EncryptableSocket
{ {
return $this->stream->getSocket(); return $this->stream->getSocket();
} }

View File

@ -20,6 +20,7 @@ namespace danog\MadelineProto\Stream\Proxy;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\ClientTlsContext; use Amp\Socket\ClientTlsContext;
use Amp\Socket\EncryptableSocket;
use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\Async\RawStream;
use danog\MadelineProto\Stream\BufferedProxyStreamInterface; use danog\MadelineProto\Stream\BufferedProxyStreamInterface;
use danog\MadelineProto\Stream\ConnectionContext; use danog\MadelineProto\Stream\ConnectionContext;
@ -47,6 +48,11 @@ class HttpProxy implements RawProxyStreamInterface, BufferedProxyStreamInterface
$ctx = $ctx->getCtx(); $ctx = $ctx->getCtx();
$uri = $ctx->getUri(); $uri = $ctx->getUri();
$secure = $ctx->isSecure(); $secure = $ctx->isSecure();
if ($secure) {
$ctx->setSocketContext($ctx->getSocketContext()->withTlsContext(new ClientTlsContext($uri->getHost())));
}
$ctx->setUri('tcp://'.$this->extra['address'].':'.$this->extra['port'])->secure(false); $ctx->setUri('tcp://'.$this->extra['address'].':'.$this->extra['port'])->secure(false);
$this->stream = yield $ctx->getStream(); $this->stream = yield $ctx->getStream();
@ -125,8 +131,8 @@ class HttpProxy implements RawProxyStreamInterface, BufferedProxyStreamInterface
$read = yield $buffer->bufferRead($length); $read = yield $buffer->bufferRead($length);
} }
if ($secure && \method_exists($this->getSocket(), 'enableCrypto')) { if ($secure) {
yield $this->getSocket()->enableCrypto((new ClientTlsContext())->withPeerName($uri->getHost())); yield $this->getSocket()->setupTls();
} }
\danog\MadelineProto\Logger::log('Connected to '.$address.':'.$port.' via http'); \danog\MadelineProto\Logger::log('Connected to '.$address.':'.$port.' via http');
@ -203,9 +209,9 @@ class HttpProxy implements RawProxyStreamInterface, BufferedProxyStreamInterface
/** /**
* {@inheritdoc} * {@inheritdoc}
* *
* @return \Amp\Socket\Socket * @return EncryptableSocket
*/ */
public function getSocket(): \Amp\Socket\Socket public function getSocket(): EncryptableSocket
{ {
return $this->stream->getSocket(); return $this->stream->getSocket();
} }

View File

@ -58,6 +58,11 @@ class SocksProxy implements RawProxyStreamInterface, BufferedProxyStreamInterfac
$ctx = $ctx->getCtx(); $ctx = $ctx->getCtx();
$uri = $ctx->getUri(); $uri = $ctx->getUri();
$secure = $ctx->isSecure(); $secure = $ctx->isSecure();
if ($secure) {
$ctx->setSocketContext($ctx->getSocketContext()->withTlsContext(new ClientTlsContext($uri->getHost())));
}
$ctx->setUri('tcp://'.$this->extra['address'].':'.$this->extra['port'])->secure(false); $ctx->setUri('tcp://'.$this->extra['address'].':'.$this->extra['port'])->secure(false);
$methods = \chr(0); $methods = \chr(0);
@ -150,8 +155,8 @@ class SocksProxy implements RawProxyStreamInterface, BufferedProxyStreamInterfac
\danog\MadelineProto\Logger::log(['Connected to '.$ip.':'.$port.' via socks5']); \danog\MadelineProto\Logger::log(['Connected to '.$ip.':'.$port.' via socks5']);
if ($secure && \method_exists($this->getSocket(), 'enableCrypto')) { if ($secure) {
yield $this->getSocket()->enableCrypto((new ClientTlsContext())->withPeerName($uri->getHost())); yield $this->getSocket()->setupTls();
} }
if (\strlen($header)) { if (\strlen($header)) {
yield (yield $this->stream->getWriteBuffer(\strlen($header)))->bufferWrite($header); yield (yield $this->stream->getWriteBuffer(\strlen($header)))->bufferWrite($header);
@ -217,9 +222,9 @@ class SocksProxy implements RawProxyStreamInterface, BufferedProxyStreamInterfac
/** /**
* {@inheritdoc} * {@inheritdoc}
* *
* @return \Amp\Socket\Socket * @return EncryptableSocket
*/ */
public function getSocket(): \Amp\Socket\Socket public function getSocket(): EncryptableSocket
{ {
return $this->stream->getSocket(); return $this->stream->getSocket();
} }

View File

@ -19,6 +19,7 @@
namespace danog\MadelineProto\Stream; namespace danog\MadelineProto\Stream;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use Amp\Socket\Socket; use Amp\Socket\Socket;
/** /**
@ -47,7 +48,7 @@ interface StreamInterface
/** /**
* Get underlying AMPHP socket resource. * Get underlying AMPHP socket resource.
* *
* @return \Amp\Socket\Socket * @return EncryptableSocket
*/ */
public function getSocket(): Socket; public function getSocket(): Socket;
} }

View File

@ -19,12 +19,16 @@
namespace danog\MadelineProto\Stream\Transport; namespace danog\MadelineProto\Stream\Transport;
use Amp\ByteStream\ClosedException; use Amp\ByteStream\ClosedException;
use Amp\CancellationToken;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use Amp\Socket\Socket; use Amp\Socket\Socket;
use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\Async\RawStream;
use danog\MadelineProto\Stream\ProxyStreamInterface; use danog\MadelineProto\Stream\ProxyStreamInterface;
use danog\MadelineProto\Stream\RawStreamInterface; use danog\MadelineProto\Stream\RawStreamInterface;
use function Amp\Socket\connect;
/** /**
* Default stream wrapper. * Default stream wrapper.
* *
@ -32,20 +36,23 @@ use danog\MadelineProto\Stream\RawStreamInterface;
* *
* @author Daniil Gentili <daniil@daniil.it> * @author Daniil Gentili <daniil@daniil.it>
*/ */
class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInterface class DefaultStream extends Socket implements RawStreamInterface
{ {
use RawStream; use RawStream;
/**
* Socket
*
* @var EncryptableSocket
*/
private $stream; private $stream;
private $connector = 'Amp\\Socket\\connect';
private $cryptoConnector = 'Amp\\Socket\\cryptoConnect';
public function __construct() public function __construct()
{ {
} }
public function enableCrypto(ClientTlsContext $tlsContext = null): \Amp\Promise public function setupTls(?CancellationToken $cancellationToken = null): \Amp\Promise
{ {
return $this->stream->enableCrypto($tlsContext); return $this->stream->setupTls($cancellationToken);
} }
public function getStream() public function getStream()
@ -55,10 +62,9 @@ class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInt
public function connectGenerator(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator public function connectGenerator(\danog\MadelineProto\Stream\ConnectionContext $ctx, string $header = ''): \Generator
{ {
$this->stream = yield connect($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken());
if ($ctx->isSecure()) { if ($ctx->isSecure()) {
$this->stream = yield ($this->cryptoConnector)($ctx->getStringUri(), $ctx->getSocketContext(), null, $ctx->getCancellationToken()); yield $this->stream->setupTls();
} else {
$this->stream = yield ($this->connector)($ctx->getStringUri(), $ctx->getSocketContext(), $ctx->getCancellationToken());
} }
yield $this->stream->write($header); yield $this->stream->write($header);
} }
@ -89,9 +95,9 @@ class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInt
} }
/** /**
* Async close. * Close.
* *
* @return Generator * @return void
*/ */
public function disconnect() public function disconnect()
{ {
@ -107,6 +113,11 @@ class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInt
} }
} }
/**
* Close
*
* @return void
*/
public function close() public function close()
{ {
$this->disconnect(); $this->disconnect();
@ -115,20 +126,13 @@ class DefaultStream extends Socket implements RawStreamInterface, ProxyStreamInt
/** /**
* {@inheritdoc} * {@inheritdoc}
* *
* @return \Amp\Socket\Socket * @return EncryptableSocket
*/ */
public function getSocket(): \Amp\Socket\Socket public function getSocket(): EncryptableSocket
{ {
return $this->stream; return $this->stream;
} }
/**
* {@inheritdoc}
*/
public function setExtra($extra)
{
list($this->connector, $this->cryptoConnector) = $extra;
}
public static function getName(): string public static function getName(): string
{ {
return __CLASS__; return __CLASS__;

View File

@ -19,6 +19,7 @@
namespace danog\MadelineProto\Stream\Transport; namespace danog\MadelineProto\Stream\Transport;
use Amp\ByteStream\ClosedException; use Amp\ByteStream\ClosedException;
use Amp\CancellationToken;
use Amp\Promise; use Amp\Promise;
use Amp\Socket\Socket; use Amp\Socket\Socket;
use danog\MadelineProto\Stream\Async\RawStream; use danog\MadelineProto\Stream\Async\RawStream;
@ -42,11 +43,12 @@ class PremadeStream extends Socket implements RawStreamInterface, ProxyStreamInt
{ {
} }
public function enableCrypto(ClientTlsContext $tlsContext = null): \Amp\Promise public function setupTls(?CancellationToken $cancellationToken = null): \Amp\Promise
{ {
return $this->stream->enableCrypto($tlsContext); return $this->stream->setupTls($cancellationToken);
} }
public function getStream() public function getStream()
{ {
return $this->stream; return $this->stream;
@ -115,7 +117,7 @@ class PremadeStream extends Socket implements RawStreamInterface, ProxyStreamInt
* *
* @return \Amp\Socket\Socket * @return \Amp\Socket\Socket
*/ */
public function getSocket(): \Amp\Socket\Socket public function getSocket(): Socket
{ {
return $this->stream; return $this->stream;
} }